深入探討 ClickHouse#
ClickHouse 是一個用於聯機分析處理 (OLAP) 的列式資料庫管理系統 (DBMS)。它允許用戶通過 SQL 查詢即時生成分析資料報告。
此文檔深入探討 ClickHouse 的集群架構、表結構和查詢優化技術,為資料庫管理員和資料工程師提供技術洞見。
集群架構#
ClickHouse 的集群架構支持高度的可擴展性和靈活性,可以實現混合 Replica 和 Shard 的配置,滿足不同的資料分佈和高可用性需求。
實現混合 Replica 和 Shard 的 Cluster#
在 ClickHouse 中,Sharding 是通過分佈式表實現的,它允許將資料水平分割存儲在多個節點上。Replication 則通過創建資料的副本來增強資料的可用性和持久性。混合使用 Sharding 和 Replication 可以在提升查詢性能的同時,增加系統的容錯性。
- Sharding:資料根據某個鍵(如用戶 ID)分佈在不同的 shards 中。
- Replication:每個 shard 的資料在多個 replicas 之間複製,確保資料的持久性和高可用。
深入研究之後,我們發現其實可以滿足在單一物理機上同時支持sharding 和 replication,這需要通過單機雙活 Clickhouse,和嵌套集群分佈式表 Distributed + 本地 ReplicatedMergeTree 來實現。
我們看一下示例配置文件
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers>
然後咱們來看一個 sample local table 的建表語句:
CREATE TABLE tutorial.order_local
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192
其中,'{}' 包裹的變數需要在 config.xml 裡面的 <'macros'>section 進行配置,注意每台物理機上的每個 clickhouse 實例需要不同的宏定義如下。
<macros>
<!-- 這裡指定的是shard的編號,在同一台物理機上的不同clickhouse實例裡,shard的編號應該保持一致-->
<shard>01</shard>
<!-- 這裡是replica的編號,可以簡單由host-shard-replica結構組成,確保全局唯一 -->
<replica>247-01-1</replica>
</macros>
<!-- 引擎默認配置,如果無參調用ReplicatedMergeTree就會自動替代默認參數 -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
接下來我們需要將每個存儲 instance 上面的 shard 聚合起來,作為分佈式表提供對外查詢。
CREATE TABLE tutorial.order
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))
到此為止我們就完成了一個同時支持sharding 和 replication 的分佈式表,之後的查詢操作都可以在這個表上面進行,Clickhouse 會自動利用 sharding 的優勢進行加速。
需要注意的是,這種分佈式表是基於內置 zookeeper 來實現 shard 內部的資料一致性的,所以也繼承了 zookeeper 的缺點:對單點故障敏感。解決方式是部署 zookeeper 集群。
對應的元資料管理 Zookeeper 配置#
先安裝 JDK8, JDK11, JDK 21+
之後我們需要將集群的 config.xml 裡面單獨接入 zookeeper 集群或者單機,用來進行元資料管理。
<zookeeper>
<node>
<host>localhost</host> <!-- 這裡配置zookeeper的host -->
<port>2181</port>
</node>
</zookeeper>
之後在 zookeeper 的配置文件 /conf/zoo.cfg 裡面,配置 zookeeper 監聽內網地址,這裡為了簡便可以設置為 0.0.0.0。
# The listening port
clientPortAddress=0.0.0.0
該結構設計的初衷是提供最高速的 read 查詢以及資料的安全存儲,針對寫入和變更,該設計的性能會比單純的 MergeTree 要下降。
此外慢節點、寫放大和異步同步的問題仍然會導致整個集群的變更及寫入操作在某些特定情況下 stuck,需要特別注意。
一個簡單的生產實踐是禁止用戶向 distributed 表裡面進行寫入及更改操作,並將所有的上述操作在 local table 上執行即可。
踩坑 1. interserver-copy 失效#
在上述實踐中,我們會發現,分佈式表的 shard 功能正常生效,但是 replica 的複製會有問題,進入 JVM 查看 tracelog 會發現,內網節點的 host 默認是通過 hostname -f 指令來獲取的,導致對應的 DNS 解析失敗。
這裡視情況可能需要將其修改為對應的 IP address:
<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
<interserver_http_host>192.168.1.253</interserver_http_host>
底層的存儲如何與 JuiceFS/HDFS/S3 對接#
ClickHouse 支持多種類型的文件系統和對象存儲後端,如 JuiceFS、HDFS 和 Amazon S3,這些可以用於存放大規模的資料。
針對流式資料(例如市場行情資料),我們可能更多的應該將其添加到另一個冷存儲區進行備份,例如 HDFS,而不是使用 ReplicateMergeTree。
這個具體文件系統存儲的方案可能還需要再深入研究。
- JuiceFS:一個基於 Redis 與 S3 的分佈式文件系統,可以將 ClickHouse 表直接存儲在 JuiceFS 上,實現高性能與可伸縮性。
- HDFS:使用 HDFS 作為存儲後端,可以通過 HDFS 表引擎直接在 ClickHouse 上創建和查詢存儲在 HDFS 上的資料。
- S3:通過 S3 表引擎,ClickHouse 可以直接與 S3 兼容的存儲互動,無需複製資料到本地存儲。
鑑於 Clickhouse 是完整基於文件的,所以對接 IDC 的文件系統並不需要特殊的 connector 支持。在 S3 的情況下,Clickhouse 也實現了對應接口,所以我們之後如果遷移,不需要額外的 efforts。
彈性擴容#
這部分設計的大前提:我們不使用存算分離的其他框架以及 clickhouse 單機的運算性能夠強。
1. 使用物化視圖重算哈希#
物化視圖(Materialized View)在 ClickHouse 中用於存儲查詢的結果作為獨立的表,這些表跟蹤對其依賴表的修改,並即時更新。
假設我們現在擁有三個節點,每個節點上有一個 shard,2 個 replica,如圖所示。
現在我們需要擴容集群到 4 節點,那麼一個使用物化視圖的擴容方式應該如下所示:
- 將底表擴容,這部分只需要修改每個節點對應的 config,最後統一修改 master config 即可。
- 資料再平衡,首先手動 fetch 負載過高,停機時間過長,一般不能接受。
- 我們考慮採用物化視圖來進行擴容,大致流程如下:
- 創建一個同結構的 MV’,將 MV’資料 hash 到 4 個節點;而此時 MV 依然寫入到 3 個節點。
- 新的 shard,有一個特殊的配置值,名為 status,值為 new;MV 讀取到這類配置項,會自動忽略這類 shard,因此對於 MV 來說,儘管已經 4 節點配置,但它自己依然是 3 節點。
- 創建 MV’時,指定配置項 include_state_shard=true,新 MV 將 hash 到 4 個節點;另外創建視圖指定資料初始化能力,這樣就能需要不停服的回追底表資料了。
- MV’消費底表的歷史資料,等歷史消費完畢後,開始將 MV’重命名為 MV(後續 MV 會被刪除)。
- 停止底表寫入,這個步驟是為了防止在 rename 階段,分佈式表上有資料積壓,因此必須停寫清空積壓資料,這個停寫時間大概是秒 - 分鐘級別。
- MV 重命名為 MV-Temp,由於 rename 操作是一個元資料操作,因此執行速度比較快。
- 刪除物化視圖轉化器,重命名資料表的本地表和分佈式表。
- MV’重命名為 MV,
- 刪除物化視圖轉化器,重命名資料表的本地表和分佈式表,重建物化視圖轉化器(名字不一樣了)。
- 更新 shard 配置項,去除 status 關鍵字。
- 修改 MV 的配置項,刪除 include_state_shard 配置項。
停止底表寫入的方法可以直接設置權限,也可以用 SQL。
ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE
2. 使用虛擬分片技術#
我們直接修改之前的 rand () 哈希函數到jumpConsistentHash 算法即可,在分佈式表中,查詢請求會重新計算哈希函數,從而抽取需要的 data part 到新的節點中。流程如下:
- 和上文中 1 一樣,配置並上線新的 shard。
- 在所有的舊 shard 上面重算 hash 函數的值,獲取需要遷移到新的 shard 的 dp 的列表。
- 手動將這些 dp 遷移到新的 shard 即可。
這個方法避免了全量的重放和資料拷貝,且不會使集群失能,但是遷移的過程中正確性和資料的完整性需要依靠手寫的邏輯實現,容易出錯,且最終完成的 shard 內部 DP 的數量會增多,這一定程度上降低了分佈式服務的性能。
動手做一做#
我們在本地 cluster 上做了測試,遷移 partition 的操作。
- 先新建一個全量的物化視圖 MV。
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
- 將 ic_all 裡面的資料插入到物化視圖 MV 中。
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
- 在配置中添加新的 shard。
- 建立一樣的表結構 ic_local。
- 更新集群裡面的 config.xml。
- truncate 掉每個節點上的 ic_local,因為物化視圖 MV 是資料的複製,所以不會受到影響。
truncate table ic_local;
- 新建一個新的分佈式表 ic_all_new,修改裡面的分佈型一致哈希函數的 shard 參數為新集群裡 shard 的數量(例如擴容一個新物理機就是 + 1)。
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
- 將物化視圖 MV 裡的資料寫入到新的分佈式表裡,這步要尤其小心,因為涉及到大量的資料寫入,需要緊密監控集群的健康狀態。
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
- 檢查遷移前後的資料質量。
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;
遷移 2.5 billion 的資料,資料丟失條數 = 0。
3shard 的分佈式寫入效率。
Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB
可能的坑#
- 集群 cluster 內部訪問不能打通,提示 auth fail。
這種情況最常見的問題是你用於訪問某個資料庫 instance 的賬戶(與密碼)沒有在整個集群中均存在且保持一致,要確保你使用的賬密在集群中的全部機器上都擁有相同的權限級別,這樣才能正確訪問修改分佈式表。一個常見的解決辦法是使用 LDAP 在所有的實例上共享賬密,另一個解決辦法是為集群設置 secret,詳見github。
查詢優化#
在我們的業務需求中,監控日誌文件是一個應該用物化視圖進行優化的業務,因為其很多查詢依賴於大量的聚合、子查詢和合併。這部分的邏輯應該使用物化視圖進行固化。
Projection#
Projection 是 ClickHouse 中的一項新功能,允許定義表的子集合併優化查詢。它在創建表時預計算並存儲資料的特定視圖,加速資料檢索。Projection 裡面包含了 SQL rewrite 的一些原理優化,可以實現索引排序等功能。
我們用一張 price 表舉個例子:
CREATE TABLE tutorial.price_local
(
`price_id` UInt32,
`trade_date` Date,
`code` String,
`last` Float32,
`prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192
然後有兩種常規的 SQL select:
select * from tutorial.price_local where trade_date = '20240101';
select * from tutorial.price_local where code = '600519.SH';
在默認情況下,只有我們的第一個查詢能夠命中索引,而 code 不可以,因為 ClickHouse 裡面的二級索引是跳數索引,而組合索引的規則是左匹配。且 clickhouse 不能設置兩個主鍵索引同時生效。
常規的解決方案是新建一個物化視圖 / 將表拆分成多個 partition based on code,後者影響了分佈式表,一般不合適。
前者又有兩個問題:
- 新的查詢需要顯式指定查詢物化視圖而不是原表。
- 本身使用的跳數索引性能也很差。
這個時候我們可以用 Projection 來解決。
ALTER TABLE tutorial.price_local
ADD PROJECTION p1
(
SELECT
price_id,
trade_date,
code,
last,
ORDER BY code
);
之後就可以正常使用 code 命中索引了。
附錄#
什麼是跳數索引#
跳數索引(Skip Index)是 ClickHouse 中一種非常高效的資料檢索技術,它允許系統跳過不包含查詢所需資料的塊,從而減少查詢的資料量和提高速度。
什麼是 Null Engine#
Null Engine 是一種特殊的存儲引擎,用於快速創建不存儲資料的表。這種類型的表可用於測試和性能調優中的資料流。在很多時候這個表可以作為 stream 來使用,變相在 Clickhouse 裡提供可以被規範化校驗的資料流,例如低頻的行情資料。
救命,我的 client 或者 DBMS 在新建物化視圖的時候報錯:Clickhouse: IllegalArgumentException: Only groupMap is supported at this point#
因為 JDBC 目前不支持 Clickhouse 的二進制自定義聚合函數,所以顯示有問題,但 MV 可以正常創建和訪問,可以使用 clickhouse-cli。
TBC#
ref:
huangzhaowei's blog
blog
單機雙活 clickhouse
Clickhouse 自定義聚合函數