导入数据
从 Kafka 导入
CnchKafka 是 ByConity 基于社区 ClickHouse Kafka 表引擎自研实现的适配云原生架构的表引擎,用于高效快速地将用户数据从 Apache Kafka 实时导入 ByConity;其设计与实现既适配了云原生新架构,同时在社区实现基础上增强了部分功能。
使用指南
建表
创建 CnchKafka 消费表和社区原生建 Kafka 表类似,需要通过 Setting 参数配置 Kafka 数据源及消费参数。示例如下:
CREATE TABLE kafka_test.cnch_kafka_consume
(
    `i` Int64,
    `ts` DateTime
)
ENGINE = CnchKafka()
SETTINGS
kafka_broker_list = '10.10.10.10:9092',  -- replace with your own broker list
kafka_topic_list = 'my_kafka_test_topic', -- topic name to subcribe
kafka_group_name = 'hansome_boy_consume_group', -- your consumer-group name
kafka_format = 'JSONEachRow', -- always be json
kafka_row_delimiter = '\n', -- always be \n
kafka_num_consumers = 1
(Setting 参数说明及其他更多参数支持请参考下方说明)
由于 Kafka 消费设计需要三张表,所以还需要同步创建另外两张表。
首先创建存储表(以 CnchMergeTree 为例):
CREATE TABLE kafka_test.cnch_store_kafka
(
    `i` Int64,
    `ts` DateTime
)
ENGINE = CnchMergeTree
PARTITION BY toDate(ts)
ORDER BY ts
最后创建物化视图表(必须 Kafka 表和存储表创建成功后才能创建):
CREATE MATERIALIZED VIEW kafka_test.cnch_kafka_view
TO kafka_test.cnch_store_kafka
(
    `i` Int64,
    `ts` DateTime
)
AS
SELECT * -- you can add virtual columns here if you need
FROM kafka_test.cnch_kafka_consume
如果你有对应 topic 的消费权限,那么三张表创建好以后,消费就会自动开始执行。
虚拟列支持
有时候业务需要获取 Kafka 消息的元数据(e.g. 消息的 partition, offset 等)。此时可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。可以放到 VIEW 表的 SELECT 语句中存储到底表中(当底表添加了对应列):
SELECT
    _topic,    -- String
    _partition,    -- UInt64
    _key,    -- String
    _offset,    -- UInt64
    _content,  -- String: 完整的消息内容
    *    -- 正常列可以通过*展开,虚拟列则不能
