通过对 Redis 源码中的 multi.c 文件进行分析,解释 Redis 事务(transaction)功能的实现原理。
Redis 的事务
在开始研究 multi.c 的源码之前,不妨先来回顾一下 Redis 的事务功能的用法。
Redis 的事务使用 MULTI 命令 和 EXEC 命令 包围,处在这两条命令之间的一条或多条命令,会以 FIFO 的方式运行:
redis> MULTI # 标记事务开始 OK redis> INCR user_id QUEUED redis> SET greeting “hello moto” QUEUED redis> GET replay QUEUED redis> EXEC # 标记事务结束,并执行事务 1) (integer) 1 2) OK 3) “hello world” |
需要注意的是,Redis 的事务和关系数据库的事务并不一样:Redis 的事务并不保证 ACID 性质。
也就是说,在 Redis 事务的执行过程中,因为服务器失败而造成数据不一致的情况是可能存在的,在后面对代码进行分析的时候,就会清晰地看到这一点。
MULTI 命令
每个 Redis 事务都以 MULTI 命令开始,而 MULTI 命令本身的实现则非常简单:
void multiCommand(redisClient *c) { // MULTI 不可以嵌套使用 if (c->flags & REDIS_MULTI) { addReplyError(c,”MULTI calls can not be nested”); return; } c->flags |= REDIS_MULTI; // 打开 FLAG addReply(c,shared.ok); } |
multiCommand 的主要动作就是对 redisClient 结构的 flags 进行检查和设置。
它首先检查 flags ,确保没有嵌套使用 MULTI 命令。
如果检查通过,那么就使用位或操作,将 REDIS_MULTI 这个 FLAG 打开。
最后向客户端返回 OK 。
命令的入队
当 REDIS_MULTI 这个 FLAG 被打开之后, 传入 Redis 客户端的命令就不会马上被执行(部分命令如EXEC 除外), 这些未被执行的命令会被 queueMultiCommand 以 FIFO 的方式放入一个数组里,储存起来。
redis.c 文件里的 processCommand 函数说明了这一点:
int processCommand(redisClient *c) { // 其他代码 … /* Exec the command */ // 如果 REDIS_MULTI 被打开 // 且要执行的命令不是 EXEC 、 DISCARD 、 MULTI 或 WATCH // 那么将这个命令入队 if (c->flags & REDIS_MULTI && c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) { queueMultiCommand(c); // 入队 addReply(c,shared.queued); // 返回已入队信息 } else { call(c,REDIS_CALL_FULL); } return REDIS_OK; } |
queueMultiCommand 函数将要执行的命令、命令的参数个数以及命令的参数放进 multiCmd 结构中,并将这个结构保存到 redisClient.mstate.command 数组的末尾,从而形成一个保存了要执行的命令的 FIFO 队列:
/* Add a new command into the MULTI commands queue */ // 添加新命令到 MULTI 的执行队列中(FIFO) void queueMultiCommand(redisClient *c) { multiCmd *mc; int j; // 为新命令分配储存结构,并放到数组的末尾 c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.count+1)); // 设置新命令 mc = c->mstate.commands+c->mstate.count; // 指向储存新命令的结构体 mc->cmd = c->cmd; // 设置命令 mc->argc = c->argc; // 设置参数数量 mc->argv = zmalloc(sizeof(robj*)*c->argc); // 生成参数空间 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc); // 设置参数 for (j = 0; j < c->argc; j++) incrRefCount(mc->argv[j]); // 更新命令数量的计数器 c->mstate.count++; } |
以下是 queueMultiCommand 函数用到的几个结构,放在 redis.h 文件中,定义都非常直观:
typedef struct redisClient { // 其他属性 … redisDb *db; // 当前 DB int flags; // 标记事务状态,以及 WATCH 状态 multiState mstate; // 事务中的所有命令 list *watched_keys; // 这个客户端 WATCH 的所有 KEY // 其他属性 … } redisClient; typedef struct multiState { multiCmd *commands; // 保存事务中所有命令的数组 int count; // 命令的数量 } multiState; typedef struct multiCmd { robj **argv; // 命令参数 int argc; // 命令参数数量 struct redisCommand *cmd; // 命令 } multiCmd; |
回到文章开头的例子,在执行以下几个命令之后:
redis> MULTI # 标记事务开始 OK redis> INCR user_id QUEUED redis> SET greeting “hello moto” QUEUED redis> GET replay QUEUED |
redisClient.mstate 的值应该类似这个样子(用 JSON 结构来表示):
redisClient.mstate = { ’count’: 3, ’commands’: [ { ’argv’: [‘user_id’], ’argc’: 1, ’cmd’: ‘incrCommand’, }, { ’argv’: [‘greeting’, ‘hello moto’], ’argc’: 2, ’cmd’: ‘setCommand’, }, { ’argv’: [‘replay’], ’argc’: 1, ’cmd’: ‘getCommand’, } ] } |
执行事务
既然事务里已经有了等待执行的命令,那么此时不运行事务,更待何时?!
事务的执行由 execCommand 函数进行,它的定义如下:
void execCommand(redisClient *c) { int j; robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; // 如果没执行过 MULTI ,报错 if (!(c->flags & REDIS_MULTI)) { addReplyError(c,”EXEC without MULTI”); return; } /* Check if we need to abort the EXEC if some WATCHed key was touched. * A failed EXEC will return a multi bulk nil object. */ // 如果在执行事务之前,有监视中(WATCHED)的 key 被改变 // 那么取消这个事务 if (c->flags & REDIS_DIRTY_CAS) { freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); unwatchAllKeys(c); addReply(c,shared.nullmultibulk); return; } /* Replicate a MULTI request now that we are sure the block is executed. * This way we’ll deliver the MULTI/…./EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ // 为保证一致性和原子性 // 如果处在 AOF 模式中,向 AOF 文件发送 MULTI // 如果处在复制模式中,向附属节点发送 MULTI execCommandReplicateMulti(c); /* Exec all the queued commands */ // 开始执行所有事务中的命令(FIFO 方式) unwatchAllKeys(c); /* Unwatch ASAP otherwise we’ll waste CPU cycles */ // 备份所有参数和命令 orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; addReplyMultiBulkLen(c,c->mstate.count); for (j = 0; j < c->mstate.count; j++) { c->argc = c->mstate.commands[j].argc; // 取出参数数量 c->argv = c->mstate.commands[j].argv; // 取出参数 c->cmd = c->mstate.commands[j].cmd; // 取出要执行的命令 call(c,REDIS_CALL_FULL); // 执行命令 /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } // 恢复所有参数和命令 c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; // 重置事务状态 freeClientMultiState(c); initClientMultiState(c); c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); /* Make sure the EXEC command is always replicated / AOF, since we * always send the MULTI command (we can’t know beforehand if the * next operations will contain at least a modification to the DB). */ // 更新状态值,确保事务执行之后的状态为脏 server.dirty++; } |
execCommand 函数的所有行为代码中都有注释,这里就不再赘述了。
需要提醒注意的是,在关键部分的 for 循环代码里,我们可以看见, execCommand 的主要作用只是一个个地执行储存在 redisClient.mstate 数组中的命令, 命令在执行之前并没有使用日志之类的保护机制, 这是为什么 Redis 的事务并不支持 ACID 这些性质的(其中一个)原因。
另外要注意的是,在 execCommand 的前半部分,调用了 execCommandReplicateMulti 函数, 如果有需要的话, Redis 就会向 AOF 文件和其他复制实例(replication)发送 MULTI 命令, 告诉它们:『哥要开始执行事务了,各单位请注意!』。
这样的话,如果事务在执行过程中失败,那么 AOF 文件和复制实例都会察觉到, 这时 Redis 实例会报错并退出,然后等待管理员使用 redis-check-aof 命令来进行数据修复, 具体请参考: Redis 官方网站上的 Transaction 介绍 。
取消事务
在一些情况下,我们也想在中途取消事务的执行。
DISCARD 命令就是用来中途取消事务的, 它的实现由 discardTransaction 和 discardCommand两个函数实现:
// 放弃执行事务 void discardTransaction(redisClient *c) { freeClientMultiState(c); // 释放事务资源 initClientMultiState(c); // 重置事务状态 c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);; // 关闭 FLAG unwatchAllKeys(c); // 取消对所有 key 的 WATCH } // 放弃执行事务(命令) void discardCommand(redisClient *c) { // 如果没有调用过 MULTI ,报错 if (!(c->flags & REDIS_MULTI)) { addReplyError(c,”DISCARD without MULTI”); return; } discardTransaction(c); addReply(c,shared.ok); } |
其中 freeClientMultiState 和 initClientMultiState 两个函数用于重置redisClient.mstate 数组,从而达到删除所有入队命令的作用:
/* Client state initialization for MULTI/EXEC */ // 初始化客户端状态,为执行事务作准备 void initClientMultiState(redisClient *c) { c->mstate.commands = NULL; // 清空命令数组 c->mstate.count = 0; // 清空命令计数器 } /* Release all the resources associated with MULTI/EXEC state */ // 释放所有事务资源 void freeClientMultiState(redisClient *c) { int j; // 释放所有命令 for (j = 0; j < c->mstate.count; j++) { int i; multiCmd *mc = c->mstate.commands+j; // 将指针指向目标命令 // 释放所有命令的参数,以及保存参数的数组 for (i = 0; i < mc->argc; i++) decrRefCount(mc->argv[i]); zfree(mc->argv); } // 释放保存命令的数组 zfree(c->mstate.commands); } |
小结
对 Redis 事务的实现分析就到此结束了,希望这篇文章对你理解 Redis 的事务有所帮助。
跟之前一样,我将带有完整注释的代码放到了 GITHUB 上,有兴趣了解全部细节的朋友可以参考源码: https://github.com/huangz1990/reading_redis_source 。
最后,和 Redis 的事务有关的命令还有 WATCH 和 UNWATCH ,在下篇文章中,会继续探讨它们的实现方式。
我们一直都在努力坚持原创.......请不要一声不吭,就悄悄拿走。
我原创,你原创,我们的内容世界才会更加精彩!
【所有原创内容版权均属TechTarget,欢迎大家转发分享。但未经授权,严禁任何媒体(平面媒体、网络媒体、自媒体等)以及微信公众号复制、转载、摘编或以其他方式进行使用。】
微信公众号
TechTarget
官方微博
TechTarget中国
作者
相关推荐
-
OpenWorld18大会:Ellison宣布数据库的搜寻和破坏任务
在旧金山举行的甲骨文OpenWorld 2018大会中,甲骨文首席技术官(CTO)兼创始人Larry Elli […]
-
ObjectRocket着力发展Azure MongoDB服务
MongoDB吸引了微软公司的注意力,微软公司计划针对运行于该公司2017年发布的Azure Cosmos D […]
-
2017年1月数据库流行度排行榜 新年新气象
新年新气象,数据库知识网站DB-engines最近更新了2017年1月份数据库流行度榜单。TechTarget数据库网站将与您分享1月份的榜单排名情况,让我们拭目以待。
-
创建NoSQL数据建模符号 企业架构师亲自上阵
新兴的NoSQL数据风格促使创新的应用程序快速发展,但NoSQL同时也带来了挑战。NoSQL系统能够快速投入生产,有时甚至根本不用创建任何的前期模式。