真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

PHP如何實(shí)現(xiàn)php-amqplib/php-amqplib實(shí)例RabbitMq

這篇文章主要為大家展示了“PHP如何實(shí)現(xiàn)php-amqplib/php-amqplib實(shí)例RabbitMq”,內(nèi)容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領(lǐng)大家一起研究并學(xué)習(xí)一下“PHP如何實(shí)現(xiàn)php-amqplib/php-amqplib實(shí)例RabbitMq”這篇文章吧。

創(chuàng)新互聯(lián)是專業(yè)的晉城網(wǎng)站建設(shè)公司,晉城接單;提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè),網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行晉城網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!

項(xiàng)目代碼

https://gitee.com/owenzhang24/tp5

其他筆記:

1: 列出隊(duì)列(Listing queues)

如果你想查看Rabbitmq隊(duì)列,并且想知道有多少消息存在其中,你(作為特權(quán)用戶)可以使用rabbitmqctl 工具:

rabbitmqctl list_queues。

在Windows中,省略sudo:

rabbitmqctl.bat list_queues

2: 工作隊(duì)列

我們發(fā)現(xiàn)即使使用CTRL+C殺掉了一個(gè)工作者(worker)進(jìn)程,消息也不會(huì)丟失。當(dāng)工作者(worker)掛掉這后,所有沒有響應(yīng)的消息都會(huì)重新發(fā)送。

一個(gè)很容易犯的錯(cuò)誤就是忘了basic_ack,后果很嚴(yán)重。消息在你的程序退出之后就會(huì)重新發(fā)送,如果它不能夠釋放沒響應(yīng)的消息,RabbitMQ就會(huì)占用越來越多的內(nèi)存。

為了排除這種錯(cuò)誤,你可以使用rabbitmqctl命令,輸出messages_unacknowledged字段:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在window系統(tǒng)運(yùn)行,去掉sudo:

$ rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

3: rabbitmqctl能夠列出服務(wù)器上所有的交換器:

$ sudo rabbitmqctl list_exchanges

這個(gè)列表中有一些叫做amq.*的交換器。這些都是默認(rèn)創(chuàng)建的,不過這時(shí)候你還不需要使用他們。

4:列出所有現(xiàn)存的綁定 rabbitmqctl list_bindings

5: 如果你想把日志保存到文件中,只需要打開控制臺(tái)輸入: ( receive_logs.php  源代碼)

$ php receive_logs.php > logs_from_rabbit.log

如果你希望所有的日志信息都輸出到屏幕中,打開一個(gè)新的終端,然后輸入:

$ php receive_logs_direct.php info warning error
# => [*] Waiting for logs. To exit press CTRL+C

如果要觸發(fā)一個(gè)error級(jí)別的日志,只需要輸入:

$ php emit_log_direct.php error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

第一:安裝RabbitMq環(huán)境

windows環(huán)境的rabbitmq安裝與啟動(dòng)

https://my.oschina.net/owenzhang24/blog/5051652

