真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

rabbitmq常見功能封裝的示例分析

這篇文章主要介紹rabbitmq常見功能封裝的示例分析,文中介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們一定要看完!

金壇ssl適用于網(wǎng)站、小程序/APP、API接口等需要進(jìn)行數(shù)據(jù)傳輸應(yīng)用場(chǎng)景,ssl證書未來(lái)市場(chǎng)廣闊!成為成都創(chuàng)新互聯(lián)公司的ssl證書銷售渠道,可以享受市場(chǎng)價(jià)格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:13518219792(備注:SSL證書合作)期待與您的合作!

在項(xiàng)目中rabbitmq得到了廣泛的時(shí)候,這里對(duì)rabbitmq的常規(guī)功能做了一個(gè)簡(jiǎn)單的總結(jié),并封裝成了composer包,composer包地址(https://packagist.org/packages/maweibinguo/easyrabbitmq)、github地址(https://github.com/maweibinguo/easyrabbitmq),歡迎fork,由于水平有限,難免存在bug,歡迎提出寶貴意見

easy-rabbitmq 包簡(jiǎn)介

對(duì)php-amqplib/php-amqplib包的二次封裝,為常見功能提供一套開箱即用的生產(chǎn)解決方案
。目前支持的功能列表如下:

  • 推送消息到直連交換機(jī)(含延遲消息)

  • 推送消息到扇形交換機(jī)(含延遲消息)

  • 推送消息到主題交換機(jī)(含延遲消息)

  • 訂閱模式下的可靠消費(fèi), 消費(fèi)者消費(fèi)失敗后將會(huì)嘗試?yán)^續(xù)消費(fèi),最多嘗試5次。

  • 拉取模式下的可靠消費(fèi), 消費(fèi)者消費(fèi)失敗后將會(huì)嘗試?yán)^續(xù)消費(fèi),最多嘗試5次。

如果還有其它場(chǎng)景,歡迎繼續(xù)補(bǔ)充,隨后進(jìn)行迭代??!

要求

  • 安裝包對(duì)PHP版本對(duì)要求主要取決于php-amqplib/php-amqplib包本身對(duì)要求,這里為了兼顧php5.0的使用者,我們使用了php-amqplib/php-amqplib包V2.9.0的版本。
    具體的要求參照這里(https://packagist.org/packages/php-amqplib/php-amqplib#v2.9.0)。
    不過(guò)筆者推薦使用php7.0及其以上版本, 這個(gè)開發(fā)包也是在7.0這個(gè)版本上面開發(fā)完成的!

安裝

      composer require maweibinguo/easyrabbitmq

使用

在這里我們推薦php腳本+supervisor結(jié)合使用,用以保證消費(fèi)進(jìn)程的可靠性、增強(qiáng)worker的消費(fèi)能力! 如果你還沒(méi)有聽說(shuō)過(guò)supervisor,可以點(diǎn)擊這里(http://www.supervisord.org/introduction.html)了解.

1、推送消息

1-1、推送消息到直連交換機(jī)
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)
      $instance->pushToDirect(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_direct_exchange", //交換機(jī)名稱
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
                        $delaySec = 30 //延遲秒數(shù)
      );

      //無(wú)延遲,推入到指定到直鏈交換機(jī)
      $instance->pushToDirect(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_direct_exchange", //交換機(jī)名稱
                        $routingKey = "direct_test_queue", //消息的routingKey,consume(get) 方法到bingdingKey 要和routingKey保持一致
      );
1-2、推送消息到扇形交換機(jī)
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)
      $instance->pushToFanout(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_fanout_exchange", //交換機(jī)名稱
                        $delaySec = 30 //延遲秒數(shù)
      );

      //無(wú)延遲,推入到指定到直鏈交換機(jī)
      $instance->pushToFanout(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_fanout_exchange" //交換機(jī)名稱
      );
1-3、推送消息到主題交換機(jī)
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      
      //延遲消息,30 秒中后才會(huì)到達(dá)指定的交換機(jī)
      $instance->pushToTopic(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_topic_exchange", //交換機(jī)名稱
                        /**
                         * routingKey 要同consum(get)方法的bindingKey相匹配
                         * bindingKey支持兩種特殊的字符"*"、“#”,用作模糊匹配, 其中"*"用于匹配一個(gè)單詞、“#”用于匹配多個(gè)單詞(也可以是0個(gè))
                         * 無(wú)論是bindingKey還是routingKey, 被"."分隔開的每一段獨(dú)立的字符串就是一個(gè)單詞, easy.topic.queue, 包含三個(gè)單詞easy、topic、queue
                         */
                        $routingKey = "easy.topic.queue",
                        $delaySec = 30 //延遲秒數(shù)
      );

      //無(wú)延遲,推入到指定到直鏈交換機(jī)
      $instance->pushToTopic(
                        $msg = time(), //消息體內(nèi)容
                        $exchange = "easy_topic_exchange", //交換機(jī)名稱
                        $routingKey = "easy.topic.queue"    
      );

2、消費(fèi)消息

消費(fèi)支持自動(dòng)重試,最多嘗試重試5次,每次消費(fèi)失敗后該消息將會(huì)被重新投入到消費(fèi)隊(duì)列中。重新的時(shí)間將會(huì)隨著失敗的次數(shù)增多逐漸推移,本客戶端支持的推移策略如下:
失敗1次(1秒鐘后會(huì)再被投遞), 失敗2次(2秒鐘后會(huì)再被投遞), 失敗3次(4秒鐘后會(huì)再被投遞), 失敗4次(8秒鐘后會(huì)再被投遞), 失敗5次(16秒鐘后會(huì)再被投遞)

2-1、訂閱模式
訂閱模式下的可靠消費(fèi)
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->consume(
            $queueName = "direct_test_queue",//訂閱的隊(duì)列名稱
            $consumerTag = "c1",//消費(fèi)標(biāo)記
            $exchange = "easy_direct_exchange",//交換機(jī)名稱
            $bindingKey = "direct_test_queue",//bindingkey,如果是直鏈交換機(jī)需要同routingKey保持一致
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回結(jié)果不絕對(duì)等于(===)true,那么將觸發(fā)消息重試機(jī)制
                return false;
            },
            //5次消費(fèi)消費(fèi)失敗后,失敗消息將會(huì)投遞到的隊(duì)列名稱
            $failedQueue = "easymq@failed"
      );
2-2、拉取模式
拉取模式下的可靠消費(fèi)
      $config = [
          "host" => "127.0.0.1",
            "port" => "5672",
            "user" => "guest",
            "password" => "guest",
            "vhost" => "/",
            "channel_max_num" => 10,
      ];    
      $instance = RabbitMq::getInstance($config);
      $instance->get(
            $queue = "get_queue",
            $exchange = "easy_fanout_exchange",
            $bindingKey = "",
            $callback = function($msg){
                $body = $msg->body;
                file_put_contents("./test.log", "time => " . time() . "\t" . " body => " . $body . PHP_EOL , FILE_APPEND);
                //如果返回結(jié)果不絕對(duì)等于(===)true,那么將觸發(fā)消息重試機(jī)制
                return false;
            },
            //5次消費(fèi)消費(fèi)失敗后,失敗消息將會(huì)投遞到的隊(duì)列名稱
            $failedQueue = 'easymq@failed'
      );

以上是“rabbitmq常見功能封裝的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!希望分享的內(nèi)容對(duì)大家有幫助,更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


當(dāng)前標(biāo)題:rabbitmq常見功能封裝的示例分析
網(wǎng)站路徑:http://weahome.cn/article/pioojs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部