真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何深入理解Redis事務(wù)

本篇內(nèi)容主要講解“如何深入理解redis事務(wù)”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“如何深入理解Redis事務(wù)”吧!

成都創(chuàng)新互聯(lián)是一家專注于網(wǎng)站設(shè)計制作、網(wǎng)站設(shè)計與策劃設(shè)計,棗強網(wǎng)站建設(shè)哪家好?成都創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:棗強等地區(qū)。棗強做網(wǎng)站價格咨詢:18982081108

定制制作可以根據(jù)自己的需求進行定制,成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)構(gòu)思過程中功能建設(shè)理應(yīng)排到主要部位公司成都做網(wǎng)站、成都網(wǎng)站建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)的運用實際效果公司網(wǎng)站制作網(wǎng)站建立與制做的實際意義

如何深入理解Redis事務(wù)

Redis可以看成NOSQL類型的數(shù)據(jù)庫系統(tǒng), Redis也提供了事務(wù), 但是和傳統(tǒng)的關(guān)系型數(shù)據(jù)庫的事務(wù)既有相似性, 也存在區(qū)別.因為Redis的架構(gòu)基于操作系統(tǒng)的多路復(fù)用的IO接口,主處理流程是一個單線程,因此對于一個完整的命令, 其處理都是原子性的, 但是如果需要將多個命令作為一個不可分割的處理序列, 就需要使用事務(wù).

Redis事務(wù)有如下一些特點:

  •  事務(wù)中的命令序列執(zhí)行的時候是原子性的,也就是說,其不會被其他客戶端的命令中斷. 這和傳統(tǒng)的數(shù)據(jù)庫的事務(wù)的屬性是類似的.

  •  盡管Redis事務(wù)中的命令序列是原子執(zhí)行的, 但是事務(wù)中的命令序列執(zhí)行可以部分成功,這種情況下,Redis事務(wù)不會執(zhí)行回滾操作. 這和傳統(tǒng)關(guān)系型數(shù)據(jù)庫的事務(wù)是有區(qū)別的.

  •  盡管Redis有RDB和AOF兩種數(shù)據(jù)持久化機制, 但是其設(shè)計目標(biāo)是高效率的cache系統(tǒng). Redis事務(wù)只保證將其命令序列中的操作結(jié)果提交到內(nèi)存中,不保證持久化到磁盤文件. 更進一步的, Redis事務(wù)和RDB持久化機制沒有任何關(guān)系, 因為RDB機制是對內(nèi)存數(shù)據(jù)結(jié)構(gòu)的全量的快照.由于AOF機制是一種增量持久化,所以事務(wù)中的命令序列會提交到AOF的緩存中.但是AOF機制將其緩存寫入磁盤文件是由其配置的實現(xiàn)策略決定的,和Redis事務(wù)沒有關(guān)系.

Redis事務(wù)API

從宏觀上來講, Redis事務(wù)開始后, 會緩存后續(xù)的操作命令及其操作數(shù)據(jù),當(dāng)事務(wù)提交時,原子性的執(zhí)行緩存的命令序列.

從版本2.2開始,Redis提供了一種樂觀的鎖機制, 配合這種機制,Redis事務(wù)提交時, 變成了事務(wù)的條件執(zhí)行. 具體的說,如果樂觀鎖失敗了,事務(wù)提交時, 丟棄事務(wù)中的命令序列,如果樂觀鎖成功了, 事務(wù)提交時,才會執(zhí)行其命令序列.當(dāng)然,也可以不使用樂觀鎖機制, 在事務(wù)提交時, 無條件執(zhí)行事務(wù)的命令序列.

