Skip to main content
Version: 0.3.x

Importing Data

Importing from Kafka

CnchKafka is a table engine developed by ByConity based on the community ClickHouse Kafka table engine and optimized for cloud-native architectures. It efficiently and rapidly imports user data in real-time from Apache Kafka into ByConity. Its design and implementation are tailored to cloud-native infrastructures while enhancing certain functionalities beyond the community's implementations.

User Guide

Table Creation

Creating a CnchKafka consumer table is similar to creating a Kafka table in the community version. It requires configuring Kafka data sources and consumption parameters through the Setting parameter. Here's an example:

CREATE TABLE kafka_test.cnch_kafka_consume
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchKafka()
SETTINGS
kafka_broker_list = '10.10.10.10:9092', -- replace with your own broker list
kafka_topic_list = 'my_kafka_test_topic', -- topic name to subscribe
kafka_group_name = 'hansome_boy_consume_group', -- your consumer-group name
kafka_format = 'JSONEachRow', -- always be json
kafka_row_delimiter = '\n', -- always be \n
kafka_num_consumers = 1

(Please refer to the section below for explanations of the Setting parameters and support for additional parameters.)

Due to the design of Kafka consumption, three tables are required. Therefore, you need to create two additional tables.

First, create a storage table (using CnchMergeTree as an example):

CREATE TABLE kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchMergeTree
PARTITION BY toDate(ts)
ORDER BY ts

Finally, create a materialized view table (only after successfully creating the Kafka table and storage table):

CREATE MATERIALIZED VIEW kafka_test.cnch_kafka_view
TO kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
AS
SELECT * -- you can add virtual columns here if you need
FROM kafka_test.cnch_kafka_consume

Once you have the necessary permissions for the corresponding topic, consumption will automatically start executing after all three tables are created.

Virtual Column Support

Sometimes, there's a business need to access metadata from Kafka messages (e.g., message partition, offset, etc.). In such cases, you can use the virtual columns feature to meet this requirement. Virtual columns don't need to be specified during table creation; they are inherent properties of the table engine. They can be included in the SELECT statement of the VIEW table and stored in the underlying table (provided the underlying table has the corresponding columns added):

SELECT
_topic, -- String
_partition, -- UInt64
_key, -- String
_offset, -- UInt64
_content, -- String: complete message content
* -- Normal columns can be expanded with *, but virtual columns cannot
FROM kafka_test.cnch_kafka_consume

Setting Description

Table of Parameters

Parameter NameTypeRequired/DefaultDescription
kafka_cluster / kafka_broker_listStringRequiredInternal Kafka cluster for the company; for community version of Kafka, use the kafka_broker_list parameter.
kafka_topic_listStringRequiredCan be multiple, separated by commas.
kafka_group_nameStringRequiredConsumer group name.
kafka_formatStringRequiredMessage format; currently, JSONEachRow is the most commonly used.
kafka_row_delimiterString'\0'Generally used as '\n'.
kafka_num_consumersUInt641Number of consumers, recommended not to exceed the maximum number of partitions in the topic.
kafka_max_block_sizeUInt6465536Write block_size, with an upper limit of 1M.
kafka_max_poll_interval_msMilliseconds7500The maximum time to poll from the broker during each iteration.
kafka_schemaString""Schema file setup parameter, set in the format of filename + colon + message name. E.g., schema.proto:MyMessage.
kafka_format_schema_pathString""Remote schema file path (excluding filename) setup parameter, currently only supports hdfs. (If this parameter is not set, it will read from the default path set in the configuration file).
kafka_protobuf_enable_multiple_messagebooltrueSet to true to indicate that multiple protobuf messages can be read from a single kafka message, separated by their respective lengths.
kafka_protobuf_default_length_parserboolfalseOnly effective when kafka_protobuf_enable_multiple_message is true: true indicates that the message header has a variable record length; false indicates that a fixed 8 bytes are used as the header record length.
kafka_extra_librdkafka_configJson format string""Other parameters supported by rdkafka, typically used for authentication (More params refer to here).
cnch_vw_writeString"vw_write"Configures consumption using Virtual WareHouse, where consumer tasks will be scheduled to execute on the configured Virtual Warehouse nodes.
kafka_cnch_schedule_modeString"random"Scheduling strategy adopted by ConsumeManager when scheduling consumer tasks. Currently supports: random, hash, and least_consumers; if using independent vw or with more than 10 consumers, least_consumers is recommended.

Modifying Consumption Parameters

Supports quickly modifying Setting parameters through the ALTER command, primarily used for adjusting consumer count and other parameters to enhance consumption capacity.

Command:

ALTER TABLE <cnch_kafka_name> MODIFY SETTING <name1> = <value1>, <name2> = <value2>

Executing this command will automatically restart the consumption task.

Manual Start/Stop Consumption

In some scenarios, users may need to manually stop consumption and then resume it later; we provide corresponding SYSTEM commands to achieve this:

SYSTEM START/STOP/RESTART CONSUME <cnch_kafka_name>

Note: The START/STOP commands will persist the corresponding state to the Catalog. Therefore, after executing the STOP command, if START is not executed, even if the service is restarted, the consumption task will not resume.

Resetting Offset

Since CnchKafka manages and saves offsets internally, we have implemented SYSTEM commands for users to reset offsets when needed. Specifically, the following three methods are supported:

Reset to Special Positions

  • By latest/earliest position
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "offset_value":-1}'
  • Possible values for special positions:
    enum Offset {
OFFSET_BEGINNING = -2,
OFFSET_END = -1,
OFFSET_STORED = -1000,
OFFSET_INVALID = -1001
};
  • Reset by timestamp
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "timestamp":1646125258000}'

The timestamp value should be within the valid data retention period on the Kafka side and should be in milliseconds.

  • Specify offset value
system reset consume offset '{"database_name":"XXX", "table_name": "XXX", "topic_name": "XXX", "offset_values":[{"partition":0, "offset":100}, {"partition":10, "offset":101}]}'

This is less common and used to specify a specific offset value for a particular topic partition.

Operation and Maintenance Manual

Common Consumption Performance Tuning

When there is a persistent lag in consumption, it is usually due to insufficient consumption capacity. By default, CnchKafka creates one consumer with a maximum block size of 65536 for writing in a single consumption. When the consumption capacity is insufficient, it is recommended to adjust the consumer and block-size parameters. Refer to the Modifying Consumption Parameters section above for adjustment methods.

Adjusting max-block-size

  • This parameter directly affects consumption memory usage, and a larger value requires more memory. For consumption tables with larger individual data sizes, adjust this parameter carefully to avoid exceeding memory limits (the upper limit is 1M).
  • When users have low requirements for data latency and have a large amount of data with sufficient memory, they can simultaneously adjust this parameter and the "kafka_max_poll_interval_ms" parameter to increase the consumption time for each round, making each written part larger, reducing MERGE pressure, and improving query performance.

Adjusting num_consumers

  • The upper limit for this parameter is the number of partitions corresponding to the consumed topic.
  • When there is no lag in consumption, it is recommended to minimize this parameter (i.e., avoid increasing it unnecessarily) to reduce resource usage, avoid generating too many small parts during consumption, reduce MERGE pressure, and facilitate queries.

System Tables for Troubleshooting

  • Consumption Events: cnch_system.cnch_kafka_log

The kafka_log table records basic consumption events. To enable it, configure the kafka_log item in config.xml (required for both server and worker), and restart the service for the changes to take effect.

The kafka_log is written by the consumer task in the Virtual Warehouse and aggregated in real-time into the global cnch_system.cnch_kafka_log table, allowing you to view consumption records for all consumption tables from the server side.

Field Description

Event Table (event_table)
Column NameTypeDescription
event_typeEnum8See the table below for details
event_dateDateThe date when the event occurred. It is a partition field and is recommended to be included in every query.
event_timeDateTimeThe time when the event occurred, in seconds
duration_msUInt64The duration of the event, in milliseconds
cnch_databaseStringThe name of the CnchKafka database
cnch_tableStringThe name of the CnchKafka table
databaseStringThe database name of the consumer task (currently the same as cnch_database)
tableStringThe table name of the consumer task (usually suffixed with a timestamp and consumer ID based on cnch_table)
consumerStringThe consumer ID
metricUInt64The number of consumed rows
has_errorUInt81 indicates an exception; 0 indicates no exception
exceptionStringException description

Event Type Description (event_type)

UInt8 ValueString ValueDescription
1POLLThe metric indicates the number of data items consumed, and duration_ms covers the entire consumption process including WRITE time.
2PARSE_ERRORThe metric represents the number of consumption entries with parsing errors. If multiple entries have errors, only one will be selected for printing.
3WRITEThe metric represents the number of data rows written, and duration_ms is basically equivalent to the data persistence time.
4EXCEPTIONExceptions during the consumption process. Common examples include: authentication exceptions, data persistence failures, and VIEW SELECT execution failures.
5EMPTY_MESSAGEThe number of empty messages.
6FILTERData filtered during the write phase.
7COMMITFinal transaction commit record. Only this record indicates successful data writing and can be used as a data audit standard.

Consumption Status Table: system.cnch_kafka_tables

The kafka_tables records the real-time status of CnchKafka tables, which are memory tables by default;

Field NameData TypeDescription
databaseStringDatabase name
nameStringKafka table name
uuidStringUnique identifier UUID for the kafka table
kafka_clusterStringKafka cluster
topicsArray(String)List of consumed topics
consumer_groupStringAssociated consumer group
num_consumersUInt32The current number of actively executing consumers
consumer_tablesArray(String)Data table names corresponding to each consumer
consumer_hostsArray(String)Execution nodes assigned to each consumer
consumer_partitionsArray(String)Partitions assigned to each consumer for consumption

Common Queries for Troubleshooting Consumption Issues

  • Checking the real-time status of CnchKafka consumption tables
SELECT * FROM system.cnch_kafka_tables
WHERE database = '<database_name>' AND name = '<cnch_kafka_table>'
  • Viewing recent consumption records
SELECT * FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = '<database_name>'
AND cnch_table = '<cnch_kafka_table>'
AND event_time > now() - 600 -- Last 10 minutes
ORDER BY event_time
  • Summarizing consumption records for the current day by hour
SELECT
toHour(event_time) as hour,
sumIf(metric, event_type = 'POLL') as poll_rows,
sumIf(metric, event_type = 'PARSE_ERROR') as error_rows,
sumIf(metric, event_type = 'COMMIT') as commit_rows
FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = '<database_name>'
AND cnch_table = '<cnch_kafka_table>'
GROUP BY hour
ORDER BY hour