Redis事务相关源码探究

news2025/7/9 16:58:24

文章目录

  • Redis事务源码解读
    • 1、MULTI 声明事务
    • 2、命令入队
    • 3、EXEC 执行事务
    • 4、WATCH 监听变量

Redis事务源码解读

源码地址:https://github.com/redis/redis/tree/7.0/src

从源码来简单分析下 Redis 中事务的实现过程

1、MULTI 声明事务

Redis 中使用 MULTI 命令来声明和开启一个事务

// https://github.com/redis/redis/blob/7.0/src/multi.c#L104
void multiCommand(client *c) {
	// 判断是否已经开启了事务
	// 不持之事务的嵌套
    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
	// 设置事务标识
    c->flags |= CLIENT_MULTI;

    addReply(c,shared.ok);
}

1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;

2、给 flags 设置事务标识 CLIENT_MULTI

2、命令入队

开始事务之后,后面所有的命令都会被添加到事务队列中

// https://github.com/redis/redis/blob/7.0/src/multi.c#L59
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) {
    multiCmd *mc;

    // 这里有两种情况的判断  
    // 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误
    // 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
        
    // 在原commands后面配置空间以存放新命令
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 微信新配置的空间设置执行的命令和参数
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;
    ...
}

入队的时候会做个判断:

1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误;

2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动;

3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CASCLIENT_DIRTY_CAS 是在何时被标记,可继续看下文。

3、EXEC 执行事务

命令入队之后,再来看下事务的提交

// https://github.com/redis/redis/blob/7.0/src/multi.c#L140
void execCommand(client *c) {
    ...
    // 判断下是否开启了事务
    if (!(c->flags & CLIENT_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }

    // 事务中不能 watch 有过期时间的键值
    if (isWatchedKeyExpired(c)) {
        c->flags |= (CLIENT_DIRTY_CAS);
    }

     // 检查是否需要中退出事务,有下面两种情况  
     // 1、 watch 的 key 有变化了
     // 2、命令入队的时候,有语法错误  
    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) {
        if (c->flags & CLIENT_DIRTY_EXEC) {
            addReplyErrorObject(c, shared.execaborterr);
        } else {
            addReply(c, shared.nullarray[c->resp]);
        }
        // 取消事务
        discardTransaction(c);
        return;
    }

    uint64_t old_flags = c->flags;

    /* we do not want to allow blocking commands inside multi */
    // 事务中不允许出现阻塞命令
    c->flags |= CLIENT_DENY_BLOCKING;

    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

    server.in_exec = 1;

    orig_argv = c->argv;
    orig_argv_len = c->argv_len;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    // 循环处理执行事务队列中的命令
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->argv_len = c->mstate.commands[j].argv_len;
        c->cmd = c->realcmd = c->mstate.commands[j].cmd;

        
        // 权限检查
        int acl_errpos;
        int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
        if (acl_retval != ACL_OK) {
          ...
        } else {
            // 执行命令
            if (c->id == CLIENT_ID_AOF)
                call(c,CMD_CALL_NONE);
            else
                call(c,CMD_CALL_FULL);

            serverAssert((c->flags & CLIENT_BLOCKED) == 0);
        }

        // 命令执行后可能会被修改,需要更新操作
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }

    // restore old DENY_BLOCKING value
    if (!(old_flags & CLIENT_DENY_BLOCKING))
        c->flags &= ~CLIENT_DENY_BLOCKING;
        
    // 恢复原命令
    c->argv = orig_argv;
    c->argv_len = orig_argv_len;
    c->argc = orig_argc;
    c->cmd = c->realcmd = orig_cmd;
    // 清除事务
    discardTransaction(c);

    server.in_exec = 0;
}

事务提交的时候,命令的执行逻辑还是比较简单的

1、首先会进行一些检查;

  • 检查事务有没有嵌套;
  • watch 监听的键值是否有变动;
  • 事务中命令入队列的时候,是否有语法错误;

