Kafka 是目前应用非常广泛的开源消息中间件,一个常用的的场景就是做数据总线收集各个服务的消息日志,下游各种数据服务订阅消费数据,生成各种报表或数据应用等。Clickhouse 的自带了 Kafka Engine,使得 Clickhouse 和 Kafka 的集成变得非常容易。
Clickhouse 的 Kafka Engine 可以将 Kafka 中的流映射成一个表,方便我们的后续处理。只要建表的时候制定
Kafka(broker_list, topic_list, group_name, format[, schema])
broker_list
: 逗号分隔的 Kafka broker 列表topic_list
: 消费的topicgroup_name
: consumer group 的id, 同一个 group_name 的 clickhouse 会在同一个 consumer group 消费数据format
: kafka 消息的格式在前文的所述的3节点 clickhouse 集群上,在每一个节点都建一个 Kafka Engine 的表从 kafka 的events topic读数据。
CREATE TABLE event_stream (ts UInt64, tag String, cnt Int64, val Double)
ENGINE = Kafka('127.0.0.1:9092', 'events', 'group1', 'JSONEachRow');
现在我们试着往 kafka 写一点数据
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic events
>{"ts":1515897449,"tag":"aa","cnt":3,"val":0.7}
>{"ts":1515897450,"tag":"bb","cnt":9,"val":0.28}
>{"ts":1515897451,"tag":"cc","cnt":7,"val":0.93}
>{"ts":1515897452,"tag":"dd","cnt":1,"val":0.78}
然后在每个 clickhose 节点中查看数据。
select * from event_stream;
clickhouse-server-01
┌─────────ts─┬─tag─┬─cnt─┬────────────────val─┐
│ 1515897449 │ aa │ 3 │ 0.7000000000000001 │
└────────────┴─────┴─────┴────────────────────┘
┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897452 │ dd │ 1 │ 0.78 │
└────────────┴─────┴─────┴──────┘
clickhouse-server-02
┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897450 │ bb │ 9 │ 0.28 │
└────────────┴─────┴─────┴──────┘
clickhouse-server-03
┌─────────ts─┬─tag─┬─cnt─┬──val─┐
│ 1515897451 │ cc │ 7 │ 0.93 │
└────────────┴─────┴─────┴──────┘
注意的是由于一个kafka的partition 只能由一个 group consumer 消费,所以clickhouse 节点数需要大于 topic 的 partition 数。
由于 Kafka 表只是 kafka 流的一个视图而已,当数据被 select 了一次之后,这个数据就会被认为已经消费了,下次 select 就不会再出现。所以Kafka表单独使用是没什么用的,一般是用来和 MaterialView 配合,将Kafka表里面的数据自动导入到 MaterialView 里面。
我们现在每一节点建一个 MaterialView 保存 Kafka 里面的数据, 再顺手建一个全局的Distributed表。
CREATE MATERIALIZED VIEW events ENGINE = MergeTree(day, (day,ts, tag, cnt, val), 8192) AS
SELECT toDate(toDateTime(ts)) AS day, ts, tag, cnt, val FROM event_stream;
CREATE TABLE events_all AS events
ENGINE = Distributed(perftest_3shards_1replicas, default, events, rand());
再往Kafka里面写些数据,就能在各个节点的 events 或 events_all 里面查出来了。
clichouse 和 Kafka的配合可以说是十分的便利,只有配置好,clickhouse 从 kafka 读数据和写入都是如此的方便。不过还是有相当的局限性,因为目前对 kafka 数据格式的支持还是有限。如果能通过插件之类的扩展方式自定义format就好了。另外,clickhouse 是否保证数据的一致性,不重复不丢?详细的情况还需要进一步探究。
参考资料
https://clickhouse.yandex/tutorial.html
https://clickhouse.yandex/docs/en/table_engines/kafka.html