banner
DarkMesh

Dark Flame Master

DarkMesh的小屋

Dive Deep in ClickHouse

Dive Deep in ClickHouse#

ClickHouse 是一个用于联机分析处理 (OLAP) 的列式数据库管理系统 (DBMS)。它允许用户通过 SQL 查询实时生成分析数据报告。
此文档深入探讨 ClickHouse 的集群架构、表结构和查询优化技术,为数据库管理员和数据工程师提供技术洞见。


集群架构#

ClickHouse 的集群架构支持高度的可扩展性和灵活性,可以实现混合 Replica 和 Shard 的配置,满足不同的数据分布和高可用性需求。

实现混合 Replica 和 Shard 的 Cluster#

在 ClickHouse 中,Sharding 是通过分布式表实现的,它允许将数据水平分割存储在多个节点上。Replication 则通过创建数据的副本来增强数据的可用性和持久性。混合使用 Sharding 和 Replication 可以在提升查询性能的同时,增加系统的容错性。

  • Sharding:数据根据某个键(如用户 ID)分布在不同的 shards 中。
  • Replication:每个 shard 的数据在多个 replicas 之间复制,确保数据的持久性和高可用。

深入研究之后,我们发现其实可以满足在单一物理机上同时支持sharding 和 replication, 这需要通过单机双活 Clickhouse, 和嵌套集群分布式表 Distributed + 本地 ReplicatedMergeTree 来实现

我们看一下示例配置文件

<remote_servers>
    <default>
        <shard>
            <replica>
                <host>localhost</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.250</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>192.168.1.250</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.253</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>localhost</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.253</host>
                <port>9000</port>
            </replica>
        </shard>
    </default>
</remote_servers> -->

然后咱们来看一个 sample local table 的建表语句:

CREATE TABLE tutorial.order_local
(
    `order_id` UInt32,
    `order_date` Date,
    `quantity` UInt8,
    `last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192

其中,'{}' 包裹的变量需要在 config.xml 里面的 <'macros'>section 进行配置,注意每台物理机上的每个 clickhouse 实例需要不同的宏定义如下

<macros>
  <!-- 这里指定的是shard的编号, 在同一台物理机上的不同clickhouse实例里面, shard的编号应该保持一致-->
  <shard>01</shard>  
  <!-- 这里是replica的编号,可以简单由host-shard-replica结构组成,确保全局唯一 -->
  <replica>247-01-1</replica> 
</macros> 

<!-- 引擎默认配置,如果无参调用ReplicatedMergeTree就会自动替代默认参数 -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>  
<default_replica_name>{replica}</default_replica_name>

接下来我们需要将每个存储 instance 上面的 shard 聚合起来,作为分布式表提供对外查询

 CREATE TABLE tutorial.order
(
    `order_id` UInt32,
    `order_date` Date,
    `quantity` UInt8,
    `last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))

到此为止我们就完成了一个同时支持sharding 和 replication 的分布式表,之后的查询操作都可以在这个表上面进行,Clickhouse 会自动利用 sharding 的优势进行加速.

需要注意的是,这种分布式表是基于内置 zookeeper 来实现 shard 内部的数据一致性的,所以也继承了 zookeeper 的缺点:对单点故障敏感. 解决方式是部署 zookeeper 集群

对应的元数据管理 Zookeeper 配置#

先安装 JDK8, JDK11, JDK 21+

之后我们需要将集群的 config.xml 里面单独接入 zookeeper 集群或者单机,用来进行元数据管理

    <zookeeper>
        <node>
            <host>localhost</host>  <!-- 这里配置zookeeper的host -->
            <port>2181</port>
        </node>
    </zookeeper>

之后在 zookeeper 的配置文件 /conf/zoo.cfg 里面,配置 zookeeper 监听内网地址,这里为了简便可以设置为 0.0.0.0

# The listening port
clientPortAddress=0.0.0.0

该结构设计的初衷是提供最高速的 read 查询以及数据的安全存储,针对写入和变更,该设计的性能会比单纯的 MergeTree 要下降.
此外慢节点, 写放大异步同步的问题仍然会导致整个集群的变更及写入操作在某些特定情况下 stuck, 需要特别注意.

一个简单的生产实践是禁止用户向 distributed 表里面进行写入及更改操作,并将所有的上述操作在 local table 上执行即可.

踩坑 1. interserver-copy 失效#

