跳到主要内容
版本:0.3.x

导入数据

从 Kafka 导入

CnchKafka 是 ByConity 基于社区 ClickHouse Kafka 表引擎自研实现的适配云原生架构的表引擎,用于高效快速地将用户数据从 Apache Kafka 实时导入 ByConity;其设计与实现既适配了云原生新架构,同时在社区实现基础上增强了部分功能。

使用指南

建表

创建 CnchKafka 消费表和社区原生建 Kafka 表类似,需要通过 Setting 参数配置 Kafka 数据源及消费参数。示例如下:

CREATE TABLE kafka_test.cnch_kafka_consume
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchKafka()
SETTINGS
kafka_broker_list = '10.10.10.10:9092', -- replace with your own broker list
kafka_topic_list = 'my_kafka_test_topic', -- topic name to subcribe
kafka_group_name = 'hansome_boy_consume_group', -- your consumer-group name
kafka_format = 'JSONEachRow', -- always be json
kafka_row_delimiter = '\n', -- always be \n
kafka_num_consumers = 1

Setting 参数说明及其他更多参数支持请参考下方说明)

由于 Kafka 消费设计需要三张表,所以还需要同步创建另外两张表。

首先创建存储表(以 CnchMergeTree 为例):

CREATE TABLE kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchMergeTree
PARTITION BY toDate(ts)
ORDER BY ts

最后创建物化视图表(必须 Kafka 表和存储表创建成功后才能创建):

CREATE MATERIALIZED VIEW kafka_test.cnch_kafka_view
TO kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
AS
SELECT * -- you can add virtual columns here if you need
FROM kafka_test.cnch_kafka_consume

如果你有对应 topic 的消费权限,那么三张表创建好以后,消费就会自动开始执行。

虚拟列支持

有时候业务需要获取 Kafka 消息的元数据(e.g. 消息的 partition, offset 等)。此时可以使用 virtual columns 功能来满足这个需求。virtual columns 不需要在建表的时候指定,是表引擎本身的属性。可以放到 VIEW 表的 SELECT 语句中存储到底表中(当底表添加了对应列):

SELECT
_topic, -- String
_partition, -- UInt64
_key, -- String
_offset, -- UInt64
_content, -- String: 完整的消息内容
* -- 正常列可以通过*展开,虚拟列则不能
FROM kafka_test.cnch_kafka_consume

Setting 参数说明

| **参数名**                                    | **类型**        | **必填/默认值** | **说明**                                                                                                                                                                                                                          |
| :-------------------------------------------- | :-------------- | :-------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| kafka_cluster / kafka_broker_list | String | 必填 | 公司内部 Kafka 集群;社区版本 Kafka 请使用 `kafka_broker_list` 参数。 |
| kafka_topic_list | String | 必填 | 可以多个,逗号分隔。 |
| kafka_group_name | String | 必填 | consumer group name,消费组。 |
| kafka_format | String | 必填 | 消息格式;目前最常用 JSONEachRow。 |
| kafka_row_delimiter | String | '\0' | 一般使用 '\n'。 |
| kafka_num_consumers | UInt64 | 1 | 消费者个数,建议不超过 topic 中最大 partition 数目。 |
| kafka_max_block_size | UInt64 | 65536 | 写入 block_size,上限 1M。 |
| kafka_max_poll_interval_ms | Milliseconds | 7500 | the max time to poll from broker each iteration。 |
| kafka_schema | String | "" | schema 文件设置参数,以文件名 + 冒号 + 消息名格式设置。如: `schema.proto:MyMessage`|
| kafka_format_schema_path | String | "" | 远端 schema 文件路径(不含文件名)设置参数,目前只支持 hdfs。(如果没有设置这个参数,将从配置文件设置的默认路径读取)。 |
| kafka_protobuf_enable_multiple_message | bool | true | 设置为 true,表示可以从一条 kafka 消息中读取多个 protobuf 的 message,彼此以各自长度为间隔。 |
| kafka_protobuf_default_length_parser | bool | false | 仅在 `kafka_protobuf_enable_multiple_message` 为 true 生效:true 表示消息头部有变量记录长度;false 表示用一个固定的 8 字节作为头部记录长度。 |
| kafka_extra_librdkafka_config | Json format string | "" | 其他 rdkafka 支持的参数,通常用于鉴权 (More params refer to [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#:~:text=see%20dedicated%20API-,ssl.ca.location,-*)). |
| cnch_vw_write | String | "vw_write" | 配置消费使用 Virtual WareHouse,consumer 任务将被调度到配置的 Virtual Warehouse 节点执行。 |
| kafka_cnch_schedule_mode | String | "random" | ConsumeManager 调度 consumer 任务时候采取的调度策略,目前支持:random, hash, and least_consumers;如果是独立 vw 或消费者数目大于 10,推荐使用 least_consumers。 |

修改消费参数

支持通过 ALTER 命令快速修改 Setting 参数,主要用于调整消费者数目等提升消费能力。

命令:

ALTER TABLE <cnch_kafka_name> MODIFY SETTING <name1> = <value1>, <name2> = <value2>

该命令执行会自动重启消费任务。

手动启停消费

在一些场景中用户可能需要手动停止消费,随后手动恢复;我们提供了对应的 SYSTEM 命令实现:

SYSTEM START/STOP/RESTART CONSUME <cnch_kafka_name>

注意:START/STOP 命令会将对应状态持久化到 Catalog,因此在执行 STOP 命令后,如果不执行 START,即使服务重启,消费任务也不会恢复。

重置 offset

由于 CnchKafka 的 offset 由引擎自身管理和保存,当用户需要重启 offset 时,我们同样实现了 SYSTEM 命令操作。具体支持以下三种方式:

重置到特殊位置

  • 按最新位置/起始位置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "offset_value":-1}'

  • 可能的特殊位置的 value 值:
    enum Offset {
OFFSET_BEGINNING = -2,
OFFSET_END = -1,
OFFSET_STORED = -1000,
OFFSET_INVALID = -1001
};

  • 按时间戳重置
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "timestamp":1646125258000}'