FROM kafka_test.cnch_kafka_consume
Setting 参数说明
| **参数名**                                    | **类型**        | **必填/默认值** | **说明**                                                                                                                                                                                                                          |
| :-------------------------------------------- | :-------------- | :-------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| kafka_cluster / kafka_broker_list             | String          | 必填            | 社区版本 Kafka 请使用 `kafka_broker_list` 参数。                                                                                                                                                             |
| kafka_topic_list                              | String          | 必填            | 可以多个,逗号分隔。                                                                                                                                                                                                              |
| kafka_group_name                              | String          | 必填            | consumer group name,消费组。                                                                                                                                                                                                     |
| kafka_format                                  | String          | 必填            | 消息格式;目前最常用 JSONEachRow。                                                                                                                                                                                                |
| kafka_row_delimiter                           | String          | '\0'            | 一般使用 '\n'。                                                                                                                                                                                                                   |
| kafka_num_consumers                           | UInt64          | 1               | 消费者个数,建议不超过 topic 中最大 partition 数目。                                                                                                                                                                              |
| kafka_max_block_size                          | UInt64          | 65536           | 写入 block_size,上限 1M。                                                                                                                                                                                                         |
| kafka_max_poll_interval_ms                    | Milliseconds    | 7500            | the max time to poll from broker each iteration。                                                                                                                                                                                  |
| kafka_schema                                  | String          | ""              | schema 文件设置参数,以文件名 + 冒号 + 消息名格式设置。如: `schema.proto:MyMessage`。                                                                                                                                             |
| kafka_format_schema_path                      | String          | ""              | 远端 schema 文件路径(不含文件名)设置参数,目前只支持 hdfs。(如果没有设置这个参数,将从配置文件设置的默认路径读取)。                                                                                                               |
| kafka_protobuf_enable_multiple_message        | bool            | true            | 设置为 true,表示可以从一条 kafka 消息中读取多个 protobuf 的 message,彼此以各自长度为间隔。                                                                                                                                        |
| kafka_protobuf_default_length_parser          | bool            | false           | 仅在 `kafka_protobuf_enable_multiple_message` 为 true 生效:true 表示消息头部有变量记录长度;false 表示用一个固定的 8 字节作为头部记录长度。                                                                                      |
| kafka_extra_librdkafka_config                 | Json format string | ""           | 其他 rdkafka 支持的参数,通常用于鉴权 (More params refer to [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#:~:text=see%20dedicated%20API-,ssl.ca.location,-*)).                                      |
| cnch_vw_write                                 | String          | "vw_write"      | 配置消费使用 Virtual WareHouse,consumer 任务将被调度到配置的 Virtual Warehouse 节点执行。                                                                                                                                        |
| kafka_cnch_schedule_mode                      | String          | "random"        | ConsumeManager 调度 consumer 任务时候采取的调度策略,目前支持:random, hash, and least_consumers;如果是独立 vw 或消费者数目大于 10,推荐使用 least_consumers。                                                                   |
修改消费参数
支持通过 ALTER 命令快速修改 Setting 参数,主要用于调整消费者数目等提升消费能力。
命令:
ALTER TABLE <cnch_kafka_name> MODIFY SETTING <name1> = <value1>, <name2> = <value2>
该命令执行会自动重启消费任务。
手动启停消费
在一些场景中用户可能需要手动停止消费,随后手动恢复;我们提供了对应的 SYSTEM 命令实现:
SYSTEM START/STOP/RESTART CONSUME <cnch_kafka_name>
注意:START/STOP 命令会将对应状态持久化到 Catalog,因此在执行 STOP 命令后,如果不执行 START,即使服务重启,消费任务也不会恢复。
重置 offset
由于 CnchKafka 的 offset 由引擎自身管理和保存,当用户需要重启 offset 时,我们同样实现了 SYSTEM 命令操作。具体支持以下三种方式:
重置到特殊位置
- 按最新位置/起始位置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "offset_value":-1}'
- 可能的特殊位置的 value 值:
    enum Offset {
        OFFSET_BEGINNING = -2,
        OFFSET_END = -1,
        OFFSET_STORED = -1000,
        OFFSET_INVALID = -1001
    };
- 按时间戳重置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "timestamp":1646125258000}'
其中 timestamp 的值应该为 Kafka 侧数据有效期内的某个时间的时间戳,且为毫秒级。
- 指定 offset 具体 value
system reset consume offset '{"database_name":"XXX", "table_name": "XXX", "topic_name": "XXX", "offset_values":[{"partition":0, "offset":100}, {"partition":10, "offset":101}]}'
指定特定 topic partition 到特定 offset value,比较少见。
运维手册
常见消费性能调优
当消费持续出现 lag,通常为消费能力不足。CnchKafka 建表默认 1 个消费,单次消费写入最大 block size 为 65536. 当消费能力不足时,优先调整消费者和 block-size 参数。调整方式参考上文修改消费参数
调整 max-block-size
- 该参数直接影响消费内存使用,值越大所需内存越大。对于一些单条数据较大的消费表,谨慎调整该参数,避免爆内存。(上限为 1M)
- 当用户对数据延时要求不高,且数据量大 内存充足时,可同步调整此参数以及“kafka_max_poll_interval_ms”参数,让每一轮消费时间增加,每次写入的 part 变大,降低 MERGE 压力,提升查询性能。
调整 num_consumers
- 该参数上限为消费 topic 对应的 partition 数目。
- 在消费无 lag 情况下,尽可能减少此参数(即避免无意义增大此参数),减少资源使用,同时避免消费产生过多碎 part,增大 MERGE 压力,且不利于查询。
用于辅助排查的系统表
- 消费事件:cnch_system.cnch_kafka_log
kakfa_log 表记录了一些消费的基本事件,开启需要在 config.xml 中配置 kafka_log 项(server&worker 均需配置),重启之后生效。
kafka_log 在 Virtual Warehouse 由 consumer 任务写入,实时汇聚到全局的 cnch_system.cnch_kafka_log 表中,实现从 Server 段查看所有消费表的消费记录。
字段说明
事件表(event_table)
| 列名 | 类型 | 说明 | 
|---|---|---|
| event_type | Enum8 | 见下表 | 
| event_date | Date | 时间发生日期。分区字段,建议每次查询都带上。 | 
| event_time | DateTime | 时间发生时间,单位秒 | 
| duration_ms | UInt64 | 事件持续时间,单位秒 | 
| cnch_database | String | CnchKafka 库名 | 
| cnch_table | String | CnchKafka 表名 | 
| database | String | consumer 任务库名(目前同 cnch_database) | 
| table | String | consumer 任务表名(通常为 cnch_table 加时间戳及消费者编号后缀) | 
| consumer | String | 消费者编号 | 
| metric | UInt64 | 消费行数 | 
| has_error | UInt8 | 1 代表有异常;0 代表无异常。 | 
| exception | String | 异常说明 | 
事件类型说明(event_type)
| UInt8 值 | String 值 | 说明 | 
|---|---|---|
| 1 | POLL | metric 表示消费了多少条数据,duration_ms 覆盖了一次完整的消费流程,包含 WRITE 的时间。 | 
| 2 | PARSE_ERROR | metric 表示解析出错的消费条数,如果有多条解析出错,仅挑选一条打印出来。 | 
| 3 | WRITE | metric 表示写入数据的行数,duration_ms 基本上等同于数据持久化的时间 | 
| 4 | EXCEPTION | 消费过程的异常。常见的有:鉴权异常,数据持久化失败,VIEW SELECT 执行失败。 | 
| 5 | EMPTY_MESSAGE | 空消息条数。 | 
| 6 | FILTER | 写入阶段被过滤的数据。 | 
| 7 | COMMIT | 最后事务提交记录,只有该条记录才表示数据写入成功,可作为数据审计标准 | 
消费状态表:system.cnch_kafka_tables
kafka_tables 记录了 CnchKafka 表的实时状态,默认开始,为内存表;
| 字段名 | 数据类型 | 说明 | 
|---|---|---|
| database | String | 数据库名 | 
| name | String | Kafka 表名 | 
| uuid | String | kafka 表唯一标识 UUID | 
| kafka_cluster | String | kafka 集群 | 
| topics | Array(String) | 消费 topic 列表 | 
| consumer_group | String | 所属消费组 | 
| num_consumers | UInt32 | 当前实际正在执行的消费者数目 | 
| consumer_tables | Array(String) | 各个消费者对应的数据表名 | 
| consumer_hosts | Array(String) | 各个消费者分发到的执行节点 | 
| consuemr_partitions | Array(String) | 各个消费者分配到的需要消费的 partition | 
常见排查消费异常记录
- 查看 CnchKafka 消费表实时状态
SELECT * FROM system.cnch_kafka_tables
WHERE database = <database_name> AND name = <cnch_kafka_table>
- 查看最近消费记录
SELECT * FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
 AND cnch_database = <database_name>
 AND cnch_table = <cnch_kafka_table>
 AND event_time > now() - 600 -- 最近十分钟
ORDER BY event_time
- 按小时统计当天消费记录
SELECT
 toHour(event_time) as hour,
 sumIf(metric, event_type = 'POLL') as poll_rows,
 sumIf(metric, event_type = 'PARSE_ERROR') as error_rows,
 sumIf(metric, event_type = 'COMMIT') as commit_rows
FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
 AND cnch_database = <database_name>
 AND cnch_table = <cnch_kafka_table>
GROUP BY hour
ORDER BY hour