Strimzi Kafka Bridge(桥接)实战之二:生产和发送消息

news2025/7/13 21:21:25

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

本篇概览

  • 本文是《Strimzi Kafka Bridge(桥接)实战之》系列的第二篇,咱们直奔bridge的重点:常用接口,用实际操作体验如何用bridge完成常用的消息收发业务

  • 官方的openapi接口文档地址 : https://strimzi.io/docs/bridge/in-development/#_openapi

  • 整篇文章由以下内容构成:

  1. 准备工作:创建topic
  2. 生产消息
  3. 消费消息,strimzi bridge消费消息的逻辑略有些特殊,就是要提前创建strimzi bridge consumer,再通过consumer来调用拉取消息的接口
  • 完成本篇实战后,相信您已经可以数量的通过http来使用kafka的服务了

准备工作:创建topic

  • 遗憾的是,bridge未提供创建topic的API,所以咱们还是用命令来创建吧
  • ssh登录kubernetes的宿主机
  • 执行创建名为bridge-quickstart-topic的topic,共四个分区
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--create \
--topic bridge-quickstart-topic \
--partitions 4 \
--replication-factor 1
  • 检查topic创建是否成功
kubectl -n aabbcc \
run kafka-producer \
-ti \
--image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 \
--rm=true \
--restart=Never \
-- bin/kafka-topics.sh \
--bootstrap-server my-cluster-kafka-bootstrap:9092 \
--describe \
--topic bridge-quickstart-topic
  • 如下图,可见topic的创建符合预期
    在这里插入图片描述
  • 接下来的操作都是向bridge发送http请求完成的,我这边宿主机的IP地址是192.168.0.1,bridge的NodePort端口号31331

查看指定topic的详情

  • 如下请求,可以取得topicbridge-quickstart-topic的详情
curl -X GET \
  http://192.168.0.1:31331/topics/bridge-quickstart-topic
  • 收到响应如下,是这个topic的详细信息
{
	"name": "bridge-quickstart-topic",
	"configs": {
		"compression.type": "producer",
		"leader.replication.throttled.replicas": "",
		"message.downconversion.enable": "true",
		"min.insync.replicas": "1",
		"segment.jitter.ms": "0",
		"cleanup.policy": "delete",
		"flush.ms": "9223372036854775807",
		"follower.replication.throttled.replicas": "",
		"segment.bytes": "1073741824",
		"retention.ms": "604800000",
		"flush.messages": "9223372036854775807",
		"message.format.version": "3.0-IV1",
		"max.compaction.lag.ms": "9223372036854775807",
		"file.delete.delay.ms": "60000",
		"max.message.bytes": "1048588",
		"min.compaction.lag.ms": "0",
		"message.timestamp.type": "CreateTime",
		"preallocate": "false",
		"min.cleanable.dirty.ratio": "0.5",
		"index.interval.bytes": "4096",
		"unclean.leader.election.enable": "false",
		"retention.bytes": "-1",
		"delete.retention.ms": "86400000",
		"segment.ms": "604800000",
		"message.timestamp.difference.max.ms": "9223372036854775807",
		"segment.index.bytes": "10485760"
	},
	"partitions": [
		{
			"partition": 0,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 1,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 2,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		},
		{
			"partition": 3,
			"leader": 0,
			"replicas": [
				{
					"broker": 0,
					"leader": true,
					"in_sync": true
				}
			]
		}
	]
}

批量生产消息(同步)

  • 试试bridge提供的批量生产消息的API,以下命令会生产了三条消息,第一条通过key的hash值确定分区,第二条用partition参数明确指定了分区是2,第三条的分区是按照轮询策略更新的
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • bridge响应如下,会返回每一条消息的partition和offset,这就是同步消息的特点,等到meta信息更新完毕后才会返回
{
	"offsets": [{
		"partition": 0,
		"offset": 0
	}, {
		"partition": 2,
		"offset": 0
	}, {
		"partition": 3,
		"offset": 0
	}]
}

