這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)碛嘘P(guān)如何進(jìn)行redis5新特性中Streams作消息隊(duì)列的分析,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
十載的湖州網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。全網(wǎng)整合營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整湖州建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)公司從事“湖州網(wǎng)站設(shè)計(jì)”,“湖州網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。
前言
Redis 5 新特性中,Streams 數(shù)據(jù)結(jié)構(gòu)的引入,可以說它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作為消息隊(duì)列使用時(shí),得到更完善,更強(qiáng)大的原生支持,其中尤為明顯的是持久化消息隊(duì)列。同時(shí),stream 借鑒了 kafka 的消費(fèi)組模型概念和設(shè)計(jì),使消費(fèi)消息處理上更加高效快速。 數(shù)據(jù)結(jié)構(gòu)中常用 API 進(jìn)行分析。
準(zhǔn)備
本文所使用 Redis 版本為 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,與本文中描述略有不同。
添加消息
Streams 添加數(shù)據(jù)使用 XADD 指令進(jìn)行添加,消息中的數(shù)據(jù)以 K-V 鍵值對(duì)的形式進(jìn)行操作。一條消息可以存在多個(gè)鍵值對(duì),添加命令格式:
XADD key ID field string [field string ...]
其中 key 為 Streams 的名稱,ID 為消息的唯一標(biāo)志,不可重復(fù),field string 就為鍵值對(duì)。下面我們就添加以 person 為名稱的流,進(jìn)行操作。
XADD person * name ytao des https://ytao.top
上面添加案例中,ID 使用 * 號(hào)復(fù)制,這里代表著服務(wù)端自動(dòng)生成 Id,添加后返回?cái)?shù)據(jù) "1578238486193-0"
這里自動(dòng)生成的 Id 格式為
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
millisecondsTime 為當(dāng)前服務(wù)器時(shí)間毫秒時(shí)間戳。
sequenceNumber 當(dāng)前序列號(hào),取值來源于當(dāng)前毫秒內(nèi),生成消息的順序,默認(rèn)從 0 開始加 1 遞增。
比如:1578238486193-3 表示在 1578238486193 毫秒的時(shí)間戳?xí)r,添加的第 4 條消息。
除了服務(wù)端自動(dòng)生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下條件限制:
鴻蒙官方戰(zhàn)略合作共建——HarmonyOS技術(shù)社區(qū)
Id 中的前后部分必須為數(shù)字。
最小 Id 為 0-1,不能為 0-0,但是 2-0,3-0 .... 是被允許的。
添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。
否則,當(dāng)不滿足上述條件時(shí),添加后會(huì)拋出異常:
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
實(shí)際上,當(dāng)添加一條消息時(shí),會(huì)進(jìn)行兩部操作。第一步,先判斷如果不存在 Streams,則創(chuàng)建 Streams 的名稱,再添加消息到 Streams 中。即使添加消息時(shí),由于 Id 異常,也可以在 Redis 中存在以當(dāng)前 Streams 的名稱。 Streams 中 Id 也可作為指針使用,因?yàn)樗且粋€(gè)有序的標(biāo)記。
生產(chǎn)中,如果這樣使用添加消息,會(huì)存在一個(gè)問題,那就是消息數(shù)量太大時(shí),會(huì)使服務(wù)宕機(jī)。這里 Streams 的設(shè)計(jì)初期也有考慮到這個(gè)問題,那就是可以指定 Streams 的容量。如果容量操作這個(gè)設(shè)定的值,就會(huì)對(duì)調(diào)舊的消息。在添加消息時(shí),設(shè)置 MAXLEN 參數(shù)。
XADD person MAXLEN 5 * name ytao des https://ytao.top
這樣就指定該了 Streams 中的容量為 5 條消息。也可使用 XTRIM 截取消息,從小到大剔除多余的消息:
XTRIM person MAXLEN 8
消息數(shù)量
查看消息數(shù)量使用 XLEN 指令進(jìn)行操作。
XLEN key
例:查看 person 流中的消息數(shù)量:
> XLEN person (integer) 5
查詢消息
查詢 Streams 中的消息使用 XRANGE 和 XREVRANGE 指令。
XRANGE
查詢數(shù)據(jù)時(shí),可以按照指定 Id 范圍進(jìn)行查詢,XRANGE 查詢指令格式:
XRANGE key start end [COUNT count]
參數(shù)說明:
key 為 Streams 的名稱
start 為范圍查詢開始 Id,包含本 Id。
start 為范圍查詢結(jié)束 Id,包含本 Id。
Count 為查詢返回最大的消息數(shù)量,非必填。
這里 start 和 end 有-和+兩個(gè)非指定值,他們分別表示無(wú)窮小和無(wú)窮大,所以當(dāng)使用這個(gè)兩個(gè)值時(shí),會(huì)查詢出全部的消息。
> XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!"
上面查詢的消息數(shù)據(jù),可以看到是按照先進(jìn)先出的順序查詢出來的。
使用 COUNT 指定查詢返回的數(shù)量:
# 查詢所有的消息,并且返回一條數(shù)據(jù) > XRANGE person - + COUNT 1 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
在范圍查詢中,Id 的后半部分可省略,后半部分中的數(shù)據(jù)會(huì)全部查詢到。
XREVRANGE
XREVRANGE 的查詢和 XRANGE 指令中的使用類似,但查詢的 start 和 end 參數(shù)順序進(jìn)行了調(diào)換:
XREVRANGE key end start [COUNT count]
使用案例:
> XREVRANGE person + - 1) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top"
查詢后的結(jié)果與 XRANGE 的結(jié)果順序剛好相反,其他都一樣,這兩個(gè)指令可進(jìn)行消息的升序和降序的返回。
刪除消息
刪除消息使用 XDEL 指令操作,只需指定將要?jiǎng)h除的 Streams 名稱和 Id 即可,支持一次刪除多個(gè)消息 。
XDEL key ID [ID ...]
刪除案例:
# 查詢所有消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" 3) 1) "2-0" 2) 1) "name" 2) "gaga" 3) "des" 4) "fishion!" # 刪除消息 > XDEL person 2-0 (integer) 1 # 再次查詢刪除后的所有消息 > XRANGE person - + 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!" # 查詢刪除后的長(zhǎng)度 > XLEN person (integer) 2
從上面可以看到,刪除消息后,長(zhǎng)度也會(huì)減少相應(yīng)的數(shù)量。
消費(fèi)消息
在 Redis 的 PUB/SUB 中,我們是通過訂閱來消費(fèi)消息,在 Streams 數(shù)據(jù)結(jié)構(gòu)中,同樣也能實(shí)現(xiàn)同等功能,當(dāng)沒有新的消息時(shí),可進(jìn)行阻塞等待。不僅支持單獨(dú)消費(fèi),而且還可以支持群組消費(fèi)。
單獨(dú)消費(fèi)
單獨(dú)消費(fèi)使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 為必填項(xiàng)。ID 表示將要讀取大于該 ID 的消息。當(dāng) ID 值使用 $ 賦予時(shí),表示已存在消息的最大 Id 值。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
上面的 COUNT 參數(shù)用來指定讀取的最大數(shù)量,與 XRANGE 的用法一樣。
> XREAD COUNT 1 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" > XREAD COUNT 2 STREAMS person 0 1) 1) "person" 2) 1) 1) "0-1" 2) 1) "name" 2) "ytao" 3) "des" 4) "https://ytao.top" 2) 1) "0-2" 2) 1) "name" 2) "luffy" 3) "des" 4) "valiant!"
在 XREAD 里面還有個(gè) BLOCK 參數(shù),這個(gè)是用來阻塞訂閱消息的,BLOCK 攜帶的參數(shù)為阻塞時(shí)間,單位為毫秒,如果在這個(gè)時(shí)間內(nèi)沒有新的消息消費(fèi),那么就會(huì)釋放該阻塞。當(dāng)這里的時(shí)間指定為 0 時(shí),會(huì)一直阻塞,直到有新的消息來消費(fèi)到。
# 窗口 1 開啟阻塞,等待新消息的到來 > XREAD BLOCK 0 STREAMS person $ # 另開一個(gè)連接窗口 2,添加一條新的消息 > XADD person 2-2 name tao des coder "2-2" # 窗口 1,獲取到有新的消息來消費(fèi),并且?guī)в凶枞臅r(shí)間 > XREAD BLOCK 0 STREAMS person $ 1) 1) "person" 2) 1) 1) "2-2" 2) 1) "name" 2) "tao" 3) "des" 4) "coder" (60.81s)
當(dāng)使用 XREAD 進(jìn)行順序消費(fèi)時(shí),需要額外記錄下讀取到位置的 Id,方便下次繼續(xù)消費(fèi)。
群組消費(fèi)
群組消費(fèi)的主要目的也就是為了分流消息給不同的客戶端處理,以更高效的速率處理消息。為達(dá)到這一肝功能需求,我們需要做三件事:創(chuàng)建群組,群組讀取消息,向服務(wù)端確認(rèn)消息以處理。
群組操作
操作群組使用 XGROUP 指令:
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
上面命令中,包含操作有:
CREATE 創(chuàng)建消費(fèi)組。
SETID 修改下一個(gè)處理消息的 Id。
DESTROY 銷毀消費(fèi)組。
DELCONSUMER 刪除消費(fèi)組中指定的消費(fèi)者。
我們當(dāng)前需要使用的是創(chuàng)建消費(fèi)組:
# 以當(dāng)前存在的最大 Id 作為消費(fèi)起始 > XGROUP CREATE person group1 $ OK
群組讀取消息
群組讀取使用 XREADGROUP 指令,COUNT和BLOCK的使用類似 XREAD 的操作,只是多了個(gè)群組和消費(fèi)者的指定:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
由于群組消費(fèi)和單獨(dú)消費(fèi)類似,這里只進(jìn)行個(gè)阻塞分析,這里 Id 也有個(gè)特殊值>,表示還未進(jìn)行消費(fèi)的消息:
# 窗口 1,消費(fèi)群組中,taotao 消費(fèi)者建立阻塞監(jiān)聽 XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > # 窗口 2,消費(fèi)群組中,yangyang 消費(fèi)者建立阻塞監(jiān)聽 XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > # 窗口 3,添加消費(fèi)消息 > XADD person 3-1 name tony des 666 "3-1" # 窗口 1,讀取到新消息,此時(shí) 窗口 2 沒有任何反應(yīng) > XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-1" 2) 1) "name" 2) "tony" 3) "des" 4) "666" (77.54s) # 窗口 3,再次添加消費(fèi)消息 > XADD person 3-2 name james des abc! "3-2" # 窗口 2,讀取到新消息,此時(shí) 窗口 1 沒有任何反應(yīng) > XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person > 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!" (76.36s)
以上執(zhí)行流程中,group1 群組中有兩個(gè)消費(fèi)者,當(dāng)添加兩條消息后,這兩個(gè)消費(fèi)者輪流消費(fèi)。
消息ACK
消息消費(fèi)后,為避免再次重復(fù)消費(fèi),這是需要向服務(wù)端發(fā)送 ACK,確保消息被消費(fèi)后的標(biāo)記。 例如下列情況,我們上面我們將最新兩條消息已進(jìn)行了消費(fèi),但是當(dāng)我們?cè)俅巫x取消息時(shí),還是被讀到:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) 1) 1) "3-2" 2) 1) "name" 2) "james" 3) "des" 4) "abc!"
這時(shí),我們使用 XACK 指令告訴服務(wù)器,我們已處理的消息:
XACK key group ID [ID ...]0
讓服務(wù)器標(biāo)記 3-2 已處理:
> XACK person group1 3-2 (integer) 1
再次獲取群組讀取消息:
> XREADGROUP GROUP group1 yangyang STREAMS person 0 1) 1) "person" 2) (empty list or set)
隊(duì)列中沒有了可讀消息。 除了上面以講解到的 API 外,查看消費(fèi)群組信息可使用 XINFO 指令查看。
上面對(duì) Streams 常用 API 進(jìn)行了分析,我們可以感受到 Redis 在消息隊(duì)列支持的道路上,也越來越強(qiáng)大。如果使用過它的 PUB/SUB 功能的話,就會(huì)感受到 5.x 迭代正是將你的一些痛點(diǎn)進(jìn)行了優(yōu)化。
上述就是小編為大家分享的如何進(jìn)行Redis5新特性中Streams作消息隊(duì)列的分析了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。