快速掌握 GO 之 RabbitMQ

news2025/6/3 3:54:54

更多个人笔记见:
github个人笔记仓库
gitee 个人笔记仓库
个人学习,学习过程中还会不断补充~ (后续会更新在github和 gitee上)

文章目录

    • 作用
    • 经典例子
        • 生产者(发送端)
        • 消费者(接收端)

作用

类似一个“中间过渡器”,应对突发流量导致数据库连接池耗尽或者请求导致服务崩溃

  • 流量洪峰​​:促销活动时,前置 Nginx 将请求写入 RabbitMQ,后端服务按能力消费
  • 容灾恢复​​:数据库故障期间,消息持久化在队列;恢复后继续消费 (消费指的是 Mysql 取出数据然后存起来)
  • 将任务分发到多个消费者实例,确保高负载下任务均匀分配。这就可以实现负载均衡 (比如多个 worker 处理帖子审核)

需要考虑如果用户的申请不是很多情况下,多引入一层 RabbitMQ 其实会导致实际的速度变慢(毕竟多加了一层)

经典例子

GO 语言相关库:go get -u github.com/streadway/amqp

docker 快速部署 rabbitMQ:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

  • 5672:AMQP 端口
  • 15672:管理界面端口,访问 http://localhost:15672 ( 默认用户/密码:guest/guest)
生产者(发送端)

创建 producer文件夹下创建producer.go ,然后单独 go run(同时 go run 后面的消费者记得)

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/streadway/amqp"
)

// 统一错误输出
func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 连接 RabbitMQ
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close() //关闭连接

	ch, err := conn.Channel() //建立通道,通过 conn 建立的,可以调用 amqp 中的函数
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 声明队列
	q, err := ch.QueueDeclare(
		"post_queue", // 指定创建或引用的队列名称
		false,        // 持久化  false 表示队列不会持久化到磁盘,重启 RabbitMQ 后会丢失。true 的话重启后就还在
		false,        // 自动删除   设置为 false 表示队列不会自动删除,如果 true,最后一个消费者断开后队列删除
		false,        // 独占   设置为 true 表示该队列只供一个消费者使用,当连接关闭后,队列会自动删除。false表示队列可以被多个连接使用
		false,        // 无等待  false 表示需要服务器确认队列创建,true表示客户端不会等待服务器的确认响应,如果操作失败也不会收到错误通知
		nil,          // 额外参数  额外参数可以用来设置队列的特殊属性,如消息TTL、队列最大长度、死信队列等
	)
	failOnError(err, "Failed to declare a queue")

	// 设置定时器,每5秒发送一次消息
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	// 创建一个函数用于发送消息,这样循环调用函数就是发送多次消息
	sendMessage := func(msgContent string) {
		err = ch.Publish(
			"",     // 交换机名称   这里是默认交换机,能够将消息直接路由到与路由键同名的队列
			q.Name, // 路由键   也就是队列名称,路由键应该与目标队列名称一致,消息才能被正确路由
			false,  // mandatory标志  false 表示消息无法路由到队列,则消息会被丢弃  如果是 true 就是当消息不能路由到队列时,RabbitMQ会返回一个Basic.Return命令给生产者
			false,  // immediate 标志   false 表示如果队列中没有消费者,消息会被存入队列等待消费, true表示当没有消费者能够立即消费该消息时,消息不会入队而是被丢弃
			amqp.Publishing{ //消息内容和性质
				ContentType: "text/plain",       //制定为 MIME 类型
				Body:        []byte(msgContent), //转换为字节类型
			})
		if err != nil {
			log.Printf("Failed to publish a message: %s", err)
			return
		}
		log.Printf(" [x] Sent %s", msgContent)
	}

	count := 1
	log.Println("Starting periodic message sending. Press Ctrl+C to exit.")

	// 等待定时器触发,定期发送消息
	for range ticker.C {
		sendMessage(fmt.Sprintf("Hello, RabbitMQ! Message #%d", count))
		count++
	}
}

  • 这里我将函数设置为每间隔 1s 就发送消息,同时记录数据
  • 如果运行后,隔一段时间再启动消费者,或者说运行中途关闭消费者,过一段时间再启动消费者,会发现中间发出的信号也会打印出来,这说明实际上是有存储在 RabbitMQ 中的(运行的时候,关闭后存储就需要看上面的设置了)
