1. DolphinDB Kafka 插件介绍
DolphinDB Kafka 插件支持把 DolphinDB 中生产的数据推送到 Kafka,也支持从 Kafka 订阅数据,并在 DolphinDB 中消费。用户可以在 DolphinDB 中实例化 Producer 对象,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也可以在 DolphinDB 中实例化 Consumer 对象,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。DolphinDB Kafka 插件目前支持以下数据类型的序列化和反序列化:
- DolphinDB 标量
- Kafka Java API 的内置类型:String (UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte [] 以及 ByteBuffer
- 以上数据类型所组成的向量
Kafka 插件目前支持版本:relsease200, release130。本教程基于 Kafka Plugin release200 开发,请使用 DolphinDB 2.00.X 版本 server 进行相关测试。若需要测试其它版本 server,请切换至相应插件分支下载插件包进行测试。
2. 基本使用介绍
2.1 安装 DolphinDB Kafka 插件
用户可根据 DolphinDB server 版本和操作系统下载对应的已经编译好的插件文件,官方下载链接。手动编译安装可以参考官网文档教程:DolphinDB Kafka 插件官方教程。
以 Linux 为例,下载好插件文件后需要添加动态库地址到环境变量中,注意插件安装的路径<PluginDir>
,需要根据实际环境修改,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka
,执行命令如下:
export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/DolphinDB/server/plugins/kafka"
2.2 使用 DolphinDB Kafka Producer
语法
kafka::producer(config)
- config:字典类型,表示 DolphinDB Kafka Producer 的配置。字典的键是一个字符串,值是一个字符串或布尔值。有关 Kafka 配置的更多信息,请参阅 配置参数列表。
该函数调用后,会根据指定配置创建一个 Kafka Producer 实例,并返回句柄。
kafka::produce(producer, topic, key, value, json, [partition])
- producer:Kafka 生产者的句柄
- topic:Kafka 的主题
- key:Kafka 生产者配置字典的键
- value:Kafka 生产者配置字典的值
- json:表示是否以 json 格式传递数据
- partition:可选参数,整数,表示 Kafka 的 broker 分区
该函数调用后会,可以把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。
下面通过例子,展示如何实时同步 DolphinDB 流数据表 KafkaTest
中的增量数据到 Kafka 的 dolphindb-producer-test
Topic 中。
DolphinDB 中创建 Producer 实例
DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本,加载 DolphinDB Kafka 插件:
try{loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")} catch(ex){print(ex)}
注意:
本例中插件的安装路径为/DolphinDB/server/plugins/kafka
,用户需要根据自己实际环境进行修改。
每次启动 DolphinDB 服务后,只需手动加载一次即可。也可以设置为自动加载,参考教程:自动加载插件教程。
DolphinDB GUI 中执行以下脚本,创建 Producer 实例,注意需要根据实际环境配置 metadata.broker.list 参数:
producerCfg = dict(STRING, ANY)
producerCfg["metadata.broker.list"] = "192.193.168.4:9092"
producer = kafka::producer(producerCfg)
模拟测试数据生成
DolphinDB GUI 中执行以下脚本,模拟测试数据生成:
share streamTable(take(1, 86400) as id, 2020.01.01T00:00:00 + 0..86399 as datetime, rand(1..100, 86400) as val) as `kafkaTest
测试数据共有 86400 行,包含三列:id (INT 类型), datetime(DATETIME 类型)和 val(INT 类型),如下表所示
Kafka 创建 Topic : dolphindb-producer-test
使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:
bin/kafka-topics.sh --create --topic dolphindb-producer-test --bootstrap-server 192.193.168.4:9092
控制台输出结果:
Created topic dolphindb-producer-test.
DolphinDB 流数据表中的数据同步至 Kafka
DolphinDB GUI 中执行以下脚本,声明自定义流数据表订阅的处理函数:
def sendMsgToKafkaFunc(producer, msg){
try {
kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)
}
catch(ex) {
writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)
}
}
DolphinDB GUI 中执行以下脚本,订阅 DolphinDB 的流数据表 kafkaTest
,处理函数是 sendMsgToKafkaFunc,将流数据表内的增量数据实时推送到 Kafka 的 dolphindb-producer-test
Topic 中:
subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{produc