Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南

news2026/3/14 22:25:33
Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南摘要:在微服务架构中,分布式事务是无法回避的核心难题。本文深入剖析本地消息表、事务消息、SAGA、TCC 四种主流方案的实现原理,提供完整的 Go 语言代码示例,并结合电商、支付等真实场景给出选型建议。一、为什么分布式事务这么难?1.1 从单体到微服务的演变在单体应用中,事务管理相对简单:// 单体应用:本地事务即可保证一致性 func Transfer(ctx context.Context, from, to string, amount decimal.Decimal) error { tx, err := db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // 扣减账户 A _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE user_id = ?", amount, from) if err != nil { return err } // 增加账户 B _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE user_id = ?", amount, to) if err != nil { return err } return tx.Commit() }但在微服务架构下,问题变得复杂:┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 订单服务 │ │ 库存服务 │ │ 支付服务 │ │ (MySQL) │─────▶│ (MySQL) │─────▶│ (MySQL) │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────┐ │ 如何保证数据一致性? │ └─────────────────────────────────────────────────────────┘1.2 CAP 定理的权衡理论含义分布式事务中的体现Consistency一致性所有节点同一时刻数据一致Availability可用性每个请求都能得到响应Partition分区容错性网络分区时系统仍能运行结论:分布式系统中 P 是必须的,因此只能在 C 和 A 之间权衡。1.3 一致性模型强一致性 (Strong) ──────▶ 弱一致性 (Weak) │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ 线性一致性 │ │ 最终一致性 │ │ 2PC/3PC │ │ 消息队列 │ │ 性能差 │ │ 性能好 │ └─────────────┘ └─────────────┘二、方案一:本地消息表(Local Message Table)2.1 核心原理本地消息表是最经典的最终一致性方案,核心思想是:将分布式事务拆分为本地事务 + 异步消息,通过本地事务保证消息的可靠投递┌──────────────────────────────────────────────────────────┐ │ 业务流程 │ ├──────────────────────────────────────────────────────────┤ │ 1. 业务数据 + 消息 写入同一本地事务 │ │ 2. 后台任务轮询消息表,发送到 MQ │ │ 3. 消费者处理消息,实现最终一致性 │ └──────────────────────────────────────────────────────────┘2.2 Go 语言完整实现2.2.1 消息表设计CREATE TABLE local_messages ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, business_type VARCHAR(32) NOT NULL, -- 业务类型 business_data JSON NOT NULL, -- 业务数据 message_data JSON NOT NULL, -- 消息内容 status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:已发送 2:已完成 3:失败 retry_count INT DEFAULT 0, max_retry INT DEFAULT 3, next_retry_time DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_retry (status, next_retry_time) );2.2.2 消息表模型与存储层// model/message.go package model import ( "encoding/json" "time" ) type MessageStatus int const ( MessageStatusPending MessageStatus = iota // 待发送 MessageStatusSent // 已发送 MessageStatusCompleted // 已完成 MessageStatusFailed // 失败 ) type LocalMessage struct { ID int64 `db:"id"` MessageID string `db:"message_id"` BusinessType string `db:"business_type"` BusinessData json.RawMessage `db:"business_data"` MessageData json.RawMessage `db:"message_data"` Status MessageStatus `db:"status"` RetryCount int `db:"retry_count"` MaxRetry int `db:"max_retry"` NextRetryTime time.Time `db:"next_retry_time"` CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` }// repository/message_repo.go package repository import ( "context" "database/sql" "time" "github.com/google/uuid" "your-project/model" ) type MessageRepo struct { db *sql.DB } func NewMessageRepo(db *sql.DB) *MessageRepo { return MessageRepo{db: db} } // CreateInTransaction 在事务中创建消息记录 func (r *MessageRepo) CreateInTransaction( ctx context.Context, tx *sql.Tx, businessType string, businessData, messageData interface{}, ) (string, error) { messageID := uuid.New().String() businessJSON, err := json.Marshal(businessData) if err != nil { return "", err } messageJSON, err := json.Marshal(messageData) if err != nil { return "", err } query := ` INSERT INTO local_messages (message_id, business_type, business_data, message_data, status, next_retry_time) VALUES (?, ?, ?, ?, ?, ?) ` _, err = tx.ExecContext(ctx, query, messageID, businessType, businessJSON, messageJSON, model.MessageStatusPending, time.Now(), // 立即可重试 ) return messageID, err } // FetchPendingMessages 获取待发送的消息 func (r *MessageRepo) FetchPendingMessages(ctx context.Context, limit int) ([]model.LocalMessage, error) { query := ` SELECT id, message_id, business_type, business_data, message_data, status, retry_count, max_retry, next_retry_time, created_at FROM local_messages WHERE status = ? AND next_retry_time = ? ORDER BY created_at ASC LIMIT ? ` rows, err := r.db.QueryContext(ctx, query, model.MessageStatusPending, time.Now(), limit, ) if err != nil { return nil, err } defer rows.Close() var messages []model.LocalMessage for rows.Next() { var msg model.LocalMessage err := rows.Scan( msg.ID, msg.MessageID, msg.BusinessType, msg.BusinessData, msg.MessageData, msg.Status, msg.RetryCount, msg.MaxRetry, msg.NextRetryTime, msg.CreatedAt, ) if err != nil { return nil, err } messages = append(messages, msg) } return messages, rows.Err() } // UpdateStatus 更新消息状态 func (r *MessageRepo) UpdateStatus(ctx context.Context, messageID string, status model.MessageStatus) error { query := ` UPDATE local_messages SET status = ?, updated_at = ?, next_retry_time = ? WHERE message_id = ? ` var nextRetry time.Time if status == model.MessageStatusPending { // 指数退避:1min, 2min, 4min, 8min... nextRetry = time.Now().Add(5 * time.Minute) } _, err := r.db.ExecContext(ctx, query, status, time.Now(), nextRetry, messageID) return err } // MarkCompleted 标记消息处理完成 func (r *MessageRepo) MarkCompleted(ctx context.Context, messageID string) error { query := `UPDATE local_messages SET status = ?, updated_at = ? WHERE message_id = ?` _, err := r.db.ExecContext(ctx, query, model.MessageStatusCompleted, time.Now(), messageID) return err }2.2.3 消息发送器// messenger/messenger.go package messenger import ( "context" "encoding/json" "fmt" "log" "time" "github.com/segmentio/kafka-go" "your-project/model" "your-project/repository" ) type Messenger struct { repo *repository.MessageRepo kafkaWriter *kafka.Writer running chan struct{} } func NewMessenger(repo *repository.MessageRepo, brokers []string) *Messenger { return Messenger{ repo: repo, kafkaWriter: kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: "distributed-tx-events", Balancer: kafka.LeastBytes{}, }, running: make(chan struct{}), } } // Start 启动消息轮询发送 func (m *Messenger) Start(ctx context.Context) { go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case -ctx.Done(): return case -ticker.C: m.sendPendingMessages(ctx) } } }() } func (m *Messenger) sendPendingMessages(ctx context.Context) { messages, err := m.repo.FetchPendingMessages(ctx, 100) if err != nil { log.Printf("fetch pending messages error: %v", err) return } for _, msg := range messages { if err := m.sendMessage(ctx, msg); err != nil { log.Printf("send message %s error: %v", msg.MessageID, err) // 更新重试次数 m.updateRetryStatus(ctx, msg) } else { // 发送成功,标记为已发送 _ = m.repo.UpdateStatus(ctx, msg.MessageID, model.MessageStatusSent) } } } func (m *Messenger) sendMessage(ctx context.Context, msg model.LocalMessage) error { var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageData, messageData); err != nil { return err } // 添加消息 ID 到 header,用于幂等性校验 kafkaMsg := kafka.Message{

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2412555.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…