在上述实践中,我们会发现,分布式表的 shard 功能正常生效,但是 replica 的复制会有问题,进入 JVM 查看 tracelog 会发现,内网节点的 host 默认是通过 hostname -f 指令来获取的,导致对应的 DNS 解析失败.
这里视情况可能需要将其修改为对应的 IP address:

		<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
    <interserver_http_host>192.168.1.253</interserver_http_host>

底层的存储如何与 JuiceFS/HDFS/S3 对接#

ClickHouse 支持多种类型的文件系统和对象存储后端,如 JuiceFS、HDFS 和 Amazon S3,这些可以用于存放大规模的数据。
针对流式数据 (例如市场行情数据), 我们可能更多的应该将其添加到另一个冷存储区进行备份,例如 HDFS, 而不是使用 ReplicateMergeTree
这个具体文件系统存储的方案可能还需要再深入研究.

  • JuiceFS:一个基于 Redis 与 S3 的分布式文件系统,可以将 ClickHouse 表直接存储在 JuiceFS 上,实现高性能与可伸缩性。
  • HDFS:使用 HDFS 作为存储后端,可以通过 HDFS 表引擎直接在 ClickHouse 上创建和查询存储在 HDFS 上的数据。
  • S3:通过 S3 表引擎,ClickHouse 可以直接与 S3 兼容的存储交互,无需复制数据到本地存储。

鉴于 Clickhouse 是完整基于文件的,所以对接 IDC 的文件系统并不需要特殊的 connector 支持. S3 的情况下,Clickhouse 也实现了对应接口, 所以我们之后如果迁移,不需要额外的 efforts

弹性扩容#

这部分设计的大前提:我们不使用存算分离的其他框架以及 clickhouse 单机的运算性能够强.

1. 使用物化视图重算哈希#

物化视图(Materialized View)在 ClickHouse 中用于存储查询的结果作为独立的表,这些表跟踪对其依赖表的修改,并实时更新。

假设我们现在拥有三个节点,每个节点上有一个 shard,2 个 replica, 如图所示

现在我们需要扩容集群到 4 节点,那么一个使用物化视图的扩容方式应该如下所示:

  1. 将底表扩容,这部分只需要修改每个节点对应的 config, 最后统一修改 master config 即可
  2. 数据再平衡,首先手动 fetch 负载过高,停机时间过长,一般不能接受
  • 我们考虑采用物化视图来进行扩容,大致流程如下:
    1. 创建一个同结构的 MV’, 将 MV’数据 hash 到 4 个节点;而此时 MV 依然写入到 3 个节点
    2. 新的 shard, 有一个特殊的配置值,名为 status, 值为 new; MV 读取到这类配置项,会自动忽略这类 shard, 因此对于 MV 来说,尽管已经 4 节点配置,但它自己依然是 3 节点
    3. 创建 MV’时,指定配置项 include_state_shard=true, 新 MV 将 hash 到 4 个节点;另外创建视图指定数据初始化能力,这样就能需要不停服的回追底表数据了.
    4. MV’消费底表的历史数据,等历史消费完毕后,开始将 MV’重命名为 MV (后续 MV 会被删除)
    5. 停止底表写入, 这个步骤是为了防止在 rename 阶段,分布式表上有数据积压,因此必须停写清空积压数据,这个停写时间大概是秒 - 分钟级别
    6. MV 重命名为 MV-Temp, 由于 rename 操作是一个元数据操作,因此执行速度比较快
      • 删除物化视图转化器,重命名数据表的本地表和分布式表
    7. MV’ 重命名为 MV,
      • 删除物化视图转化器,重命名数据表的本地表和分布式表,重建物化视图转化器 (名字不一样了)
      • 更新 shard 配置项,去除 status 关键字
      • 修改 MV 的配置项,删除 include_state_shard 配置项.

停止底表写入的方法可以直接设置权限,也可以用 SQL

ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE

2. 使用虚拟分片技术#

我们直接修改之前的 rand () 哈希函数到jumpConsistentHash 算法即可,在分布式表中,查询请求会重新计算哈希函数,从而抽取需要的 data part 到新的节点中。流程如下:

  1. 和上文中 1 一样,配置并上线新的 shard
  2. 在所有的旧 shard 上面重算 hash 函数的值,获取需要迁移到新的 shard 的 dp 的列表
  3. 手动将这些 dp 迁移到新的 shard 即可