第二:composer require php-amqplib/php-amqplib
第三:代碼類
  1. rabbitMq實(shí)現(xiàn)的基礎(chǔ)類:application/common/lib/classes/rabbitmq/RabbitMq.php

  2. 供外部調(diào)用的rabbitMq類:application/common/lib/classes/RabbitMqWork.php

  3. 測試發(fā)送消息到rabbitMq中的方法:application/index/controller/Index.php

  4. 添加php think命令實(shí)現(xiàn)接收rabbitMq中的消息:application/common/command/*.php

第四:使用說明
  1. 發(fā)送消息時(shí)直接在自己的方法中調(diào)用RabbitMqWork.php類中的幾個(gè)送消息的方法即可。

  2. application/common/command/下的類都是實(shí)現(xiàn)添加php think命令的類,在configure方法中的setName()中設(shè)置命令名稱,execute()方法是為了執(zhí)行接收rabbitMq中的消息,同時(shí)在application/command.php中return添加設(shè)置的命令名稱及對應(yīng)的命令目錄地址。

  3. 貢獻(xiàn)文檔

  4. RabbitMQ 中文文檔-PHP版。https://xiaoxiami.gitbook.io/rabbitmq_into_chinese_php/

  5. RabbitMQ官方文檔。https://www.rabbitmq.com/getstarted.html

第五:源碼

application/common/lib/classes/rabbitmq/RabbitMq.php

 'direct_exchange',
        self::TOPIC => 'topic_exchange',
        self::HEADERS => 'headers_exchange',
        self::FANOUT => 'fanout_exchange',
    ];
    const SEVERITYS = [
        'info',
        'warning',
        'error'
    ];
    static private $exchangeName = '';

    /**
     * RabbitMq constructor.
     * @param $exchangeType
     */
    private function __construct($exchangeType)
    {
        self::$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
        self::$channel = self::$connection->channel();
        if (!empty($exchangeType)) {
            self::$exchangeName = self::$exchangeNames[$exchangeType];
            self::$channel->exchange_declare(
                self::$exchangeName, //交換機(jī)名稱
                $exchangeType, //路由類型
                false, //don't check if a queue with the same name exists 是否檢測同名隊(duì)列
                true, //the queue will not survive server restarts 是否開啟隊(duì)列持久化
                false //the queue will be deleted once the channel is closed. 通道關(guān)閉后是否刪除隊(duì)列
            );
        }
    }

    /**
     * 實(shí)例化
     * @param string $exchangeType
     * @return RabbitMq
     */
    public static function instance($exchangeType = '')
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self($exchangeType);
        }
        return self::$instance;
    }

    /**
     * 防止被外部復(fù)制
     */
    private function __clone()
    {
    }

    /**
     * 簡單的發(fā)送
     */
    public function send()
    {
        self::$channel->queue_declare('hello', false, false, false);
        $msg = new AMQPMessage('Hello World!');
        self::$channel->basic_publish($msg, '', 'hello');
        echo "[X] Sent 'Hello World!'\n";
    }

    /**
     * 簡單的接收
     * @param $queueName
     * @param $callback
     */
    public function receive($callback)
    {
        self::$channel->queue_declare('hello', false, false, false, true);
        echo "[*] Waiting for messages. To exit press CTRL+C\n";

        self::$channel->basic_consume('hello', '', false, true, false, false, $callback);

        while (count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 添加工作隊(duì)列
     * @param string $data
     */
    public function addTask($data = '')
    {
        self::$channel->queue_declare('task_queue', false, true, false, true);
        if (empty($data)) $data = 'Hello World!';
        $msg = new AMQPMessage(
            $data,
            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );
        self::$channel->basic_publish($msg, '', 'task_queue');

        echo "[x] Sent $data \n";
    }

    /**
     * 執(zhí)行工作隊(duì)列
     * @param $callback
     */
    public function workTask($callback)
    {
        self::$channel->queue_declare('task_queue', false, true, false, true);
        echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

        self::$channel->basic_qos(null, 1, null);
        self::$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

        while (count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 發(fā)布
     * @param string $data
     */
    public function sendQueue($data = '')
    {
        if (empty($data)) $data = 'info:Hello World!';
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg, self::$exchangeName);
        echo "[x] Sent $data \n";
    }

    /**
     * 訂閱
     * @param $callback
     */
    public function subscribeQueue($callback)
    {
        list($queue_name, ,) = self::$channel->queue_declare(
            "", //隊(duì)列名稱
            false, //don't check if a queue with the same name exists 是否檢測同名隊(duì)列
            true, //the queue will not survive server restarts 是否開啟隊(duì)列持久化
            true, //the queue might be accessed by other channels 隊(duì)列是否可以被其他隊(duì)列訪問
            false //the queue will be deleted once the channel is closed. 通道關(guān)閉后是否刪除隊(duì)列
        );
        self::$channel->queue_bind($queue_name, self::$exchangeName);
        echo "[*] Waiting for logs. To exit press CTRL+C \n";
        self::$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

        while (count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 發(fā)送(直接交換機(jī))
     * @param $routingKey
     * @param string $data
     */
    public function sendDirect($routingKey, $data = '')
    {
        if (empty($data)) $data = "Hello World!";
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
        echo "[x] Sent $routingKey:$data \n";
    }

    /**
     * 接收(直接交換機(jī))
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveDirect(\Closure $callback, array $bindingKeys)
    {
        list($queue_namme, ,) = self::$channel->queue_declare('', false, true, true, false);
        foreach ($bindingKeys as $bindingKey) {
            self::$channel->queue_bind($queue_namme, self::$exchangeName, $bindingKey);
        }
        echo "[x] Waiting for logs. To exit press CTRL+C \n";
        self::$channel->basic_consume($queue_namme, '', false, true, false, false, $callback);
        while (count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 發(fā)送(主題交換機(jī))
     * @param $routingKey
     * @param string $data
     */
    public function sendTopic($routingKey, $data = '')
    {
        if (empty($data)) $data = "Hello World!";
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg, self::$exchangeName, $routingKey);
        echo " [x] Sent ", $routingKey, ':', $data, " \n";
    }

    /**
     * 接收(主題交換機(jī))
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveTopic(\Closure $callback, array $bindingKeys)
    {
        list($queueName, ,) = self::$channel->queue_declare("", false, true, true, false);
        foreach ($bindingKeys as $bindingKey) {
            self::$channel->queue_bind($queueName, self::$exchangeName, $bindingKey);
        }

        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
        self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);

        while (count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 銷毀
     */
    public function __destruct()
    {
        // TODO: Implement __destruct() method.
        self::$channel->close();
        self::$connection->close();
    }
}

application/common/lib/classes/RabbitMqWork.php

RabbitMq = RabbitMq::instance($exchageType);
    }

    /**
     * 發(fā)送(普通)
     */
    public function send()
    {
        $this->RabbitMq->send();
    }

    /**
     * 接收(普通)
     * @param $callback
     */
    public function receive($callback)
    {
        $this->RabbitMq->receive($callback);
    }

    /**
     * 發(fā)送(工作隊(duì)列)
     * @param $data
     */
    public function addTask($data)
    {
        $this->RabbitMq->addTask($data);
    }

    /**
     * 接收(工作隊(duì)列)
     * @param $callback
     */
    public function workTask($callback)
    {
        $this->RabbitMq->workTask($callback);
    }

    /**
     * 發(fā)布(扇形交換機(jī))
     * @param $data
     */
    public function sendQueue($data)
    {
        $this->RabbitMq->sendQueue($data);
    }

    /**
     * 訂閱(扇形交換機(jī))
     * @param $callback
     */
    public function subscribeQueue($callback)
    {
        $this->RabbitMq->subscribeQueue($callback);
    }

    /**
     * 發(fā)送(直接交換機(jī))
     * @param $bindingKey
     * @param $data
     */
    public function sendDirect($routingKey, $data)
    {
        $this->RabbitMq->sendDirect($routingKey, $data);
    }

    /**
     * 接收(直接交換機(jī))
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveDirect(\Closure $callback, array $bindingKeys)
    {
        $this->RabbitMq->receiveDirect($callback, $bindingKeys);
    }

    /**
     * 發(fā)送(主題交換機(jī))
     * @param $routingKey
     * @param $data
     */
    public function sendTopic($routingKey, $data)
    {
        $this->RabbitMq->sendTopic($routingKey, $data);
    }

    /**
     * 接收(主題交換機(jī))
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveTopic(\Closure $callback, array $bindingKeys)
    {
        $this->RabbitMq->receiveTopic($callback, $bindingKeys);
    }
}

application/index/controller/Index.php

send();
//        $this->addTask();
//        $this->sendQueue();
//        $this->sendDirect();
        $this->sendTopic();
        var_dump(11);
        die();
    }
    public function searchBlog()
    {
//        $id=1;
//        $res = SyncBlog::getInstance()->syncBlog($id);
        $search='11';
        $res = SearchBlog::getInstance()->searchBlog($search, 1, 100);
        var_dump($res);
        die();
        var_dump(1111);
        die();
    }

    /**
     * 發(fā)送(普通)
     */
    public function send()
    {
        $RabbitMqWork = new RabbitMqWork();
        $RabbitMqWork->send();
    }

    /**
     * 發(fā)送(工作隊(duì)列)
     */
    public function addTask()
    {
        $data = input('data', 'This is work task!');
        $RabbitMqWork = new RabbitMqWork();
        $RabbitMqWork->addTask($data);
    }

    /**
     * 發(fā)送(扇形交換機(jī))
     */
    public function sendQueue()
    {
        $data = input('data', 'This is send queue1');
        $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
        $RabbitMqWork->sendQueue($data);
    }

    /**
     * 發(fā)送(直接交換機(jī))
     */
    public function sendDirect()
    {
        $data = input('data', 'Hello World!');
        $routingKey = input('routingKey', 'info');
        $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
        $RabbitMqWork->sendDirect($routingKey, $data);
    }

    /**
     * 發(fā)送(主題交換機(jī))
     */
    public function sendTopic()
    {
        $data = input('data', 'Hello World!');
        $routingKey = input('routingKey', 'lazy.boy');
        $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
        $RabbitMqWork->sendTopic($routingKey, $data);
    }
}

application/command.php


// +----------------------------------------------------------------------

return [
    'simpleMq' => 'application\command\SimpleWork',
    'workQueue' => 'application\command\WorkQueue',
    'sendQueue' => 'application\command\SendQueue',
    'directQueue' => 'application\command\DirectQueue',
    'topicQueue' => 'application\command\TopicQueue',
];

application/common/command/*.php

application/command/DirectQueue.php

setName('directQueue');
    }

    protected function execute(Input $input, Output $output)
    {
        $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
        $callback = function ($msg){
            echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n";
        };
        $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS);
    }
}

application/command/SendQueue.php

setName('sendQueue');
    }

    protected function execute(Input $input, Output $output)
    {
        $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
        $callback = function ($msg) {
            echo 'Receive:';
            echo "Msg:$msg->body \n";
            \Log::error("Msg:$msg->body");
        };
        $RabbitMqWork->subscribeQueue($callback);
    }
}

application/command/SimpleWork.php

setName('simpleMq');
    }

    protected function execute(Input $input, Output $output)
    {
        $RabbitMqWork= new RabbitMqWork();
        $callback = function ($msg){
            echo 'Receive:';
            $queueName = $msg->delivery_info['routing_key'];
            $msgData = $msg->body;
            $isAck = true;
            echo 'Msg:'.$msgData."\n";
            echo 'QueueName:'.$queueName."\n";
            if($isAck) {
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
            }
        };
        $RabbitMqWork->receive($callback);
    }
}

application/command/TopicQueue.php

setName('topicQueue');
    }

    protected function execute(Input $input, Output $output)
    {
        $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
        $callback = function ($msg){
            echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
        };
        $bindingKeys = [
            '*.orange.*',
            '*.*.rabbit',
            'lazy.#'
        ];
        $RabbitMqWork->receiveTopic($callback,$bindingKeys);
    }
}

application/command/WorkQueue.php

setName('workQueue');
    }

    protected function execute(Input $input, Output $output)
    {
        $RabbitMqWork = new RabbitMqWork();
        $callback = function ($msg){
            echo " [x] Received ", $msg->body, "\n";
            sleep(substr_count($msg->body, '.'));
            echo " [x] Done", "\n";
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $RabbitMqWork->workTask($callback);
    }
}

以上是“PHP如何實(shí)現(xiàn)php-amqplib/php-amqplib實(shí)例RabbitMq”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!


名稱欄目:PHP如何實(shí)現(xiàn)php-amqplib/php-amqplib實(shí)例RabbitMq
網(wǎng)站網(wǎng)址:http://weahome.cn/article/jssjhi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部