Importing Data
Importing from Kafka
CnchKafka is a table engine developed by ByConity based on the community ClickHouse Kafka table engine and optimized for cloud-native architectures. It efficiently and rapidly imports user data in real-time from Apache Kafka into ByConity. Its design and implementation are tailored to cloud-native infrastructures while enhancing certain functionalities beyond the community's implementations.
User Guide
Table Creation
Creating a CnchKafka consumer table is similar to creating a Kafka table in the community version. It requires configuring Kafka data sources and consumption parameters through the Setting
parameter. Here's an example:
CREATE TABLE kafka_test.cnch_kafka_consume
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchKafka()
SETTINGS
kafka_broker_list = '10.10.10.10:9092', -- replace with your own broker list
kafka_topic_list = 'my_kafka_test_topic', -- topic name to subscribe
kafka_group_name = 'hansome_boy_consume_group', -- your consumer-group name
kafka_format = 'JSONEachRow', -- always be json
kafka_row_delimiter = '\n', -- always be \n
kafka_num_consumers = 1
(Please refer to the section below for explanations of the Setting
parameters and support for additional parameters.)
Due to the design of Kafka consumption, three tables are required. Therefore, you need to create two additional tables.
First, create a storage table (using CnchMergeTree as an example):
CREATE TABLE kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
ENGINE = CnchMergeTree
PARTITION BY toDate(ts)
ORDER BY ts
Finally, create a materialized view table (only after successfully creating the Kafka table and storage table):
CREATE MATERIALIZED VIEW kafka_test.cnch_kafka_view
TO kafka_test.cnch_store_kafka
(
`i` Int64,
`ts` DateTime
)
AS
SELECT * -- you can add virtual columns here if you need
FROM kafka_test.cnch_kafka_consume
Once you have the necessary permissions for the corresponding topic, consumption will automatically start executing after all three tables are created.
Virtual Column Support
Sometimes, there's a business need to access metadata from Kafka messages (e.g., message partition, offset, etc.). In such cases, you can use the virtual columns feature to meet this requirement. Virtual columns don't need to be specified during table creation; they are inherent properties of the table engine. They can be included in the SELECT statement of the VIEW table and stored in the underlying table (provided the underlying table has the corresponding columns added):
SELECT
_topic, -- String
_partition, -- UInt64
_key, -- String
_offset, -- UInt64
_content, -- String: complete message content
* -- Normal columns can be expanded with *, but virtual columns cannot
FROM kafka_test.cnch_kafka_consume
Setting Description
Table of Parameters
Parameter Name | Type | Required/Default | Description |
---|---|---|---|
kafka_cluster / kafka_broker_list | String | Required | for community version of Kafka, use the kafka_broker_list parameter. |
kafka_topic_list | String | Required | Can be multiple, separated by commas. |
kafka_group_name | String | Required | Consumer group name. |
kafka_format | String | Required | Message format; currently, JSONEachRow is the most commonly used. |
kafka_row_delimiter | String | '\0' | Generally used as '\n'. |
kafka_num_consumers | UInt64 | 1 | Number of consumers, recommended not to exceed the maximum number of partitions in the topic. |
kafka_max_block_size | UInt64 | 65536 | Write block_size, with an upper limit of 1M. |
kafka_max_poll_interval_ms | Milliseconds | 7500 | The maximum time to poll from the broker during each iteration. |
kafka_schema | String | "" | Schema file setup parameter, set in the format of filename + colon + message name. E.g., schema.proto:MyMessage . |
kafka_format_schema_path | String | "" | Remote schema file path (excluding filename) setup parameter, currently only supports hdfs. (If this parameter is not set, it will read from the default path set in the configuration file). |
kafka_protobuf_enable_multiple_message | bool | true | Set to true to indicate that multiple protobuf messages can be read from a single kafka message, separated by their respective lengths. |
kafka_protobuf_default_length_parser | bool | false | Only effective when kafka_protobuf_enable_multiple_message is true: true indicates that the message header has a variable record length; false indicates that a fixed 8 bytes are used as the header record length. |
kafka_extra_librdkafka_config | Json format string | "" | Other parameters supported by rdkafka, typically used for authentication (More params refer to here). |
cnch_vw_write | String | "vw_write" | Configures consumption using Virtual WareHouse, where consumer tasks will be scheduled to execute on the configured Virtual Warehouse nodes. |
kafka_cnch_schedule_mode | String | "random" | Scheduling strategy adopted by ConsumeManager when scheduling consumer tasks. Currently supports: random, hash, and least_consumers; if using independent vw or with more than 10 consumers, least_consumers is recommended. |
Modifying Consumption Parameters
Supports quickly modifying Setting parameters through the ALTER command, primarily used for adjusting consumer count and other parameters to enhance consumption capacity.
Command:
ALTER TABLE <cnch_kafka_name> MODIFY SETTING <name1> = <value1>, <name2> = <value2>
Executing this command will automatically restart the consumption task.
Manual Start/Stop Consumption
In some scenarios, users may need to manually stop consumption and then resume it later; we provide corresponding SYSTEM commands to achieve this:
SYSTEM START/STOP/RESTART CONSUME <cnch_kafka_name>
Note: The START/STOP commands will persist the corresponding state to the Catalog. Therefore, after executing the STOP command, if START is not executed, even if the service is restarted, the consumption task will not resume.
Resetting Offset
Since CnchKafka manages and saves offsets internally, we have implemented SYSTEM commands for users to reset offsets when needed. Specifically, the following three methods are supported:
Reset to Special Positions
- By latest/earliest position
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "offset_value":-1}'
- Possible values for special positions:
enum Offset {
OFFSET_BEGINNING = -2,
OFFSET_END = -1,
OFFSET_STORED = -1000,
OFFSET_INVALID = -1001
};
- Reset by timestamp
SYSTEM RESET CONSUME OFFSET '{"database_name":"XXX", "table_name": "XXX", "timestamp":1646125258000}'
The timestamp value should be within the valid data retention period on the Kafka side and should be in milliseconds.
- Specify offset value
system reset consume offset '{"database_name":"XXX", "table_name": "XXX", "topic_name": "XXX", "offset_values":[{"partition":0, "offset":100}, {"partition":10, "offset":101}]}'
This is less common and used to specify a specific offset value for a particular topic partition.
Operation and Maintenance Manual
Common Consumption Performance Tuning
When there is a persistent lag in consumption, it is usually due to insufficient consumption capacity. By default, CnchKafka creates one consumer with a maximum block size of 65536 for writing in a single consumption. When the consumption capacity is insufficient, it is recommended to adjust the consumer and block-size parameters. Refer to the Modifying Consumption Parameters section above for adjustment methods.
Adjusting max-block-size
- This parameter directly affects consumption memory usage, and a larger value requires more memory. For consumption tables with larger individual data sizes, adjust this parameter carefully to avoid exceeding memory limits (the upper limit is 1M).
- When users have low requirements for data latency and have a large amount of data with sufficient memory, they can simultaneously adjust this parameter and the "kafka_max_poll_interval_ms" parameter to increase the consumption time for each round, making each written part larger, reducing MERGE pressure, and improving query performance.
Adjusting num_consumers
- The upper limit for this parameter is the number of partitions corresponding to the consumed topic.
- When there is no lag in consumption, it is recommended to minimize this parameter (i.e., avoid increasing it unnecessarily) to reduce resource usage, avoid generating too many small parts during consumption, reduce MERGE pressure, and facilitate queries.
System Tables for Troubleshooting
- Consumption Events: cnch_system.cnch_kafka_log
The kafka_log table records basic consumption events. To enable it, configure the kafka_log item in config.xml (required for both server and worker), and restart the service for the changes to take effect.
The kafka_log is written by the consumer task in the Virtual Warehouse and aggregated in real-time into the global cnch_system.cnch_kafka_log table, allowing you to view consumption records for all consumption tables from the server side.
Field Description
Event Table (event_table)
Column Name | Type | Description |
---|---|---|
event_type | Enum8 | See the table below for details |
event_date | Date | The date when the event occurred. It is a partition field and is recommended to be included in every query. |
event_time | DateTime | The time when the event occurred, in seconds |
duration_ms | UInt64 | The duration of the event, in milliseconds |
cnch_database | String | The name of the CnchKafka database |
cnch_table | String | The name of the CnchKafka table |
database | String | The database name of the consumer task (currently the same as cnch_database) |
table | String | The table name of the consumer task (usually suffixed with a timestamp and consumer ID based on cnch_table) |
consumer | String | The consumer ID |
metric | UInt64 | The number of consumed rows |
has_error | UInt8 | 1 indicates an exception; 0 indicates no exception |
exception | String | Exception description |
Event Type Description (event_type)
UInt8 Value | String Value | Description |
---|---|---|
1 | POLL | The metric indicates the number of data items consumed, and duration_ms covers the entire consumption process including WRITE time. |
2 | PARSE_ERROR | The metric represents the number of consumption entries with parsing errors. If multiple entries have errors, only one will be selected for printing. |
3 | WRITE | The metric represents the number of data rows written, and duration_ms is basically equivalent to the data persistence time. |
4 | EXCEPTION | Exceptions during the consumption process. Common examples include: authentication exceptions, data persistence failures, and VIEW SELECT execution failures. |
5 | EMPTY_MESSAGE | The number of empty messages. |
6 | FILTER | Data filtered during the write phase. |
7 | COMMIT | Final transaction commit record. Only this record indicates successful data writing and can be used as a data audit standard. |
Consumption Status Table: system.cnch_kafka_tables
The kafka_tables records the real-time status of CnchKafka tables, which are memory tables by default;
Field Name | Data Type | Description |
---|---|---|
database | String | Database name |
name | String | Kafka table name |
uuid | String | Unique identifier UUID for the kafka table |
kafka_cluster | String | Kafka cluster |
topics | Array(String) | List of consumed topics |
consumer_group | String | Associated consumer group |
num_consumers | UInt32 | The current number of actively executing consumers |
consumer_tables | Array(String) | Data table names corresponding to each consumer |
consumer_hosts | Array(String) | Execution nodes assigned to each consumer |
consumer_partitions | Array(String) | Partitions assigned to each consumer for consumption |
Common Queries for Troubleshooting Consumption Issues
- Checking the real-time status of CnchKafka consumption tables
SELECT * FROM system.cnch_kafka_tables
WHERE database = '<database_name>' AND name = '<cnch_kafka_table>'
- Viewing recent consumption records
SELECT * FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = '<database_name>'
AND cnch_table = '<cnch_kafka_table>'
AND event_time > now() - 600 -- Last 10 minutes
ORDER BY event_time
- Summarizing consumption records for the current day by hour
SELECT
toHour(event_time) as hour,
sumIf(metric, event_type = 'POLL') as poll_rows,
sumIf(metric, event_type = 'PARSE_ERROR') as error_rows,
sumIf(metric, event_type = 'COMMIT') as commit_rows
FROM cnch_system.cnch_kafka_log
WHERE event_date = today()
AND cnch_database = '<database_name>'
AND cnch_table = '<cnch_kafka_table>'
GROUP BY hour
ORDER BY hour