批量生产消息(异步)

  • 有的场景下,例如追求高QPS并且对返回的meta信息不关注,可以考虑异步的方式发送消息,也就是说bridge收到响应后立即返回200,这种异步模式和前面的同步模式只有一个参数的差别:在请求url中增加async=true即可
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic?async=true \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "key": "my-key",
            "value": "sales-lead-0001"
        },
        {
            "value": "sales-lead-0002",
            "partition": 2
        },
        {
            "value": "sales-lead-0003"
        }
    ]
}'
  • 没有响应body,请您自行请求感受一下,响应明显比同步模式快

查看partition

  • 查看tipic的parition情况
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions
  • 响应
[{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 1,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 2,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}, {
	"partition": 3,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}]
  • 查看指定partition
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0
  • 响应
{
	"partition": 0,
	"leader": 0,
	"replicas": [{
		"broker": 0,
		"leader": true,
		"in_sync": true
	}]
}
  • 查看指定partition的offset情况
curl -X GET \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic/partitions/0/offsets
  • 响应
{
	"beginning_offset": 0,
	"end_offset": 5
}

创建bridge consumer

  • 通过bridge消费消息,有个特别且重要的前提:创建bridge consumer,只有先创建了bridge consumer,才能顺利从kafka的broker取到消息
  • 以下命令创建了一个bridge consumer,各参数的含义稍后会说明
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "name": "bridge-quickstart-consumer",
    "auto.offset.reset": "earliest",
    "format": "json",
    "enable.auto.commit": false,
    "fetch.min.bytes": 16,
    "consumer.request.timeout.ms": 300000
  }'
  • 上述请求的参数解释:
  1. 对应kafka的group为bridge-quickstart-consumer-group
  2. 此bridge consumer的name等于bridge-quickstart-consumer
  3. 参数enable.auto.commit表示是否自动提交offset,这里设置成false,表示无需自动提交,后面的操作中会调用API请求来更新offset
  4. 参数fetch.min.bytes要特别注意,其值等于16,表示唯有消息内容攒够了16字节,拉取消息的请求才能获取到消息,如果消息内容长度不到16字节,收到的响应body就是空
  5. 参数consumer.request.timeout.ms也要注意,这里我设置了300秒,如果超过300秒没有去拉取消息,这个消费者就会被kafka移除(被移除后如果再去拉取消息,kafka会报错:Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the grou)
  • 收到响应如下,instance_id表示这个bridge consumer的身份id,base_uri则是订阅消息时必须使用的请求地址
{
	"instance_id": "bridge-quickstart-consumer",
	"base_uri": "http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer"
}

如何删除bridge consumer

  • 以下命令可以删除consumer,重点是将身份id放入path中
curl -X DELETE http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer

订阅指定topic的消息

  • 创建bridge consumer成功后,接下来就能以这个consumer的身份去订阅kafka消息了
  • 执行以下命令可以订阅topic为bridge-quickstart-topic的kafka消息,注意请求地址就是前面创建bridge consumer时返回的base_uri字段
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/subscription \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "topics": [
        "bridge-quickstart-topic"
    ]
}'
  • 从上述请求body可以看出,此请求可以一次订阅多个topic,而且还可以使用topic_pattern(正则表达式)的形式来一次订阅多个topic
  • 订阅完成后,接下来就能主动拉取消息了

拉取消息

  • 在拉取消息之前,请确保已经提前生产了消息
  • 执行以下命令拉取一条消息
curl -X GET http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/records \
  -H 'accept: application/vnd.kafka.json.v2+json'
  • 然而,当您执行了上述命令后,会发现返回body为空,别担心,这是正常的现象,按照官方的说法,拉取到的第一条消息就是空的,这是因为拉取操作出触发了rebalancing逻辑(rebalancing是kafka的概览,是处理多个partition消费的操作),再次执行上述命令去拉取消息,这下正常了,body如下
[
	{
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 0
	}, {
		"topic": "bridge-quickstart-topic",
		"key": "my-key",
		"value": "sales-lead-0001",
		"partition": 0,
		"offset": 1
	}
]

提交offset

  • 前面在创建bridge consumer的时候,参数enable.auto.commit的值等于fasle,表示由调用方主动提交offset到kafka,因此在拉取到消息之后,需要手动更新kafka consumer的offset
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/offsets
  • 该请求无返回body,只要返回码是204就表示成功