消费者(接收端)

consumer 文件夹下创建 consumer.go 然后单独一个终端 go run

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	//建立连接
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	//连接 channel
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare("post_queue", false, false, false, false, nil)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // 队列
		"",     // 消费者标签
		true,   // 自动确认
		false,  // 独占
		false,  // 无本地
		false,  // 无等待
		nil,    // 额外参数
	)

	failOnError(err, "Failed to register a consumer")

	forever := make(chan bool)
	go func() {
		for d := range msgs {
			log.Printf("Received: %s", d.Body)
		}
	}()
	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2394535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Centos环境下安装/重装MySQL完整教程

目录 一、卸载残留的MySQL环境&#xff1a; 二、安装MySQL&#xff1a; 1、下载MySQL官方的yum源&#xff1a; 2、更新系统yum源&#xff1a; 3、确保系统中有了对应的MySQL安装包&#xff1a; 4、安装MySQL服务&#xff1a; 5、密钥问题安装失败解决方法&#xff1a; …

【Linux】环境变量完全解析

9.环境变量 文章目录 9.环境变量一、命令行参数二、获取环境变量程序中获取环境变量1. 使用命令行参数2. 使用系统调用函数getenv("字符串");3. 使用系统提供的全局变量environ 命令行中查询环境变量 三、常见环境变量1. HOME2. OLDPWD3. PATH4. SHELL 四、环境变量与…

力扣每日一题——找到离给定两个节点最近的节点

目录 题目链接&#xff1a;2359. 找到离给定两个节点最近的节点 - 力扣&#xff08;LeetCode&#xff09; 题目描述 解法一&#xff1a;双指针路径交汇法​ 基本思路 关键步骤 为什么这样可行呢我请问了&#xff1f; 举个例子 特殊情况 Java写法&#xff1a; C写法&a…

卷积神经网络(CNN)入门学习笔记

什么是 CNN&#xff1f; CNN&#xff0c;全称 卷积神经网络&#xff08;Convolutional Neural Network&#xff09;&#xff0c;是一种专门用来处理图片、语音、文本等结构化数据的神经网络。 它模仿人眼识别图像的方式&#xff1a; 从局部到整体&#xff0c;一步步提取特征&a…

VLAN的作用和原理

1. 为什么要有vlan&#xff1f; 分割广播域&#xff0c;避免广播风暴&#xff0c;造成网络资源的浪费 可以灵活的组网&#xff0c;便于管理&#xff0c;同时还有安全加固的功能 2. vlan是怎么实现的&#xff1f;端口的原理&#xff1f; 设置VLAN后&#xff0c;流量之间的转…

深入探讨集合与数组转换方法

目录 1、Arrays.asList() 1.1、方法作用 1.2、内部实现 1.3、修改元素的影响 1.4、注意事项 2、list.toArray() 2.1、方法作用 2.2、内部实现 2.3、修改元素的影响 2.4、特殊情况 1、对象引用 2、数组copy 3、对比总结 4、常见误区与解决方案 5、实际应用建议…

【HarmonyOS 5应用架构详解】深入理解应用程序包与多Module设计机制

⭐本期内容&#xff1a;【HarmonyOS 5应用架构详解】深入理解应用程序包与多Module设计机制 &#x1f3c6;系列专栏&#xff1a;鸿蒙HarmonyOS&#xff1a;探索未来智能生态新纪元 文章目录 前言应用与应用程序包应用程序的基本概念应用程序包的类型标识机制应用安装流程 应用的…

【Oracle】DCL语言

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. DCL概述1.1 什么是DCL&#xff1f;1.2 DCL的核心功能 2. 用户管理2.1 创建用户2.2 修改用户2.3 删除用户2.4 用户信息查询 3. 权限管理3.1 系统权限3.1.1 授予系统权限3.1.2 撤销系统权限 3.2 对象权限3.2.1…

MySQL强化关键_017_索引

目 录 一、概述 二、索引 1.主键索引 2.唯一索引 3.查看索引 4.添加索引 &#xff08;1&#xff09;建表时添加 &#xff08;2&#xff09;建表后添加 5.删除索引 三、树 1.二叉树 2.红黑树 3.B树 4.B树 &#xff08;1&#xff09;为什么 MySQL 选择B树作为索引…

【备忘】php命令行异步执行超长时间任务