2、循环执行,事务队列中的命令。

通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行。

4、WATCH 监听变量

WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

看下 watch 的键值对是如何和客户端进行映射的

// https://github.com/redis/redis/blob/7.0/src/server.h#L918
typedef struct redisDb {
    ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    ...
} redisDb;

// https://github.com/redis/redis/blob/7.0/src/server.h#L1083
typedef struct client {
    ...
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ...
} client;

// https://github.com/redis/redis/blob/7.0/src/multi.c#L262
// 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。   
//
// 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key .
typedef struct watchedKey {
    // 键值
    robj *key;
    // 键值所在的db
    redisDb *db;
    // 客户端
    client *client;
    // 正在监听过期key 的标识
    unsigned expired:1; /* Flag that we're watching an already expired key. */
} watchedKey;

变量映射关系吐下所示

redis

分析完数据结构,看下 watch 的代码实现

// https://github.com/redis/redis/blob/7.0/src/multi.c#L441
void watchCommand(client *c) {
    int j;

    if (c->flags & CLIENT_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    /* No point in watching if the client is already dirty. */
    if (c->flags & CLIENT_DIRTY_CAS) {
        addReply(c,shared.ok);
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}

// https://github.com/redis/redis/blob/7.0/src/multi.c#L270
/* Watch for the specified key */
void watchForKey(client *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    // 检查是否正在 watch 传入的 key 
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    // 没有监听,添加监听的 key 到 db 中的 watched_keys 中
    clients = dictFetchValue(c->db->**watched_keys**,key);
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 添加 key 到 client 中的  watched_keys 中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->client = c;
    wk->db = c->db;
    wk->expired = keyIsExpired(c->db, key);
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
    listAddNodeTail(clients,wk);
}

1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;

2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;

3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。

上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS 标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS 是在何时被标记的呢?

// https://github.com/redis/redis/blob/7.0/src/db.c#L535
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/

// 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。
// 每次DB被刷新时,函数signalFlushDb()被调用。
/* Note that the 'c' argument may be NULL if the key was modified out of
 * a context of a client. */
// 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS
void signalModifiedKey(client *c, redisDb *db, robj *key) {
    touchWatchedKey(db,key);
    trackingInvalidateKey(c,key,1);
}

// https://github.com/redis/redis/blob/7.0/src/multi.c#L348
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
// 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    // 如果 redisDb 中的 watched_keys 为空,直接返回
    if (dictSize(db->watched_keys) == 0) return;
    // 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态  
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        watchedKey *wk = listNodeValue(ln);
        client *c = wk->client;

        if (wk->expired) {
            /* The key was already expired when WATCH was called. */
            if (db == wk->db &&
                equalStringObjects(key, wk->key) &&
                dictFind(db->dict, key->ptr) == NULL)
            {
                /* Already expired key is deleted, so logically no change. Clear
                 * the flag. Deleted keys are not flagged as expired. */
                wk->expired = 0;
                goto skip_client;
            }
            break;
        }

        c->flags |= CLIENT_DIRTY_CAS;
        /* As the client is marked as dirty, there is no point in getting here
         * again in case that key (or others) are modified again (or keep the
         * memory overhead till EXEC). */
         // 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key
        unwatchAllKeys(c);

    skip_client:
        continue;
    }
}

Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS 位,有下面两种情况:

1、每次修改数据库中的一个键值时;

2、每次DB被 flush 时,整个 Redis 的键值被清空;

上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。

几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand 等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/9014.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu18.04 安装完成后的开发配置

Ubuntu18.04 安装完成后的开发配置1 终端工具2 屏幕录制工具3 屏幕截图工具4 向日葵远程操作与开机自启动设置5 MarkDown笔记工具6 ROS 机器人操作系统安装7 编程开发工具8 机器人仿真环境Gazebo 安装Webots 安装CoppeliaSim 安装Mujuco 安装Pybullet 安装测试平台&#xff1a;…