设定offset

  • 试想这样的场景:共生产了100条消息,消费者也已经将这100条全部消费完毕,现在由于某种原因,需要从91条开始,重新消费91-100这10条消息(例如需要重新计算),此时可以主动设定offset
  • 先执行以下命令,生产一条消息
curl -X POST \
  http://42.193.162.141:31331/topics/bridge-quickstart-topic \
  -H 'content-type: application/vnd.kafka.json.v2+json' \
  -d '{
    "records": [
        {
            "value": "sales-lead-a002-01234567890123456789",
            "partition": 2
        }
    ]
}'
  • 如下图红色箭头,可见当前partition已经生产了75条消息了
    在这里插入图片描述
  • 咱们先拉取消息,将消息都消费掉
    在这里插入图片描述
  • 由于没有新生产消息,此时再拉去应该拉取不到了
  • 现在执行以下请求,就可以将offset设置到74
curl -X POST http://42.193.162.141:31331/consumers/bridge-quickstart-consumer-group/instances/bridge-quickstart-consumer/positions \
  -H 'content-type: application/vnd.kafka.v2+json' \
  -d '{
    "offsets": [
        {
            "topic": "bridge-quickstart-topic",
            "partition": 2,
            "offset": 74
        }
    ]
}'
  • 再次拉取消息,发现74和之后的所有消息都可以拉去到了(注意,包含了74)
    在这里插入图片描述
  • 至此,咱们对生产和发送消息的常用接口都已经操作了一遍,对于常规的业务场景已经够用,接下来的文章,咱们以此为基础,玩出更多花样来

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39417.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

27. Ubuntu 20.04 开机自动挂载文件/etc/fstab

自动挂载文件/etc/fstab1.fstab2. 参数含义3.开机自动挂载3.1 查看要挂载的磁盘UUID3.2 向fstab文件中添加不同于热插拔的设备,对于硬盘可能需要长期挂载在系统下,所以如果每次开机都去手动mount是非常痛苦的,当然Ubuntu系统的GNOME桌面自带的…

Map学习笔记——深入理解ConcurrentHashMap

ConcurrentHashMap 是我们日常开发中使用频率最高的并发容器之一了,具有如下特点: 基于JDK8分析 存储结构和HashMap一样,都是数组 链表 红黑树是线程安全的容器,底层是通过CAS自旋 sychronized 来保证的key 和 value 都不允许为空&#xf…

【华为OD机试真题 python】叠积木【2022 Q4 | 200分】

■ 题目描述 【叠积木】 有一堆长方体积木,它们的长度和宽度都相同,但长度不一。 小橙想把这堆积木叠成一面墙,墙的每层可以放一个积木,也可以将两个积木拼接起来,要求每层的长度相同。 若必须用完这些积木,叠成的墙最多为多少层? 如下是叠成的一面墙的图示,积木仅…

太强了,全面解析缓存应用经典问题

1、前言 随着互联网从简单的单向浏览请求,发展为基于用户个性信息的定制化以及社交化的请求,这要求产品需要做到以用户和关系为基础,对海量数据进行分析和计算。对于后端服务来说,意味着用户的每次请求都需要查询用户的个人信息和…

jdk-synchronized源码学习

synchronized介绍java中jdk1.6之前和jdk1.6及之后synchronized完全不一样。1.6之前直接上来都是重量级锁导致java的性能很低效,而1.6及之后甲骨文公司对其进行优化,通过一个锁的升级过程从而来支持一些非复杂的场景。那么本文主要是针对synchronized的源…

Java并发-ThreadLocal的使用

ThreadLocal 概述 当使用 ThreadLocal 维护变量时,ThreadLocal 为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。当多个线程操作这个变量时,实际操作的是自…

python实现熵权法

原文:https://mp.weixin.qq.com/s/vPNPdbZy11q1qsfEz9etZQ 1 熵权法简介 熵源自于希腊语 “ 变化 ” 表示变化的容量,德国物理学家克劳修斯为了将热力学第二定律格式化而引入熵的概念 。 熵的概念来源于热力学,是用来描述过程的不可逆现象…

sql注入手法详解

sql定义 sql--结构化查询语句 sql注入:首先我们通过前端将我们的payload(恶意代码)传送到后台服务器 传送到后台以后 我们提交的payload拼接到sql语句中 作为sql语句的一部分被执行 从而导致数据库又被脱库甚至删库的风险 使得数据库受损 sql注入手法 sql注入可…

