Kafka Go客户端--Sarama

news2025/7/17 7:02:08

Kafka Go客户端

在Go中里面有三个比较有名气的Go客户端。

  • Sarama:用户数量最多,早期这个项目是在Shopify下面,现在挪到了IBM下。
  • segmentio/kafka-go:没啥大的缺点。
  • confluent-kafka-go:需要启用cgo,跨平台问题比较多,交叉编译也不支持。

Sarama 使用入门:tools

IBM/sarama: Sarama is a Go library for Apache Kafka.

在 Sarama 里面提供了一些简单的命令行工具,可以看做是 Shell脚本提供的功能一个子集。

Consumer和 producer中的用得比较多

在这里插入图片描述

1.设置 Go 代理(如果内网无法直连 proxy.golang.org)

export GOPROXY=https://goproxy.cn,direct
export GOSUMDB=sum.golang.google.cn

2.在虚拟机上执行安装命令:

  • ​ go install github.com/IBM/sar ama/tools/kafka-console-consumer@latest
  • ​ go install github.com/lBM/sarama/tools/kafka-console-producer@latest

3.把可执行文件所在目录加到 PATH(如果还没加)

export PATH=$PATH:$(go env GOBIN)

4.确认可执行文件在哪里

# 查看 GOBIN,如果你没显式设置,就会是空
go env GOBIN

# 查看 GOPATH,默认是 $HOME/go(对于 root 用户就是 /root/go)
go env GOPATH

#我的是/home/cxz/go/lib:/home/cxz/go/work

5.查看安装结果

ls /home/cxz/go/lib/bin
#应该能够看到kafka-console-consumer  kafka-console-producer

6.临时生效

export PATH=$PATH:/home/cxz/go/lib/bin

# 然后验证
which kafka-console-consumer
# 应该输出 /home/cxz/go/lib/bin/kafka-console-consumer

7.永久生效

echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.bashrc
# 或者,如果你用的是 zsh:
# echo 'export PATH=$PATH:/home/cxz/go/lib/bin' >> ~/.zshrc

# 然后重新加载配置
source ~/.bashrc

Sarama 使用入门:发送消息

虚拟机上执行

kafka-console-consumer -topic=test_topic -brokers=192.168.24.101:9094

Goland上执行

package main

import (
	"github.com/IBM/sarama"
	"github.com/stretchr/testify/assert"
	"testing"
)

var addrs = []string{"192.168.24.101:9094"}

func TestSyncProducer(t *testing.T) {
    //创建一个 Sarama 的配置对象。
	cfg := sarama.NewConfig()
    //表示生产者要等待 Kafka 确认消息成功写入后再返回(同步模式)。如果不设置这个,SyncProducer.SendMessage 会一直失败。
	cfg.Producer.Return.Successes = true //同步的Producer一定要设置
    //创建一个同步的生产者实例
	producer, err := sarama.NewSyncProducer(addrs, cfg)
	assert.NoError(t, err)
    //构建消息并发送
	_, _, err = producer.SendMessage(&sarama.ProducerMessage{
		Topic: "test_topic",
		//消息数据本体
		Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),
		//会在生产者和消费者之间传递,消息头,可传递自定义键值对,比如 trace_id 用于链路追踪。
		Headers: []sarama.RecordHeader{
			{
				Key:   []byte("trace_id"),
				Value: []byte("123456"),
			},
		},
		//只作用于发送过程。元信息,在发送过程中使用,可以用来传递额外信息,发送完成后会原样返回(不会传给消费者)。
		Metadata: "这是metadata",
	})
	assert.NoError(t, err)
}

10.执行结果

Partition:	0
Offset:	0
Key:	
Value:	hello world ,这是一条使用kafka的消息

使用控制台工具连接Kafka

Sarama 使用入门:指定分区

可以注意到,前面所有的消息都被发送到了 Partition 0 上面。

正常来说,在 Sarama 里面,可以通过指定 config 中的Partitioner来指定最终的目标分区。