这个方法避免了全量的重放和数据拷贝,且不会使集群失能,但是迁移的过程中正确性和数据的完整性需要依靠手写的逻辑实现,容易出错,且最终完成的 shard 内部 DP 的数量会增多,这一定程度上降低了分布式服务的性能.

动手做一做#

我们在本地 cluster 上做了测试,迁移 partition 的操作

  1. 先新建一个全量的物化视图 MV
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
  1. 将 ic_all 里面的数据插入到物化视图 MV 中
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
  1. 在配置中添加新的 shard
    • 建立一样的表结构 ic_local
    • 更新集群里面的 config.xml
  2. truncate 掉每个节点上的 ic_local, 因为物化视图 MV 是数据的复制,所以不会受到影响
truncate table ic_local;
  1. 新建一个新的分布式表 ic_all_new, 修改里面的分布型一致哈希函数的 shard 参数为新集群里 shard 的数量 (例如扩容一个新物理机就是 + 1)
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
  1. 将物化视图 MV 里的数据写入到新的分布式表里面,这步要尤其小心,因为涉及到大量的数据写入,需要紧密监控集群的健康状态
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
  1. 检查迁移前后的数据质量
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;

迁移 2.5 billion 的数据,数据丢失条数 = 0
3shard 的分布式写入效率

Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB

可能的坑#

  1. 集群 cluster 内部访问不能打通,提示 auth fail
    这种情况最常见的问题是你用于访问某个数据库 instance 的账户(与密码)没有在整个集群中均存在且保持一致,要确保你使用的账密在集群中的全部机器上都拥有相同的权限级别,这样才能正确访问修改分布式表。一个常见的解决办法是使用 LDAP 在所有的实例上共享账密,另一个解决办法是为集群设置 secret,详见github

查询优化#

在咱们的业务需求中,监控日志文件是一个应该用物化视图进行优化的业务,因为其很多查询依赖于大量的聚合,子查询和合并。这部分的逻辑应该使用物化视图进行固化.

Projection#

Projection 是 ClickHouse 中的一项新功能,允许定义表的子集合并优化查询。它在创建表时预计算并存储数据的特定视图,加速数据检索。Projection 里面包含了 SQL rewrite 的一些原理优化,可以实现索引排序等功能.

我们用一张 price 表举个例子:

CREATE TABLE tutorial.price_local
(
    `price_id` UInt32,
    `trade_date` Date,
    `code` String,
    `last` Float32,
    `prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192

然后有两种常规的 SQL select:

select * from tutorial.price_local where trade_date = '20240101';

select * from tutorial.price_local where code = '600519.SH';

在默认情况下,只有我们的第一个查询能够命中索引,而 code 不可以,因为 ClickHouse 里面的二级索引是跳数索引,而组合索引的规则是左匹配。且 clickhouse 不能设置两个主键索引同时生效.

常规的解决方案是新建一个物化视图 / 将表拆分成多个 partition based on code, 后者影响了分布式表,一般不合适.
前者又有两个问题

  1. 新的查询需要显式指定查询物化视图而不是原表
  2. 本身使用的跳数索引性能也很差

这个时候我们可以用 Projection 来解决

ALTER TABLE tutorial.price_local
    ADD PROJECTION p1
    (
        SELECT
            price_id,
            trade_date,
            code,
            last,
        ORDER BY code
    );

之后就可以正常使用 code 命中索引了.

附录#

什么是跳数索引#

跳数索引(Skip Index)是 ClickHouse 中一种非常高效的数据检索技术,它允许系统跳过不包含查询所需数据的块,从而减少查询的数据量和提高速度。

什么是 Null Engine#

Null Engine 是一种特殊的存储引擎,用于快速创建不存储数据的表。这种类型的表可用于测试和性能调优中的数据流。在很多时候这个表可以作为 stream 来使用,变相在 Clickhouse 里提供可以被规范化校验的数据流,例如低频的行情数据

救命,我的 client 或者 DBMS 在新建物化视图的时候报错:Clickhouse: IllegalArgumentException: Only groupMap is supported at this point#

因为 JDBC 目前不支持 Clickhouse 的二进制自定义聚合函数,所以显示有问题,但 MV 可以正常创建和访问,可以使用 clickhouse-cli

JDBC limitations

TBC#

ref:
huangzhaowei's blog
blog
单机双活 clickhouse
Clickhouse 自定义聚合函数

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。