本篇內(nèi)容介紹了“golang如何實(shí)現(xiàn)rabbitmq監(jiān)聽(tīng)”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)公司2013年成立,先為北林等服務(wù)建站,北林等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為北林企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
準(zhǔn)備工作
在開(kāi)始之前,需要確保已經(jīng)安裝RabbitMQ。由于RabbitMQ依賴Erlang,所以還需要安裝Erlang。
安裝完成之后,我們需要安裝Golang第三方包。其中,AMQP包是必不可少的,它可以讓我們很方便地連接和操作RabbitMQ。
go get github.com/streadway/amqp
代碼實(shí)現(xiàn)
首先,我們需要連接到RabbitMQ。連接成功后,我們需要聲明一個(gè)名為“test”、類型為“fanout”的exchange。exchange是RabbitMQ中實(shí)現(xiàn)消息路由的重要組成部分,它負(fù)責(zé)接收消息并將它們分發(fā)給隊(duì)列。在這種情況下,我們將聲明一個(gè)名為“test”的exchange,并將其類型設(shè)置為“fanout”,這意味著它將消息廣播給所有訂閱了它的隊(duì)列。
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"test", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
接下來(lái),我們需要?jiǎng)?chuàng)建一個(gè)新的、非持久的、具有自動(dòng)生成名稱的隊(duì)列。在這里,我們將使用隊(duì)列的名稱來(lái)綁定它們與剛剛聲明的“test”exchange。
q, err := ch.QueueDeclare(
"", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments
)
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name "", // routing key "test", // exchange false, nil,
)
failOnError(err, "Failed to bind a queue")
現(xiàn)在,RabbitMQ已經(jīng)準(zhǔn)備就緒,我們可以開(kāi)始監(jiān)聽(tīng)它的消息了。我們可以使用Consume函數(shù)來(lái)實(shí)現(xiàn)消息監(jiān)聽(tīng),它可以使我們持續(xù)不斷地接收來(lái)自隊(duì)列的消息,并對(duì)它們進(jìn)行處理。
msgs, err := ch.Consume(
q.Name, // queue name "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args
)
failOnError(err, "Failed to register a consumer")
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
}
在以上代碼中,我們使用了ch.Consume()方法來(lái)監(jiān)聽(tīng)指定隊(duì)列中的消息,并通過(guò)打印日志的方式輸出了消息內(nèi)容。需要注意的是,我們使用了一個(gè)死循環(huán)來(lái)部署進(jìn)行消息監(jiān)聽(tīng),這意味著我們會(huì)一直監(jiān)聽(tīng)隊(duì)列,直至程序被停止或者出現(xiàn)錯(cuò)誤。
完整代碼如下:
package main
import (
"log" "github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil { log.Fatalf("%s: %s", msg, err) }
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "test", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.QueueBind( q.Name, // queue name "", // routing key "test", // exchange false, nil, ) failOnError(err, "Failed to bind a queue") msgs, err := ch.Consume( q.Name, // queue name "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") for msg := range msgs { log.Printf("Received a message: %s", msg.Body) }
}
“golang如何實(shí)現(xiàn)rabbitmq監(jiān)聽(tīng)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!