环境说明&#xff1a; 操作系统&#xff1a;windows10 IDE&#xff1a;phpstorm 开发语言&#xff1a;php7.4 框架&#xff1a;thinkphp5.1 测试环境&#xff1a;linuxwindows均测试通过。 初级方法&#xff1a; function longRunningTask() {$root_path Tools::get_ro…

在 RK3588 上通过 VSCode 远程开发配置指南

在 RK3588 上通过 VSCode 远程开发配置指南 RK3588 设备本身不具备可视化编程环境&#xff0c;但可以通过 VSCode 的 Remote - SSH 插件 实现远程代码编写与调试。以下是完整的配置流程。 一、连接 RK3588 1. 安装 Debian 系统 先在 RK3588 上安装 Debian 操作系统。 2. 安…

OpenHarmony标准系统-HDF框架之音频驱动开发

文章目录 引言OpenHarmony音频概述OpenHarmony音频框图HDF音频驱动框架概述HDF音频驱动框图HDF音频驱动框架分析之音频设备驱动HDF音频驱动框架分析之supportlibs实现HDF音频驱动框架分析之hdi-passthrough实现HDF音频驱动框架分析之hdi-bindev实现HDF音频驱动加载过程HDF音频驱…

HTML Day03

Day03 0. 引言1. CSS1.1 CSS的3种使用方法1.2 内联样式1.3 内部样式表1.4 外部CSS文件 2. 图像3. 表格3.1单元格间距和单元格边框 4. 列表4.1 有序表格的不同类型4.2 不同类型的无序表格4.3 嵌套列表 5. 区块6. 布局6.1 div布局6.2 表格布局 0. 引言 HELLO ^ _ ^大家好&#xf…

篇章六 数据结构——链表(二)

目录 1. LinkedList的模拟实现 1.1 双向链表结构图​编辑 1.2 三个简单方法的实现 1.3 头插法 1.4 尾插法 1.5 中间插入 1.6 删除 key 1.7 删除所有key 1.8 clear 2.LinkedList的使用 2.1 什么是LinkedList 5.2 LinkedList的使用 1.LinkedList的构造 2. LinkedList的…

吴恩达MCP课程(3):mcp_chatbot

原课程代码是用Anthropic写的&#xff0c;下面代码是用OpenAI改写的&#xff0c;模型则用阿里巴巴的模型做测试 .env 文件为&#xff1a; OPENAI_API_KEYsk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx OPENAI_API_BASEhttps://dashscope.aliyuncs.com/compatible-mode…

【清晰教程】查看和修改Git配置情况

目录 查看安装版本 查看特定配置 查看全局配置 查看本地仓库配置 设置或修改配置 查看安装版本 打开命令行工具&#xff0c;通过version命令检查Git版本号。 git --version 如果显示出 Git 的版本号&#xff0c;说明 Git 已经成功安装。 查看特定配置 如果想要查看特定…

JAVA 常用 API 正则表达式

1 正则表达式作用 作用一&#xff1a;校验字符串是否满足规则作用二&#xff1a;在一段文本中查找满足要求的内容 2 正则表达式规则 2.1 字符类 package com.bjpowernode.test14;public class RegexDemo1 {public static void main(String[] args) {//public boolean matche…

光电设计大赛智能车激光对抗方案分享:低成本高效备赛攻略

一、赛题核心难点与备赛痛点解析 全国大学生光电设计竞赛的 “智能车激光对抗” 赛题&#xff0c;要求参赛队伍设计具备激光对抗功能的智能小车&#xff0c;需实现光电避障、目标识别、轨迹规划及激光精准打击等核心功能。从历年参赛情况看&#xff0c;选手普遍面临三大挑战&a…

Python实现P-PSO优化算法优化BP神经网络回归模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在当今数据驱动的时代&#xff0c;回归分析作为预测和建模的重要工具&#xff0c;在科学研究和工业应用中占据着重要…

Microsoft的在word中选择文档中的所有表格进行字体和格式的调整时的解决方案

找到宏 创建 并粘贴 使用 Sub 全选所有表格() Dim t As Table an MsgBox("即将选择选区内所有表格&#xff0c;若无选区&#xff0c;则选择全文表格。", vbYesNo, "reboot提醒您!") If an - 6 Then Exit Sub Set rg IIf(Selection.Type wdSelectionIP, …