Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南
Go 分布式事务实战:本地消息表、事务消息、SAGA、TCC 四大方案深度解析与选型指南摘要:在微服务架构中,分布式事务是无法回避的核心难题。本文深入剖析本地消息表、事务消息、SAGA、TCC 四种主流方案的实现原理,提供完整的 Go 语言代码示例,并结合电商、支付等真实场景给出选型建议。一、为什么分布式事务这么难?1.1 从单体到微服务的演变在单体应用中,事务管理相对简单:// 单体应用:本地事务即可保证一致性 func Transfer(ctx context.Context, from, to string, amount decimal.Decimal) error { tx, err := db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() // 扣减账户 A _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance - ? WHERE user_id = ?", amount, from) if err != nil { return err } // 增加账户 B _, err = tx.ExecContext(ctx, "UPDATE accounts SET balance = balance + ? WHERE user_id = ?", amount, to) if err != nil { return err } return tx.Commit() }但在微服务架构下,问题变得复杂:┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ 订单服务 │ │ 库存服务 │ │ 支付服务 │ │ (MySQL) │─────▶│ (MySQL) │─────▶│ (MySQL) │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────┐ │ 如何保证数据一致性? │ └─────────────────────────────────────────────────────────┘1.2 CAP 定理的权衡理论含义分布式事务中的体现Consistency一致性所有节点同一时刻数据一致Availability可用性每个请求都能得到响应Partition分区容错性网络分区时系统仍能运行结论:分布式系统中 P 是必须的,因此只能在 C 和 A 之间权衡。1.3 一致性模型强一致性 (Strong) ──────▶ 弱一致性 (Weak) │ │ ▼ ▼ ┌─────────────┐ ┌─────────────┐ │ 线性一致性 │ │ 最终一致性 │ │ 2PC/3PC │ │ 消息队列 │ │ 性能差 │ │ 性能好 │ └─────────────┘ └─────────────┘二、方案一:本地消息表(Local Message Table)2.1 核心原理本地消息表是最经典的最终一致性方案,核心思想是:将分布式事务拆分为本地事务 + 异步消息,通过本地事务保证消息的可靠投递┌──────────────────────────────────────────────────────────┐ │ 业务流程 │ ├──────────────────────────────────────────────────────────┤ │ 1. 业务数据 + 消息 写入同一本地事务 │ │ 2. 后台任务轮询消息表,发送到 MQ │ │ 3. 消费者处理消息,实现最终一致性 │ └──────────────────────────────────────────────────────────┘2.2 Go 语言完整实现2.2.1 消息表设计CREATE TABLE local_messages ( id BIGINT PRIMARY KEY AUTO_INCREMENT, message_id VARCHAR(64) NOT NULL UNIQUE, business_type VARCHAR(32) NOT NULL, -- 业务类型 business_data JSON NOT NULL, -- 业务数据 message_data JSON NOT NULL, -- 消息内容 status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:已发送 2:已完成 3:失败 retry_count INT DEFAULT 0, max_retry INT DEFAULT 3, next_retry_time DATETIME, created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, INDEX idx_status_retry (status, next_retry_time) );2.2.2 消息表模型与存储层// model/message.go package model import ( "encoding/json" "time" ) type MessageStatus int const ( MessageStatusPending MessageStatus = iota // 待发送 MessageStatusSent // 已发送 MessageStatusCompleted // 已完成 MessageStatusFailed // 失败 ) type LocalMessage struct { ID int64 `db:"id"` MessageID string `db:"message_id"` BusinessType string `db:"business_type"` BusinessData json.RawMessage `db:"business_data"` MessageData json.RawMessage `db:"message_data"` Status MessageStatus `db:"status"` RetryCount int `db:"retry_count"` MaxRetry int `db:"max_retry"` NextRetryTime time.Time `db:"next_retry_time"` CreatedAt time.Time `db:"created_at"` UpdatedAt time.Time `db:"updated_at"` }// repository/message_repo.go package repository import ( "context" "database/sql" "time" "github.com/google/uuid" "your-project/model" ) type MessageRepo struct { db *sql.DB } func NewMessageRepo(db *sql.DB) *MessageRepo { return MessageRepo{db: db} } // CreateInTransaction 在事务中创建消息记录 func (r *MessageRepo) CreateInTransaction( ctx context.Context, tx *sql.Tx, businessType string, businessData, messageData interface{}, ) (string, error) { messageID := uuid.New().String() businessJSON, err := json.Marshal(businessData) if err != nil { return "", err } messageJSON, err := json.Marshal(messageData) if err != nil { return "", err } query := ` INSERT INTO local_messages (message_id, business_type, business_data, message_data, status, next_retry_time) VALUES (?, ?, ?, ?, ?, ?) ` _, err = tx.ExecContext(ctx, query, messageID, businessType, businessJSON, messageJSON, model.MessageStatusPending, time.Now(), // 立即可重试 ) return messageID, err } // FetchPendingMessages 获取待发送的消息 func (r *MessageRepo) FetchPendingMessages(ctx context.Context, limit int) ([]model.LocalMessage, error) { query := ` SELECT id, message_id, business_type, business_data, message_data, status, retry_count, max_retry, next_retry_time, created_at FROM local_messages WHERE status = ? AND next_retry_time = ? ORDER BY created_at ASC LIMIT ? ` rows, err := r.db.QueryContext(ctx, query, model.MessageStatusPending, time.Now(), limit, ) if err != nil { return nil, err } defer rows.Close() var messages []model.LocalMessage for rows.Next() { var msg model.LocalMessage err := rows.Scan( msg.ID, msg.MessageID, msg.BusinessType, msg.BusinessData, msg.MessageData, msg.Status, msg.RetryCount, msg.MaxRetry, msg.NextRetryTime, msg.CreatedAt, ) if err != nil { return nil, err } messages = append(messages, msg) } return messages, rows.Err() } // UpdateStatus 更新消息状态 func (r *MessageRepo) UpdateStatus(ctx context.Context, messageID string, status model.MessageStatus) error { query := ` UPDATE local_messages SET status = ?, updated_at = ?, next_retry_time = ? WHERE message_id = ? ` var nextRetry time.Time if status == model.MessageStatusPending { // 指数退避:1min, 2min, 4min, 8min... nextRetry = time.Now().Add(5 * time.Minute) } _, err := r.db.ExecContext(ctx, query, status, time.Now(), nextRetry, messageID) return err } // MarkCompleted 标记消息处理完成 func (r *MessageRepo) MarkCompleted(ctx context.Context, messageID string) error { query := `UPDATE local_messages SET status = ?, updated_at = ? WHERE message_id = ?` _, err := r.db.ExecContext(ctx, query, model.MessageStatusCompleted, time.Now(), messageID) return err }2.2.3 消息发送器// messenger/messenger.go package messenger import ( "context" "encoding/json" "fmt" "log" "time" "github.com/segmentio/kafka-go" "your-project/model" "your-project/repository" ) type Messenger struct { repo *repository.MessageRepo kafkaWriter *kafka.Writer running chan struct{} } func NewMessenger(repo *repository.MessageRepo, brokers []string) *Messenger { return Messenger{ repo: repo, kafkaWriter: kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: "distributed-tx-events", Balancer: kafka.LeastBytes{}, }, running: make(chan struct{}), } } // Start 启动消息轮询发送 func (m *Messenger) Start(ctx context.Context) { go func() { ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case -ctx.Done(): return case -ticker.C: m.sendPendingMessages(ctx) } } }() } func (m *Messenger) sendPendingMessages(ctx context.Context) { messages, err := m.repo.FetchPendingMessages(ctx, 100) if err != nil { log.Printf("fetch pending messages error: %v", err) return } for _, msg := range messages { if err := m.sendMessage(ctx, msg); err != nil { log.Printf("send message %s error: %v", msg.MessageID, err) // 更新重试次数 m.updateRetryStatus(ctx, msg) } else { // 发送成功,标记为已发送 _ = m.repo.UpdateStatus(ctx, msg.MessageID, model.MessageStatusSent) } } } func (m *Messenger) sendMessage(ctx context.Context, msg model.LocalMessage) error { var messageData map[string]interface{} if err := json.Unmarshal(msg.MessageData, messageData); err != nil { return err } // 添加消息 ID 到 header,用于幂等性校验 kafkaMsg := kafka.Message{
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2412555.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!