【c#】前后端分离练习小项目学习笔记----纯干货

c#实现前端页面CRUD结果图涉及到的技术设计流程代码展示总结结果图 涉及到的技术 1、HttpClient请求网络 2、webapi接口用法 3、sqlsugar数据库使用 4、Json序列化、反序列化 设计流程 刚开始做这个小项目&#xff0c;很懵。按照平常中项目的写法&#xff0c;就是先配置好数据…

模式识别与机器学习(更新中)

模式识别与机器学习 使用的教材&#xff0c;PPT为1 公式推导部分直接去看白板推导2&#xff0c;不管是书上还是ppt都写的极其模糊 先说重点&#xff1a; 贝叶斯算概率参数估计 第二讲 贝叶斯学习基础 贝叶斯公式 先验概率是非条件概率 似然概率是给定变量的条件下观测变…

贤鱼的刷题日常--P1665 正方形计数--题目详解

&#x1f3c6;今日学习目标&#xff1a; &#x1f340;学习了解–P1665 正方形计数 ✅创作者&#xff1a;贤鱼 ⏰预计时间&#xff1a;5分钟 &#x1f389;个人主页&#xff1a;贤鱼的个人主页 &#x1f525;专栏系列&#xff1a;c &#x1f341;贤鱼的个人社区&#xff0c;欢迎…

基于matlab的瑞利衰落信道建模和仿真

目录 1.算法概述 2.仿真效果预览 3.核心MATLAB预览 4.完整MATLAB程序 1.算法概述 无线信道的小尺度衰弱特征可以分为三大类&#xff1a; 一类是由于多径传播导致短时间内幅度衰落&#xff1b; 一类是由于多径的时延扩展引起时间色散导致的信道衰弱&#xff1b; 一类是由于…

点成分享 | 微流控技术集成系统的应用

一、背景 微流控技术是指把化学和生物等领域中涉及的样品制备、反应、分离、检测、细胞培养、分选、裂解等基本操作单元集成到一块几平方厘米甚至更小的芯片上&#xff0c;由微通道形成网络&#xff0c;以可控流体贯穿整个系统&#xff0c;用以实现常规化学、生物、材料、光学…

SAP UI5 SmartTable 控件的使用介绍试读版

本文来自笔者 SAP 开发技术交流知识星球内一位朋友的提问&#xff1a; smartfilter bar 有个输入框Cost Element绑定了cds实现value help 请问其对应的suggestion功能是通过cds的注解实现的嘛&#xff1f; 要回答这个问题&#xff0c;我们必须首先掌握 SAP UI5 SmartTable 控件…

软考应该如何备考?

首先应该选择软考的科目&#xff0c;软考科目众多&#xff0c;计算机软件资格考试设置了27个专业资格&#xff0c;涵盖5个专业领域&#xff0c; 3个级别层次&#xff08;初级、中级、高级&#xff09;。 首先软考报名是不限学历&#xff0c;经验&#xff0c;资历的&#xff0c;…

cpu设计和实现(iverilog工具)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 编写verilog的工具不少。大家熟知的modelsim、quartus和vivado都可以用来编写的。前者主要是用来仿真&#xff0c;quartus主要用于altera芯片&…

伪元素和伪类的概念和区别

目录 伪类 伪元素 区别&#xff1a; 对于区别的解释&#xff1a; 伪类 这是菜鸟教程下方的一个笔记&#xff0c;看着多但是干货满满。 伪类选择元素基于的是当前元素处于的状态&#xff0c;或者说元素当前所具有的特性&#xff0c;而不是元素的id、class、属性等静态的标志。…

小程序获取不到用户头像和昵称返回微信用户问题解决,即小程序授权获取用户头像规则调整的最新解决方案

