真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Golang中怎么利用Redis實(shí)現(xiàn)TCC分布式事務(wù)

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Golang中怎么利用redis實(shí)現(xiàn)TCC分布式事務(wù),文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括德欽網(wǎng)站建設(shè)、德欽網(wǎng)站制作、德欽網(wǎng)頁(yè)制作以及德欽網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃等。多年來(lái),我們專(zhuān)注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,德欽網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到德欽省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

對(duì)于使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

MSET 命令在集群模式下的問(wèn)題#

于是問(wèn)題來(lái)了 DEL、MSET 等命令所涉及的 key 可能分布在不同的節(jié)點(diǎn)中,在集群模式下實(shí)現(xiàn)這類(lèi)涉及多個(gè) key 的命令最簡(jiǎn)單的方式當(dāng)然是 For-Each 遍歷 key 并向它們所在的節(jié)點(diǎn)發(fā)送相應(yīng)的操作指令。 以 MGET 命令的實(shí)現(xiàn)為例:

func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) < 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")

}

// 從參數(shù)列表中取出要讀取的 key

keys := make([]string, len(args)-1)

for i := 1; i < len(args); i++ {

keys[i-1] = string(args[i])

}

resultMap := make(map[string][]byte)

// 計(jì)算每個(gè) key 所在的節(jié)點(diǎn),并按照節(jié)點(diǎn)分組

groupMap := cluster.groupBy(keys)

// groupMap 的類(lèi)型為 map[string][]string,key 是節(jié)點(diǎn)的地址,value 是 keys 中屬于該節(jié)點(diǎn)的 key 列表

for peer, group := range groupMap {

// 向每個(gè)節(jié)點(diǎn)發(fā)送 mget 指令,讀取分布在它上面的 key

resp := cluster.Relay(peer, c, makeArgs("MGET", group...))

if reply.IsErrorReply(resp) {

errReply := resp.(reply.ErrorReply)

return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))

}

arrReply, _ := resp.(*reply.MultiBulkReply)

// 將每個(gè)節(jié)點(diǎn)上的結(jié)果 merge 到 map 中

for i, v := range arrReply.Args {

key := group[i]

resultMap[key] = v

}

}

result := make([][]byte, len(keys))

for i, k := range keys {

result[i] = resultMap[k]

}

return reply.MakeMultiBulkReply(result)

}

// 計(jì)算 key 所屬的節(jié)點(diǎn),并按節(jié)點(diǎn)分組

func (cluster *Cluster) groupBy(keys []string) map[string][]string {

result := make(map[string][]string)

for _, key := range keys {

// 使用一致性 hash 計(jì)算所屬節(jié)點(diǎn)

peer := cluster.peerPicker.Get(key)

// 將 key 加入到相應(yīng)節(jié)點(diǎn)的分組中

group, ok := result[peer]

if !ok {

group = make([]string, 0)

}

group = append(group, key)

result[peer] = group

}

return result

}

那么 MSET 命令的實(shí)現(xiàn)能否如法炮制呢?答案是否定的。在上面的代碼中我們注意到,在向各個(gè)節(jié)點(diǎn)發(fā)送指令時(shí)若某個(gè)節(jié)點(diǎn)讀取失敗則會(huì)直接退出整個(gè) MGET 執(zhí)行過(guò)程。

若在執(zhí)行 MSET 指令時(shí)遇到部分節(jié)點(diǎn)失敗或超時(shí),則會(huì)出現(xiàn)部分 key 設(shè)置成功而另一份設(shè)置失敗的情況。對(duì)于緩存使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。

兩階段提交#

兩階段提交(2-Phase Commit, 2PC)算法是解決我們遇到的一致性問(wèn)題最簡(jiǎn)單的算法。在 2PC 算法中寫(xiě)操作被分為兩個(gè)階段來(lái)執(zhí)行:

Prepare 階段

協(xié)調(diào)者向所有參與者發(fā)送事務(wù)內(nèi)容,詢(xún)問(wèn)是否可以執(zhí)行事務(wù)操作。在 Godis 中收到客戶端 MSET 命令的節(jié)點(diǎn)是事務(wù)的協(xié)調(diào)者,所有持有相關(guān) key 的節(jié)點(diǎn)都要參與事務(wù)。

各參與者鎖定事務(wù)相關(guān) key 防止被其它操作修改。各參與者寫(xiě) undo log 準(zhǔn)備在事務(wù)失敗后進(jìn)行回滾。

參與者回復(fù)協(xié)調(diào)者可以提交。若協(xié)調(diào)者收到所有參與者的YES回復(fù),則準(zhǔn)備進(jìn)行事務(wù)提交。若有參與者回復(fù)NO或者超時(shí),則準(zhǔn)備回滾事務(wù)

Commit 階段