Redis事務(wù)涉及到MULTI, EXEC, DISCARD, WATCH和UNWATCH這五個命令:

  •  事務(wù)開始的命令是MULTI, 該命令返回OK提示信息. Redis不支持事務(wù)嵌套,執(zhí)行多次MULTI命令和執(zhí)行一次是相同的效果.嵌套執(zhí)行MULTI命令時,Redis只是返回錯誤提示信息.

  •  EXEC是事務(wù)的提交命令,事務(wù)中的命令序列將被執(zhí)行(或者不被執(zhí)行,比如樂觀鎖失敗等).該命令將返回響應(yīng)數(shù)組,其內(nèi)容對應(yīng)事務(wù)中的命令執(zhí)行結(jié)果.

  •  WATCH命令是開始執(zhí)行樂觀鎖,該命令的參數(shù)是key(可以有多個), Redis將執(zhí)行WATCH命令的客戶端對象和key進行關(guān)聯(lián),如果其他客戶端修改了這些key,則執(zhí)行WATCH命令的客戶端將被設(shè)置樂觀鎖失敗的標(biāo)志.該命令必須在事務(wù)開始前執(zhí)行,即在執(zhí)行MULTI命令前執(zhí)行WATCH命令,否則執(zhí)行無效,并返回錯誤提示信息.

  •  UNWATCH命令將取消當(dāng)前客戶端對象的樂觀鎖key,該客戶端對象的事務(wù)提交將變成無條件執(zhí)行.

  •  DISCARD命令將結(jié)束事務(wù),并且會丟棄全部的命令序列.

需要注意的是,EXEC命令和DISCARD命令結(jié)束事務(wù)時,會調(diào)用UNWATCH命令,取消該客戶端對象上所有的樂觀鎖key.

無條件提交

如果不使用樂觀鎖, 則事務(wù)為無條件提交.下面是一個事務(wù)執(zhí)行的例子:

multi  +OK  incr key1  +QUEUED  set key2 val2  +QUEUED  exec  *2  :1  +OK

當(dāng)客戶端開始事務(wù)后, 后續(xù)發(fā)送的命令將被Redis緩存起來,Redis向客戶端返回響應(yīng)提示字符串QUEUED.當(dāng)執(zhí)行EXEC提交事務(wù)時,緩存的命令依次被執(zhí)行,返回命令序列的執(zhí)行結(jié)果.

事務(wù)的錯誤處理

事務(wù)提交命令EXEC有可能會失敗, 有三種類型的失敗場景:

  •  在事務(wù)提交之前,客戶端執(zhí)行的命令緩存失敗.比如命令的語法錯誤(命令參數(shù)個數(shù)錯誤, 不支持的命令等等).如果發(fā)生這種類型的錯誤,Redis將向客戶端返回包含錯誤提示信息的響應(yīng).

  •  事務(wù)提交時,之前緩存的命令有可能執(zhí)行失敗.

  •  由于樂觀鎖失敗,事務(wù)提交時,將丟棄之前緩存的所有命令序列.

當(dāng)發(fā)生第一種失敗的情況下,客戶端在執(zhí)行事務(wù)提交命令EXEC時,將丟棄事務(wù)中所有的命令序列.下面是一個例子:

multi  +OK  incr num1 num2  -ERR wrong number of arguments for 'incr' command  set key1 val1  +QUEUED  exec  -EXECABORT Transaction discarded because of previous errors.

命令incr num1 num2并沒有緩存成功, 因為incr命令只允許有一個參數(shù),是個語法錯誤的命令.Redis無法成功緩存該命令,向客戶端發(fā)送錯誤提示響應(yīng).接下來的set key1 val1命令緩存成功.最后執(zhí)行事務(wù)提交的時候,因為發(fā)生過命令緩存失敗,所以事務(wù)中的所有命令序列被丟棄.

如果事務(wù)中的所有命令序列都緩存成功,在提交事務(wù)的時候,緩存的命令中仍可能執(zhí)行失敗.但Redis不會對事務(wù)做任何回滾補救操作.下面是一個這樣的例子:

multi  +OK  set key1 val1  +QUEUED  lpop key1  +QUEUED  incr num1  +QUEUED  exec  *3  +OK  -WRONGTYPE Operation against a key holding the wrong kind of value  :1

所有的命令序列都緩存成功,但是在提交事務(wù)的時候,命令set key1 val1和incr num1執(zhí)行成功了,Redis保存了其執(zhí)行結(jié)果,但是命令lpop key1執(zhí)行失敗了.

樂觀鎖機制

