ClickHouse in a general analytical workload (based on Star Schema Benchmark)
Jun 26, 2017
This is a crosspost from https://www.percona.com/blog/2017/06/22/clickhouse-general-analytical-workload-based-star-schema-benchmark/
In this blog post, we’ll look at how ClickHouse performs in a general analytical workload using the star schema benchmark test.
We have mentioned ClickHouse in some recent posts (https://www.percona.com/blog/2017/02/13/clickhouse-new-opensource-columnar-database/, https://www.percona.com/blog/2017/03/17/column-store-database-benchmarks-mariadb-columnstore-vs-clickhouse-vs-apache-spark/) ,
where it showed excellent results. ClickHouse by itself seems to be event-oriented RDBMS, as its name suggests (clicks). Its primary purpose, using Yandex Metrica (the system similar to Google Analytics), also points to an event-based nature. We also can see there is a requirement for date-stamped columns.
It is possible, however, to use ClickHouse in a general analytical workload. This blog post shares my findings. For these tests, I used a Star Schema benchmark — slightly-modified so that able to handle ClickHouse specifics.
First, let’s talk about schemas. We need to adjust to ClickHouse data types. For example, the biggest fact table in SSB is “lineorder”. Below is how it is defined for Amazon RedShift (as taken from https://docs.aws.amazon.com/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html)
CREATE TABLE lineorder
(
lo_orderkey INTEGER NOT NULL,
lo_linenumber INTEGER NOT NULL,
lo_custkey INTEGER NOT NULL,
lo_partkey INTEGER NOT NULL,
lo_suppkey INTEGER NOT NULL,
lo_orderdate INTEGER NOT NULL,
lo_orderpriority VARCHAR(15) NOT NULL,
lo_shippriority VARCHAR(1) NOT NULL,
lo_quantity INTEGER NOT NULL,
lo_extendedprice INTEGER NOT NULL,
lo_ordertotalprice INTEGER NOT NULL,
lo_discount INTEGER NOT NULL,
lo_revenue INTEGER NOT NULL,
lo_supplycost INTEGER NOT NULL,
lo_tax INTEGER NOT NULL,
lo_commitdate INTEGER NOT NULL,
lo_shipmode VARCHAR(10) NOT NULL
);
For ClickHouse the table definition looks like:
CREATE TABLE lineorderfull (
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY String,
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE String
)Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER),8192);
From this we can see we need to use datatypes like UInt8 and UInt32, which are somewhat unusual for database world datatypes.
The second table (RedShift definition):
CREATE TABLE customer
(
c_custkey INTEGER NOT NULL,
c_name VARCHAR(25) NOT NULL,
c_address VARCHAR(25) NOT NULL,
c_city VARCHAR(10) NOT NULL,
c_nation VARCHAR(15) NOT NULL,
c_region VARCHAR(12) NOT NULL,
c_phone VARCHAR(15) NOT NULL,
c_mktsegment VARCHAR(10) NOT NULL
);
And for ClickHouse I define as
CREATE TABLE customerfull (
C_CUSTKEY UInt32,
C_NAME String,
C_ADDRESS String,
C_CITY String,
C_NATION String,
C_REGION String,
C_PHONE String,
C_MKTSEGMENT String,
C_FAKEDATE Date
)Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY),8192);
For reference, the full schema for the benchmark is here: https://github.com/vadimtk/ssb-clickhouse/blob/master/create.sql.
For this table, we need to define a rudimentary column C_FAKEDATE Date in order to use ClickHouse’s most advanced engine (MergeTree). I was told by the ClickHouse team that they plan to remove this limitation in the future.
To generate data acceptable by ClickHouse, I made modifications to ssb-dbgen. You can find my version here: https://github.com/vadimtk/ssb-dbgen. The most notable change is that ClickHouse can’t accept dates in CSV files formatted as “19971125”. It has to be “1997-11-25”. This is something to keep in mind when loading data into ClickHouse.
It is possible to do some preformating on the load, but I don’t have experience with that. A common approach is to create the staging table with datatypes that match loaded data, and then convert them using SQL functions when inserting to the main table.
Hardware setup.
One of the goal of this benchmark to see how ClickHouse scales on multiple nodes. So I will use the setup of 1 node and compare to setup of 3 nodes.
Each node is 24 cores of “Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz” CPUs and data is located on very fast PCIe Flash storage.
For the SSB benchmark I use scale factor 2500, which gives in raw data:
Table lineorder – 15 bln rows, raw size 1.7TB, Table customer – 75 mln rows
When loaded into ClickHouse, the table lineorder takes 464GB, which corresponds to 3.7x compression ratio.
We compare 1 node (table names lineorderfull, customerfull) setup vs 3 nodes (table names lineorderd, customerd)
Single table operations:
Query
SELECT
toYear(LO_ORDERDATE) AS yod,
sum(LO_REVENUE)
FROM lineorderfull
GROUP BY yod
1 node:
7 rows in set. Elapsed: 9.741 sec. Processed 15.00 billion rows, 90.00 GB (1.54 billion rows/s., 9.24 GB/s.)
3 nodes:
7 rows in set. Elapsed: 3.258 sec. Processed 15.00 billion rows, 90.00 GB (4.60 billion rows/s., 27.63 GB/s.)
We see practically 3x speedup. Handling 4.6 billion rows/s is scaring blazing fast.
1 table with filtering
SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorderfull
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
1 node:
1 rows in set. Elapsed: 3.175 sec. Processed 2.28 billion rows, 18.20 GB (716.60 million rows/s., 5.73 GB/s.)
3 nodes:
1 rows in set. Elapsed: 1.295 sec. Processed 2.28 billion rows, 18.20 GB (1.76 billion rows/s., 14.06 GB/s.)
It worths mention that during execution of this query ClickHouse is able to utilize ALL 24 cores on each box, this confirms that ClickHouse is Massively Parallel Processing system.
Two tables (independent subquery)
In this case I want to show the ability of Clickhouse to handle independent subqueries.
SELECT sum(LO_REVENUE)
FROM lineorderfull
WHERE LO_CUSTKEY IN
(
SELECT C_CUSTKEY AS LO_CUSTKEY
FROM customerfull
WHERE C_REGION = 'ASIA'
)
1 node:
1 rows in set. Elapsed: 28.934 sec. Processed 15.00 billion rows, 120.00 GB (518.43 million rows/s., 4.15 GB/s.)
3 nodes:
1 rows in set. Elapsed: 14.189 sec. Processed 15.12 billion rows, 121.67 GB (1.07 billion rows/s., 8.57 GB/s.)
We however do not see ~3x speedup on 3 nodes, because of required data transfer to perform match LO_CUSTKEY
with C_CUSTKEY
Two tables JOIN
With the subquery where columns are used to return result or for GROUP BY things become more complicated.
In this case we want to GROUP BY
by the column from the second table.
First, ClickHouse would not support traditional subquery syntax, so we need to use JOIN.
And for JOINs ClickHouse also strictly prescribes how it should be written (the limitation which also will be shifted in the future). So our JOIN should look like:
SELECT
C_REGION,
sum(LO_EXTENDEDPRICE * LO_DISCOUNT)
FROM lineorderfull
ANY INNER JOIN
(
SELECT
C_REGION,
C_CUSTKEY AS LO_CUSTKEY
FROM customerfull
) USING (LO_CUSTKEY)
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
GROUP BY C_REGION
1 node:
5 rows in set. Elapsed: 31.443 sec. Processed 2.35 billion rows, 28.79 GB (74.75 million rows/s., 915.65 MB/s.)
3 nodes:
5 rows in set. Elapsed: 25.160 sec. Processed 2.58 billion rows, 33.25 GB (102.36 million rows/s., 1.32 GB/s.)
In this case the speedup is not even 2x. This again corresponds to the fact of the random data distribution for the tables lineorderd and customerd. Both tables were defines as
CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(3shards, default, lineorder, rand());
CREATE TABLE customerd AS customer ENGINE = Distributed(3shards, default, customer, rand());
Where rand()
part defines that records will be distributed randomly across 3 nodes, and when we perform join by LO_CUSTKEY=C_CUSTKEY
it may happen that records are located on different nodes.
One possibility to deal with this is to define data locality, for example as
CREATE TABLE lineorderLD AS lineorderL ENGINE = Distributed(3shards, default, lineorderL, LO_CUSTKEY);
CREATE TABLE customerLD AS customerL ENGINE = Distributed(3shards, default, customerL, C_CUSTKEY);
Three tables JOIN
This is when it becomes very complicated.
Let’s consider the query which you normally would write
As
SELECT sum(LO_REVENUE),P_MFGR, toYear(LO_ORDERDATE) yod FROM lineorderfull ,customerfull,partfull WHERE C_REGION = 'ASIA' and LO_CUSTKEY=C_CUSTKEY and P_PARTKEY=LO_PARTKEY GROUP BY P_MFGR,yod ORDER BY P_MFGR,yod;
With Clickhouse’s limitations on JOINs syntax, the query becomes like
SELECT
sum(LO_REVENUE),
P_MFGR,
toYear(LO_ORDERDATE) AS yod
FROM
(
SELECT
LO_PARTKEY,
LO_ORDERDATE,
LO_REVENUE
FROM lineorderfull
ALL INNER JOIN
(
SELECT
C_REGION,
C_CUSTKEY AS LO_CUSTKEY
FROM customerfull
) USING (LO_CUSTKEY)
WHERE C_REGION = 'ASIA'
)
ALL INNER JOIN
(
SELECT
P_MFGR,
P_PARTKEY AS LO_PARTKEY
FROM partfull
) USING (LO_PARTKEY)
GROUP BY
P_MFGR,
yod
ORDER BY
P_MFGR ASC,
yod ASC
Writing queries this way we also force ClickHouse to use prescribed JOIN order, as at this moment there is no optimizer in ClickHouse and it is totally unaware about data distribution.
Also there is not much speedup when we compare 1 node vs 3 nodes:
1 node execution time:
35 rows in set. Elapsed: 697.806 sec. Processed 15.08 billion rows, 211.53 GB (21.61 million rows/s., 303.14 MB/s.)
3 nodes execution time:
35 rows in set. Elapsed: 622.536 sec. Processed 15.12 billion rows, 211.71 GB (24.29 million rows/s., 340.08 MB/s.)
There is however way to make the query faster for this 3-way JOIN is (Thanks to Alexander Zaytsev for help)
Optimized query:
SELECT
sum(revenue),
P_MFGR,
yod
FROM
(
SELECT
LO_PARTKEY AS P_PARTKEY,
toYear(LO_ORDERDATE) AS yod,
SUM(LO_REVENUE) AS revenue
FROM lineorderfull
WHERE LO_CUSTKEY IN
(
SELECT C_CUSTKEY
FROM customerfull
WHERE C_REGION = 'ASIA'
)
GROUP BY
P_PARTKEY,
yod
)
ANY INNER JOIN partfull USING (P_PARTKEY)
GROUP BY
P_MFGR,
yod
ORDER BY
P_MFGR ASC,
yod ASC
Optimized query time
1 node:
35 rows in set. Elapsed: 106.732 sec. Processed 15.00 billion rows, 210.05 GB (140.56 million rows/s., 1.97 GB/s.)
3 nodes:
35 rows in set. Elapsed: 75.854 sec. Processed 15.12 billion rows, 211.71 GB (199.36 million rows/s., 2.79 GB/s.
That’s an improvement of about 6.5 times compared to the original query. This shows the importance of understanding data distribution, and writing the optimal query to process the data.
Another option for dealing with JOIN complexity, and to improve performance, is to use ClickHouse’s dictionaries. These dictionaries are described here: https://altinity.com/blog/2017/4/12/dictionaries-explained.
I will review dictionary performance in further posts
Another, traditional way, to deal with JOIN complexity in analytics workload, is to use denormalization. We can move some columns (for example P_MFGR from the last query) to the facts table (lineorder)
Observations
- ClickHouse can handle general analytical queries (it requires special schema design and considerations, however)
- Linear speedup is possible, but it depends on query design and requires advanced planning — proper speedup depends on data locality
- ClickHouse is blazingly fast (beyond what I’ve seen before) because it can use all available CPU cores for query, as shown above using 24 cores for single server and 72 cores for three nodes
- Multi-table JOINs are cumbersome and require manual work to achieve better performance, so consider using dictionaries or denormalization