協(xié)調(diào)者向所有參與者發(fā)送提交請(qǐng)求

參與者正式提交事務(wù),并在完成后釋放相關(guān) key 的鎖。

參與者協(xié)調(diào)者回復(fù)ACK,協(xié)調(diào)者收到所有參與者的ACK后認(rèn)為事務(wù)提交成功。

Rollback 階段

在事務(wù)請(qǐng)求階段若有參與者回復(fù)NO或者超時(shí),協(xié)調(diào)者向所有參與者發(fā)出回滾請(qǐng)求

各參與者執(zhí)行事務(wù)回滾,并在完成后釋放相關(guān)資源。

參與者協(xié)調(diào)者回復(fù)ACK,協(xié)調(diào)者收到所有參與者的ACK后認(rèn)為事務(wù)回滾成功。

2PC是一種簡(jiǎn)單的一致性協(xié)議,它存在一些問(wèn)題:

單點(diǎn)服務(wù): 若協(xié)調(diào)者突然崩潰則事務(wù)流程無(wú)法繼續(xù)進(jìn)行或者造成狀態(tài)不一致

無(wú)法保證一致性: 若協(xié)調(diào)者第二階段發(fā)送提交請(qǐng)求時(shí)崩潰,可能部分參與者受到COMMIT請(qǐng)求提交了事務(wù),而另一部分參與者未受到請(qǐng)求而放棄事務(wù)造成不一致現(xiàn)象。

阻塞: 為了保證事務(wù)完成提交,各參與者在完成第一階段事務(wù)執(zhí)行后必須鎖定相關(guān)資源直到正式提交,影響系統(tǒng)的吞吐量。

首先我們定義事務(wù)的描述結(jié)構(gòu):

type Transaction struct {

id string // 事務(wù) ID, 由 snowflake 算法生成

args [][]byte // 命令參數(shù)

cluster *Cluster

conn redis.Connection

keys []string // 事務(wù)中涉及的 key

undoLog map[string][]byte // 每個(gè) key 在事務(wù)執(zhí)行前的值,用于回滾事務(wù)

}

Prepare 階段#

先看事務(wù)參與者 prepare 階段的操作:

// prepare 命令的格式是: PrepareMSet TxID key1, key2 ...

// TxID 是事務(wù) ID,由協(xié)調(diào)者決定

func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) < 3 {

return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")

}

txId := string(args[1])

size := (len(args) - 2) / 2

keys := make([]string, size)

for i := 0; i < size; i++ {

keys[i] = string(args[2*i+2])

}

txArgs := [][]byte{

[]byte("MSet"),

} // actual args for cluster.db

txArgs = append(txArgs, args[2:]...)

tx := NewTransaction(cluster, c, txId, txArgs, keys) // 創(chuàng)建新事務(wù)

cluster.transactions.Put(txId, tx) // 存儲(chǔ)到節(jié)點(diǎn)的事務(wù)列表中

err := tx.prepare() // 準(zhǔn)備事務(wù)

if err != nil {

return reply.MakeErrReply(err.Error())

}

return &reply.OkReply{}

}

實(shí)際的準(zhǔn)備操作在 tx.prepare() 中:

func (tx *Transaction) prepare() error {

// 鎖定相關(guān) key

tx.cluster.db.Locks(tx.keys...)

// 準(zhǔn)備 undo log

tx.undoLog = make(map[string][]byte)

for _, key := range tx.keys {

entity, ok := tx.cluster.db.Get(key)

if ok {

blob, err := gob.Marshal(entity) // 將修改之前的狀態(tài)序列化之后存儲(chǔ)作為 undo log

if err != nil {

return err

}

tx.undoLog[key] = blob

} else {

// 若事務(wù)執(zhí)行前 key 是空的,在回滾時(shí)應(yīng)刪除它

tx.undoLog[key] = []byte{}

}

}

tx.status = PreparedStatus

return nil

}

看看協(xié)調(diào)者在做什么:

func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

// 解析參數(shù)

argCount := len(args) - 1

if argCount%2 != 0 || argCount < 1 {

return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")

}

size := argCount / 2

keys := make([]string, size)

valueMap := make(map[string]string)

for i := 0; i < size; i++ {

keys[i] = string(args[2*i+1])

valueMap[keys[i]] = string(args[2*i+2])

}

// 找到所屬的節(jié)點(diǎn)

groupMap := cluster.groupBy(keys)

if len(groupMap) == 1 { // do fast

// 若所有的 key 都在同一個(gè)節(jié)點(diǎn)直接執(zhí)行,不使用較慢的 2pc 算法

for peer := range groupMap {

return cluster.Relay(peer, c, args)

}

}

// 開(kāi)始準(zhǔn)備階段

var errReply redis.Reply