Redis事務(wù)和樂觀鎖一起使用時,事務(wù)將成為有條件提交.

關(guān)于樂觀鎖,需要注意的是:

  •  WATCH命令必須在MULTI命令之前執(zhí)行. WATCH命令可以執(zhí)行多次.

  •  WATCH命令可以指定樂觀鎖的多個key,如果在事務(wù)過程中,任何一個key被其他客戶端改變,則當(dāng)前客戶端的樂觀鎖失敗,事務(wù)提交時,將丟棄所有命令序列.

  •  多個客戶端的WATCH命令可以指定相同的key.

WATCH命令指定樂觀鎖后,可以接著執(zhí)行MULTI命令進入事務(wù)上下文,也可以在WATCH命令和MULTI命令之間執(zhí)行其他命令. 具體使用方式取決于場景需求,不在事務(wù)中的命令將立即被執(zhí)行.

如果WATCH命令指定的樂觀鎖的key,被當(dāng)前客戶端改變,在事務(wù)提交時,樂觀鎖不會失敗.

如果WATCH命令指定的樂觀鎖的key具有超時屬性,并且該key在WATCH命令執(zhí)行后, 在事務(wù)提交命令EXEC執(zhí)行前超時, 則樂觀鎖不會失敗.如果該key被其他客戶端對象修改,則樂觀鎖失敗.

一個執(zhí)行樂觀鎖機制的事務(wù)例子:

rpush list v1 v2 v3  :3  watch list  +OK  multi  +OK lpop list  +QUEUED  exec  *1  $2  v1

下面是另一個例子,樂觀鎖被當(dāng)前客戶端改變, 事務(wù)提交成功:

watch num  +OK  multi  +OK  incr num  +QUEUED  exec  *1  :2

Redis事務(wù)和樂觀鎖配合使用時, 可以構(gòu)造實現(xiàn)單個Redis命令不能完成的更復(fù)雜的邏輯.

Redis事務(wù)的源碼實現(xiàn)機制

首先,事務(wù)開始的MULTI命令執(zhí)行的函數(shù)為multiCommand, 其實現(xiàn)為(multi.c):

void multiCommand(redisClient *c) {      if (c->flags & REDIS_MULTI) {          addReplyError(c,"MULTI calls can not be nested");          return;      }      c->flags |= REDIS_MULTI;      addReply(c,shared.ok);  }

該命令只是在當(dāng)前客戶端對象上加上REDIS_MULTI標(biāo)志, 表示該客戶端進入了事務(wù)上下文.

客戶端進入事務(wù)上下文后,后續(xù)執(zhí)行的命令將被緩存. 函數(shù)processCommand是Redis處理客戶端命令的入口函數(shù), 其實現(xiàn)為(redis.c):

