跳到主要内容
版本:0.4.x

唯一键

简介

ByConity Unique表主要用于实现 upsert 功能。该能力是 ByConity 团队自研的独有特性,既能保持高效的查询性能、又支持主键更新。主要解决了开源 ClickHouse 不能支持高效更新操作的痛点,帮助业务更简单地开发实时分析应用。用户通过指定唯一键 UNIQUE KEY 来实现 Upsert 更新写语义,查询自动返回每个唯一键的最新值。

Unique 表主要具有以下特点:

  • 用户通过 UNIQUE KEY 配置唯一键,提供 upsert 更新写语义,查询自动返回每个唯一键的最新值。
  • 在保证实时更新能力的情况下,依然保持较高的查询性能,几乎无损耗。
  • 支持通过删除字段,对行进行删除。
  • 支持指定字段进行版本管理,仅保留最新版本。

实时更新的使用场景

业务需要对交易类数据进行实时分析,在同步 OLTP 数据库到 OLAP 数据库的过程中,由于订单数据等需要支持更新能力,因此对于 OLAP 数据库也有支持实时更新和删除的要求。

另一类场景虽然不存在更新,但需要去重。在开发实时数据时,很难保证数据流中没有重复数据,因此通常需要存储系统支持数据的幂等写入。

上述场景都可以通过唯一键 upsert 功能来支持,不管是幂等还是更新的需求。

使用示例

Upsert 使用示例

创建数据库和对应的 Unique 表。

CREATE DATABASE upsertdb;
CREATE TABLE IF NOT EXISTS upsertdb.uniquetable
(
`event_time` DateTime,
`product_id` UInt64,
`city` String,
`category` String,
`amount` UInt32,
`revenue` UInt64
)
ENGINE = CnchMergeTree()
PARTITION BY toDate(event_time)
ORDER BY (city, category)
UNIQUE KEY (product_id, sipHash64(city));
-- UNIQUE KEY 可以包含多个字段和表达式

顺序插入以下字段:

INSERT INTO upsertdb.uniquetable VALUES
('2020-10-29 23:40:00', 10001, 'Beijing', '男装', 5, 500),
('2020-10-29 23:40:00', 10002, 'Beijing', '男装', 2, 200),
('2020-10-29 23:40:00', 10003, 'Beijing', '男装', 1, 100),
('2020-10-29 23:50:00', 10002, 'Shanghai', '男装', 4, 400),
('2020-10-29 23:50:00', 10003, 'Beijing', '男装', 2, 200),
('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 1, 100);
  • 写入相同 key 的数据可以实现更新(upsert 语义),即如果 key 不存在则插入数据,否则更新这条数据。

查询表中数据,已进行了去重:

select * from upsertdb.uniquetable;
┌──────event_time─┬product_id─┬─city──┬─category─┬─amount─┬─revenue─┐
2020-10-29 23:40:0010001 │ Beijing │ 男装 │ 5500
2020-10-29 23:40:0010002 │ Beijing │ 男装 │ 2200
2020-10-29 23:50:0010003 │ Beijing │ 男装 │ 2200
2020-10-29 23:50:0010004 │ Beijing │ 男装 │ 1100
2020-10-29 23:50:0010002 │ Shanghai │ 男装 │ 4400
└─────────────────────┴───────┴──────────┴──────┴─────────┴─────────┘

删除:通过设置虚拟列 _delete_flag_=1 来删除指定的 key。

通过如下语句插入数据,并指定删除标记位:

-- 指定删除字段进行数据删除,删除字段设置非 0 时表示删除,设置为 0 时表示正常的 upsert 操作
INSERT INTO upsertdb.uniquetable (event_time, product_id, city, category, amount, revenue, _delete_flag_) VALUES
('2020-10-29 23:50:00', 10001, 'Beijing', '男装', 4, 400, 5),
('2020-10-29 23:50:00', 10002, 'Beijing', '男装', 2, 200, 1),
('2020-10-29 23:50:00', 10004, 'Beijing', '男装', 3, 600, 0);

查看删除后的效果:

select * from upsertdb.uniquetable order by toDate(event_time), product_id;

可以看到,查询结果中包含了替换 product_id=10004 的一行数据,并删除了三行 product_id = 1000110002 且城市为'Beijing' 的旧数据。

┌──────────event_time─┬─product_id─┬─city────┬─category─┬─amount─┬─revenue─┐
2020-10-29 23:50:0010002 │ Shanghai │ 男装 │ 4400
2020-10-29 23:50:0010003 │ Beijing │ 男装 │ 2200
2020-10-29 23:50:0010004 │ Beijing │ 男装 │ 3600
└─────────────────────┴────────────┴──────────┴─────────┴────────┴─────────┘

UPDATE & DELETE 使用示例

在 Unique 表中,可以参考下面语法更新和删除数据。

-- 仅适用于 Unique Table 的语法
UPDATE [db.]table SET a=b WHERE expr;
DELETE FROM [db.]table WHERE expr;

例如,通过如下语句进行更新和删除操作:

-- 更新指定字段
Update test.uniquetable set str = 'updated' WHERE date = '2023-12-18';

-- 删除指定字段
DELETE FROM test.uniquetable WHERE product_id=10001;

分桶使用示例

假设用户启用了六个计算节点,由于单个分区的数据量较大,超过 2 亿条记录,应用程序经常根据 c1c2 字段进行聚合和连接操作。 因此,决定使用桶表进行优化。 桶表的设计选项如下:

分桶键(CLUSTER Key)选择:选择 c1c2 列作为分桶键。

桶(Bucket)数:取节点数的两倍:12。

-- 创建带有分桶的表 create table with bucketing
create or replace table table_01 (c1 timestamp, c2 string, c3 number) cluster by (to_date(c1), c2) INTO 12 BUCKETS;

-- 将桶添加到现有数据中 add bucket to existing data
ALTER TABLE t CLUSTER BY (column, expression,...) INTO 64 BUCKETS

-- 按多列将桶添加到集群中 add bucket to cluster by multiple columns
ALTER TABLE t CLUSTER BY sipHash(a,b,c) INTO 64 BUCKETS

版本管理示例

Unique 表通过 ENGINE = CnchMergeTree(version) 来进行版本管理,其中定义 version 参数后,在插入数据时,仅会保留最新版本的记录。如下所示:

--创建 unique 表
CREATE TABLE UniqTest
(
`key` Int64,
`val` String,
`eventTime` DateTime
)
ENGINE = CnchMergeTree(eventTime)
ORDER BY `key`
PRIMARY KEY `key`
UNIQUE KEY `key`

--分别插入两条记录,用 eventTime 做版本字段区分
insert into UniqTest VALUES (1, 'first', '2020-01-01 01:01:01');
insert into Uniqtest VALUES (1, 'econd', '2020-01-01 00:00:00');

--插入完成后,执行查询,第二条记录即旧版本的数据会被 ignore 掉
select * from UniqTest
┌─key─┬─val───┬─eventTime───────────┐
1first2020-01-01 01:01:01
└─────┴───────┴─────────────────────┘

支持的 SQL 语句

SELECT

INSERT

  • INSERT VALUES
  • INSERT SELECT
  • INSERT FORMAT
  • INSERT INFILE
  • 说明 在并发 INSERT 时的性能情况:
  • 对于每个 unique table,insert 是由单线程执行的。
  • 并发的 insert 将按顺序执行,因此大批量处理(insert infile)可能会耗时较长。

CREATE TABLE with UNIQUE KEY

  • 注意:

此情况下包含以下限制:

  • 只有 [U]Int8/16/32/64, Boolean, Date, DateTime, String 这些数据类型可以用作 UNIQUE KEY。
  • UNIQUE KEY 不可以与 CLUSTER BY 一起使用(未来会提供支持)。
  • 每个 String 类型的 UNIQUE KEY 大小必须 <= 1 MB(此值取决于 max_string_size_for_unique_key),否则 insert 会失败。

DROP TABLE

DELETE TABLE

DELETE FROM [db.]table WHERE expr;

UNDROP TABLE

TRUNCATE TABLE

ALTER TABLE DROP PARTITION

OPTIMIZE TABLE

RENAME TABLE

UPDATE TABLE

 UPDATE table_name
SET assignment_list
[WHERE where_condition]
[ORDER BY...]
[LIMIT...]

使用限制

当前默认为分区级别去重,即 unique id 在相同分区下会实现 upsert 能力,在不同分区可能出现重复。

在 Kafka 数据源导入时,用户需要保证相同唯一键的数据写入同一个的 Topic Partition,并禁用 Topic 扩容。

唯一键 Unique Key 最多支持指定 10 个字段。