ClickHouse の深層ダイブ#
ClickHouse は、オンライン分析処理(OLAP)用の列指向データベース管理システム(DBMS)です。ユーザーは SQL クエリを通じてリアルタイムで分析データレポートを生成できます。
この文書では、ClickHouse のクラスタアーキテクチャ、テーブル構造、クエリ最適化技術について深く掘り下げ、データベース管理者やデータエンジニアに技術的洞察を提供します。
クラスタアーキテクチャ#
ClickHouse のクラスタアーキテクチャは、高度なスケーラビリティと柔軟性をサポートし、異なるデータ分布と高可用性のニーズを満たすために、混合 Replica と Shard の構成を実現できます。
混合 Replica と Shard の Cluster の実現#
ClickHouse では、Sharding は分散テーブルを通じて実現され、データを水平方向に複数のノードに分割して保存できます。Replication はデータのコピーを作成することでデータの可用性と永続性を強化します。Sharding と Replication を混合して使用することで、クエリ性能を向上させつつ、システムのフォールトトレランスを高めることができます。
- Sharding:データは特定のキー(例:ユーザー ID)に基づいて異なる shards に分散されます。
- Replication:各 shard のデータは複数の replicas 間でコピーされ、データの永続性と高可用性を確保します。
深く研究した結果、実際には単一の物理マシン上で同時にsharding と replication をサポートできることがわかりました。これは単機双活 Clickhouseと、ネストされたクラスタ分散テーブル Distributed + ローカル ReplicatedMergeTree を通じて実現されます。
サンプル構成ファイルを見てみましょう。
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers> -->
次に、サンプルローカルテーブルの作成文を見てみましょう。
CREATE TABLE tutorial.order_local
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192
ここで、'{}' で囲まれた変数は config.xml の <'macros'> セクションで設定する必要があります。注意すべきは、各物理マシン上の各 ClickHouse インスタンスには異なるマクロ定義が必要です。
<macros>
<!-- ここで指定するのはshardの番号で、同一物理マシン上の異なるClickHouseインスタンスではshardの番号は一致させる必要があります -->
<shard>01</shard>
<!-- ここはreplicaの番号で、host-shard-replica構造から簡単に構成され、グローバルにユニークであることを保証します -->
<replica>247-01-1</replica>
</macros>
<!-- エンジンのデフォルト設定、引数なしでReplicatedMergeTreeを呼び出すとデフォルトパラメータが自動的に置き換えられます -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
次に、各ストレージインスタンス上の shard を集約し、分散テーブルとして外部クエリを提供する必要があります。
CREATE TABLE tutorial.order
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))
これで、同時にsharding と replication をサポートする分散テーブルが完成しました。以降のクエリ操作はこのテーブル上で行うことができ、ClickHouse は自動的に sharding の利点を利用して加速します。
注意が必要なのは、この分散テーブルは内蔵の Zookeeper を基にして shard 内部のデータの一貫性を実現しているため、Zookeeper の欠点も引き継いでいます:単一障害点に敏感です。解決策は Zookeeper クラスタを展開することです。
対応するメタデータ管理 Zookeeper 設定#
まず JDK8、JDK11、JDK21 + をインストールします。
その後、クラスタの config.xml に Zookeeper クラスタまたは単一ノードを個別に接続し、メタデータ管理を行います。
<zookeeper>
<node>
<host>localhost</host> <!-- ここでZookeeperのhostを設定 -->
<port>2181</port>
</node>
</zookeeper>
次に、Zookeeper の設定ファイル /conf/zoo.cfg で、Zookeeper が内網アドレスをリッスンするように設定します。ここでは簡便のために 0.0.0.0 に設定できます。
# リッスンポート
clientPortAddress=0.0.0.0
この構造設計の初衷は、最高速の読み取りクエリとデータの安全な保存を提供することです。書き込みと変更に関しては、この設計の性能は単純な MergeTree よりも低下します。
さらに、遅いノード、書き込み増幅、および非同期同期の問題は、特定の状況下でクラスタ全体の変更および書き込み操作がスタックする原因となるため、特に注意が必要です。
簡単な運用実践として、ユーザーが分散テーブルに書き込みや変更操作を行うことを禁止し、すべての上記操作をローカルテーブルで実行することをお勧めします。
踏み外し 1. interserver-copy の無効化#
上記の実践の中で、分散テーブルの shard 機能は正常に機能しますが、replica のコピーに問題が発生することがあります。JVM でトレースログを確認すると、内網ノードの host はデフォルトで hostname -f コマンドを使用して取得されているため、対応する DNS 解決に失敗します。
この場合、必要に応じて対応する IP アドレスに変更する必要があります。
<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
<interserver_http_host>192.168.1.253</interserver_http_host>
低レベルのストレージが JuiceFS/HDFS/S3 と接続する方法#
ClickHouse は、JuiceFS、HDFS、Amazon S3 などのさまざまなタイプのファイルシステムおよびオブジェクトストレージバックエンドをサポートしており、これらは大規模データの保存に使用できます。
ストリーミングデータ(例:市場データ)に関しては、HDFS などの別の冷ストレージにバックアップすることを検討するべきであり、ReplicateMergeTree を使用するべきではありません。
この具体的なファイルシステムストレージのソリューションは、さらに深く研究する必要があります。
- JuiceFS:Redis と S3 に基づく分散ファイルシステムで、ClickHouse テーブルを直接 JuiceFS に保存し、高性能とスケーラビリティを実現できます。
- HDFS:HDFS をストレージバックエンドとして使用することで、HDFS テーブルエンジンを介して ClickHouse 上で HDFS に保存されたデータを直接作成およびクエリできます。
- S3:S3 テーブルエンジンを介して、ClickHouse は S3 互換のストレージと直接対話でき、データをローカルストレージにコピーする必要がありません。
ClickHouse は完全にファイルベースであるため、IDC のファイルシステムと接続するために特別なコネクタサポートは必要ありません。S3 の場合、ClickHouse は対応するインターフェースを実装しているため、今後移行する際に追加の手間はかかりません。
弾性スケーリング#
この部分の設計の前提条件:計算とストレージの分離を使用しない他のフレームワークおよび ClickHouse の単一インスタンスの計算能力が強力であること。
1. 物化ビューを使用してハッシュを再計算#
物化ビュー(Materialized View)は、ClickHouse でクエリの結果を独立したテーブルとして保存するために使用され、これらのテーブルは依存するテーブルの変更を追跡し、リアルタイムで更新されます。
現在、3 つのノードを持ち、それぞれのノードに 1 つの shard と 2 つの replica があると仮定します。
現在、クラスタを 4 ノードに拡張する必要がある場合、物化ビューを使用した拡張方法は次のようになります。
- ベーステーブルを拡張します。この部分は、各ノードの config を変更するだけで済み、最終的にマスター config を統一して変更できます。
- データの再バランスを行います。まず、手動で負荷が高すぎる場合、ダウンタイムが長すぎる場合は、一般的には受け入れられません。
- 物化ビューを使用して拡張を行うことを検討します。大まかな流れは次のとおりです。
- 同じ構造の MV' を作成し、MV' のデータを 4 つのノードにハッシュします。この時、MV は依然として 3 つのノードに書き込まれます。
- 新しい shard には、status という特別な設定値があり、値は new です。MV はこのような設定項目を読み取ると、自動的にこの shard を無視します。したがって、MV にとっては、すでに 4 ノードの設定があるにもかかわらず、依然として 3 ノードです。
- MV' を作成する際に、設定項目 include_state_shard=true を指定します。新しい MV は 4 つのノードにハッシュされます。さらに、ビューを作成する際にデータ初期化機能を指定することで、ダウンタイムなしでベーステーブルデータを追跡できます。
- MV' はベーステーブルの履歴データを消費し、履歴の消費が完了した後、MV' の名前を MV に変更します(以降、MV は削除されます)。
- ベーステーブルへの書き込みを停止します。このステップは、rename 段階で分散テーブルにデータが蓄積されるのを防ぐために必要です。したがって、書き込みを停止して蓄積データをクリアする必要があります。この停止時間は数秒から数分程度です。
- MV を MV-Temp に名前変更します。rename 操作はメタデータ操作であるため、実行速度は比較的速いです。
- 物化ビュー変換器を削除し、ローカルテーブルと分散テーブルのデータテーブルの名前を変更します。
- MV' を MV に名前変更します。
- 物化ビュー変換器を削除し、ローカルテーブルと分散テーブルのデータテーブルの名前を変更し、物化ビュー変換器を再構築します(名前が異なります)。
- shard の設定項目を更新し、status キーワードを削除します。
- MV の設定項目を変更し、include_state_shard 設定項目を削除します。
ベーステーブルへの書き込みを停止する方法は、権限を直接設定することも、SQL を使用することもできます。
ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE
2. 仮想シャーディング技術を使用#
以前の rand () ハッシュ関数をjumpConsistentHash アルゴリズムに変更するだけで済みます。分散テーブル内では、クエリリクエストがハッシュ関数を再計算し、新しいノードに必要なデータパートを抽出します。流れは次のとおりです。
- 上記の 1 と同様に、新しい shard を設定してオンラインにします。
- すべての古い shard でハッシュ関数の値を再計算し、新しい shard に移行する必要がある dp のリストを取得します。
- 手動でこれらの dp を新しい shard に移行します。
この方法は、全量の再生とデータコピーを回避し、クラスタが無効になることはありませんが、移行中の正確性とデータの完全性は手動のロジック実装に依存するため、エラーが発生しやすく、最終的に完成した shard 内部の DP の数が増加するため、分散サービスの性能が低下する可能性があります。
実際にやってみる#
私たちはローカルクラスタでパーティションの移行操作をテストしました。
- まず、完全な物化ビュー MV を新規作成します。
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
- ic_all 内のデータを物化ビュー MV に挿入します。
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
- 設定に新しい shard を追加します。
- 同じテーブル構造 ic_local を作成します。
- クラスタ内の config.xml を更新します。
- 各ノードの ic_local を truncate します。物化ビュー MV はデータのコピーであるため、影響を受けません。
truncate table ic_local;
- 新しい分散テーブル ic_all_new を新規作成し、内部の分散型一貫ハッシュ関数の shard パラメータを新しいクラスタの shard の数に変更します(例えば、新しい物理マシンを追加する場合は + 1)。
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
- 物化ビュー MV 内のデータを新しい分散テーブルに書き込みます。このステップは特に注意が必要で、大量のデータ書き込みが関与するため、クラスタの健康状態を厳密に監視する必要があります。
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
- 移行前後のデータ品質を確認します。
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;
2.5 億のデータを移行し、データ喪失件数 = 0
3shard の分散書き込み効率
Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB
可能な落とし穴#
- クラスタ内部のアクセスが通じない、auth fail のエラーが表示される
この状況で最も一般的な問題は、特定のデータベースインスタンスにアクセスするために使用するアカウント(およびパスワード)がクラスタ全体で一貫して存在しないことです。使用するアカウントとパスワードがクラスタ内のすべてのマシンで同じ権限レベルを持っていることを確認する必要があります。これにより、分散テーブルに正しくアクセスし、変更できるようになります。一般的な解決策は、LDAP を使用してすべてのインスタンスでアカウントとパスワードを共有することです。もう一つの解決策は、クラスタに secret を設定することです。詳細はgithubを参照してください。
クエリ最適化#
私たちのビジネスニーズでは、監視ログファイルは物化ビューを使用して最適化すべきビジネスであり、多くのクエリが大量の集約、サブクエリ、マージに依存しています。この部分のロジックは物化ビューを使用して固定化する必要があります。
Projection#
Projection は ClickHouse の新機能で、テーブルのサブセットを定義してクエリを最適化することを可能にします。テーブル作成時にデータの特定のビューを事前計算して保存し、データの取得を加速します。Projection には SQL リライトの原理最適化が含まれており、インデックスのソートなどの機能を実現できます。
価格テーブルの例を挙げてみましょう。
CREATE TABLE tutorial.price_local
(
`price_id` UInt32,
`trade_date` Date,
`code` String,
`last` Float32,
`prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192
次に、2 つの一般的な SQL select があります。
select * from tutorial.price_local where trade_date = '20240101';
select * from tutorial.price_local where code = '600519.SH';
デフォルトでは、最初のクエリのみがインデックスにヒットし、code はヒットしません。なぜなら、ClickHouse の二次インデックスはスキップインデックスであり、複合インデックスのルールは左マッチングだからです。また、ClickHouse では 2 つの主キーインデックスを同時に有効にすることはできません。
一般的な解決策は、新しい物化ビューを作成するか、テーブルを code に基づいて複数のパーティションに分割することですが、後者は分散テーブルに影響を与えるため、一般的には適切ではありません。
前者には 2 つの問題があります。
- 新しいクエリは元のテーブルではなく、物化ビューを明示的に指定する必要があります。
- 使用されるスキップインデックスの性能も非常に悪いです。
この時、Projection を使用して解決できます。
ALTER TABLE tutorial.price_local
ADD PROJECTION p1
(
SELECT
price_id,
trade_date,
code,
last,
ORDER BY code
);
その後、code を使用してインデックスにヒットさせることができるようになります。
附録#
スキップインデックスとは何か#
スキップインデックス(Skip Index)は、ClickHouse における非常に効率的なデータ検索技術であり、システムが必要なデータを含まないブロックをスキップすることを可能にし、クエリのデータ量を減らし、速度を向上させます。
Null Engine とは何か#
Null Engine は、データを保存しないテーブルを迅速に作成するための特別なストレージエンジンです。このタイプのテーブルは、テストやパフォーマンス調整におけるデータフローに使用できます。多くの場合、このテーブルはストリームとして使用され、Clickhouse 内で規範化検証可能なデータフローを提供します。例えば、低頻度の市場データなどです。
助けて、クライアントまたは DBMS が新しい物化ビューを作成するときにエラーが発生します:Clickhouse: IllegalArgumentException: Only groupMap is supported at this point#
JDBC が現在 Clickhouse のバイナリカスタム集約関数をサポートしていないため、表示に問題がありますが、MV は正常に作成およびアクセスできます。clickhouse-cli を使用できます。
TBC#
ref:
huangzhaowei's blog
blog
単機双活 clickhouse
Clickhouse カスタム集約関数