int processCommand(redisClient *c) {      /* The QUIT command is handled separately. Normal command procs will       * go through checking for replication and QUIT will cause trouble       * when FORCE_REPLICATION is enabled and would be implemented in       * a regular command proc. */      if (!strcasecmp(c->argv[0]->ptr,"quit")) {          addReply(c,shared.ok);          c->flags |= REDIS_CLOSE_AFTER_REPLY;          return REDIS_ERR;     }      /* Now lookup the command and check ASAP about trivial error conditions       * such as wrong arity, bad command name and so forth. */      c->ccmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);      if (!c->cmd) {          flagTransaction(c);          addReplyErrorFormat(c,"unknown command '%s'",              (char*)c->argv[0]->ptr);          return REDIS_OK;      } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||                 (c->argc < -c->cmd->arity)) {          flagTransaction(c);          addReplyErrorFormat(c,"wrong number of arguments for '%s' command",              c->cmd->name);          return REDIS_OK;     }      /* Check if the user is authenticated */      if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)      {          flagTransaction(c);          addReply(c,shared.noautherr);          return REDIS_OK;      }      /* Handle the maxmemory directive.       *       * First we try to free some memory if possible (if there are volatile       * keys in the dataset). If there are not the only thing we can do       * is returning an error. */      if (server.maxmemory) {          int retval = freeMemoryIfNeeded();          /* freeMemoryIfNeeded may flush slave output buffers. This may result           * into a slave, that may be the active client, to be freed. */          if (server.current_client == NULL) return REDIS_ERR;          /* It was impossible to free enough memory, and the command the client           * is trying to execute is denied during OOM conditions? Error. */          if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {              flagTransaction(c);              addReply(c, shared.oomerr);              return REDIS_OK;          }      }      /* Don't accept write commands if there are problems persisting on disk       * and if this is a master instance. */      if (((server.stop_writes_on_bgsave_err &&            server.saveparamslen > 0 &&            server.lastbgsave_status == REDIS_ERR) ||            server.aof_last_write_status == REDIS_ERR) &&          server.masterhost == NULL &&          (c->cmd->flags & REDIS_CMD_WRITE ||           c->cmd->proc == pingCommand))      {          flagTransaction(c);          if (server.aof_last_write_status == REDIS_OK)              addReply(c, shared.bgsaveerr);          else              addReplySds(c,                  sdscatprintf(sdsempty(),                  "-MISCONF Errors writing to the AOF file: %s\r\n",                  strerror(server.aof_last_write_errno)));          return REDIS_OK;      }      /* Don't accept write commands if there are not enough good slaves and       * user configured the min-slaves-to-write option. */      if (server.masterhost == NULL &&          server.repl_min_slaves_to_write &&          server.repl_min_slaves_max_lag &&          c->cmd->flags & REDIS_CMD_WRITE &&          server.repl_good_slaves_count < server.repl_min_slaves_to_write)      {          flagTransaction(c);          addReply(c, shared.noreplicaserr);          return REDIS_OK;      }      /* Don't accept write commands if this is a read only slave. But       * accept write commands if this is our master. */      if (server.masterhost && server.repl_slave_ro &&          !(c->flags & REDIS_MASTER) &&          c->cmd->flags & REDIS_CMD_WRITE)      {          addReply(c, shared.roslaveerr);          return REDIS_OK;      }      /* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */      if (c->flags & REDIS_PUBSUB &&          c->cmd->proc != pingCommand &&          c->cmd->proc != subscribeCommand &&          c->cmd->proc != unsubscribeCommand &&          c->cmd->proc != psubscribeCommand &&          c->cmd->proc != punsubscribeCommand) {          addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");          return REDIS_OK;      }      /* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and       * we are a slave with a broken link with master. */      if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&          server.repl_serve_stale_data == 0 &&          !(c->cmd->flags & REDIS_CMD_STALE))      {          flagTransaction(c);          addReply(c, shared.masterdownerr);          return REDIS_OK;      }      /* Loading DB? Return an error if the command has not the       * REDIS_CMD_LOADING flag. */      if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {          addReply(c, shared.loadingerr);          return REDIS_OK;      }      /* Lua script too slow? Only allow a limited number of commands. */      if (server.lua_timedout &&            c->cmd->proc != authCommand &&            c->cmd->proc != replconfCommand &&          !(c->cmd->proc == shutdownCommand &&            c->argc == 2 &&            tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&          !(c->cmd->proc == scriptCommand &&            c->argc == 2 &&            tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))      {          flagTransaction(c);          addReply(c, shared.slowscripterr);          return REDIS_OK;      }      /* Exec the command */      if (c->flags & REDIS_MULTI &&          c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&          c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)      {          queueMultiCommand(c);          addReply(c,shared.queued);      } else {          call(c,REDIS_CALL_FULL);          if (listLength(server.ready_keys))              handleClientsBlockedOnLists();      }      return REDIS_OK;  }

Line145:151當(dāng)客戶端處于事務(wù)上下文時, 如果接收的是非事務(wù)命令(MULTI, EXEC, WATCH, DISCARD), 則調(diào)用queueMultiCommand將命令緩存起來,然后向客戶端發(fā)送成功響應(yīng).

在函數(shù)processCommand中, 在緩存命令之前, 如果檢查到客戶端發(fā)送的命令不存在,或者命令參數(shù)個數(shù)不正確等情況, 會調(diào)用函數(shù)flagTransaction標(biāo)命令緩存失敗.也就是說,函數(shù)processCommand中, 所有調(diào)用函數(shù)flagTransaction的條件分支,都是返回失敗響應(yīng).

緩存命令的函數(shù)queueMultiCommand的實現(xiàn)為(multi.c):

/* Add a new command into the MULTI commands queue */  void queueMultiCommand(redisClient *c) {      multiCmd *mc;      int j;      c->mstate.commands = zrealloc(c->mstate.commands,              sizeof(multiCmd)*(c->mstate.count+1));      mc = c->mstate.commands+c->mstate.count;      mc->ccmd = c->cmd;      mc->argc = c->argc;      mc->argv = zmalloc(sizeof(robj*)*c->argc);      memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);      for (j = 0; j < c->argc; j++)          incrRefCount(mc->argv[j]);      c->mstate.count++;  }

