EasyNetQ是基于官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來更加方便,開發(fā)者不用關(guān)心具體隊(duì)列聲明,路由聲明等細(xì)節(jié),幾句簡(jiǎn)單代碼即可發(fā)送消息到隊(duì)列,接收消息也很簡(jiǎn)單,下面將簡(jiǎn)單介紹EasyNetQ的使用方法。不知道什么是RabbitMQ?您可以關(guān)閉網(wǎng)頁了。
成都創(chuàng)新互聯(lián)公司"三網(wǎng)合一"的企業(yè)建站思路。企業(yè)可建設(shè)擁有電腦版、微信版、手機(jī)版的企業(yè)網(wǎng)站。實(shí)現(xiàn)跨屏營銷,產(chǎn)品發(fā)布一步更新,電腦網(wǎng)絡(luò)+移動(dòng)網(wǎng)絡(luò)一網(wǎng)打盡,滿足企業(yè)的營銷需求!成都創(chuàng)新互聯(lián)公司具備承接各種類型的成都網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、外貿(mào)網(wǎng)站建設(shè)項(xiàng)目的能力。經(jīng)過十載的努力的開拓,為不同行業(yè)的企事業(yè)單位提供了優(yōu)質(zhì)的服務(wù),并獲得了客戶的一致好評(píng)。
從NuGet上安裝即可,由于EasyNetQ是依賴RabbitMQ.Client所以,會(huì)同時(shí)安裝兩個(gè)dll。
PM> Install-Package EasyNetQ
使用EasyNetQ連接RabbitMQ,是在應(yīng)用程序啟動(dòng)時(shí)創(chuàng)建一個(gè)IBus對(duì)象,并且,在應(yīng)用程序關(guān)閉時(shí)釋放該對(duì)象。RabbitMQ連接是基于IBus接口的,當(dāng)IBus中的方法被調(diào)用,連接才會(huì)開啟。創(chuàng)建一個(gè)IBus對(duì)象的方法如下:
var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=mike;password=topsecret”);
根據(jù)上述代碼可以看出,連接字符串中是基于Key/Value形式的,每個(gè)Key中間用分號(hào)(;)斷開。其中host是必須要寫的,其他的值都是可以不用寫,會(huì)采用默認(rèn)的配置。連接中可能用到的Key如下:
host,host=localhost 或者h(yuǎn)ost =192.168.1.102或者h(yuǎn)ost=my.rabbitmq.com,如果用到集群配置的話,那么可以用逗號(hào)將服務(wù)地址隔開,例如host=a.com,b.com,c.com
virtualHost,虛擬主機(jī),默認(rèn)為'/'
username,用戶登錄名
password,用戶登錄密碼
requestedHeartbeat,心跳設(shè)置,默認(rèn)是10秒
prefetchcount,默認(rèn)是50
pubisherConfirms,默認(rèn)為false
persistentMessages,消息持久化,默認(rèn)為true
product,產(chǎn)品名
platform,平臺(tái)
timeout,默認(rèn)為10秒
關(guān)閉連接,可以使用 bus.Dispose();
EasyNetQ提供了一個(gè)日志接口 IEasyNetQLogger
public interface IEasyNetQLogger { void DebugWrite(string format, params object[] args); void InfoWrite(string format, params object[] args); void ErrorWrite(string format, params object[] args); void ErrorWrite(Exception exception); }
內(nèi)部默認(rèn)用的是NullLogger,即什么也不做,不記錄日志。在測(cè)試的時(shí)候也可以用ConsoleLogger來顯示EasyNetQ運(yùn)行中的各種信息。不過一般在正式使用環(huán)境中,可以自定義日志并實(shí)現(xiàn)IEasyNetQLogger接口。然后在RabbitHutch.CreateBus的重載方法中注冊(cè)想用的日志類型。(日志中會(huì)記錄連接RabbitMQ的過程和隊(duì)列創(chuàng)建細(xì)節(jié)等信息,對(duì)于不懂RabbitMQ的同學(xué),可能這些日志沒有什么意義)。代碼如下:
var logger = new MyLogger() // 繼承自 IEasyNetQLoggervar bus = RabbitHutch.CreateBus(“my connection”, x => x.Register(_ => logger));
EasyNetQ支持最簡(jiǎn)單的消息模式是發(fā)布和訂閱。發(fā)布消息后,任意消費(fèi)者可以訂閱該消息,也可以多個(gè)消費(fèi)者訂閱。并且不需要額外配置。首先,如上文中需要先創(chuàng)建一個(gè)IBus對(duì)象,然后,在創(chuàng)建一個(gè)可序列化的.NET對(duì)象。調(diào)用Publish方法即可。
message = MyMessage { Text =
警告,Publish只顧發(fā)送消息到隊(duì)列,但是不管有沒有消費(fèi)端訂閱,所以,發(fā)布之后,如果沒有消費(fèi)者,該消息將不會(huì)被消費(fèi)甚至丟失。
EasyNetQ提供了消息訂閱,當(dāng)調(diào)用Subscribe方法時(shí)候,EasyNetQ會(huì)創(chuàng)建一個(gè)用于接收消息的隊(duì)列,不過與消息發(fā)布不同的是,消息訂閱增加了一個(gè)參數(shù),subscribe_id.代碼如下:
bus.Subscribe("my_subscription_id", msg => Console.WriteLine(msg.Text));
第一個(gè)參數(shù)是訂閱id,另外一個(gè)是delegate參數(shù),用于處理接收到的消息。這里要注意的是,subscribe_id參數(shù)很重要,假如開發(fā)者用同一個(gè)subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會(huì)以輪訓(xùn)的方式給每個(gè)訂閱的隊(duì)列發(fā)送消息。接收到之后,其他隊(duì)列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那么生成的每一個(gè)隊(duì)列都會(huì)收到該消息。
舉個(gè)例子:出庫發(fā)貨,我們有五個(gè)商品倉庫,每個(gè)倉庫的商品都是一樣的,假如來了一堆訂單,那么我們需要五個(gè)倉庫共同工作,分別處理訂單。而同樣,總倉庫需要知道總出貨量,正常情況下,可以用每個(gè)倉庫的出貨量相加即可。不過如果我們?cè)诳倐}庫也監(jiān)聽商品訂單消息,那么,每次來訂單,總倉庫也都會(huì)收到一份,那么可以作相應(yīng)的統(tǒng)計(jì)了。
需要注意的是,在收到消息處理消息時(shí)候,不要占用太多的時(shí)間,會(huì)影響消息的處理效率,所以,遇到占用長(zhǎng)時(shí)間的處理方法,最好用異步處理。代碼如下:
bus.SubscribeAsync("subscribe_async_test", message => new WebClient().DownloadStringTask(new Uri("http://localhost:1338/?timeout=500")) .ContinueWith(task => Console.WriteLine("Received: '{0}', Downloaded: '{1}'", message.Text, task.Result)));
取消訂閱,可以用如下方法:
var subscriptionResult = bus.Subscribe("sub_id", MyHandler); ... subscriptionResult.Dispose();
或者直接IBus.Dispose();
與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊(duì)列名稱。
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });
bus.Receive("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));
并且,也可以在同一個(gè)隊(duì)列上發(fā)送不同的消息類型,Receive方法可以這么寫:
bus.Receive("my.queue", x => x .Add(message => deliveredMyMessage = message) .Add (message => deliveredMyOtherMessage = message));
如果消息到達(dá)隊(duì)列,但是沒有發(fā)現(xiàn)相應(yīng)消息類型的處理時(shí),EasyNetQ會(huì)發(fā)送一條消息到error隊(duì)列,并且,帶上一個(gè)異常信息:No handler found for message type
Publish方法,可以加一個(gè)topic參數(shù)。
bus.Publish(message, "X.A");
消息訂閱方可以通過路由來過濾相應(yīng)的消息。
* 匹配一個(gè)字符
#匹配0個(gè)或者多個(gè)字符
所以 X.A.2 會(huì)匹配到 "#", "X.#", "*.A.*" 但不會(huì)匹配 "X.B.*" 或者 "A". 當(dāng)消息訂閱需要用到topic時(shí)候,需要調(diào)用Subscribe的重載方法
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*")); bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
上述這種方式,會(huì)將消息輪詢發(fā)送給兩個(gè)訂閱者,如果只需要一個(gè)訂閱者的話,可以這么調(diào)用:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
以上就是EasyNetQ的一些基本用法了,是不是很簡(jiǎn)單呢,就這么輕松實(shí)現(xiàn)了消息隊(duì)列的使用。當(dāng)然,要深入內(nèi)部還是有很多東西的。比如依賴注入,自定義EasyNetQ組件,RPC實(shí)現(xiàn)等。而且,他的源碼也是比較有參考價(jià)值的,相對(duì)于之前自己寫的基于RabbitMQ的封裝,自己的簡(jiǎn)直是不能看呀。希望本文能給讀完的你帶來幫助。