常见的方法:

  • ​ Random:随机挑一个。
  • ​ RoundRobin:轮询。
  • ​ Hash(默认):根据 key 的哈希值来筛选一个。
  • ​ Manual: 根据 Message 中的 partition 字段来选择。
  • ​ ConsistentCRC:一致性哈希,用的是 CRC32 算法。
  • ​ Custom:实际上不 Custom,而是自定义一部分Hash 的参数,本质上是一个 Hash 的实现。
//默认HashPartitioner  适合: 按用户 ID、订单 ID 等字段分区场景
cfg.Producer.Partitioner = sarama.NewHashPartitioner
//使用 CRC32 算法 计算 Key 的哈希。 适合: 需要高一致性分布的业务,例如日志收集系统
cfg.Producer.Partitioner = sarama.NewConsistentCRCHashPartitioner
//忽略 Key,每条消息随机分配 partition。  适合: 普通消息队列、广播类场景。
cfg.Producer.Partitioner = sarama.NewRandomPartitioner
//需要手动指定 partition(ProducerMessage.Partition 字段)。适合: 明确知道要写哪个 partition,例如做数据分流
cfg.Producer.Partitioner = sarama.NewManualPartitioner
//用于实现你自己的 Partitioner  一般不推荐使用这个空参函数(它会 panic),应实现完整接口。
cfg.Producer.Partitioner = sarama.NewCustomPartitioner()
//允许你使用自定义哈希函数来做 key 分区。  适合: 有特定哈希策略需求时,例如分布要尽可能均匀。
cfg.Producer.Partitioner = sarama.NewCustomHashPartitioner(func() hash.Hash32 {

})



Topic: "test_topic",
//分区依据
Key:   sarama.StringEncoder("user_123"), // 🔑 这里是分区依据
//消息数据本体
Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),

最典型的场景,就是利用Partitioner来保证同一个业务的消息一定发送到同一个分区上,从而保证业 有序。

Sarama 使用入门:异步发送

Sarama有一个异步发送的producer,它的用法稍微复杂一点。

  • 把Return.Success和 Errors都设置为true,这是为了后面能够拿到发送结果。
  • 初始化异步producer。
  • 从producer里面拿到Input的channel,并且发送 一条消息。
  • ​ 利用select case,同时**监听Success和Error两个channel,**来获得发送成功与否的信息。
func TestAsyncProducer(t *testing.T) {
	cfg := sarama.NewConfig()
	//怎么知道发送是否成功
	cfg.Producer.Return.Errors = true
	cfg.Producer.Return.Successes = true
	producer, err := sarama.NewAsyncProducer(addrs, cfg)
	require.NoError(t, err)
	messages := producer.Input()
	go func() {
		for {
			messages <- &sarama.ProducerMessage{
				Topic: "test_topic",
				//分区依据
				Key: sarama.StringEncoder("user_123"), // 🔑 这里是分区依据
				//消息数据本体
				Value: sarama.StringEncoder("hello world ,这是一条使用kafka的消息"),
				//会在生产者和消费者之间传递
				Headers: []sarama.RecordHeader{
					{
						Key:   []byte("trace_id"),
						Value: []byte("123456"),
					},
				},
				//只作用于发送过程
				Metadata: "这是metadata",
			}
		}
	}()

	errCh := producer.Errors()
	succCh := producer.Successes()
	for {
		//两个都不满足就会阻塞
		select {
		case err := <-errCh:
			t.Log("发送出了问题", err.Err)
		case <-succCh:
			t.Log("发送成功")
		}
	}
}

Sarama 使用入门:acks

在Kafka里面,生产者在发送数据的时候,有一个很关键的参数,就是 acks。
有三个取值:

  • ​ 0:客户端发一次,不需要服务端的确认。
  • ​ 1:客户端发送,并且需要服务端写入到主分区。
  • ​ -1:客户端发送,并且需要服务端同步到所有的ISR 上。