在事務(wù)上下文中, 使用multiCmd結(jié)構(gòu)來緩存命令, 該結(jié)構(gòu)定義為(redis.h):

/* Client MULTI/EXEC state */  typedef struct multiCmd {      robj **argv;      int argc;      struct redisCommand *cmd;  } multiCmd;

其中argv字段指向命令的參數(shù)內(nèi)存地址,argc為命令參數(shù)個數(shù), cmd為命令描述結(jié)構(gòu), 包括名字和函數(shù)指針等.

命令參數(shù)的內(nèi)存空間已經(jīng)使用動態(tài)分配記錄于客戶端對象的argv字段了, multiCmd結(jié)構(gòu)的argv字段指向客戶端對象redisClient的argv即可.

無法緩存命令時, 調(diào)用函數(shù)flagTransaction,該函數(shù)的實現(xiàn)為(multi.c):

/* Flag the transacation as DIRTY_EXEC so that EXEC will fail.   * Should be called every time there is an error while queueing a command. */  void flagTransaction(redisClient *c) {      if (c->flags & REDIS_MULTI)          c->flags |= REDIS_DIRTY_EXEC;  }

該函數(shù)在客戶端對象中設(shè)置REDIS_DIRTY_EXEC標(biāo)志, 如果設(shè)置了這個標(biāo)志, 事務(wù)提交時, 命令序列將被丟棄.

最后,在事務(wù)提交時, 函數(shù)processCommand中將調(diào)用call(c,REDIS_CALL_FULL);, 其實現(xiàn)為(redis.c):

/* Call() is the core of Redis execution of a command */  void call(redisClient *c, int flags) {      long long dirty, start, duration;      int cclient_old_flags = c->flags;      /* Sent the command to clients in MONITOR mode, only if the commands are       * not generated from reading an AOF. */      if (listLength(server.monitors) &&          !server.loading &&          !(c->cmd->flags & (REDIS_CMD_SKIP_MONITOR|REDIS_CMD_ADMIN)))      {          replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);     }      /* Call the command. */      c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);      redisOpArrayInit(&server.also_propagate);      dirty = server.dirty;      start = ustime();      c->cmd->proc(c);      duration = ustime()-start;      dirty = server.dirty-dirty;      if (dirty < 0) dirty = 0;      /* When EVAL is called loading the AOF we don't want commands called       * from Lua to go into the slowlog or to populate statistics. */      if (server.loading && c->flags & REDIS_LUA_CLIENT)          flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);      /* If the caller is Lua, we want to force the EVAL caller to propagate       * the script if the command flag or client flag are forcing the       * propagation. */      if (c->flags & REDIS_LUA_CLIENT && server.lua_caller) {          if (c->flags & REDIS_FORCE_REPL)              server.lua_caller->flags |= REDIS_FORCE_REPL;          if (c->flags & REDIS_FORCE_AOF)              server.lua_caller->flags |= REDIS_FORCE_AOF;      }     /* Log the command into the Slow log if needed, and populate the       * per-command statistics that we show in INFO commandstats. */      if (flags & REDIS_CALL_SLOWLOG && c->cmd->proc != execCommand) {          char *latency_event = (c->cmd->flags & REDIS_CMD_FAST) ?                                "fast-command" : "command";          latencyAddSampleIfNeeded(latency_event,duration/1000);          slowlogPushEntryIfNeeded(c->argv,c->argc,duration);      }      if (flags & REDIS_CALL_STATS) {          c->cmd->microseconds += duration;          c->cmd->calls++;      }      /* Propagate the command into the AOF and replication link */      if (flags & REDIS_CALL_PROPAGATE) {          int flags = REDIS_PROPAGATE_NONE;          if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;          if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;          if (dirty)              flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);          if (flags != REDIS_PROPAGATE_NONE)              propagate(c->cmd,c->db->id,c->argv,c->argc,flags);      }      /* Restore the old FORCE_AOF/REPL flags, since call can be executed       * recursively. */      c->flags &= ~(REDIS_FORCE_AOF|REDIS_FORCE_REPL);      c->flags |= client_old_flags & (REDIS_FORCE_AOF|REDIS_FORCE_REPL);      /* Handle the alsoPropagate() API to handle commands that want to propagate       * multiple separated commands. */      if (server.also_propagate.numops) {          int j;          redisOp *rop;          for (j = 0; j < server.also_propagate.numops; j++) {              rop = &server.also_propagate.ops[j];              propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);          }          redisOpArrayFree(&server.also_propagate);      }      server.stat_numcommands++;  }