txId := cluster.idGenerator.NextId() // 使用 snowflake 算法決定事務(wù) ID

txIdStr := strconv.FormatInt(txId, 10)

rollback := false

// 向所有參與者發(fā)送 prepare 請(qǐng)求

for peer, group := range groupMap {

peerArgs := []string{txIdStr}

for _, k := range group {

peerArgs = append(peerArgs, k, valueMap[k])

}

var resp redis.Reply

if peer == cluster.self {

resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))

} else {

resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))

}

if reply.IsErrorReply(resp) {

errReply = resp

rollback = true

break

}

}

if rollback {

// 若 prepare 過(guò)程出錯(cuò)則執(zhí)行回滾

RequestRollback(cluster, c, txId, groupMap)

} else {

_, errReply = RequestCommit(cluster, c, txId, groupMap)

rollback = errReply != nil

}

if !rollback {

return &reply.OkReply{}

}

return errReply

}

Commit 階段#

事務(wù)參與者提交本地事務(wù):

func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) != 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")

}

// 讀取事務(wù)信息

txId := string(args[1])

raw, ok := cluster.transactions.Get(txId)

if !ok {

return reply.MakeIntReply(0)

}

tx, _ := raw.(*Transaction)

// 在提交成功后解鎖 key

defer func() {

cluster.db.UnLocks(tx.keys...)

tx.status = CommitedStatus

//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit

}()

cmd := strings.ToLower(string(tx.args[0]))

var result redis.Reply

if cmd == "del" {

result = CommitDel(cluster, c, tx)

} else if cmd == "mset" {

result = CommitMSet(cluster, c, tx)

}

// 提交失敗

if reply.IsErrorReply(result) {  陽(yáng)痿早泄前列腺炎醫(yī)院哪家好http://www.zztjxb.com/

err2 := tx.rollback()

return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))

}

return result

}

// 執(zhí)行操作

func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {

size := len(tx.args) / 2

keys := make([]string, size)

values := make([][]byte, size)

for i := 0; i < size; i++ {

keys[i] = string(tx.args[2*i+1])

values[i] = tx.args[2*i+2]鄭州無(wú)痛人流醫(yī)院哪家好http://www.hnzzxb.com/

}

for i, key := range keys {

value := values[i]

cluster.db.Put(key, &db.DataEntity{Data: value})

}

cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))

return &reply.OkReply{}

}

協(xié)調(diào)者的邏輯也很簡(jiǎn)單:

func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {

var errReply reply.ErrorReply

txIdStr := strconv.FormatInt(txId, 10)

respList := make([]redis.Reply, 0, len(peers))

for peer := range peers {

var resp redis.Reply

if peer == cluster.self {

resp = Commit(cluster, c, makeArgs("commit", txIdStr))

} else {

resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))

}

if reply.IsErrorReply(resp) {

errReply = resp.(reply.ErrorReply)

break

}

respList = append(respList, resp)

}

if errReply != nil {

RequestRollback(cluster, c, txId, peers)

return nil, errReply

}

return respList, nil

}

Rollback#

回滾本地事務(wù):

func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {

if len(args) != 2 {

return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")

}

txId := string(args[1])

raw, ok := cluster.transactions.Get(txId)

if !ok {

return reply.MakeIntReply(0)

}

tx, _ := raw.(*Transaction)

err := tx.rollback()

if err != nil {

return reply.MakeErrReply(err.Error())

}

return reply.MakeIntReply(1)

}

func (tx *Transaction) rollback() error {

for key, blob := range tx.undoLog {

if len(blob) > 0 {

entity := &db.DataEntity{}

err := gob.UnMarshal(blob, entity) // 反序列化事務(wù)前的快照

if err != nil {

return err

}

tx.cluster.db.Put(key, entity) // 寫(xiě)入事務(wù)前的數(shù)據(jù)

} else {

tx.cluster.db.Remove(key) // 若事務(wù)開(kāi)始之前 key 不存在則將其刪除

}

}

if tx.status != CommitedStatus {

tx.cluster.db.UnLocks(tx.keys...)

}

tx.status = RollbackedStatus

return nil

}

協(xié)調(diào)者的邏輯與 commit 類(lèi)似:

func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {

txIdStr := strconv.FormatInt(txId, 10)

for peer := range peers {

if peer == cluster.self {

Rollback(cluster, c, makeArgs("rollback", txIdStr))

} else {

cluster.Relay(peer, c, makeArgs("rollback", txIdStr))

}

}

}

上述就是小編為大家分享的Golang中怎么利用Redis實(shí)現(xiàn)TCC分布式事務(wù)了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


分享名稱(chēng):Golang中怎么利用Redis實(shí)現(xiàn)TCC分布式事務(wù)
文章源于:http://weahome.cn/article/iesdio.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部