Home

从kafka导入数据到Clickhouse

从kafka导入数据到Clickhouse

Kafka 是目前应用非常广泛的开源消息中间件,一个常用的的场景就是做数据总线收集各个服务的消息日志,下游各种数据服务订阅消费数据,生成各种报表或数据应用等。Clickhouse 的自带了 Kafka Engine,使得 Clickhouse 和 Kafka 的集成变得非常容易。

创建 Kafka 表

Clickhouse 的 Kafka Engine 可以将 Kafka 中的流映射成一个表,方便我们的后续处理。只要建表的时候制定

Kafka(broker_list, topic_list, group_name, format[, schema])

在前文的所述的3节点 clickhouse 集群上,在每一个节点都建一个 Kafka Engine 的表从 kafka 的events topic读数据。

CREATE TABLE event_stream (ts UInt64, tag String, cnt Int64, val Double) 
ENGINE = Kafka('127.0.0.1:9092', 'events', 'group1', 'JSONEachRow');

写入数据

现在我们试着往 kafka 写一点数据

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic events

>{"ts":1515897449,"tag":"aa","cnt":3,"val":0.7}
>{"ts":1515897450,"tag":"bb","cnt":9,"val":0.28}
>{"ts":1515897451,"tag":"cc","cnt":7,"val":0.93}
>{"ts":1515897452,"tag":"dd","cnt":1,"val":0.78}

然后在每个 clickhose 节点中查看数据。

select * from event_stream;

clickhouse-server-01

┌─────────ts─┬─tag─┬─cnt─┬────────────────val─┐
│ 1515897449 │ aa  │   3 │ 0.7000000000000001 │
└────────────┴─────┴─────┴────────────────────┘
┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897452 │ dd  │   1 │ 0.78 │
└────────────┴─────┴─────┴──────┘

clickhouse-server-02

┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897450 │ bb  │   9 │ 0.28 │
└────────────┴─────┴─────┴──────┘

clickhouse-server-03

┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897451 │ cc  │   7 │ 0.93 │
└────────────┴─────┴─────┴──────┘

注意的是由于一个kafka的partition 只能由一个 group consumer 消费,所以clickhouse 节点数需要大于 topic 的 partition 数。

由于 Kafka 表只是 kafka 流的一个视图而已,当数据被 select 了一次之后,这个数据就会被认为已经消费了,下次 select 就不会再出现。所以Kafka表单独使用是没什么用的,一般是用来和 MaterialView 配合,将Kafka表里面的数据自动导入到 MaterialView 里面。

与 MaterialView 集成

我们现在每一节点建一个 MaterialView 保存 Kafka 里面的数据, 再顺手建一个全局的Distributed表。

CREATE MATERIALIZED VIEW events ENGINE = MergeTree(day, (day,ts, tag, cnt, val), 8192) AS
SELECT toDate(toDateTime(ts)) AS day, ts, tag, cnt, val FROM event_stream;

CREATE TABLE events_all AS events
ENGINE = Distributed(perftest_3shards_1replicas, default, events, rand());

再往Kafka里面写些数据,就能在各个节点的 events 或 events_all 里面查出来了。

总结

clichouse 和 Kafka的配合可以说是十分的便利,只有配置好,clickhouse 从 kafka 读数据和写入都是如此的方便。不过还是有相当的局限性,因为目前对 kafka 数据格式的支持还是有限。如果能通过插件之类的扩展方式自定义format就好了。另外,clickhouse 是否保证数据的一致性,不重复不丢?详细的情况还需要进一步探究。


参考资料

https://clickhouse.yandex/tutorial.html
https://clickhouse.yandex/docs/en/table_engines/kafka.html