在函數(shù)call中通過執(zhí)行c->cmd->proc(c);調(diào)用具體的命令函數(shù).事務(wù)提交命令EXEC對應(yīng)的執(zhí)行函數(shù)為execCommand, 其實現(xiàn)為(multi.c):

void execCommand(redisClient *c) {      int j;      robj **orig_argv;      int orig_argc;      struct redisCommand *orig_cmd;      int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */      if (!(c->flags & REDIS_MULTI)) {          addReplyError(c,"EXEC without MULTI");          return;      }      /* Check if we need to abort the EXEC because:       * 1) Some WATCHed key was touched.       * 2) There was a previous error while queueing commands.       * A failed EXEC in the first case returns a multi bulk nil object       * (technically it is not an error but a special behavior), while       * in the second an EXECABORT error is returned. */      if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {          addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :                                                    shared.nullmultibulk);          discardTransaction(c);          goto handle_monitor;      }      /* Exec all the queued commands */      unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */      orig_argv = c->argv;      orig_argc = c->argc;      orig_cmd = c->cmd;      addReplyMultiBulkLen(c,c->mstate.count);      for (j = 0; j < c->mstate.count; j++) {          c->argc = c->mstate.commands[j].argc;          c->argv = c->mstate.commands[j].argv;          c->ccmd = c->mstate.commands[j].cmd;          /* Propagate a MULTI request once we encounter the first write op.           * This way we'll deliver the MULTI/..../EXEC block as a whole and           * both the AOF and the replication link will have the same consistency           * and atomicity guarantees. */          if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {              execCommandPropagateMulti(c);              must_propagate = 1;          }          call(c,REDIS_CALL_FULL);          /* Commands may alter argc/argv, restore mstate. */          c->mstate.commands[j].argc = c->argc;          c->mstate.commands[j].argv = c->argv;          c->mstate.commands[j].cmd = c->cmd;      }      c->argv = orig_argv;      c->argc = orig_argc;      c->cmd = orig_cmd;      discardTransaction(c);      /* Make sure the EXEC command will be propagated as well if MULTI       * was already propagated. */      if (must_propagate) server.dirty++; handle_monitor:      /* Send EXEC to clients waiting data from MONITOR. We do it here       * since the natural order of commands execution is actually:       * MUTLI, EXEC, ... commands inside transaction ...       * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command       * table, and we do it here with correct ordering. */      if (listLength(server.monitors) && !server.loading)          replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);  }

LINE8:11檢查EXEC命令和MULTI命令是否配對使用, 單獨執(zhí)行EXEC命令是沒有意義的.

