ClickHouse是一个用于联机分析处理(OLAP)的列式数据库管理系统(DBMS)。它允许用户通过SQL查询实时生成分析数据报告。
此文档深入探讨ClickHouse的集群架构、表结构和查询优化技术,为数据库管理员和数据工程师提供技术洞见。
ClickHouse的集群架构支持高度的可扩展性和灵活性,可以实现混合Replica和Shard的配置,满足不同的数据分布和高可用性需求。
在ClickHouse中,Sharding是通过分布式表实现的,它允许将数据水平分割存储在多个节点上。Replication则通过创建数据的副本来增强数据的可用性和持久性。混合使用Sharding和Replication可以在提升查询性能的同时,增加系统的容错性。
深入研究之后,我们发现其实可以满足在单一物理机上同时支持sharding和replication, 这需要通过单机双活Clickhouse, 和嵌套集群分布式表Distributed + 本地ReplicatedMergeTree来实现
我们看一下示例配置文件
<remote_servers>
<default>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>192.168.1.250</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>localhost</host>
<port>9000</port>
</replica>
<replica>
<host>192.168.1.253</host>
<port>9000</port>
</replica>
</shard>
</default>
</remote_servers> -->
然后咱们来看一个sample local table的建表语句:
CREATE TABLE tutorial.order_local
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192
其中,'{}'包裹的变量需要在config.xml里面的<'macros'>section进行配置,注意每台物理机上的每个clickhouse实例需要不同的宏定义如下
<macros>
<!-- 这里指定的是shard的编号, 在同一台物理机上的不同clickhouse实例里面, shard的编号应该保持一致-->
<shard>01</shard>
<!-- 这里是replica的编号,可以简单由host-shard-replica结构组成,确保全局唯一 -->
<replica>247-01-1</replica>
</macros>
<!-- 引擎默认配置,如果无参调用ReplicatedMergeTree就会自动替代默认参数 -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
接下来我们需要将每个存储instance上面的shard聚合起来,作为分布式表提供对外查询
CREATE TABLE tutorial.order
(
`order_id` UInt32,
`order_date` Date,
`quantity` UInt8,
`last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))
到此为止我们就完成了一个同时支持sharding和replication的分布式表,之后的查询操作都可以在这个表上面进行,Clickhouse会自动利用sharding的优势进行加速.
需要注意的是,这种分布式表是基于内置zookeeper来实现shard内部的数据一致性的,所以也继承了zookeeper的缺点: 对单点故障敏感. 解决方式是部署zookeeper集群
先安装JDK8, JDK11, JDK 21+
之后我们需要将集群的config.xml里面单独接入zookeeper集群或者单机,用来进行元数据管理
<zookeeper>
<node>
<host>localhost</host> <!-- 这里配置zookeeper的host -->
<port>2181</port>
</node>
</zookeeper>
之后在zookeeper的配置文件/conf/zoo.cfg里面,配置zookeeper监听内网地址,这里为了简便可以设置为0.0.0.0
# The listening port
clientPortAddress=0.0.0.0
该结构设计的初衷是提供最高速的read查询以及数据的安全存储,针对写入和变更,该设计的性能会比单纯的MergeTree要下降.
此外慢节点, 写放大和异步同步的问题仍然会导致整个集群的变更及写入操作在某些特定情况下stuck,需要特别注意.
一个简单的生产实践是禁止用户向distributed表里面进行写入及更改操作,并将所有的上述操作在local table上执行即可.
在上述实践中,我们会发现,分布式表的shard功能正常生效,但是replica的复制会有问题,进入JVM查看tracelog会发现,内网节点的host默认是通过hostname -f指令来获取的,导致对应的DNS解析失败.
这里视情况可能需要将其修改为对应的IP address:
<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
<interserver_http_host>192.168.1.253</interserver_http_host>
ClickHouse支持多种类型的文件系统和对象存储后端,如JuiceFS、HDFS和Amazon S3,这些可以用于存放大规模的数据。
针对流式数据(例如市场行情数据),我们可能更多的应该将其添加到另一个冷存储区进行备份,例如HDFS,而不是使用ReplicateMergeTree
这个具体文件系统存储的方案可能还需要再深入研究.
鉴于Clickhouse是完整基于文件的, 所以对接IDC的文件系统并不需要特殊的connector支持. S3的情况下,Clickhouse也实现了对应接口,所以我们之后如果迁移,不需要额外的efforts
这部分设计的大前提:我们不使用存算分离的其他框架以及clickhouse单机的运算性能够强.
物化视图(Materialized View)在ClickHouse中用于存储查询的结果作为独立的表,这些表跟踪对其依赖表的修改,并实时更新。
假设我们现在拥有三个节点,每个节点上有一个shard,2个replica,如图所示
现在我们需要扩容集群到4节点,那么一个使用物化视图的扩容方式应该如下所示:
停止底表写入的方法可以直接设置权限,也可以用SQL
ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE
我们直接修改之前的rand()哈希函数到jumpConsistentHash算法即可,在分布式表中,查询请求会重新计算哈希函数,从而抽取需要的data part到新的节点中.流程如下:
这个方法避免了全量的重放和数据拷贝,且不会使集群失能,但是迁移的过程中正确性和数据的完整性需要依靠手写的逻辑实现,容易出错,且最终完成的shard内部DP的数量会增多,这一定程度上降低了分布式服务的性能.
我们在本地cluster上做了测试,迁移partition的操作
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
truncate table ic_local;
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;
迁移 2.5 billion的数据,数据丢失条数 = 0
3shard的分布式写入效率
Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB
在咱们的业务需求中,监控日志文件是一个应该用物化视图进行优化的业务,因为其很多查询依赖于大量的聚合,子查询和合并.这部分的逻辑应该使用物化视图进行固化.
Projection是ClickHouse中的一项新功能,允许定义表的子集合并优化查询。它在创建表时预计算并存储数据的特定视图,加速数据检索。Projection里面包含了SQL rewrite的一些原理优化,可以实现索引排序等功能.
我们用一张price表举个例子:
CREATE TABLE tutorial.price_local
(
`price_id` UInt32,
`trade_date` Date,
`code` String,
`last` Float32,
`prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192
然后有两种常规的SQL select:
select * from tutorial.price_local where trade_date = '20240101';
select * from tutorial.price_local where code = '600519.SH';
在默认情况下,只有我们的第一个查询能够命中索引,而code不可以,因为ClickHouse里面的二级索引是跳数索引,而组合索引的规则是左匹配.且clickhouse不能设置两个主键索引同时生效.
常规的解决方案是新建一个物化视图/将表拆分成多个partition based on code,后者影响了分布式表,一般不合适.
前者又有两个问题
这个时候我们可以用Projection来解决
ALTER TABLE tutorial.price_local
ADD PROJECTION p1
(
SELECT
price_id,
trade_date,
code,
last,
ORDER BY code
);
之后就可以正常使用code命中索引了.
跳数索引(Skip Index)是ClickHouse中一种非常高效的数据检索技术,它允许系统跳过不包含查询所需数据的块,从而减少查询的数据量和提高速度。
Null Engine是一种特殊的存储引擎,用于快速创建不存储数据的表。这种类型的表可用于测试和性能调优中的数据流。在很多时候这个表可以作为stream来使用,变相在Clickhouse里提供可以被规范化校验的数据流,例如低频的行情数据
因为JDBC目前不支持Clickhouse的二进制自定义聚合函数,所以显示有问题,但MV可以正常创建和访问,可以使用clickhouse-cli
ref:
huangzhaowei's blog
blog
单机双活clickhouse
Clickhouse自定义聚合函数