這篇文章主要介紹“最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”,在日常操作中,相信很多人在最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!
創(chuàng)新互聯(lián)是一家集網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)、網(wǎng)站頁(yè)面設(shè)計(jì)、網(wǎng)站優(yōu)化SEO優(yōu)化為一體的專業(yè)網(wǎng)絡(luò)公司,已為成都等多地近百家企業(yè)提供網(wǎng)站建設(shè)服務(wù)。追求良好的瀏覽體驗(yàn),以探求精品塑造與理念升華,設(shè)計(jì)最適合用戶的網(wǎng)站頁(yè)面。 合作只是第一步,服務(wù)才是根本,我們始終堅(jiān)持講誠(chéng)信,負(fù)責(zé)任的原則,為您進(jìn)行細(xì)心、貼心、認(rèn)真的服務(wù),與眾多客戶在蓬勃發(fā)展的市場(chǎng)環(huán)境中,互促共生。
結(jié)合其他 mq
的使用經(jīng)歷,基本的使用流程:
創(chuàng)建 producer
或 consumer
啟動(dòng) mq
生產(chǎn)消息/消費(fèi)消息
對(duì)應(yīng)到 queue
中,大致也是這個(gè):
// 生產(chǎn)者創(chuàng)建工廠 producer := newMockedProducer() // 消費(fèi)者創(chuàng)建工廠 consumer := newMockedConsumer() // 將生產(chǎn)者以及消費(fèi)者的創(chuàng)建工廠函數(shù)傳遞給 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
我們看看 NewQueue
需要什么構(gòu)建條件:
producer constructor
consumer constructor
將雙方的工廠函數(shù)傳遞給 queue
,由它去執(zhí)行以及重試。
這兩個(gè)需要的目的是將生產(chǎn)者/消費(fèi)者的構(gòu)建和消息生產(chǎn)/消費(fèi)都封裝在 mq
中,而且將生產(chǎn)者/消費(fèi)者的整套邏輯交給開(kāi)發(fā)者處理:
type ( // 開(kāi)發(fā)者需要實(shí)現(xiàn)此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定義了生成Producer的方法 ProducerFactory func() (Producer, error) )
其實(shí)也就是將生產(chǎn)者的邏輯交個(gè)開(kāi)發(fā)者自己完成,mq
只負(fù)責(zé)生產(chǎn)者/消費(fèi)者的消息傳遞和之間的調(diào)度。
工廠方法的設(shè)計(jì),是將生產(chǎn)者本身和生產(chǎn)消息,這兩個(gè)任務(wù)都交給 queue
自己來(lái)做調(diào)度或者重試。
生產(chǎn)消息當(dāng)然要回到生產(chǎn)者本身:
type mockedProducer struct { total int32 count int32 // 使用waitgroup來(lái)模擬任務(wù)的完成 wait sync.WaitGroup } // 實(shí)現(xiàn) Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生產(chǎn)者編寫(xiě)都必須實(shí)現(xiàn):
Produce()
:由開(kāi)發(fā)者編寫(xiě)生產(chǎn)消息的邏輯
AddListener()
:生產(chǎn)者
和生產(chǎn)者類似:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
啟動(dòng),然后驗(yàn)證我們上述的生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)是否傳輸成功:
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 創(chuàng)建 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 當(dāng)生產(chǎn)者生產(chǎn)完畢,執(zhí)行 Stop() 關(guān)閉生產(chǎn)端生產(chǎn) go func() { producer.wait.Wait() // mq生產(chǎn)端停止生產(chǎn),不是mq本身 Stop 運(yùn)行 q.Stop() }() // 啟動(dòng) q.Start() // 驗(yàn)證生產(chǎn)消費(fèi)端是否消息消費(fèi)完成 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) }
以上就是 queue
最簡(jiǎn)易的入門(mén)使用代碼。開(kāi)發(fā)者可以根據(jù)自己的業(yè)務(wù)實(shí)際情況:自由定義生產(chǎn)者/消費(fèi)者已經(jīng)生產(chǎn)/消費(fèi)邏輯。
![image-20210506224102836](/Users/dyhxl/Library/Application Support/typora-user-images/image-20210506224102836.png)
整體流程如上圖:
全體的通信都由 channel
進(jìn)行
通過(guò)加入監(jiān)聽(tīng)器 listener
,以及事件觸發(fā) event
,相當(dāng)于將觸發(fā)器邏輯分離出來(lái)
生產(chǎn)者有 produceone
,這個(gè)是生產(chǎn)消息的邏輯,但是其中的 Produce()
是由開(kāi)發(fā)者編寫(xiě)【上面的 interface
中正是這個(gè)函數(shù)】
同理消費(fèi)者,Consume()
基本的消息流動(dòng)就入上圖以及上述描寫(xiě)的,具體的代碼分析我們就留到下一篇,我們????分析里面,尤其是如何控制 channel
是整個(gè)設(shè)計(jì)的核心。
到此,關(guān)于“最簡(jiǎn)消息隊(duì)列的實(shí)現(xiàn)方法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!