Kafka 插件并创建 Kafka Producer 发送

news2025/8/14 2:36:20

相关说明

  • 启动测试前清空所有数据。
  • 每次测试先把所有数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB 中。目的是将同步数据的压力全部集中到 Kafka 插件。
  • 以 Kafka 插件从收到第一批数据到收到最后一批数据的时间差作为同步数据的总耗时。

测试流程

  • 加载 Kafka 插件并创建 Kafka Producer 发送数据到 Kafka 中(以发送 100 万条数据为例)

DolphinDB GUI 连接 DolphinDB 执行以下脚本,本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改:

// 加载插件
try{
	loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
	loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
} catch(ex){print(ex)}

// 创建 Producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
producer = kafka::producer(producerCfg);
kafka::producerFlush(producer);

//向kafka传100万数据
tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )


// 创建 Consume
consumerCfg = dict(STRING, ANY)

consumerCfg["group.id"] = "test10"
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";

for(i in 1..1000000) {
	aclMsg = select *, string(now()) as pluginSendTime from tbl;
	for(i in aclMsg) {
		kafka::produce(producer, "test3", "1", i, true);
	}
}

consumer = kafka::consumer(consumerCfg)
topics=["test10"];
kafka::subscribe(consumer, topics);

for(i in 1..1000000) {
	aclMsg = select *, string(now()) as pluginSendTime from tbl;
	for(i in aclMsg) {
		kafka::produce(producer, "test10", "1", i, true);
	}
}
  • 订阅 Kafka 中数据进行消费
// 创建存储解析完数据的表
colDefName = ["deviceId","timestamps","deviceType","value", "pluginSendTime", "pluginReceived"]

colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP]
dest = table(1:0, colDefName, colDefType);
share dest as `streamZd

// 解析函数
def temporalHandle(mutable dictVar, mutable dest){
	try{
		t = dictVar
		t.replaceColumn!(`timestamps, temporalParse(dictVar[`timestamps],"yyyy-MM-dd HH:mm:ss.SSS"))
		t.replaceColumn!(`pluginSendTime, timestamp(dictVar[`pluginSendTime]))
		t.update!(`received, now());
		dest.append!(t);
	}catch(ex){
		print("kafka errors : " + ex)
	}
}

// 创建 decoder
name = colDefName[0:5];
type = colDefType[0:5];
type[1] = STRING;
type[4] = STRING;
decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle{, dest}, 15, 100000, 0.5)

