這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)Golang中怎么利用redis實(shí)現(xiàn)TCC分布式事務(wù),文章內(nèi)容豐富且以專(zhuān)業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)服務(wù)項(xiàng)目包括德欽網(wǎng)站建設(shè)、德欽網(wǎng)站制作、德欽網(wǎng)頁(yè)制作以及德欽網(wǎng)絡(luò)營(yíng)銷(xiāo)策劃等。多年來(lái),我們專(zhuān)注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,德欽網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到德欽省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!
對(duì)于使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。
MSET 命令在集群模式下的問(wèn)題#
于是問(wèn)題來(lái)了 DEL、MSET 等命令所涉及的 key 可能分布在不同的節(jié)點(diǎn)中,在集群模式下實(shí)現(xiàn)這類(lèi)涉及多個(gè) key 的命令最簡(jiǎn)單的方式當(dāng)然是 For-Each 遍歷 key 并向它們所在的節(jié)點(diǎn)發(fā)送相應(yīng)的操作指令。 以 MGET 命令的實(shí)現(xiàn)為例:
func MGet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'mget' command")
}
// 從參數(shù)列表中取出要讀取的 key
keys := make([]string, len(args)-1)
for i := 1; i < len(args); i++ {
keys[i-1] = string(args[i])
}
resultMap := make(map[string][]byte)
// 計(jì)算每個(gè) key 所在的節(jié)點(diǎn),并按照節(jié)點(diǎn)分組
groupMap := cluster.groupBy(keys)
// groupMap 的類(lèi)型為 map[string][]string,key 是節(jié)點(diǎn)的地址,value 是 keys 中屬于該節(jié)點(diǎn)的 key 列表
for peer, group := range groupMap {
// 向每個(gè)節(jié)點(diǎn)發(fā)送 mget 指令,讀取分布在它上面的 key
resp := cluster.Relay(peer, c, makeArgs("MGET", group...))
if reply.IsErrorReply(resp) {
errReply := resp.(reply.ErrorReply)
return reply.MakeErrReply(fmt.Sprintf("ERR during get %s occurs: %v", group[0], errReply.Error()))
}
arrReply, _ := resp.(*reply.MultiBulkReply)
// 將每個(gè)節(jié)點(diǎn)上的結(jié)果 merge 到 map 中
for i, v := range arrReply.Args {
key := group[i]
resultMap[key] = v
}
}
result := make([][]byte, len(keys))
for i, k := range keys {
result[i] = resultMap[k]
}
return reply.MakeMultiBulkReply(result)
}
// 計(jì)算 key 所屬的節(jié)點(diǎn),并按節(jié)點(diǎn)分組
func (cluster *Cluster) groupBy(keys []string) map[string][]string {
result := make(map[string][]string)
for _, key := range keys {
// 使用一致性 hash 計(jì)算所屬節(jié)點(diǎn)
peer := cluster.peerPicker.Get(key)
// 將 key 加入到相應(yīng)節(jié)點(diǎn)的分組中
group, ok := result[peer]
if !ok {
group = make([]string, 0)
}
group = append(group, key)
result[peer] = group
}
return result
}
那么 MSET 命令的實(shí)現(xiàn)能否如法炮制呢?答案是否定的。在上面的代碼中我們注意到,在向各個(gè)節(jié)點(diǎn)發(fā)送指令時(shí)若某個(gè)節(jié)點(diǎn)讀取失敗則會(huì)直接退出整個(gè) MGET 執(zhí)行過(guò)程。
若在執(zhí)行 MSET 指令時(shí)遇到部分節(jié)點(diǎn)失敗或超時(shí),則會(huì)出現(xiàn)部分 key 設(shè)置成功而另一份設(shè)置失敗的情況。對(duì)于緩存使用者而言這種部分成功部分失敗的情況非常難以處理,所以我們需要保證 MSET 操作要么全部成功要么全部失敗。
兩階段提交#
兩階段提交(2-Phase Commit, 2PC)算法是解決我們遇到的一致性問(wèn)題最簡(jiǎn)單的算法。在 2PC 算法中寫(xiě)操作被分為兩個(gè)階段來(lái)執(zhí)行:
Prepare 階段
協(xié)調(diào)者向所有參與者發(fā)送事務(wù)內(nèi)容,詢(xún)問(wèn)是否可以執(zhí)行事務(wù)操作。在 Godis 中收到客戶端 MSET 命令的節(jié)點(diǎn)是事務(wù)的協(xié)調(diào)者,所有持有相關(guān) key 的節(jié)點(diǎn)都要參與事務(wù)。
各參與者鎖定事務(wù)相關(guān) key 防止被其它操作修改。各參與者寫(xiě) undo log 準(zhǔn)備在事務(wù)失敗后進(jìn)行回滾。
參與者回復(fù)協(xié)調(diào)者可以提交。若協(xié)調(diào)者收到所有參與者的YES回復(fù),則準(zhǔn)備進(jìn)行事務(wù)提交。若有參與者回復(fù)NO或者超時(shí),則準(zhǔn)備回滾事務(wù)
Commit 階段
協(xié)調(diào)者向所有參與者發(fā)送提交請(qǐng)求
參與者正式提交事務(wù),并在完成后釋放相關(guān) key 的鎖。
參與者協(xié)調(diào)者回復(fù)ACK,協(xié)調(diào)者收到所有參與者的ACK后認(rèn)為事務(wù)提交成功。
Rollback 階段
在事務(wù)請(qǐng)求階段若有參與者回復(fù)NO或者超時(shí),協(xié)調(diào)者向所有參與者發(fā)出回滾請(qǐng)求
各參與者執(zhí)行事務(wù)回滾,并在完成后釋放相關(guān)資源。
參與者協(xié)調(diào)者回復(fù)ACK,協(xié)調(diào)者收到所有參與者的ACK后認(rèn)為事務(wù)回滾成功。
2PC是一種簡(jiǎn)單的一致性協(xié)議,它存在一些問(wèn)題:
單點(diǎn)服務(wù): 若協(xié)調(diào)者突然崩潰則事務(wù)流程無(wú)法繼續(xù)進(jìn)行或者造成狀態(tài)不一致
無(wú)法保證一致性: 若協(xié)調(diào)者第二階段發(fā)送提交請(qǐng)求時(shí)崩潰,可能部分參與者受到COMMIT請(qǐng)求提交了事務(wù),而另一部分參與者未受到請(qǐng)求而放棄事務(wù)造成不一致現(xiàn)象。
阻塞: 為了保證事務(wù)完成提交,各參與者在完成第一階段事務(wù)執(zhí)行后必須鎖定相關(guān)資源直到正式提交,影響系統(tǒng)的吞吐量。
首先我們定義事務(wù)的描述結(jié)構(gòu):
type Transaction struct {
id string // 事務(wù) ID, 由 snowflake 算法生成
args [][]byte // 命令參數(shù)
cluster *Cluster
conn redis.Connection
keys []string // 事務(wù)中涉及的 key
undoLog map[string][]byte // 每個(gè) key 在事務(wù)執(zhí)行前的值,用于回滾事務(wù)
}
Prepare 階段#
先看事務(wù)參與者 prepare 階段的操作:
// prepare 命令的格式是: PrepareMSet TxID key1, key2 ...
// TxID 是事務(wù) ID,由協(xié)調(diào)者決定
func PrepareMSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) < 3 {
return reply.MakeErrReply("ERR wrong number of arguments for 'preparemset' command")
}
txId := string(args[1])
size := (len(args) - 2) / 2
keys := make([]string, size)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i+2])
}
txArgs := [][]byte{
[]byte("MSet"),
} // actual args for cluster.db
txArgs = append(txArgs, args[2:]...)
tx := NewTransaction(cluster, c, txId, txArgs, keys) // 創(chuàng)建新事務(wù)
cluster.transactions.Put(txId, tx) // 存儲(chǔ)到節(jié)點(diǎn)的事務(wù)列表中
err := tx.prepare() // 準(zhǔn)備事務(wù)
if err != nil {
return reply.MakeErrReply(err.Error())
}
return &reply.OkReply{}
}
實(shí)際的準(zhǔn)備操作在 tx.prepare() 中:
func (tx *Transaction) prepare() error {
// 鎖定相關(guān) key
tx.cluster.db.Locks(tx.keys...)
// 準(zhǔn)備 undo log
tx.undoLog = make(map[string][]byte)
for _, key := range tx.keys {
entity, ok := tx.cluster.db.Get(key)
if ok {
blob, err := gob.Marshal(entity) // 將修改之前的狀態(tài)序列化之后存儲(chǔ)作為 undo log
if err != nil {
return err
}
tx.undoLog[key] = blob
} else {
// 若事務(wù)執(zhí)行前 key 是空的,在回滾時(shí)應(yīng)刪除它
tx.undoLog[key] = []byte{}
}
}
tx.status = PreparedStatus
return nil
}
看看協(xié)調(diào)者在做什么:
func MSet(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
// 解析參數(shù)
argCount := len(args) - 1
if argCount%2 != 0 || argCount < 1 {
return reply.MakeErrReply("ERR wrong number of arguments for 'mset' command")
}
size := argCount / 2
keys := make([]string, size)
valueMap := make(map[string]string)
for i := 0; i < size; i++ {
keys[i] = string(args[2*i+1])
valueMap[keys[i]] = string(args[2*i+2])
}
// 找到所屬的節(jié)點(diǎn)
groupMap := cluster.groupBy(keys)
if len(groupMap) == 1 { // do fast
// 若所有的 key 都在同一個(gè)節(jié)點(diǎn)直接執(zhí)行,不使用較慢的 2pc 算法
for peer := range groupMap {
return cluster.Relay(peer, c, args)
}
}
// 開(kāi)始準(zhǔn)備階段
var errReply redis.Reply
txId := cluster.idGenerator.NextId() // 使用 snowflake 算法決定事務(wù) ID
txIdStr := strconv.FormatInt(txId, 10)
rollback := false
// 向所有參與者發(fā)送 prepare 請(qǐng)求
for peer, group := range groupMap {
peerArgs := []string{txIdStr}
for _, k := range group {
peerArgs = append(peerArgs, k, valueMap[k])
}
var resp redis.Reply
if peer == cluster.self {
resp = PrepareMSet(cluster, c, makeArgs("PrepareMSet", peerArgs...))
} else {
resp = cluster.Relay(peer, c, makeArgs("PrepareMSet", peerArgs...))
}
if reply.IsErrorReply(resp) {
errReply = resp
rollback = true
break
}
}
if rollback {
// 若 prepare 過(guò)程出錯(cuò)則執(zhí)行回滾
RequestRollback(cluster, c, txId, groupMap)
} else {
_, errReply = RequestCommit(cluster, c, txId, groupMap)
rollback = errReply != nil
}
if !rollback {
return &reply.OkReply{}
}
return errReply
}
Commit 階段#
事務(wù)參與者提交本地事務(wù):
func Commit(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'commit' command")
}
// 讀取事務(wù)信息
txId := string(args[1])
raw, ok := cluster.transactions.Get(txId)
if !ok {
return reply.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
// 在提交成功后解鎖 key
defer func() {
cluster.db.UnLocks(tx.keys...)
tx.status = CommitedStatus
//cluster.transactions.Remove(tx.id) // cannot remove, may rollback after commit
}()
cmd := strings.ToLower(string(tx.args[0]))
var result redis.Reply
if cmd == "del" {
result = CommitDel(cluster, c, tx)
} else if cmd == "mset" {
result = CommitMSet(cluster, c, tx)
}
// 提交失敗
if reply.IsErrorReply(result) { 陽(yáng)痿早泄前列腺炎醫(yī)院哪家好http://www.zztjxb.com/
err2 := tx.rollback()
return reply.MakeErrReply(fmt.Sprintf("err occurs when rollback: %v, origin err: %s", err2, result))
}
return result
}
// 執(zhí)行操作
func CommitMSet(cluster *Cluster, c redis.Connection, tx *Transaction) redis.Reply {
size := len(tx.args) / 2
keys := make([]string, size)
values := make([][]byte, size)
for i := 0; i < size; i++ {
keys[i] = string(tx.args[2*i+1])
values[i] = tx.args[2*i+2]鄭州無(wú)痛人流醫(yī)院哪家好http://www.hnzzxb.com/
}
for i, key := range keys {
value := values[i]
cluster.db.Put(key, &db.DataEntity{Data: value})
}
cluster.db.AddAof(reply.MakeMultiBulkReply(tx.args))
return &reply.OkReply{}
}
協(xié)調(diào)者的邏輯也很簡(jiǎn)單:
func RequestCommit(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) ([]redis.Reply, reply.ErrorReply) {
var errReply reply.ErrorReply
txIdStr := strconv.FormatInt(txId, 10)
respList := make([]redis.Reply, 0, len(peers))
for peer := range peers {
var resp redis.Reply
if peer == cluster.self {
resp = Commit(cluster, c, makeArgs("commit", txIdStr))
} else {
resp = cluster.Relay(peer, c, makeArgs("commit", txIdStr))
}
if reply.IsErrorReply(resp) {
errReply = resp.(reply.ErrorReply)
break
}
respList = append(respList, resp)
}
if errReply != nil {
RequestRollback(cluster, c, txId, peers)
return nil, errReply
}
return respList, nil
}
Rollback#
回滾本地事務(wù):
func Rollback(cluster *Cluster, c redis.Connection, args [][]byte) redis.Reply {
if len(args) != 2 {
return reply.MakeErrReply("ERR wrong number of arguments for 'rollback' command")
}
txId := string(args[1])
raw, ok := cluster.transactions.Get(txId)
if !ok {
return reply.MakeIntReply(0)
}
tx, _ := raw.(*Transaction)
err := tx.rollback()
if err != nil {
return reply.MakeErrReply(err.Error())
}
return reply.MakeIntReply(1)
}
func (tx *Transaction) rollback() error {
for key, blob := range tx.undoLog {
if len(blob) > 0 {
entity := &db.DataEntity{}
err := gob.UnMarshal(blob, entity) // 反序列化事務(wù)前的快照
if err != nil {
return err
}
tx.cluster.db.Put(key, entity) // 寫(xiě)入事務(wù)前的數(shù)據(jù)
} else {
tx.cluster.db.Remove(key) // 若事務(wù)開(kāi)始之前 key 不存在則將其刪除
}
}
if tx.status != CommitedStatus {
tx.cluster.db.UnLocks(tx.keys...)
}
tx.status = RollbackedStatus
return nil
}
協(xié)調(diào)者的邏輯與 commit 類(lèi)似:
func RequestRollback(cluster *Cluster, c redis.Connection, txId int64, peers map[string][]string) {
txIdStr := strconv.FormatInt(txId, 10)
for peer := range peers {
if peer == cluster.self {
Rollback(cluster, c, makeArgs("rollback", txIdStr))
} else {
cluster.Relay(peer, c, makeArgs("rollback", txIdStr))
}
}
}
上述就是小編為大家分享的Golang中怎么利用Redis實(shí)現(xiàn)TCC分布式事務(wù)了,如果剛好有類(lèi)似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。