LINE19:24檢查客戶端對象是否具有REDIS_DIRTY_CAS或者REDIS_DIRTY_EXEC標(biāo)志, 如果存在,則調(diào)用函數(shù)discardTransaction丟棄命令序列, 向客戶端返回失敗響應(yīng).

如果沒有檢查到任何錯誤,則先執(zhí)行unwatchAllKeys(c);取消該客戶端上所有的樂觀鎖key.

LINE32:52依次執(zhí)行緩存的命令序列,這里有兩點需要注意的是:

事務(wù)可能需要同步到AOF緩存或者replica備份節(jié)點中.如果事務(wù)中的命令序列都是讀操作, 則沒有必要向AOF和replica進行同步.如果事務(wù)的命令序列中包含寫命令,則MULTI, EXEC和相關(guān)的寫命令會向AOF和replica進行同步.根據(jù)LINE41:44的條件判斷,執(zhí)行execCommandPropagateMulti(c);保證MULTI命令同步, LINE59檢查EXEC命令是否需要同步, 即MULTI命令和EXEC命令必須保證配對同步.EXEC命令的同步執(zhí)行在函數(shù)的call中LINE62propagate(c->cmd,c->db->id,c->argv,c->argc,flags);, 具體的寫入命令由各自的執(zhí)行函數(shù)負(fù)責(zé)同步.

這里執(zhí)行命令序列時, 通過執(zhí)行call(c,REDIS_CALL_FULL);所以call函數(shù)是遞歸調(diào)用.

所以,綜上所述, Redis事務(wù)其本質(zhì)就是,以不可中斷的方式依次執(zhí)行緩存的命令序列,將結(jié)果保存到內(nèi)存cache中.

事務(wù)提交時, 丟棄命令序列會調(diào)用函數(shù)discardTransaction, 其實現(xiàn)為(multi.c):

void discardTransaction(redisClient *c) {      freeClientMultiState(c);      initClientMultiState(c);      c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);      unwatchAllKeys(c);  }

該函數(shù)調(diào)用freeClientMultiState釋放multiCmd對象內(nèi)存.調(diào)用initClientMultiState復(fù)位客戶端對象的緩存命令管理結(jié)構(gòu).調(diào)用unwatchAllKeys取消該客戶端的樂觀鎖.

WATCH命令執(zhí)行樂觀鎖, 其對應(yīng)的執(zhí)行函數(shù)為watchCommand, 其實現(xiàn)為(multi.c):

void watchCommand(redisClient *c) {      int j;      if (c->flags & REDIS_MULTI) {          addReplyError(c,"WATCH inside MULTI is not allowed");          return;      }      for (j = 1; j < c->argc; j++)          watchForKey(c,c->argv[j]);      addReply(c,shared.ok);  }

進而調(diào)用函數(shù)watchForKey, 其實現(xiàn)為(multi.c):

/* Watch for the specified key */  void watchForKey(redisClient *c, robj *key) {      list *clients = NULL;      listIter li;      listNode *ln;      watchedKey *wk;      /* Check if we are already watching for this key */      listRewind(c->watched_keys,&li);      while((ln = listNext(&li))) {          wk = listNodeValue(ln);          if (wk->db == c->db && equalStringObjects(key,wk->key))              return; /* Key already watched */      }      /* This key is not already watched in this DB. Let's add it */      clients = dictFetchValue(c->db->watched_keys,key);      if (!clients) {          clients = listCreate();          dictAdd(c->db->watched_keys,key,clients);          incrRefCount(key);      }      listAddNodeTail(clients,c);      /* Add the new key to the list of keys watched by this client */      wk = zmalloc(sizeof(*wk));      wk->keykey = key;      wk->db = c->db;      incrRefCount(key);      listAddNodeTail(c->watched_keys,wk);  }

關(guān)于樂觀鎖的key, 既保存于其客戶端對象的watched_keys鏈表中, 也保存于全局?jǐn)?shù)據(jù)庫對象的watched_keys哈希表中.