// 创建subjob函数
kafka::createSubJob(consumer, , decoder, `DecoderKafka)

此时我们观察共享流表的数据量,当达到 100 万条时说明消费完成,测试结束。

5. Kerberos 认证

5.1 什么是 Kerberos ?

Kerberos 是一种基于加密 Ticket 的身份认证协议,支持双向认证且性能较高。主要有三个组成部分:Kdc, Client 和 Service。

生产环境的 Kafka 一般需要开启 Kerberos 认证,为 Kafka 提供权限管理,提高安全性。

5.2 前置条件

  • Java 8+
  • kerberos:包括 Kdc 和 Client
  • keytab 证书

5.3 认证相关配置说明

环境相关配置说明

以下是 Kerberos 认证涉及的关键配置信息,具体配置文件的路径根据实际情况调整

  1. 安装 kdc
yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

2. 配置 /etc/krb5.conf

[realms]
 HADOOP.COM = {
  kdc = cnserver9:88
  admin_server = cnserver9:749
  default_domain = HADOOP.COM
 }

3. 配置 /var/kerberos/krb5kdc/kadm5.acl

*/admin@HADOOP.COM	*

4. 创建生成 kdc 数据库文件

sudo kdb5_util create -r HADOOP.COM –s

5. 启动 kerberos 服务

sudo systemctl start krb5kdc
sudo systemctl status krb5kdc

6. 安装 kerberos 客户端

yum install -y krb5-devel krb5-workstation krb5-client

7. 启动 kerberos 客户端

sudo kadmin.local -q "addprinc hadoop/admin"

DolphinDB Kafka Plugin 配置说明

  • 关键配置参数说明
    • security.protocol:指定通信协议
    • sasl.kerberos.service.name:指定 service 名称
    • sasl.mechanism:SASL 机制,包括 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
    • sasl.kerberos.keytab:keytab 文件的路径
    • sasl.kerberos.principal:指定 principal

  • 具体代码实现
// 加载插件
try{loadPlugin("/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt")} catch(ex){print(ex)}

// 生产者配置
producerCfg = dict(STRING, ANY);
producerCfg["bootstrap.servers"] = "cnserver9:9992";
producerCfg["security.protocol"] = "SASL_PLAINTEXT";
producerCfg["sasl.kerberos.service.name"] = "kafka";
producerCfg["sasl.mechanism"] = "GSSAPI";
producerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
producerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
producer = kafka::producer(producerCfg);

// 消费者配置
consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = "test"
consumerCfg["bootstrap.servers"] = "cnserver9:9992";
consumerCfg["security.protocol"] = "SASL_PLAINTEXT";
consumerCfg["sasl.kerberos.service.name"] = "kafka";
consumerCfg["sasl.mechanism"] = "GSSAPI";
consumerCfg["sasl.kerberos.keytab"] = "/home/test/hadoop.keytab";
consumerCfg["sasl.kerberos.principal"] = "kafka/cnserver9@HADOOP.COM";
consumer = kafka::consumer(consumerCfg)
注意:适配 Kerberos 认证只需修改 Kafka 插件有关生产者和消费者的配置即可,其余脚本无需改动。

6. 其他说明

本教程展示了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支持请参考官网文档:DolphinDB Kafka 插件官方教程

使用过程中如果遇到任何问题,欢迎大家在项目仓库反馈

  • Github 仓库:DolphinDB Kafka Plugin
  • Gitee 仓库:DolphinDB Kafka Plugin

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/14942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

公网远程连接内网Everything实现快速搜索私有云文件

企业外派出差,已经是稀松平常的事,通常出差的同事都会带一个优盘或移动硬盘。但优盘和硬盘是离线设备,所存储的文件数据无法及时更新,因此能够连接公司主机获得最新文件才是出差的首选。可是公司主机位于内网,且面对浩…

【搭建NextCloud私有云盘服务】采用docker在linux上进行部署,内含nextCloud移植(迁移服务器)方法

1、前言 完成的效果: 在linux上搭建NextCloud云盘服务,可以通过域名访问到云盘服务,并且安装有SSL证书,可进行https访问。 例如: 服务器公网ip为47.110.66.88 域名为:test.huahua.com 可直接通过访问https…

Java#10(String 类的构造方法和练习)

目录 一.String类的构造方法 1.public String()空参构造 2.public String(char[ ] ch2);(对堆区已有的值没有办法复用,数据多会浪费内存空间,而直接赋值如果已有相同数据可以复用,不会在浪费太多内存) 3.public String(byte[ ] bytes) 二.字符串的比较 1.前提基础: 比较…

解决使用svg绘制后下载图片以及下载svg内部嵌套image图片失败的问题。

在使用svg进行图形绘制之后&#xff0c;可能需要下载已经绘制的svg图片&#xff0c;我们可能会遇到以下两种情况&#xff1a; 情况1&#xff1a; <svg width"640" height"400" xmlns"http://www.w3.org/2000/svg" id"svgColumn">…

解决问题 - 错误:不支持发行版本 5

文章目录一、提出问题二、解决问题&#xff08;一&#xff09;设置项目SDK与语言等级&#xff08;二&#xff09;设置模块语言等级&#xff08;三&#xff09;设置Java编译器等级&#xff08;四&#xff09;运行程序&#xff0c;测试问题是否已解决一、提出问题 基于JDK11创建…

阿里Redis最全面试全攻略,读完这个就可以和阿里面试官好好聊聊

简述Redis常用的数据结构及其如何实现的&#xff1f; Redis支持的常用5种数据类型指的是value类型&#xff0c;分别为&#xff1a;字符串String、列表List、哈希Hash、集合Set、有序集合Zset&#xff0c;但是Redis后续又丰富了几种数据类型分别是Bitmaps、HyperLogLogs、GEO。…

年搜索量超 7 亿次背后:这款 APP 用火山引擎 DataTester 完成“数据驱动”

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 双十一刚过&#xff0c;双十二在即&#xff0c;随着线上营销玩法的层出不穷&#xff0c;各平台之间的价格逐渐“内卷”。消费者对跨平台比价的需求越来越强烈&#…

【实时语音转文本】PC端实时语音转文本(麦克风外音系统内部音源)

语音转文字这个功能可以应用在视频动态字幕&#xff0c;语音快速输入&#xff0c;实时记录通话内容&#xff0c;高级应用可以在人工智能&#xff0c;语音识别&#xff0c;智能助手方面&#xff0c;还需要一点机器学习可以做出一些好玩的东西&#xff0c;比如PC端AI助理&#xf…

给开源项目做一个漂亮简洁的版本迭代更新图,生成固定链接复制到介绍中、公众号菜单链接中、博客中和网页中等

背景 开源项目的版本迭代与更新经常需要更新迭代文档&#xff0c;但是readme.md没有比较美观一点的效果&#xff0c;所以文本分享一种第三方的方式&#xff1a;用TexSpire的免费在线文档分享功能&#xff0c;手机、PC、Pad都可以适配。 效果预览 使用 视频教程 第一步&…

重磅 | 思特威获得ISO 26262:2018汽车功能安全ASIL D流程认证证书

确保安全是汽车制造商和系统供应商的责任。为了从芯片IP级开始解决功能安全问题&#xff0c;国际标准化组织&#xff08;ISO&#xff09;在2018年追加了汽车半导体的功能安全评估指南。 彼时&#xff0c;新车搭载的芯片数量、种类以及软件代码行数开始呈现倍数增长。按照ISO 2…

重磅!华秋电子再次入选“中国产业数字化百强榜”

11月16日&#xff0c;由江苏省商务厅、南京市人民政府指导&#xff0c;南京市商务局主办的江苏电子商务大会暨第九届中国产业数字化年会在南京开幕。 据了解&#xff0c;会议上公开发布了“2022中国产业数字化百强榜”&#xff0c;这也是托比网自2015年以来发布的第13个榜单。榜…

ES6解构赋值及ES6的一些简写介绍

1、ES6解构赋值&#xff1a; ● 解构赋值&#xff0c;就是快速地从对象或者数组中取出成员的一个语法方式 (1) 解构数组&#xff1a; ● 快速从数组中获取成员 <script>//ES5的方式从数组中获取成员var arr [Jack,Rose,Tom]var a arr[0] //Jackvar b arr[1] //Ro…

实验六 数组(山东建筑大学)

第1关:实验6.1 任务描述 输入3个整数,按由大到小的顺序输出。 输入样例 1 1 2 3 输出样例 1 3 2 1 开始你的任务吧,祝你成功! 第2关:实验6.2 任务描述 输入10个整数,将其中最小的数与第一个数对换,把最大的数与最后一个数对换。 输入样例 1 2 1 3 4 5 6 7 8 10 9 输…

感性认识一下Linux的进程地址空间和写时拷贝技术

虽然本篇文章对操作系统的理解不怎么深入&#xff0c;或者说仅仅是一些皮毛知识(也可能皮毛也算不上)&#xff0c;但还是需要读者有一些Linux的基础理解&#xff0c;如何确定是否有这些基础呢&#xff1f;可以参考我的这一篇博客&#xff1a;Linux —— 进程概念超详解! 1.“奇…

LeetCode 318 周赛

2460. 对数组执行操作 给你一个下标从 0 开始的数组 nums &#xff0c;数组大小为 n &#xff0c;且由 非负 整数组成。 你需要对数组执行 n - 1 步操作&#xff0c;其中第 i 步操作&#xff08;从 0 开始计数&#xff09;要求对 nums 中第 i 个元素执行下述指令&#xff1a;…

阿里 P8 架构师力荐 java 程序员人手一套 116 页 JVM 吊打面试官专属秘籍

只要是 java 程序员&#xff0c;肯定对于 JVM 来说并不陌生&#xff0c;甚至是从熟悉到陌生&#xff0c;为什么这样说呢&#xff1f;因为你看似熟悉的东西&#xff0c;其实对于源码层级了解得少之又少&#xff0c;到头来只有一种陌生的感觉&#xff0c;使用了吗&#xff1f;使用…

技术分享 | 多测试环境的动态伸缩实践

本文将从敏捷研发团队的环境需求与痛点出发&#xff0c;分享如何基于云构建可弹性伸缩的自动化生成式多测试环境&#xff1b;更在经济效益层面&#xff0c;提供了多种成本优化方案&#xff0c;以满足研发团队低成本、高效益的多测试环境运行目标。 一、当前遇到的环境问题 初…

论文阅读笔记 | 三维目标检测——AVOD算法

如有错误&#xff0c;恳请指出。 文章目录1. 背景2. 网络结构3. 实验结果paper&#xff1a;《Joint 3D Proposal Generation and Object Detection from View Aggregation》 1. 背景 AVOD同样是一个two-stage(使用了RPN提取候选框)、anchor-based网络结构。获得较高的召回率对…

【WPF】DiffPlex 文本比对工具

【WPF】DiffPlex 文本比对工具背景关于 DiffPlex准备代码实现效果图源码下载地址背景 现行的文本编辑器大多都具备文本查询的能力&#xff0c;但是并不能直观的告诉用户两段文字的细微差异&#xff0c;所以对比工具在某种情况下&#xff0c;就起到了很便捷的效率。 关于 DiffPl…

D. Extreme Subtraction(差分)

Problem - 1443D - Codeforces 给你一个由n个正整数组成的数组a。 你可以随意使用下面的操作&#xff1a;选择任何一个1≤k≤n的整数&#xff0c;做两件事中的一件。 将数组中的前k个元素递减1。 将数组的最后k个元素递减1。 例如&#xff0c;如果n5&#xff0c;a[3,2,2,1,4]…