从上到下,性能变差,但是数据可靠性上升。需要性能,选 0,需要消息不丢失,选-1。

理解acks你就要抓住核心点,谁ack才算数?

  • 0:TCP协议返回了ack就可以。
  • 1:主分区确认写入了就可以。
  • -1:所有的ISR都确认了就可以。

在这里插入图片描述

ISR (In Sync Replicas),用通俗易懂的话来说,就是跟上了节奏的从分区。

什么叫做跟上了节奏?就是它和主分区保持了数据同步。

所以,当消息被同步到从分区之后,如果主分区崩溃了那么依旧可以保证在从分区上还有数据。

在这里插入图片描述

sarama 使用入门:启动消费者

Sarama的消费者设计不是很直观,稍微有点复杂。

  • ​ 首先要初始化一个ConsumerGroup。
  • ​ 调用ConsumerGroup上的Consume方法。
  • ​ 为 Consume 方法传入一个 ConsumerGroupHandler的辅助方法。
package main

import (
	"context"
	"github.com/IBM/sarama"
	"github.com/stretchr/testify/assert"
	"log"
	"testing"
)

func TestConsumer(t *testing.T) {
	cfg := sarama.NewConfig()
	//正常来说,一个消费者都是归属一个消费者组的
	//消费者就是你的业务
	consumerGroup, err := sarama.NewConsumerGroup(addrs, "test_group", cfg)

	assert.NoError(t, err)
	err = consumerGroup.Consume(context.Background(), []string{"test_topic"}, testConsumerGroupHandler{})
	//你消费结束,就会到这里
	t.Log(err)
}

type testConsumerGroupHandler struct {
}

func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	log.Println("Setup session:", session)
	return nil
}

func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	log.Println("Cleanup session:", session)
	return nil
}

func (t testConsumerGroupHandler) ConsumeClaim(
	//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {
	msgs := claim.Messages()
	for msg := range msgs {
		//var bizMsg MyBizMsg
		//err := json.Unmarshal(msg.Value, &bizMsg)
		//if err != nil {
		//	//这就是消费消息出错
		//	//大多数时候就是重试
		//	//记录日志
		//	continue
		//}
		log.Println(string(msg.Value))
		session.MarkMessage(msg, "")
	}
	//什么情况下会到这里
	//msg被人关了,也就是要退出消费逻辑
	return nil
}

type MyBizMsg struct {
	Name string
}

sarama 使用入门:ConsumerGroupHandler

下面的代码就是对ConsumerGroupHandler的实现,关键就是在消费了msg之后,如果消费成功了,要记得提交。

也就是调用MarkMessage方法。

至于 Setup 和 Cleanup 方法反而用得不多。

type testConsumerGroupHandler struct {
}

func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	log.Println("Setup session:", session)
	return nil
}

func (t testConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
	log.Println("Cleanup session:", session)
	return nil
}

func (t testConsumerGroupHandler) ConsumeClaim(
	//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim) error {
	msgs := claim.Messages()
	for msg := range msgs {
		//var bizMsg MyBizMsg
		//err := json.Unmarshal(msg.Value, &bizMsg)
		//if err != nil {
		//	//这就是消费消息出错
		//	//大多数时候就是重试
		//	//记录日志
		//	continue
		//}
		log.Println(string(msg.Value))
		session.MarkMessage(msg, "")
	}
	//什么情况下会到这里
	//msg被人关了,也就是要退出消费逻辑
	return nil
}

sarama 使用入门:利用context来控制消费者退出

可以利用初始化ConsumerGroup 时候传入的ctx来控制消费者组退出消息。

下图中,我传入了一个超时的context,那么:

	start := time.Now()
	//这里是测试,我们就控制消费10s
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	//开始消费,会在这里阻塞住
	err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})
	//你消费结束,就会到这里
	t.Log(err, time.Since(start).String())	