LINE10:14檢查客戶端對象的鏈表中是否已經(jīng)存在該key, 如果已經(jīng)存在, 則直接返回.LINE16在全局?jǐn)?shù)據(jù)庫中返回該key對應(yīng)的客戶端對象鏈表, 如果鏈表不存在, 說明其他客戶端沒有使用該key作為樂觀鎖, 如果鏈表存在, 說明其他客戶端已經(jīng)使用該key作為樂觀鎖. LINE22將當(dāng)前客戶端對象記錄于該key對應(yīng)的鏈表中. LINE28將該key記錄于當(dāng)前客戶端的key鏈表中.

當(dāng)前客戶端執(zhí)行樂觀鎖以后, 其他客戶端的寫入命令可能修改該key值.所有具有寫操作屬性的命令都會執(zhí)行函數(shù)signalModifiedKey, 其實現(xiàn)為(db.c):

void signalModifiedKey(redisDb *db, robj *key) {      touchWatchedKey(db,key);  }

函數(shù)touchWatchedKey的實現(xiàn)為(multi.c):

/* "Touch" a key, so that if this key is being WATCHed by some client the   * next EXEC will fail. */  void touchWatchedKey(redisDb *db, robj *key) {      list *clients;      listIter li;      listNode *ln;      if (dictSize(db->watched_keys) == 0) return;      clients = dictFetchValue(db->watched_keys, key);      if (!clients) return;     /* Mark all the clients watching this key as REDIS_DIRTY_CAS */      /* Check if we are already watching for this key */      listRewind(clients,&li);      while((ln = listNext(&li))) {          redisClient *c = listNodeValue(ln);          c->flags |= REDIS_DIRTY_CAS;      }  }

語句if (dictSize(db->watched_keys) == 0) return;檢查全局?jǐn)?shù)據(jù)庫中的哈希表watched_keys是否為空, 如果為空,說明沒有任何客戶端執(zhí)行WATCH命令, 直接返回.如果該哈希表不為空, 取回該key對應(yīng)的客戶端鏈表結(jié)構(gòu),并把該鏈表中的每個客戶端對象設(shè)置REDIS_DIRTY_CAS標(biāo)志. 前面在EXEC的執(zhí)行命令中,進行過條件判斷, 如果客戶端對象具有這個標(biāo)志, 則丟棄事務(wù)中的命令序列.

在執(zhí)行EXEC, DISCARD, UNWATCH命令以及在客戶端結(jié)束連接的時候,都會取消樂觀鎖, 最終都會執(zhí)行函數(shù)unwatchAllKeys, 其實現(xiàn)為(multi.c):

/* Unwatch all the keys watched by this client. To clean the EXEC dirty   * flag is up to the caller. */  void unwatchAllKeys(redisClient *c) {      listIter li;      listNode *ln;      if (listLength(c->watched_keys) == 0) return;      listRewind(c->watched_keys,&li);      while((ln = listNext(&li))) {          list *clients;          watchedKey *wk;          /* Lookup the watched key -> clients list and remove the client           * from the list */          wk = listNodeValue(ln);          clients = dictFetchValue(wk->db->watched_keys, wk->key);          redisAssertWithInfo(c,NULL,clients != NULL);          listDelNode(clients,listSearchKey(clients,c));          /* Kill the entry at all if this was the only client */          if (listLength(clients) == 0)              dictDelete(wk->db->watched_keys, wk->key);          /* Remove this watched key from the client->watched list */          listDelNode(c->watched_keys,ln);          decrRefCount(wk->key);          zfree(wk);      }  }

語句if (listLength(c->watched_keys) == 0) return;判斷如果當(dāng)前客戶端對象的watched_keys鏈表為空,說明當(dāng)前客戶端沒有執(zhí)行WATCH命令,直接返回.如果該鏈表非空, 則依次遍歷該鏈表中的key, 并從該鏈表中刪除key, 同時,獲得全局?jǐn)?shù)據(jù)庫中的哈希表watched_keys中該key對應(yīng)的客戶端鏈表, 刪除當(dāng)前客戶端對象.

到此,相信大家對“如何深入理解Redis事務(wù)”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!


當(dāng)前標(biāo)題:如何深入理解Redis事務(wù)
URL地址:http://weahome.cn/article/iijpsi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部