其中 timestamp 的值应该为 Kafka 侧数据有效期内的某个时间的时间戳,且为毫秒级。

  • 指定 offset 具体 value
system reset consume offset '{"database_name":"XXX", "table_name": "XXX", "topic_name": "XXX", "offset_values":[{"partition":0, "offset":100}, {"partition":10, "offset":101}]}'

指定特定 topic partition 到特定 offset value,比较少见。

运维手册

常见消费性能调优

当消费持续出现 lag,通常为消费能力不足。CnchKafka 建表默认 1 个消费,单次消费写入最大 block size 为 65536. 当消费能力不足时,优先调整消费者和 block-size 参数。调整方式参考上文修改消费参数

调整 max-block-size

  • 该参数直接影响消费内存使用,值越大所需内存越大。对于一些单条数据较大的消费表,谨慎调整该参数,避免爆内存。(上限为 1M)
  • 当用户对数据延时要求不高,且数据量大 内存充足时,可同步调整此参数以及“kafka_max_poll_interval_ms”参数,让每一轮消费时间增加,每次写入的 part 变大,降低 MERGE 压力,提升查询性能。

调整 num_consumers

  • 该参数上限为消费 topic 对应的 partition 数目。
  • 在消费无 lag 情况下,尽可能减少此参数(即避免无意义增大此参数),减少资源使用,同时避免消费产生过多碎 part,增大 MERGE 压力,且不利于查询。

用于辅助排查的系统表

  • 消费事件:cnch_system.cnch_kafka_log

kakfa_log 表记录了一些消费的基本事件,开启需要在 config.xml 中配置 kafka_log 项(server&worker 均需配置),重启之后生效。

kafka_log 在 Virtual Warehouse 由 consumer 任务写入,实时汇聚到全局的 cnch_system.cnch_kafka_log 表中,实现从 Server 段查看所有消费表的消费记录。

字段说明

事件表(event_table)
列名类型说明
event_typeEnum8见下表
event_dateDate时间发生日期。分区字段,建议每次查询都带上。
event_timeDateTime时间发生时间,单位秒
duration_msUInt64事件持续时间,单位秒
cnch_databaseStringCnchKafka 库名
cnch_tableStringCnchKafka 表名
databaseStringconsumer 任务库名(目前同 cnch_database)
tableStringconsumer 任务表名(通常为 cnch_table 加时间戳及消费者编号后缀)
consumerString消费者编号
metricUInt64消费行数
has_errorUInt81 代表有异常;0 代表无异常。
exceptionString异常说明
事件类型说明(event_type)
UInt8 值String 值说明
1POLLmetric 表示消费了多少条数据,duration_ms 覆盖了一次完整的消费流程,包含 WRITE 的时间。
2PARSE_ERRORmetric 表示解析出错的消费条数,如果有多条解析出错,仅挑选一条打印出来。
3WRITEmetric 表示写入数据的行数,duration_ms 基本上等同于数据持久化的时间
4EXCEPTION消费过程的异常。常见的有:鉴权异常,数据持久化失败,VIEW SELECT 执行失败。
5EMPTY_MESSAGE空消息条数。
6FILTER写入阶段被过滤的数据。
7COMMIT最后事务提交记录,只有该条记录才表示数据写入成功,可作为数据审计标准
消费状态表:system.cnch_kafka_tables

kafka_tables 记录了 CnchKafka 表的实时状态,默认开始,为内存表;

字段名数据类型说明
databaseString数据库名
nameStringKafka 表名
uuidStringkafka 表唯一标识 UUID
kafka_clusterStringkafka 集群
topicsArray(String)消费 topic 列表
consumer_groupString所属消费组
num_consumersUInt32当前实际正在执行的消费者数目
consumer_tablesArray(String)各个消费者对应的数据表名
consumer_hostsArray(String)各个消费者分发到的执行节点
consuemr_partitionsArray(String)各个消费者分配到的需要消费的 partition

常见排查消费异常记录

  • 查看 CnchKafka 消费表实时状态
SELECT * FROM system.cnch_kafka_tables
WHERE database = <database_name> AND name = <cnch_kafka_table>

  • 查看最近消费记录
SELECT * FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = <database_name>
AND cnch_table = <cnch_kafka_table>
AND event_time > now() - 600 -- 最近十分钟
ORDER BY event_time

  • 按小时统计当天消费记录
SELECT
toHour(event_time) as hour,
sumIf(metric, event_type = 'POLL') as poll_rows,
sumIf(metric, event_type = 'PARSE_ERROR') as error_rows,
sumIf(metric, event_type = 'COMMIT') as commit_rows
FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = <database_name>
AND cnch_table = <cnch_kafka_table>
GROUP BY hour
ORDER BY hour