最近好多同学在学习石头哥小程序课程的时候&#xff0c;遇到了下面这样的问题&#xff0c;在小程序授权获取用户头像和昵称时&#xff0c;获取到的是下面这样的。 到底是什么原因导致的呢&#xff0c;去小程序官方文档一看&#xff0c;又是官方改规则了。 点进去一看&#x…

超详细!linux系统nlg-eval安装指南

前言&#xff1a;最近在做文本生成的任务&#xff0c;需要用到bleu等评价指标&#xff0c;看到其他研究工作中都在用nlg-eval这个github库&#xff0c;就想把它拿过来用&#xff0c;然而安装过程并不是一帆风顺的&#xff0c;谨以此篇博客记录之&#xff0c;为后来者提供一些经…

【菜菜的sklearn课堂笔记】逻辑回归与评分卡-重要参数penalty C

视频作者&#xff1a;菜菜TsaiTsai 链接&#xff1a;【技术干货】菜菜的机器学习sklearn【全85集】Python进阶_哔哩哔哩_bilibili 正则化是用来防止模型过拟合的过程&#xff0c;常用的有L1正则化和L2正则化两种选项。这个增加的范式&#xff0c;被称为“正则项”&#xff0c;也…

11个开源测试自动化框架,如何选?

以下为作者观点&#xff1a; 如果你正在考虑建立你自己的测试自动化框架&#xff0c;请再想一想。在大多数情况下&#xff0c;你最好可以考虑一个或多个可用的开源选项。 这是因为&#xff0c;一般来说&#xff0c;框架是一套可以跨团队使用的最佳实践、假设、通用工具和库。…

操作符详解(C语言)

算术操作符(运算符)&#xff1a; - * / % 在算术运算符中&#xff0c; - * 跟我们数学中的运算是一样的 在这里主要说一下 / 跟 % 这两个操作符 1、/ &#xff08;除法&#xff09; 在除法运算中&#xff0c;若除号两边的操作数都为整型&#xff0c;则结果…

布隆过滤器(Bloom Filter)

[TOC](布隆过滤器(Bloom Filter)) &#xff08;1&#xff09;什么是布隆过滤器 &#xff08;1&#xff09;布隆过滤器的简单介绍 布隆过滤器&#xff08;Bloom Filter&#xff09;是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以…

Python中的print()

本专栏为学习B站杨淑娟老师视频所记&#xff0c;仅做个人笔记使用&#x1f60b;&#x1f60b;&#x1f60b; &#x1f449;杨淑娟老师视频课&#x1f448; Python 职位方向 一、chapter1 1.使用print函数进行简单的输出 a100 # 变量a,值为100 b50 # 变量b,值为50print(90) …

【Linux实验】软中断通信

实验目的&#xff1a; 1&#xff0e;了解什么是信号&#xff0c;熟练掌握signal&#xff08;&#xff09;&#xff0c;wait&#xff08;&#xff09;&#xff0c;exit&#xff08;&#xff09;&#xff0c;kill&#xff08;&#xff09;函数。 2&#xff0e;熟悉并掌握Linux系统…

Learning to Enhance Low-Light Image via Zero-Reference Deep Curve Estimation

学习目标&#xff1a; Learning to Enhance Low-Light Image via Zero-Reference Deep Curve Estimation&#xff08;零参考深度曲线估计&#xff09; 个人体会&#xff1a; 本文的特色就是使用了PA和CA,对不同通道和不同像素做不同处理,虽然本文的实现过程懂了,但是实现去雾…

缝纫机牙架的数控工艺工装设计与编程

目 录 绪 论 1 2.差动牙架的工艺设计 3 2.1 机械加工工艺规程概述 3 2.2.1 零件的技术条件 4 2.2.2 加工表面及其要求 5 2.2.3零件的材料 6 2.3毛坯的选择 6 2.3.1毛坯的种类 6 2.4 基准的选择 8 2.5 机械加工工艺路线的拟订 10 3 差动牙架的工装设计 17 3.1 夹具概述 17 3.2 …