导入数据
从 Kafka 导入
CnchKafka 是 ByConity 基于社区 ClickHouse Kafka 表引擎自研实现的适配云原生架构的表引擎,用于高效快速地将用户数据从 Apache Kafka 实时导入 ByConity;其设计与实现既适配了云原生新架构,同时在社区实现基础上增强了部分功能。
使用指南
建表
创建 CnchKafka 消费表和社区原生建 Kafka 表类似,需要通过 Setting
参数配置 Kafka 数据源及消费参数。示例如下:
CREATE TABLE kafka_test.cnch_kafka_consume
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchKafka()
SETTINGS
kafka_broker_list = '10.10.10.10:9092', -- replace with your own broker list
kafka_topic_list = 'my_kafka_test_topic', -- topic name to subcribe
kafka_group_name = 'hansome_boy_consume_group', -- your consumer-group name
kafka_format = 'JSONEachRow', -- always be json
kafka_row_delimiter = '\n', -- always be \n
kafka_num_consumers = 1
(Setting
参数说明及其他更多参数支持请参考下方说明)
由于 Kafka 消费设计需要三张表,所以还需要同步创建另外两张表。
首先创建存储表(以 CnchMergeTree 为例):
CREATE TABLE kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchMergeTree
PARTITION BY toDate(ts)
ORDER BY ts
最后创建物化视图表(必须 Kafka 表和存储表创建成功后才能创建):
CREATE MATERIALIZED VIEW kafka_test.cnch_kafka_view
TO kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
AS
SELECT * -- you can add virtual columns here if you need
FROM kafka_test.cnch_kafka_consume
如果你有对应 topic 的消费权限,那么三张表创建好以后,消费就会自动开始执行。
虚拟列支持
有时候业务需要获取 Kafka 消息的元数据(e.g. 消息的 partition, offset 等)。此时可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。可以放到 VIEW 表的 SELECT 语句中存储到底表中(当底表添加了对应列):
SELECT
_topic, -- String
_partition, -- UInt64
_key, -- String
_offset, -- UInt64
_content, -- String: 完整的消息内容
* -- 正常列可以通过*展开,虚拟列则不能
FROM kafka_test.cnch_kafka_consume
Setting 参数说明
| **参数名** | **类型** | **必填/默认值** | **说明** |
| :-------------------------------------------- | :-------------- | :-------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| kafka_cluster / kafka_broker_list | String | 必填 | 社区版本 Kafka 请使用 `kafka_broker_list` 参数。 |
| kafka_topic_list | String | 必填 | 可以多个,逗号分隔。 |
| kafka_group_name | String | 必填 | consumer group name,消费组。 |
| kafka_format | String | 必填 | 消息格式;目前最常用 JSONEachRow。 |
| kafka_row_delimiter | String | '\0' | 一般使用 '\n'。 |
| kafka_num_consumers | UInt64 | 1 | 消费者个数,建议不超过 topic 中最大 partition 数目。 |
| kafka_max_block_size | UInt64 | 65536 | 写入 block_size,上限 1M。 |
| kafka_max_poll_interval_ms | Milliseconds | 7500 | the max time to poll from broker each iteration。 |
| kafka_schema | String | "" | schema 文件设置参数,以文件名 + 冒号 + 消息名格式设置。如: `schema.proto:MyMessage`。 |
| kafka_format_schema_path | String | "" | 远端 schema 文件路径(不含文件名)设置参数,目前只支持 hdfs。(如果没有设置这个参数,将从配置文件设置的默认路径读取)。 |
| kafka_protobuf_enable_multiple_message | bool | true | 设置为 true,表示可以从一条 kafka 消息中读取多个 protobuf 的 message,彼此以各自长度为间隔。 |
| kafka_protobuf_default_length_parser | bool | false | 仅在 `kafka_protobuf_enable_multiple_message` 为 true 生效:true 表示消息头部有变量记录长度;false 表示用一个固定的 8 字节作为头部记录长度。 |
| kafka_extra_librdkafka_config | Json format string | "" | 其他 rdkafka 支持的参数,通常用于鉴权 (More params refer to [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#:~:text=see%20dedicated%20API-,ssl.ca.location,-*)). |
| cnch_vw_write | String | "vw_write" | 配置消费使用 Virtual WareHouse,consumer 任务将被调度到配置的 Virtual Warehouse 节点执行。 |
| kafka_cnch_schedule_mode | String | "random" | ConsumeManager 调度 consumer 任务时候采取的调度策略,目前支持:random, hash, and least_consumers;如果是独立 vw 或消费者数目大于 10,推荐使用 least_consumers。 |
修改消费参数
支持通过 ALTER 命令快速修改 Setting 参数,主要用于调整消费者数目等提升消费能力。
命令:
ALTER TABLE <cnch_kafka_name> MODIFY SETTING <name1> = <value1>, <name2> = <value2>
该命令执行会自动重启消费任务。
手动启停消费
在一些场景中用户可能需要手动停止消费,随后手动恢复;我们提供了对应的 SYSTEM 命令实现:
SYSTEM START/STOP/RESTART CONSUME <cnch_kafka_name>
注意:START/STOP 命令会将对应状态持久化到 Catalog,因此在执行 STOP 命令后,如果不执行 START,即使服务重启,消费任务也不会恢复。
重置 offset
由于 CnchKafka 的 offset 由引擎自身管理和保存,当用户需要重启 offset 时,我们同样实现了 SYSTEM 命令操作。具体支持以下三种方式:
重置到特殊位置
- 按最新位置/起始位置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "offset_value":-1}'
- 可能的特殊位置的 value 值:
enum Offset {
OFFSET_BEGINNING = -2,
OFFSET_END = -1,
OFFSET_STORED = -1000,
OFFSET_INVALID = -1001
};
- 按时间戳重置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "timestamp":1646125258000}'
其中 timestamp 的值应该为 Kafka 侧数据有效期内的某个时间的时间戳,且为毫秒级。
- 指定 offset 具体 value
system reset consume offset '{"database_name":"XXX", "table_name": "XXX", "topic_name": "XXX", "offset_values":[{"partition":0, "offset":100}, {"partition":10, "offset":101}]}'
指定特定 topic partition 到特定 offset value,比较少见。
运维手册
常见消费性能调优
当消费持续出现 lag,通常为消费能力不足。CnchKafka 建表默认 1 个消费,单次消费写入最大 block size 为 65536. 当消费能力不足时,优先调整消费者和 block-size 参数。调整方式参考上文修改消费参数
调整 max-block-size
- 该参数直接影响消费内存使用,值越大所需内存越大。对于一些单条数据较大的消费表,谨慎调整该参数,避免爆内存。(上限为 1M)
- 当用户对数据延时要求不高,且数据量大 内存充足时,可同步调整此参数以及“kafka_max_poll_interval_ms”参数,让每一轮消费时间增加,每次写入的 part 变大,降低 MERGE 压力,提升查询性能。
调整 num_consumers
- 该参数上限为消费 topic 对应的 partition 数目。
- 在消费无 lag 情况下,尽可能减少此参数(即避免无意义增大此参数),减少资源使用,同时避免消费产生过多碎 part,增大 MERGE 压力,且不利于查询。
用于辅助排查的系统表
- 消费事件:cnch_system.cnch_kafka_log
kakfa_log 表记录了一些消费的基本事件,开启需要在 config.xml 中配置 kafka_log 项(server&worker 均需配置),重启之后生效。
kafka_log 在 Virtual Warehouse 由 consumer 任务写入,实时汇聚到全局的 cnch_system.cnch_kafka_log 表中,实现从 Server 段查看所有消费表的消费记录。
字段说明
事件表(event_table)
列名 | 类型 | 说明 |
---|---|---|
event_type | Enum8 | 见下表 |
event_date | Date | 时间发生日期。分区字段,建议每次查询都带上。 |
event_time | DateTime | 时间发生时间,单位秒 |
duration_ms | UInt64 | 事件持续时间,单位秒 |
cnch_database | String | CnchKafka 库名 |
cnch_table | String | CnchKafka 表名 |
database | String | consumer 任务库名(目前同 cnch_database) |
table | String | consumer 任务表名(通常为 cnch_table 加时间戳及消费者编号后缀) |
consumer | String | 消费者编号 |
metric | UInt64 | 消费行数 |
has_error | UInt8 | 1 代表有异常;0 代表无异常。 |
exception | String | 异常说明 |
事件类型说明(event_type)
UInt8 值 | String 值 | 说明 |
---|---|---|
1 | POLL | metric 表示消费了多少条数据,duration_ms 覆盖了一次完整的消费流程,包含 WRITE 的时间。 |
2 | PARSE_ERROR | metric 表示解析出错的消费条数,如果有多条解析出错,仅挑选一条打印出来。 |
3 | WRITE | metric 表示写入数据的行数,duration_ms 基本上等同于数据持久化的时间 |
4 | EXCEPTION | 消费过程的异常。常见的有:鉴权异常,数据持久化失败,VIEW SELECT 执行失败。 |
5 | EMPTY_MESSAGE | 空消息条数。 |
6 | FILTER | 写入阶段被过滤的数据。 |
7 | COMMIT | 最后事务提交记录,只有该条记录才表示数据写入成功,可作为数据审计标准 |
消费状态表:system.cnch_kafka_tables
kafka_tables 记录了 CnchKafka 表的实时状态,默认开始,为内存表;
字段名 | 数据类型 | 说明 |
---|---|---|
database | String | 数据库名 |
name | String | Kafka 表名 |
uuid | String | kafka 表唯一标识 UUID |
kafka_cluster | String | kafka 集群 |
topics | Array(String) | 消费 topic 列表 |
consumer_group | String | 所属消费组 |
num_consumers | UInt32 | 当前实际正在执行的消费者数目 |
consumer_tables | Array(String) | 各个消费者对应的数据表名 |
consumer_hosts | Array(String) | 各个消费者分发到的执行节点 |
consuemr_partitions | Array(String) | 各个消费者分配到的需要消费的 partition |
常见排查消费异常记录
- 查看 CnchKafka 消费表实时状态
SELECT * FROM system.cnch_kafka_tables
WHERE database = <database_name> AND name = <cnch_kafka_table>
- 查看最近消费记录
SELECT * FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = <database_name>
AND cnch_table = <cnch_kafka_table>
AND event_time > now() - 600 -- 最近十分钟
ORDER BY event_time
- 按小时统计当天消费记录
SELECT
toHour(event_time) as hour,
sumIf(metric, event_type = 'POLL') as poll_rows,
sumIf(metric, event_type = 'PARSE_ERROR') as error_rows,
sumIf(metric, event_type = 'COMMIT') as commit_rows
FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = <database_name>
AND cnch_table = <cnch_kafka_table>
GROUP BY hour
ORDER BY hour