Dive Deep in ClickHouse#
ClickHouse is a columnar database management system (DBMS) for online analytical processing (OLAP). It allows users to generate analytical data reports in real-time through SQL queries. This document delves into ClickHouse's cluster architecture, table structure, and query optimization techniques, providing technical insights for database administrators and data engineers.
Cluster Architecture#
ClickHouse's cluster architecture supports high scalability and flexibility, enabling mixed Replica and Shard configurations to meet different data distribution and high availability needs.
Implementing Mixed Replica and Shard Cluster#
In ClickHouse, Sharding is achieved through distributed tables, allowing data to be horizontally partitioned across multiple nodes. Replication enhances data availability and durability by creating copies of the data. The combined use of Sharding and Replication can improve query performance while increasing system fault tolerance.
- Sharding: Data is distributed across different shards based on a key (e.g., user ID).
- Replication: Data in each shard is replicated across multiple replicas to ensure data durability and high availability.
After in-depth research, we found that it is possible to simultaneously support sharding and replication on a single physical machine, which requires implementing single-machine dual-active ClickHouse and nested cluster distributed tables using Distributed + local ReplicatedMergeTree.
Let's take a look at the example configuration file:
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
Now let's look at a sample local table creation statement:
CREATE TABLE tutorial.order_local
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192
In this, the variables wrapped in '{}' need to be configured in the <'macros'> section of config.xml. Note that each ClickHouse instance on each physical machine requires different macro definitions as follows:
<macros>
<!-- Here specifies the shard number, the shard number should remain consistent across different ClickHouse instances on the same physical machine -->
<shard>01</shard>
<!-- Here is the replica number, which can be simply composed of the host-shard-replica structure, ensuring global uniqueness -->
<replica>247-01-1</replica>
</macros>
<!-- Default engine configuration, if ReplicatedMergeTree is called without parameters, it will automatically replace the default parameters -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
Next, we need to aggregate the shards on each storage instance to provide external queries as a distributed table:
CREATE TABLE tutorial.order
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))
At this point, we have completed a distributed table that simultaneously supports sharding and replication. Subsequent query operations can be performed on this table, and ClickHouse will automatically leverage the advantages of sharding for acceleration.
It is important to note that this distributed table is based on built-in Zookeeper to achieve data consistency within shards, so it also inherits Zookeeper's drawbacks: it is sensitive to single points of failure. The solution is to deploy a Zookeeper cluster.
Corresponding Metadata Management Zookeeper Configuration#
First, install JDK8, JDK11, JDK 21+.
Then we need to connect the cluster's config.xml to the Zookeeper cluster or standalone for metadata management:
<zookeeper>
<node>
<host>localhost</host> <!-- Here configure the Zookeeper host -->
<port>2181</port>
</node>
</zookeeper>
Next, in the Zookeeper configuration file /conf/zoo.cfg, configure Zookeeper to listen on the internal network address; for simplicity, it can be set to 0.0.0.0:
# The listening port
clientPortAddress=0.0.0.0
The design intention of this structure is to provide the fastest read queries and secure data storage. For writes and changes, the performance of this design will be lower than that of a pure MergeTree. Additionally, issues such as slow nodes, write amplification, and asynchronous synchronization may still cause the entire cluster's changes and write operations to get stuck under certain conditions, which requires special attention.
A simple production practice is to prohibit users from writing to and modifying the distributed table, and to perform all of the above operations on the local table.
Pitfall 1: Interserver-copy Failure#
In the above practice, we will find that the shard functionality of the distributed table works normally, but there are issues with the replication of replicas. Entering the JVM to check the trace log reveals that the host of the internal network node is obtained by default through the hostname -f command, leading to corresponding DNS resolution failures. Depending on the situation, it may be necessary to change it to the corresponding IP address:
<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
<interserver_http_host>192.168.1.253</interserver_http_host>
How to Integrate Underlying Storage with JuiceFS/HDFS/S3#
ClickHouse supports various types of file systems and object storage backends, such as JuiceFS, HDFS, and Amazon S3, which can be used to store large-scale data. For streaming data (e.g., market data), we may want to add it to another cold storage area for backup, such as HDFS, rather than using ReplicateMergeTree. This specific file system storage solution may require further research.
- JuiceFS: A distributed file system based on Redis and S3, which allows ClickHouse tables to be stored directly on JuiceFS, achieving high performance and scalability.
- HDFS: Using HDFS as a storage backend, data stored in HDFS can be created and queried directly on ClickHouse through the HDFS table engine.
- S3: Through the S3 table engine, ClickHouse can interact directly with S3-compatible storage without needing to copy data to local storage.
Since ClickHouse is entirely file-based, integrating with IDC's file system does not require special connector support. In the case of S3, ClickHouse has also implemented the corresponding interface, so if we migrate later, no extra efforts are needed.
Elastic Scaling#
The premise of this design is that we do not use the other frameworks that separate storage and computation, and that ClickHouse's single-machine computation capability is strong.
1. Using Materialized Views to Recalculate Hash#
Materialized Views in ClickHouse are used to store the results of queries as independent tables, which track modifications to their dependent tables and update in real-time.
Assuming we now have three nodes, each with one shard and two replicas, as shown in the diagram:
Now we need to scale the cluster to 4 nodes, so one way to scale using materialized views should be as follows:
- Expand the base table; this part only requires modifying the config for each node, and finally, the master config can be modified uniformly.
- Data rebalancing; first manually fetch the load that is too high, and long downtime is generally unacceptable.
- We consider using materialized views for scaling, with the general process as follows:
- Create a new MV' with the same structure, hash the MV' data to 4 nodes; meanwhile, MV still writes to 3 nodes.
- The new shard has a special configuration value named status, with the value new; MV reads this type of configuration item and will automatically ignore this type of shard. Therefore, for MV, although the configuration is already for 4 nodes, it still sees itself as 3 nodes.
- When creating MV', specify the configuration item include_state_shard=true; the new MV will hash to 4 nodes; additionally, create a view specifying data initialization capability, so that it can continuously backtrack the base table data without downtime.
- MV' consumes historical data from the base table; after the historical consumption is complete, start renaming MV' to MV (the subsequent MV will be deleted).
- Stop writing to the base table; this step is to prevent data accumulation on the distributed table during the rename phase, so it is necessary to stop writing to clear the accumulated data. This downtime is approximately seconds to minutes.
- Rename MV to MV-Temp; since the rename operation is a metadata operation, it executes relatively quickly.
- Delete the materialized view converter, rename the local and distributed tables of the data table.
- Rename MV' to MV,
- Delete the materialized view converter, rename the local and distributed tables of the data table, and rebuild the materialized view converter (the name has changed).
- Update the shard configuration item, removing the status keyword.
- Modify the MV configuration item, removing the include_state_shard configuration item.
The method to stop writing to the base table can be done by directly setting permissions or using SQL:
ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE
2. Using Virtual Sharding Technology#
We can directly modify the previous rand() hash function to the jumpConsistentHash algorithm; in the distributed table, query requests will recalculate the hash function, thus extracting the required data parts to the new nodes. The process is as follows:
- As in the previous section, configure and launch the new shard.
- Recalculate the hash function values on all old shards to obtain the list of data parts that need to be migrated to the new shard.
- Manually migrate these data parts to the new shard.
This method avoids full replay and data copying and does not disable the cluster, but during the migration process, correctness and data integrity rely on manually written logic, which is prone to errors, and the number of data parts within the completed shard will increase, which to some extent reduces the performance of the distributed service.
Hands-On Practice#
We tested the migration of partitions on our local cluster.
- First, create a full materialized view MV:
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
- Insert data from ic_all into the materialized view MV:
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
- Add a new shard to the configuration:
- Establish the same table structure ic_local.
- Update the config.xml in the cluster.
- Truncate ic_local on each node; since the materialized view MV is a copy of the data, it will not be affected:
truncate table ic_local;
- Create a new distributed table ic_all_new, modifying the shard parameter of the consistent hash function in the distributed table to the number of shards in the new cluster (for example, adding a new physical machine is +1):
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
- Write data from the materialized view MV into the new distributed table; this step requires special caution, as it involves a large amount of data writing and requires close monitoring of the cluster's health status:
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
- Check the data quality before and after migration:
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;
Migrating 2.5 billion data points, number of lost data entries = 0.
Distributed write efficiency for 3 shards:
Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB
Possible Pitfalls#
- Internal access within the cluster cannot be established, prompting auth fail.
The most common issue in this situation is that the account (and password) used to access a certain database instance is not consistently present across the entire cluster. Ensure that the account and password you are using have the same permission level on all machines in the cluster to correctly access and modify the distributed table. A common solution is to use LDAP to share accounts and passwords across all instances, or to set up secrets for the cluster; see GitHub.
Query Optimization#
In our business requirements, monitoring log files is a task that should be optimized using materialized views, as many queries depend on a large number of aggregations, subqueries, and merges. This part of the logic should be solidified using materialized views.
Projection#
Projection is a new feature in ClickHouse that allows defining subsets of tables to optimize queries. It precomputes and stores specific views of data when creating tables, speeding up data retrieval. Projections include some principles of SQL rewrite optimization, enabling features like index sorting.
Let's take a price table as an example:
CREATE TABLE tutorial.price_local
(
`price_id` UInt32,
`trade_date` Date,
`code` String,
`last` Float32,
`prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192
Then there are two common SQL selects:
select * from tutorial.price_local where trade_date = '20240101';
select * from tutorial.price_local where code = '600519.SH';
By default, only our first query can hit the index, while the code cannot, because ClickHouse's secondary index is a skip index, and the rule for composite indexes is left matching. Additionally, ClickHouse cannot have two primary key indexes active simultaneously.
The conventional solution is to create a materialized view/newly partition the table based on code, but the latter affects the distributed table and is generally not suitable. The former has two issues:
- New queries need to explicitly specify the materialized view instead of the original table.
- The performance of the skip index used is also poor.
At this point, we can use Projection to solve it:
ALTER TABLE tutorial.price_local
ADD PROJECTION p1
(
SELECT
price_id,
trade_date,
code,
last,
ORDER BY code
);
After that, you can normally use code to hit the index.
Appendix#
What is a Skip Index#
A Skip Index is a very efficient data retrieval technique in ClickHouse that allows the system to skip blocks that do not contain the data required for the query, thereby reducing the amount of data queried and increasing speed.
What is a Null Engine#
The Null Engine is a special storage engine used to quickly create tables that do not store data. This type of table can be used for testing and performance tuning data flows. Often, this table can be used as a stream, indirectly providing a data flow that can be normalized and validated in ClickHouse, such as low-frequency market data.
Help, my client or DBMS reports an error when creating a materialized view: Clickhouse: IllegalArgumentException: Only groupMap is supported at this point#
Because JDBC currently does not support ClickHouse's binary custom aggregation functions, there are display issues, but MV can be created and accessed normally using clickhouse-cli.
TBC#
ref:
huangzhaowei's blog
blog
Single-machine dual-active ClickHouse
ClickHouse custom aggregation functions