下图中,我主动调用了cancel,那么:

	start := time.Now()
	//这里是测试,我们就控制消费5s
	ctx, cancel := context.WithCancel(context.Background())
	time.AfterFunc(time.Second*5, func() {
		cancel()
	})
	//开始消费,会在这里阻塞住
	err = consumerGroup.Consume(ctx, []string{"test_topic"}, testConsumerGroupHandler{})
	//你消费结束,就会到这里
	t.Log(err, time.Since(start).String())
  • 如果超时了
  • 如果我主动调用了cancel

以上两种情况,任何一种情况出现了,都会让消费者退出消息。

sarama 使用入门:指定偏移量消费

在部分场景下,我们会希望消费历史消息,或者从某个消息开始消费,那么可以考虑在Setup里面设置偏移量。

关键调用是 ResetOffset。

不过一般建议走离线渠道,操作Kafka集群去重置对应的偏移量。

核心在于,你并不是每次重新部署,重新启动都是要重置这个偏移量的。

只要你的消费者组在这个分区上有过“已提交的 offset”,Kafka 就会优先使用这个提交的 offset,而忽略你在 Setup() 中设置的 offset

// 在每次 rebalance 或初次连接 Kafka 后调用,用于初始化。
func (t testConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
	//执行一些初始化的事情
	log.Println("Setup")
	//假设要重置到0
	var offset int64 = 0
	//遍历所有的分区
	partitions := session.Claims()["test_topic"]
	for _, p := range partitions {
		session.ResetOffset("test_topic", p, offset, "")
		//session.ResetOffset("test_topic", p, sarama.OffsetNewest, "")
		//session.ResetOffset("test_topic", p, sarama.OffsetOldest, "")
	}
	return nil
}

sarama使用入门:异步消费,批量提交

正常来说,为了在异步消费失败之后还能继续重试,可以考虑异步消费一批,提交一批。

下图中,ctx.Done分支用来控制凑够一批的超时机制,防止生产者的速率很低,一直凑不够一批。

func (t testConsumerGroupHandler) ConsumeClaim(
	//代表的是你和Kafka的会话(从建立连接到连接彻底断掉的那一段时间)
	//可以通过 session 控制 offset 提交,获取消费者信息,并感知退出时机。
	session sarama.ConsumerGroupSession,
	//claim 是你获取消息的入口
	claim sarama.ConsumerGroupClaim) error {
	msgs := claim.Messages()

	//设置批量处理的条数
	const batchSize = 10
	for {
		ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
		var eg errgroup.Group
		var last *sarama.ConsumerMessage
		for i := 0; i < batchSize; i++ {
			done := false
			select {
			case <-ctx.Done():
				//这边表示超时了
				done = true
			case msg, ok := <-msgs:
				if !ok {
					cancel()
					return nil
				}
				last = msg
				msg1 := msg
				eg.Go(func() error {
					//我就在这里消费
					time.Sleep(time.Second)
					//你在这里重试
					log.Println(string(msg1.Value))
					return nil
				})
			}
			if done {
				break
			}
		}
		cancel()
		err := eg.Wait()
		if err != nil {
			//这边能怎么办?
			//记录日志
			continue
		}
		//就这样
		session.MarkMessage(last, "")
	}
	return nil
}

另外一个分支就是读取消息,并且提交到errgroup里面执行。

Sleep是模拟长时间业务执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375964.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RustDesk:开源电脑远程控制软件

RustDesk&#xff1a;开源电脑远程控制软件 RustDesk&#xff1a;开源电脑远程控制软件一、RustDesk 简介二、下载教程2.1 桌面版下载2.2 Android 版下载 三、安装教程3.1 桌面版安装 四、功能讲解4.1 远程控制4.2 文件传输4.3 安全可靠4.4 自定义服务器 五、RustDesk技术架构解…

[操作系统] 策略模式进行日志模块设计

