xChar

Dive Deep in ClickHouse

ClickHouse是一个用于联机分析处理(OLAP)的列式数据库管理系统(DBMS)。它允许用户通过SQL查询实时生成分析数据报告。
此文档深入探讨ClickHouse的集群架构、表结构和查询优化技术,为数据库管理员和数据工程师提供技术洞见。


集群架构

ClickHouse的集群架构支持高度的可扩展性和灵活性,可以实现混合Replica和Shard的配置,满足不同的数据分布和高可用性需求。

实现混合Replica和Shard的Cluster

在ClickHouse中,Sharding是通过分布式表实现的,它允许将数据水平分割存储在多个节点上。Replication则通过创建数据的副本来增强数据的可用性和持久性。混合使用Sharding和Replication可以在提升查询性能的同时,增加系统的容错性。

  • Sharding:数据根据某个键(如用户ID)分布在不同的shards中。
  • Replication:每个shard的数据在多个replicas之间复制,确保数据的持久性和高可用。

深入研究之后,我们发现其实可以满足在单一物理机上同时支持sharding和replication, 这需要通过单机双活Clickhouse, 和嵌套集群分布式表Distributed + 本地ReplicatedMergeTree来实现

我们看一下示例配置文件

<remote_servers>
    <default>
        <shard>
            <replica>
                <host>localhost</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.250</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>192.168.1.250</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.253</host>
                <port>9000</port>
            </replica>
        </shard>
        <shard>
            <replica>
                <host>localhost</host>
                <port>9000</port>
            </replica>
            <replica>
                <host>192.168.1.253</host>
                <port>9000</port>
            </replica>
        </shard>
    </default>
</remote_servers> -->

然后咱们来看一个sample local table的建表语句:

CREATE TABLE tutorial.order_local
(
    `order_id` UInt32,
    `order_date` Date,
    `quantity` UInt8,
    `last_price` UInt32,
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/rep_local', '{replica}')
PARTITION BY toYear(order_date)
ORDER BY (order_date, order_id)
SETTINGS index_granularity = 8192

其中,'{}'包裹的变量需要在config.xml里面的<'macros'>section进行配置,注意每台物理机上的每个clickhouse实例需要不同的宏定义如下

<macros>
  <!-- 这里指定的是shard的编号, 在同一台物理机上的不同clickhouse实例里面, shard的编号应该保持一致-->
  <shard>01</shard>  
  <!-- 这里是replica的编号,可以简单由host-shard-replica结构组成,确保全局唯一 -->
  <replica>247-01-1</replica> 
</macros> 

<!-- 引擎默认配置,如果无参调用ReplicatedMergeTree就会自动替代默认参数 -->
<default_replica_path>/clickhouse/tables/{database}/{table}</default_replica_path>  
<default_replica_name>{replica}</default_replica_name>

接下来我们需要将每个存储instance上面的shard聚合起来,作为分布式表提供对外查询

 CREATE TABLE tutorial.order
(
    `order_id` UInt32,
    `order_date` Date,
    `quantity` UInt8,
    `last_price` UInt32,
)
ENGINE = Distributed('default', 'tutorial', 'order_local', xxHash32(order_id))

到此为止我们就完成了一个同时支持sharding和replication的分布式表,之后的查询操作都可以在这个表上面进行,Clickhouse会自动利用sharding的优势进行加速.

需要注意的是,这种分布式表是基于内置zookeeper来实现shard内部的数据一致性的,所以也继承了zookeeper的缺点: 对单点故障敏感. 解决方式是部署zookeeper集群

对应的元数据管理Zookeeper配置

先安装JDK8, JDK11, JDK 21+

之后我们需要将集群的config.xml里面单独接入zookeeper集群或者单机,用来进行元数据管理

    <zookeeper>
        <node>
            <host>localhost</host>  <!-- 这里配置zookeeper的host -->
            <port>2181</port>
        </node>
    </zookeeper>

之后在zookeeper的配置文件/conf/zoo.cfg里面,配置zookeeper监听内网地址,这里为了简便可以设置为0.0.0.0

# The listening port
clientPortAddress=0.0.0.0

该结构设计的初衷是提供最高速的read查询以及数据的安全存储,针对写入和变更,该设计的性能会比单纯的MergeTree要下降.
此外慢节点, 写放大异步同步的问题仍然会导致整个集群的变更及写入操作在某些特定情况下stuck,需要特别注意.

一个简单的生产实践是禁止用户向distributed表里面进行写入及更改操作,并将所有的上述操作在local table上执行即可.

踩坑1. interserver-copy失效

在上述实践中,我们会发现,分布式表的shard功能正常生效,但是replica的复制会有问题,进入JVM查看tracelog会发现,内网节点的host默认是通过hostname -f指令来获取的,导致对应的DNS解析失败.
这里视情况可能需要将其修改为对应的IP address:

		<!-- <interserver_http_host>example.clickhouse.com</interserver_http_host> -->
    <interserver_http_host>192.168.1.253</interserver_http_host>

底层的存储如何与JuiceFS/HDFS/S3对接

ClickHouse支持多种类型的文件系统和对象存储后端,如JuiceFS、HDFS和Amazon S3,这些可以用于存放大规模的数据。
针对流式数据(例如市场行情数据),我们可能更多的应该将其添加到另一个冷存储区进行备份,例如HDFS,而不是使用ReplicateMergeTree
这个具体文件系统存储的方案可能还需要再深入研究.

  • JuiceFS:一个基于Redis与S3的分布式文件系统,可以将ClickHouse表直接存储在JuiceFS上,实现高性能与可伸缩性。
  • HDFS:使用HDFS作为存储后端,可以通过HDFS表引擎直接在ClickHouse上创建和查询存储在HDFS上的数据。
  • S3:通过S3表引擎,ClickHouse可以直接与S3兼容的存储交互,无需复制数据到本地存储。

鉴于Clickhouse是完整基于文件的, 所以对接IDC的文件系统并不需要特殊的connector支持. S3的情况下,Clickhouse也实现了对应接口,所以我们之后如果迁移,不需要额外的efforts

弹性扩容

这部分设计的大前提:我们不使用存算分离的其他框架以及clickhouse单机的运算性能够强.

1. 使用物化视图重算哈希

物化视图(Materialized View)在ClickHouse中用于存储查询的结果作为独立的表,这些表跟踪对其依赖表的修改,并实时更新。

假设我们现在拥有三个节点,每个节点上有一个shard,2个replica,如图所示
image-20210713155425389.png

现在我们需要扩容集群到4节点,那么一个使用物化视图的扩容方式应该如下所示:

  1. 将底表扩容,这部分只需要修改每个节点对应的config, 最后统一修改master config即可
  2. 数据再平衡, 首先手动fetch负载过高,停机时间过长,一般不能接受
  • 我们考虑采用物化视图来进行扩容,大致流程如下:
    1. 创建一个同结构的MV’, 将MV’数据hash到4个节点; 而此时MV依然写入到3个节点
    2. 新的shard, 有一个特殊的配置值, 名为status, 值为new; MV读取到这类配置项, 会自动忽略这类shard, 因此对于MV来说, 尽管已经4节点配置, 但它自己依然是3节点
    3. 创建MV’时, 指定配置项include_state_shard=true, 新MV将hash到4个节点; 另外创建视图指定数据初始化能力, 这样就能需要不停服的回追底表数据了.
    4. MV’消费底表的历史数据, 等历史消费完毕后, 开始将MV’重命名为MV(后续MV会被删除)
    5. 停止底表写入, 这个步骤是为了防止在rename阶段, 分布式表上有数据积压, 因此必须停写清空积压数据, 这个停写时间大概是秒 - 分钟级别
    6. MV 重命名为MV-Temp, 由于rename操作是一个元数据操作, 因此执行速度比较快
      • 删除物化视图转化器, 重命名数据表的本地表和分布式表
    7. MV’ 重命名为MV,
      • 删除物化视图转化器, 重命名数据表的本地表和分布式表,重建物化视图转化器(名字不一样了)
      • 更新shard配置项, 去除status关键字
      • 修改MV的配置项, 删除include_state_shard配置项.

停止底表写入的方法可以直接设置权限,也可以用SQL

ALTER TABLE tablename DISABLE WRITE
ALTER TABLE tablename ENABLE WRITE

2. 使用虚拟分片技术

我们直接修改之前的rand()哈希函数到jumpConsistentHash算法即可,在分布式表中,查询请求会重新计算哈希函数,从而抽取需要的data part到新的节点中.流程如下:

  1. 和上文中1一样,配置并上线新的shard
  2. 在所有的旧shard上面重算hash函数的值,获取需要迁移到新的shard的dp的列表
  3. 手动将这些dp迁移到新的shard即可

这个方法避免了全量的重放和数据拷贝,且不会使集群失能,但是迁移的过程中正确性和数据的完整性需要依靠手写的逻辑实现,容易出错,且最终完成的shard内部DP的数量会增多,这一定程度上降低了分布式服务的性能.

动手做一做

我们在本地cluster上做了测试,迁移partition的操作

  1. 先新建一个全量的物化视图MV
CREATE MATERIALIZED VIEW tutorial.ic_local_mv
ENGINE = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY trade_date
AS SELECT *
FROM tutorial.ic_local;
  1. 将ic_all里面的数据插入到物化视图MV中
INSERT INTO tutorial.ic_local_mv
SELECT * FROM tutorial.ic_all;
  1. 在配置中添加新的shard
    • 建立一样的表结构ic_local
    • 更新集群里面的config.xml
  2. truncate掉每个节点上的ic_local,因为物化视图MV是数据的复制,所以不会受到影响
truncate table ic_local;
  1. 新建一个新的分布式表ic_all_new, 修改里面的分布型一致哈希函数的shard参数为新集群里shard的数量(例如扩容一个新物理机就是+1)
CREATE TABLE IF NOT EXISTS tutorial.ic_dist AS tutorial.ic_local ON CLUSTER default
ENGINE = Distributed(default, tutorial, ic_local, rand());
  1. 将物化视图MV里的数据写入到新的分布式表里面,这步要尤其小心,因为涉及到大量的数据写入,需要紧密监控集群的健康状态
INSERT INTO tutorial.ic_local
SELECT * FROM tutorial.ic_local_mv;
  1. 检查迁移前后的数据质量
select count() from ic_all;
select count() from ic_all_mv;
select count() from ic_local;

迁移 2.5 billion的数据,数据丢失条数 = 0
3shard的分布式写入效率

Elapsed: 46.387 sec. Processed 537.29 million rows, 75.96 GB (11.58 million rows/s., 1.64 GB/s.)
Peak memory usage: 1.37 GiB

可能的坑

  1. 集群cluster内部访问不能打通,提示auth fail
    这种情况最常见的问题是你用于访问某个数据库instance的账户(与密码)没有在整个集群中均存在且保持一致,要确保你使用的账密在集群中的全部机器上都拥有相同的权限级别,这样才能正确访问修改分布式表。一个常见的解决办法是使用LDAP在所有的实例上共享账密,另一个解决办法是为集群设置secret,详见github

查询优化

在咱们的业务需求中,监控日志文件是一个应该用物化视图进行优化的业务,因为其很多查询依赖于大量的聚合,子查询和合并.这部分的逻辑应该使用物化视图进行固化.

Projection

Projection是ClickHouse中的一项新功能,允许定义表的子集合并优化查询。它在创建表时预计算并存储数据的特定视图,加速数据检索。Projection里面包含了SQL rewrite的一些原理优化,可以实现索引排序等功能.

我们用一张price表举个例子:

CREATE TABLE tutorial.price_local
(
    `price_id` UInt32,
    `trade_date` Date,
    `code` String,
    `last` Float32,
    `prev_close` Float32,
)
ENGINE = MergeTree
PARTITION BY toYear(trade_date)
ORDER BY (trade_date, code)
SETTINGS index_granularity = 8192

然后有两种常规的SQL select:

select * from tutorial.price_local where trade_date = '20240101';

select * from tutorial.price_local where code = '600519.SH';

在默认情况下,只有我们的第一个查询能够命中索引,而code不可以,因为ClickHouse里面的二级索引是跳数索引,而组合索引的规则是左匹配.且clickhouse不能设置两个主键索引同时生效.

常规的解决方案是新建一个物化视图/将表拆分成多个partition based on code,后者影响了分布式表,一般不合适.
前者又有两个问题

  1. 新的查询需要显式指定查询物化视图而不是原表
  2. 本身使用的跳数索引性能也很差

这个时候我们可以用Projection来解决

ALTER TABLE tutorial.price_local
    ADD PROJECTION p1
    (
        SELECT
            price_id,
            trade_date,
            code,
            last,
        ORDER BY code
    );

之后就可以正常使用code命中索引了.

附录

什么是跳数索引

跳数索引(Skip Index)是ClickHouse中一种非常高效的数据检索技术,它允许系统跳过不包含查询所需数据的块,从而减少查询的数据量和提高速度。

什么是Null Engine

Null Engine是一种特殊的存储引擎,用于快速创建不存储数据的表。这种类型的表可用于测试和性能调优中的数据流。在很多时候这个表可以作为stream来使用,变相在Clickhouse里提供可以被规范化校验的数据流,例如低频的行情数据

救命,我的client或者DBMS在新建物化视图的时候报错:Clickhouse: IllegalArgumentException: Only groupMap is supported at this point

因为JDBC目前不支持Clickhouse的二进制自定义聚合函数,所以显示有问题,但MV可以正常创建和访问,可以使用clickhouse-cli

JDBC limitations

TBC

ref:
huangzhaowei's blog
blog
单机双活clickhouse
Clickhouse自定义聚合函数

Loading comments...