领悟《信号与系统》之 非周期信号的傅里叶变换及性质

非周期信号的傅里叶变换及性质一、非周期信号的傅里叶变换二、 典型信号的傅立叶变换1.单边指数信号2.偶双边指数3. 矩阵脉冲信号4. 奇双边指数5. 符号函数6. 冲激信号7. 阶跃信号三、常用傅里叶变换表这里记录的信号都是非周期信号的傅里叶变化,频谱变换的特点就是…

VsCode 配置eslint,支持typescript的语法检查,及时发现低级语法错误,包括函数未定义等行为

背景 最近学习cocos的小游戏制作,参考游戏管理器的代码进行调试的时候,发现自己运行的结果一直都是跟官方效果不一样,没有自动生成更多的方块。肉眼检查代码差异,基本上代码是一样的,浏览器页面调试的时候也看不出问题…

[附源码]Python计算机毕业设计高校教室申请管理系统

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

【仿牛客网笔记】项目进阶,构建安全高效的企业服务——热帖排行

p:投票数 T:发布时间间隔 G:系数,通常为1.5,1.8 计算帖子的分数 注入RedisTemplate 帖子刷新 实现定时任务 刷新帖子 实现更新帖子分数 刷新帖子分数任务 配置Trigger 注释掉定时任务,注释注解就可以 启动服务之…

人口数据集:地级市常住人口与户籍人口、人口1%抽样调查数据两大维度指标数据

一、地级市常住人口与户籍人口 1、数据来源:地级市常住人口数据(主要来源于各地政府公报),户籍人口数据来源于《中国城市统计年鉴》 2、时间跨度:2003-2019年 3、区域范围:280个地级市 4、指标说明&…

[附源码]Python计算机毕业设计电影网站系统设计

项目运行 环境配置: Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术: django python Vue 等等组成,B/S模式 pychram管理等等。 环境需要 1.运行环境:最好是python3.7.7,…

【Spring】Bean生命周期

一、背景: 自动注入 UserService 对象, UserService 结构如下 二、创建 Bean 的整体流程: UserService.class ------> 无参的构造方法 ------> 普通对象(无值) ------> 依赖注入 ------> 初始化前&#…

FreeCAD二次开发-基于控制台模式FC外部开发

版本 FreeCAD0.18.2+PyCharm Community 2020.3.3 演示效果 环境搭建步骤 1.先安装好FreeCAD和PyCharm 2.添加环境变量 点击确定,全部关掉。 3.测试变量是否生效(CMD打开控制台,输入python回车) 弹出如下,说明可以进入FreeCAD自带的python解释器 4.打开PyCharm新建项目 …

Windows静态库用C++代码调用C语言的旧库方法extern ”c“

静态库特点 运行不存在 静态库源码被链接到调用程序中 目标程序的归档 C语言静态库 C静态库的创建 1.创建一个静态库项目 2.添加库程序,源文件使用C文件 C静态库的使用 库路径设置:可以使用pragma关键字设置 #pragma comment(lib,“./lib/cli…

15、Mybatis获取参数值的情况1(mapper接口方法的参数为单个字面量类型)

Mybatis获取参数值的情况1(mapper接口方法的参数为单个字面量类型) #{}和${}可以通过任意名称来获取 mapper接口方法的参数为单个字面量类型第一步: 第二步:#{} 结果: 第二步:${} 结果 第三步: 这种也行&…

站酷基于服务网格 ASM 的生产实践

01背景介绍Aliware站酷(ZCOOL)2006 年 8 月创立于北京,深耕设计领域多年,聚集了 1500 万设计师、摄影师、插画师、艺术家、创意人,在设计创意群体中具有一定的影响力与号召力。站酷在创立之初,就以“让设计…

libcurl 库的编译

因为要用到 libcurl 库的接口进行练习,而现在手上只有curl相关的头文件,那没办法只能下载源码进行编译了,这里记录一下编译 x86 与 hisi dv300 版本的动态库。 根据头文件 curlver.h 里的版本信息,我是直接下载 7.67.0 版本的源码…