文章目录 [toc]一、什么是设计模式&#xff1f;二、日志系统的基本构成三、策略模式在日志系统中的落地实现✦ 1. 策略基类 LogStrategy✦ 2. 具体策略类▸ 控制台输出&#xff1a;ConsoleLogStrategy▸ 文件输出&#xff1a;FileLogStrategy 四、日志等级枚举与转换函数五、日…

MoonBit正式入驻GitCode!AI时代的编程语言新星,开启高性能开发新纪元

在AI与编程语言深度交融的今天&#xff0c;开发者们正见证一场技术生产力的革命。由IDEA研究院基础软件中心倾力打造的MoonBit&#xff08;月兔&#xff09;编程语言&#xff0c;自2023年横空出世以来&#xff0c;凭借高性能、低延迟、轻量化的特性&#xff0c;迅速成为全球开发…

关于vue学习的经常性错误

目录 常见问题&#xff1a; 1关于引用本地下载es6模块文件&#xff0c;报404错误 2 使用createApp函数后没有调用mount函数挂载到浏览器 3 在mount函数中&#xff0c;忘记引用插值表达式所在标签的定位符如 标签选择器&#xff0c;类选择器等 4在直接使用Vue3函数时&#…

AtCoder Beginner Contest 403

再来一场atCoder&#xff0c;这一场简直血虐&#xff0c;让你回忆起了审题的重要性 A - Odd Position Sum 思路&#xff1a;题意很简单&#xff0c;求一个数组奇数位上数字和。很简单的问题&#xff0c;但你如果不仔细审题&#xff0c;就会浪费大量的时间 /* Author Owen_Q…

关于 Golang GC 机制的一些细节:什么是根对象?GC 机制的触发时机?

文章目录 关于 Golang GC 机制的一些细节&#xff1a;什么是根对象&#xff1f;GC 机制的触发时机&#xff1f;简要回顾 Golang GC 三色标记法的工作流程什么是根对象&#xff1f;GC 的触发时机&#xff1f; 关于 Golang GC 机制的一些细节&#xff1a;什么是根对象&#xff1f…

Python笔记:c++内嵌python,c++主窗口如何传递给脚本中的QDialog,使用的是pybind11

1. 问题描述 用的是python 3.8.20, qt版本使用的是5.15.2, PySide的版本是5.15.2, pybind11的版本为2.13.6 网上说在python脚本中直接用PySide2自带的QWinWidget&#xff0c;如from PySide2.QtWinExtras import QWinWidget&#xff0c;但我用的版本中说没有QWinWidget&#x…

C++效率掌握之STL库:map set底层剖析及迭代器万字详解

文章目录 1.map、set的基本结构2.map、set模拟实现2.1 初步定义2.2 仿函数实现2.3 Find功能实现2.4 迭代器初步功能实现2.4.1 运算符重载2.4.2 --运算符重载2.4.3 *运算符重载2.4.4 ->运算符重载2.4.5 !运算符重载2.4.6 begin()2.4.7 end() 2.5 迭代器进阶功能实现2.5.1 set…

新三消示例项目《Gem Hunter》中的光照和视觉效果

《Gem Hunter》是 Unity 的全新官方示例项目&#xff0c;展示了如何在 Unity 2022 LTS 使用通用渲染管线 (URP) 打造抢眼的光效和视效&#xff0c;让 2D 益智/三消游戏在竞争中脱颖而出。 下载示例项目及其说明文档。准备潜入清澈湛蓝的海水中探寻财富吧&#xff0c;因为那里到…

单向循环链表C语言实现实现(全)

#include<stdio.h> #include<stdlib.h> #define TRUE 1 #define FASLE 0//定义宏标识判断是否成功 typedef struct Node {int data;struct Node* next; }Node;Node* InitList() {Node* list (Node*)malloc(sizeof(Node));list->data 0;//创建节点保存datalist…

【AI大模型】赋能【传统业务】

