ClickHouse in a general analytical workload (based on Star Schema Benchmark)

 

Jun 26, 2017

This is a crosspost from https://www.percona.com/blog/2017/06/22/clickhouse-general-analytical-workload-based-star-schema-benchmark/

In this blog post, we’ll look at how ClickHouse performs in a general analytical workload using the star schema benchmark test.

We have mentioned ClickHouse in some recent posts (https://www.percona.com/blog/2017/02/13/clickhouse-new-opensource-columnar-database/, https://www.percona.com/blog/2017/03/17/column-store-database-benchmarks-mariadb-columnstore-vs-clickhouse-vs-apache-spark/) ,
where it showed excellent results. ClickHouse by itself seems to be event-oriented RDBMS, as its name suggests (clicks). Its primary purpose, using Yandex Metrica (the system similar to Google Analytics), also points to an event-based nature. We also can see there is a requirement for date-stamped columns.

It is possible, however, to use ClickHouse in a general analytical workload. This blog post shares my findings. For these tests, I used a Star Schema benchmark — slightly-modified so that able to handle ClickHouse specifics.
First, let’s talk about schemas. We need to adjust to ClickHouse data types. For example, the biggest fact table in SSB is “lineorder”. Below is how it is defined for Amazon RedShift (as taken from https://docs.aws.amazon.com/redshift/latest/dg/tutorial-tuning-tables-create-test-data.html)

CREATE TABLE lineorder 
    (
      lo_orderkey          INTEGER NOT NULL,
      lo_linenumber        INTEGER NOT NULL,
      lo_custkey           INTEGER NOT NULL,
      lo_partkey           INTEGER NOT NULL,
      lo_suppkey           INTEGER NOT NULL,
      lo_orderdate         INTEGER NOT NULL,
      lo_orderpriority     VARCHAR(15) NOT NULL,
      lo_shippriority      VARCHAR(1) NOT NULL,
      lo_quantity          INTEGER NOT NULL,
      lo_extendedprice     INTEGER NOT NULL,
      lo_ordertotalprice   INTEGER NOT NULL,
      lo_discount          INTEGER NOT NULL,
      lo_revenue           INTEGER NOT NULL,
      lo_supplycost        INTEGER NOT NULL,
      lo_tax               INTEGER NOT NULL,
      lo_commitdate        INTEGER NOT NULL,
      lo_shipmode          VARCHAR(10) NOT NULL
    );

For ClickHouse the table definition looks like:

CREATE TABLE lineorderfull ( 
            LO_ORDERKEY             UInt32,
            LO_LINENUMBER           UInt8,
            LO_CUSTKEY              UInt32,
            LO_PARTKEY              UInt32,
            LO_SUPPKEY              UInt32,
            LO_ORDERDATE            Date,
            LO_ORDERPRIORITY        String,
            LO_SHIPPRIORITY         UInt8,
            LO_QUANTITY             UInt8,
            LO_EXTENDEDPRICE        UInt32,
            LO_ORDTOTALPRICE        UInt32,
            LO_DISCOUNT             UInt8,
            LO_REVENUE              UInt32,
            LO_SUPPLYCOST           UInt32,
            LO_TAX                  UInt8,
            LO_COMMITDATE           Date,
            LO_SHIPMODE             String
    )Engine=MergeTree(LO_ORDERDATE,(LO_ORDERKEY,LO_LINENUMBER),8192);

From this we can see we need to use datatypes like UInt8 and UInt32, which are somewhat unusual for database world datatypes.

The second table (RedShift definition):

CREATE TABLE customer 
(
  c_custkey      INTEGER NOT NULL,
  c_name         VARCHAR(25) NOT NULL,
  c_address      VARCHAR(25) NOT NULL,
  c_city         VARCHAR(10) NOT NULL,
  c_nation       VARCHAR(15) NOT NULL,
  c_region       VARCHAR(12) NOT NULL,
  c_phone        VARCHAR(15) NOT NULL,
  c_mktsegment   VARCHAR(10) NOT NULL
);

And for ClickHouse I define as

CREATE TABLE customerfull ( 
        C_CUSTKEY       UInt32,
        C_NAME          String,
        C_ADDRESS       String,
        C_CITY          String,
        C_NATION        String,
        C_REGION        String,
        C_PHONE         String,
        C_MKTSEGMENT    String,
        C_FAKEDATE      Date
)Engine=MergeTree(C_FAKEDATE,(C_CUSTKEY),8192);

For reference, the full schema for the benchmark is here: https://github.com/vadimtk/ssb-clickhouse/blob/master/create.sql.
For this table, we need to define a rudimentary column C_FAKEDATE Date in order to use ClickHouse’s most advanced engine (MergeTree). I was told by the ClickHouse team that they plan to remove this limitation in the future.
To generate data acceptable by ClickHouse, I made modifications to ssb-dbgen. You can find my version here: https://github.com/vadimtk/ssb-dbgen. The most notable change is that ClickHouse can’t accept dates in CSV files formatted as “19971125”. It has to be “1997-11-25”. This is something to keep in mind when loading data into ClickHouse.
It is possible to do some preformating on the load, but I don’t have experience with that. A common approach is to create the staging table with datatypes that match loaded data, and then convert them using SQL functions when inserting to the main table.

Hardware setup.

One of the goal of this benchmark to see how ClickHouse scales on multiple nodes. So I will use the setup of 1 node and compare to setup of 3 nodes.
Each node is 24 cores of “Intel(R) Xeon(R) CPU E5-2643 v2 @ 3.50GHz” CPUs and data is located on very fast PCIe Flash storage.

For the SSB benchmark I use scale factor 2500, which gives in raw data:
Table lineorder – 15 bln rows, raw size 1.7TB, Table customer – 75 mln rows

When loaded into ClickHouse, the table lineorder takes 464GB, which corresponds to 3.7x compression ratio.

We compare 1 node (table names lineorderfull, customerfull) setup vs 3 nodes (table names lineorderd, customerd)

Single table operations:

Query

SELECT 
    toYear(LO_ORDERDATE) AS yod, 
    sum(LO_REVENUE)
FROM lineorderfull 
GROUP BY yod

1 node:
7 rows in set. Elapsed: 9.741 sec. Processed 15.00 billion rows, 90.00 GB (1.54 billion rows/s., 9.24 GB/s.)

3 nodes:
7 rows in set. Elapsed: 3.258 sec. Processed 15.00 billion rows, 90.00 GB (4.60 billion rows/s., 27.63 GB/s.)

We see practically 3x speedup. Handling 4.6 billion rows/s is scaring blazing fast.

1 table with filtering

SELECT sum(LO_EXTENDEDPRICE * LO_DISCOUNT) AS revenue
FROM lineorderfull
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)

1 node:
1 rows in set. Elapsed: 3.175 sec. Processed 2.28 billion rows, 18.20 GB (716.60 million rows/s., 5.73 GB/s.)

3 nodes:
1 rows in set. Elapsed: 1.295 sec. Processed 2.28 billion rows, 18.20 GB (1.76 billion rows/s., 14.06 GB/s.)

It worths mention that during execution of this query ClickHouse is able to utilize ALL 24 cores on each box, this confirms that ClickHouse is Massively Parallel Processing system.

Two tables (independent subquery)

In this case I want to show the ability of Clickhouse to handle independent subqueries.

SELECT sum(LO_REVENUE)
FROM lineorderfull
WHERE LO_CUSTKEY IN 
(
    SELECT C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull 
    WHERE C_REGION = 'ASIA'
)

1 node:
1 rows in set. Elapsed: 28.934 sec. Processed 15.00 billion rows, 120.00 GB (518.43 million rows/s., 4.15 GB/s.)

3 nodes:
1 rows in set. Elapsed: 14.189 sec. Processed 15.12 billion rows, 121.67 GB (1.07 billion rows/s., 8.57 GB/s.)

We however do not see ~3x speedup on 3 nodes, because of required data transfer to perform match LO_CUSTKEY with C_CUSTKEY

Two tables JOIN

With the subquery where columns are used to return result or for GROUP BY things become more complicated.
In this case we want to GROUP BY by the column from the second table.

First, ClickHouse would not support traditional subquery syntax, so we need to use JOIN.
And for JOINs ClickHouse also strictly prescribes how it should be written (the limitation which also will be shifted in the future). So our JOIN should look like:

SELECT 
    C_REGION,
    sum(LO_EXTENDEDPRICE * LO_DISCOUNT)
FROM lineorderfull
ANY INNER JOIN
(
    SELECT 
        C_REGION,
        C_CUSTKEY AS LO_CUSTKEY
    FROM customerfull
) USING (LO_CUSTKEY)
WHERE (toYear(LO_ORDERDATE) = 1993) AND ((LO_DISCOUNT >= 1) AND (LO_DISCOUNT <= 3)) AND (LO_QUANTITY < 25)
GROUP BY C_REGION

1 node:
5 rows in set. Elapsed: 31.443 sec. Processed 2.35 billion rows, 28.79 GB (74.75 million rows/s., 915.65 MB/s.)

3 nodes:
5 rows in set. Elapsed: 25.160 sec. Processed 2.58 billion rows, 33.25 GB (102.36 million rows/s., 1.32 GB/s.)

In this case the speedup is not even 2x. This again corresponds to the fact of the random data distribution for the tables lineorderd and customerd. Both tables were defines as

CREATE TABLE lineorderd AS lineorder ENGINE = Distributed(3shards, default, lineorder, rand());
CREATE TABLE customerd AS customer ENGINE = Distributed(3shards, default, customer, rand());

Where rand() part defines that records will be distributed randomly across 3 nodes, and when we perform join by LO_CUSTKEY=C_CUSTKEY it may happen that records are located on different nodes.
One possibility to deal with this is to define data locality, for example as

CREATE TABLE lineorderLD AS lineorderL ENGINE = Distributed(3shards, default, lineorderL, LO_CUSTKEY);
CREATE TABLE customerLD AS customerL ENGINE = Distributed(3shards, default, customerL, C_CUSTKEY);

Three tables JOIN

This is when it becomes very complicated.
Let’s consider the query which you normally would write
As
SELECT sum(LO_REVENUE),P_MFGR, toYear(LO_ORDERDATE) yod FROM lineorderfull ,customerfull,partfull WHERE C_REGION = 'ASIA' and LO_CUSTKEY=C_CUSTKEY and P_PARTKEY=LO_PARTKEY GROUP BY P_MFGR,yod ORDER BY P_MFGR,yod;

With Clickhouse’s limitations on JOINs syntax, the query becomes like

SELECT 
    sum(LO_REVENUE), 
    P_MFGR, 
    toYear(LO_ORDERDATE) AS yod
FROM 
(
    SELECT 
        LO_PARTKEY, 
        LO_ORDERDATE, 
        LO_REVENUE
    FROM lineorderfull 
    ALL INNER JOIN 
    (
        SELECT 
            C_REGION, 
            C_CUSTKEY AS LO_CUSTKEY
        FROM customerfull 
    ) USING (LO_CUSTKEY)
    WHERE C_REGION = 'ASIA'
) 
ALL INNER JOIN 
(
    SELECT 
        P_MFGR, 
        P_PARTKEY AS LO_PARTKEY
    FROM partfull 
) USING (LO_PARTKEY)
GROUP BY 
    P_MFGR, 
    yod
ORDER BY 
    P_MFGR ASC, 
    yod ASC

Writing queries this way we also force ClickHouse to use prescribed JOIN order, as at this moment there is no optimizer in ClickHouse and it is totally unaware about data distribution.

Also there is not much speedup when we compare 1 node vs 3 nodes:

1 node execution time:

35 rows in set. Elapsed: 697.806 sec. Processed 15.08 billion rows, 211.53 GB (21.61 million rows/s., 303.14 MB/s.)

3 nodes execution time:

35 rows in set. Elapsed: 622.536 sec. Processed 15.12 billion rows, 211.71 GB (24.29 million rows/s., 340.08 MB/s.)

There is however way to make the query faster for this 3-way JOIN is (Thanks to Alexander Zaytsev for help)

Optimized query:

SELECT 
    sum(revenue), 
    P_MFGR, 
    yod
FROM 
(
    SELECT 
        LO_PARTKEY AS P_PARTKEY, 
        toYear(LO_ORDERDATE) AS yod, 
        SUM(LO_REVENUE) AS revenue
    FROM lineorderfull 
    WHERE LO_CUSTKEY IN 
    (
        SELECT C_CUSTKEY
        FROM customerfull 
        WHERE C_REGION = 'ASIA'
    )
    GROUP BY 
        P_PARTKEY, 
        yod
) 
ANY INNER JOIN partfull USING (P_PARTKEY)
GROUP BY 
    P_MFGR, 
    yod
ORDER BY 
    P_MFGR ASC, 
    yod ASC

Optimized query time

1 node:

35 rows in set. Elapsed: 106.732 sec. Processed 15.00 billion rows, 210.05 GB (140.56 million rows/s., 1.97 GB/s.)

3 nodes:

35 rows in set. Elapsed: 75.854 sec. Processed 15.12 billion rows, 211.71 GB (199.36 million rows/s., 2.79 GB/s.

That’s an improvement of about 6.5 times compared to the original query. This shows the importance of understanding data distribution, and writing the optimal query to process the data.
Another option for dealing with JOIN complexity, and to improve performance, is to use ClickHouse’s dictionaries. These dictionaries are described here: https://altinity.com/blog/2017/4/12/dictionaries-explained.

I will review dictionary performance in further posts

Another, traditional way, to deal with JOIN complexity in analytics workload, is to use denormalization. We can move some columns (for example P_MFGR from the last query) to the facts table (lineorder)

Observations

  • ClickHouse can handle general analytical queries (it requires special schema design and considerations, however)
  • Linear speedup is possible, but it depends on query design and requires advanced planning — proper speedup depends on data locality
  • ClickHouse is blazingly fast (beyond what I’ve seen before) because it can use all available CPU cores for query, as shown above using 24 cores for single server and 72 cores for three nodes
  • Multi-table JOINs are cumbersome and require manual work to achieve better performance, so consider using dictionaries or denormalization
 
Share