唯一键
简介
ByConity Unique表主要用于实现 upsert 功能。该能力是 ByConity 团队自研的独有特性,既能保持高效的查询性能、又支持主键更新。主要解决了开源 ClickHouse 不能支持高效更新操作的痛点,帮助业务更简单地开发实时分析应用。用户通过指定唯一键 UNIQUE KEY 来实现 Upsert 更新写语义,查询自动返回每个唯一键的最新值。
Unique 表主要具有以下特点:
- 用户通过 UNIQUE KEY 配置唯一键,提供 upsert 更新写语义,查询自动返回每个唯一键的最新值。
- 在保证实时更新能力的情况下,依然保持较高的查询性能,几乎无损耗。
- 支持通过删除字段,对行进行删除。
- 支持指定字段进行版本管理,仅保留最新版本。
实时更新的使用场景
业务需要对交易类数据进行实时分析,在同步 OLTP 数据库到 OLAP 数据库的过程中,由于订单数据等需要支持更新能力,因此对于 OLAP 数据库也有支持实时更新和删除的要求。
另一类场景虽然不存在更新,但需要去重。在开发实时数据时,很难保证数据流中没有重复数据,因此通常需要存储系统支持数据的幂等写入。
上述场景都可以通过唯一键 upsert 功能来支持,不管是幂等还是更新的需求。
使用示例
Upsert 使用示例
创建数据库和对应的 Unique 表。
CREATE DATABASE upsertdb;
CREATE TABLE IF NOT EXISTS upsertdb.uniquetable
(
`event_time` DateTime,
`product_id` UInt64,
`city` String,
`category` String,
`amount` UInt32,
`revenue` UInt64
)
ENGINE = CnchMergeTree()
PARTITION BY toDate(event_time)
ORDER BY (city, category)
UNIQUE KEY (product_id, sipHash64(city));
-- UNIQUE KEY 可以包含多个字段和表达式
顺序插入以下字段:
INSERT INTO upsertdb.uniquetable VALUES
('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500),
('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200),
('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100),
('2020-10-29 23:50:00', 10002, 'Shanghai', '男装', 4, 400),
('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200),
('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100);
- 写入相同 key 的数据可以实现更新(upsert 语义),即如果 key 不存在则插入数据,否则更新这条数据。
查询表中数据,已进行了去重:
select * from upsertdb.uniquetable;
┌──────event_time─┬product_id─┬─city──┬─category─┬─amount─┬─revenue─┐
│ 2020-10-29 23:40:00 │ 10001 │ Beijing │ 男装 │ 5 │ 500 │
│ 2020-10-29 23:40:00 │ 10002 │ Beijing │ 男装 │ 2 │ 200 │
│ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │
│ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 1 │ 100 │
│ 2020-10-29 23:50:00 │ 10002 │ Shanghai │ 男装 │ 4 │ 400 │
└─────────────────────┴───────┴──────────┴──────┴─────────┴─────────┘
删除:通过设置虚拟列 _delete_flag_=1
来删除指定的 key。
通过如下语句插入数据,并指定删除标记位:
-- 指定删除字段进行数据删除,删除字段设置非 0 时表示删除,设置为 0 时表示正常的 upsert 操作
INSERT INTO upsertdb.uniquetable (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES
('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 4, 400, 5),
('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 2, 200, 1),
('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 3, 600, 0);
查看删除后的效果:
select * from upsertdb.uniquetable order by toDate(event_time), product_id;
可以看到,查询结果中包含了替换 product_id=10004
的一行数据,并删除了三行 product_id = 10001
或 10002
且城市为'Beijing' 的旧数据。
┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐
│ 2020-10-29 23:50:00 │ 10002 │ Shanghai │ 男装 │ 4 │ 400 │
│ 2020-10-29 23:50:00 │ 10003 │ Beijing │ 男装 │ 2 │ 200 │
│ 2020-10-29 23:50:00 │ 10004 │ Beijing │ 男装 │ 3 │ 600 │
└─────────────────────┴────────────┴──────────┴─────────┴────────┴─────────┘
UPDATE & DELETE 使用示例
在 Unique 表中,可以参考下面语法更新和删除数据。
-- 仅适用于 Unique Table 的语法
UPDATE [db.]table SET a=b WHERE expr;
DELETE FROM [db.]table WHERE expr;
例如,通过如下语句进行更新和删除操作:
-- 更新指定字段
Update test.uniquetable set str = 'updated' WHERE date = '2023-12-18';
-- 删除指定字段
DELETE FROM test.uniquetable WHERE product_id=10001;
分桶使用示例
假设用户启用了六个计算节点,由于单个分区的数据量较大,超过 2 亿条记录,应用程序经常根据 c1
和 c2
字段进行聚合和连接操作。 因此,决定使用桶表进行优化。 桶表的设计选项如下:
分桶键(CLUSTER Key)选择:选择 c1
和 c2
列作为分桶键。
桶(Bucket)数:取节点数的两倍:12。
-- 创建带有分桶的表 create table with bucketing
create or replace table table_01 (c1 timestamp, c2 string, c3 number) cluster by (to_date(c1), c2) INTO 12 BUCKETS;
-- 将桶添加到现有数据中 add bucket to existing data
ALTER TABLE t CLUSTER BY (column, expression,...) INTO 64 BUCKETS
-- 按多列将桶添加到集群中 add bucket to cluster by multiple columns
ALTER TABLE t CLUSTER BY sipHash(a,b,c) INTO 64 BUCKETS
版本管理示例
Unique 表通过 ENGINE = CnchMergeTree(version)
来进行版本管理,其中定义 version
参数后,在插入数据时,仅会保留最新版本的记录。如下所示:
--创建 unique 表
CREATE TABLE UniqTest
(
`key` Int64,
`val` String,
`eventTime` DateTime
)
ENGINE = CnchMergeTree(eventTime)
ORDER BY `key`
PRIMARY KEY `key`
UNIQUE KEY `key`
--分别插入两条记录,用 eventTime 做版本字段区分
insert into UniqTest VALUES (1, 'first', '2020-01-01 01:01:01');
insert into Uniqtest VALUES (1, 'econd', '2020-01-01 00:00:00');
--插入完成后,执行查询,第二条记录即旧版本的数据会被 ignore 掉
select * from UniqTest
┌─key─┬─val───┬─eventTime───────────┐
│ 1 │ first │ 2020-01-01 01:01:01 │
└─────┴───────┴─────────────────────┘
支持的 SQL 语句
SELECT
INSERT
- INSERT VALUES
- INSERT SELECT
- INSERT FORMAT
- INSERT INFILE
- 说明 在并发 INSERT 时的性能情况:
- 对于每个 unique table,insert 是由单线程执行的。
- 并发的 insert 将按顺序执行,因此大批量处理(insert infile)可能会耗时较长。
CREATE TABLE with UNIQUE KEY
- 注意:
此情况下包含以下限制:
- 只有 [U]Int8/16/32/64, Boolean, Date, DateTime, String 这些数据类型可以用作 UNIQUE KEY。
- UNIQUE KEY 不可以与 CLUSTER BY 一起使用(未来会提供支持)。
- 每个 String 类型的 UNIQUE KEY 大小必须 <= 1 MB(此值取决于 max_string_size_for_unique_key),否则 insert 会失败。
DROP TABLE
DELETE TABLE
DELETE FROM [db.]table WHERE expr;
UNDROP TABLE
TRUNCATE TABLE
ALTER TABLE DROP PARTITION
OPTIMIZE TABLE
RENAME TABLE
UPDATE TABLE
UPDATE table_name
SET assignment_list
[WHERE where_condition]
[ORDER BY...]
[LIMIT...]
使用限制
当前默认为分区级别去重,即 unique id 在相同分区下会实现 upsert 能力,在不同分区可能出现重复。
在 Kafka 数据源导入时,用户需要保证相同唯一键的数据写入同一个的 Topic Partition,并禁用 Topic 扩容。
唯一键 Unique Key 最多支持指定 10 个字段。