在数字化转型的浪潮下&#xff0c;传统业务流程&#xff08;如通知公告管理、文档处理等&#xff09;仍依赖人工操作&#xff0c;面临效率低、成本高、易出错等问题。以企业通知公告为例&#xff0c;从内容撰写、摘要提炼到信息分发&#xff0c;需耗费大量人力与时间&#xff0…

团结引擎开源车模 Sample 发布:光照渲染优化 动态交互全面体验升级

光照、材质与交互效果的精细控制&#xff0c;通常意味着复杂的技术挑战&#xff0c;但借助 Shader Graph 14.1.0(已内置在团结引擎官方 1.5.0 版本中)&#xff0c;这一切都变得简单易用。通过最新团结引擎官方车模 Sample&#xff0c;开发者能切身感受到全新光照优化与编辑功能…

精准测量“双雄会”:品致与麦科信光隔离探头谁更胜一筹

在电子技术飞速发展的当下&#xff0c;每一次精准测量都如同为科技大厦添砖加瓦。光隔离探头作为测量领域的关键角色&#xff0c;能有效隔绝电气干扰&#xff0c;保障测量安全与精准。在众多品牌中&#xff0c;PINTECH品致与麦科信的光隔离探头脱颖而出&#xff0c;成为工程师们…

NSSCTF [HNCTF 2022 WEEK4]

题解前的吐槽&#xff1a;紧拖慢拖还是在前段时间开始学了堆的UAF(虽然栈还没学明白&#xff0c;都好难[擦汗])&#xff0c;一直觉得学的懵懵懂懂&#xff0c;不太敢发题解&#xff0c;这题算是入堆题后一段时间的学习成果&#xff0c;有什么问题各位师傅可以提出来&#xff0c…

tornado_登录页面(案例)

目录 1.基础知识​编辑 2.脚手架&#xff08;模版&#xff09; 3.登录流程图&#xff08;processon&#xff09; 4.登录表单 4.1后&#xff08;返回值&#xff09;任何值&#xff1a;username/password &#xff08;4.1.1&#xff09;app.py &#xff08;4.1.2&#xff…

YOLOv12模型部署(保姆级)

一、下载YOLOv12源码 1.通过网盘分享的文件&#xff1a;YOLOv12 链接: https://pan.baidu.com/s/12-DEbWx1Gu7dC-ehIIaKtQ 提取码: sgqy &#xff08;网盘下载&#xff09; 2.进入github克隆YOLOv12源码包 二、安装Anaconda/pycharm 点击获取官网链接(anaconda) 点击获取…

BGP实验练习1

需求&#xff1a; 要求五台路由器的环回地址均可以相互访问 需求分析&#xff1a; 1.图中存在五个路由器 AR1、AR2、AR3、AR4、AR5&#xff0c;分属不同自治系统&#xff08;AS&#xff09;&#xff0c;AR1 在 AS 100&#xff0c;AR2 - AR4 在 AS 200&#xff0c;AR5 在 AS …

HTML、CSS 和 JavaScript 基础知识点

HTML、CSS 和 JavaScript 基础知识点 一、HTML 基础 1. HTML 文档结构 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.…

数据结构与算法分析实验12 实现二叉查找树

实现二叉查找树 1、二叉查找树介绍2.上机要求3.上机环境4.程序清单(写明运行结果及结果分析)4.1 程序清单4.1.1 头文件 TreeMap.h 内容如下&#xff1a;4.1.2 实现文件 TreeMap.cpp 文件内容如下&#xff1a;4.1.3 源文件 main.cpp 文件内容如下&#xff1a; 4.2 实现展效果示5…

使用 Semantic Kernel 调用 Qwen-VL 多模态模型

使用 Semantic Kernel 调用 Qwen-VL 多模态模型 一、引言 随着人工智能技术的不断发展&#xff0c;多模态模型逐渐成为研究的热点。Qwen-VL 是阿里云推出的大规模视觉语言模型&#xff0c;支持图像、文本等多种输入形式&#xff0c;并能够进行图像描述、视觉问答等多种任务。…