延遲任務(wù)應(yīng)用場(chǎng)景
創(chuàng)新互聯(lián)是一家業(yè)務(wù)范圍包括IDC托管業(yè)務(wù),網(wǎng)絡(luò)空間、主機(jī)租用、主機(jī)托管,四川、重慶、廣東電信服務(wù)器租用,內(nèi)江服務(wù)器托管,成都網(wǎng)通服務(wù)器托管,成都服務(wù)器租用,業(yè)務(wù)范圍遍及中國大陸、港澳臺(tái)以及歐美等多個(gè)國家及地區(qū)的互聯(lián)網(wǎng)數(shù)據(jù)服務(wù)公司。場(chǎng)景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會(huì)遇到向終端下發(fā)命令,如果命令一段時(shí)間沒有應(yīng)答,就需要設(shè)置成超時(shí)。
場(chǎng)景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動(dòng)取消訂單。
實(shí)現(xiàn)方案
定時(shí)任務(wù)輪詢數(shù)據(jù)庫,看是否有產(chǎn)生新任務(wù),如果產(chǎn)生則消費(fèi)任務(wù)
pcntl_alarm為進(jìn)程設(shè)置一個(gè)鬧鐘信號(hào)
swoole的異步高精度定時(shí)器:swoole_time_tick(類似javascript的setInterval)和swoole_time_after(相當(dāng)于javascript的setTimeout)
rabbitmq延遲任務(wù)
以上四種方案,如果生產(chǎn)環(huán)境有使用到swoole建議使用第三種方案。此篇文章重點(diǎn)講述第四種方案實(shí)現(xiàn)
Rabbitmq延遲隊(duì)列實(shí)現(xiàn)
RabbitMQ沒有直接去實(shí)現(xiàn)延遲隊(duì)列這個(gè)功能。而是需要通過消息的TTL和死信Exchange這兩者的組合來實(shí)現(xiàn)。
消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。
可以通過設(shè)置消息的expiration字段或者隊(duì)列x-message-ttl屬性來設(shè)置時(shí)間,兩者是一樣的效果。下面例子是通過隊(duì)列的ttl實(shí)現(xiàn)死信
$queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, )); $queue->declareQueue();當(dāng)上面的消息扔到該隊(duì)列中后,過了60秒,如果沒有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒有“死掉”的消息對(duì)頂上來,被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去。單靠死信還不能實(shí)現(xiàn)延遲任務(wù),還要靠Dead Letter Exchange。
Exchage的概念在這里就不在贅述,可以從這里進(jìn)行了解。一個(gè)消息在滿足如下條件下,會(huì)進(jìn)死信路由,記住這里是路由而不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)很多隊(duì)列。
1. 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊(duì)列的長度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
Dead Letter Exchange其實(shí)就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣。只是在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去。
示例
生產(chǎn)者:
'test_cache_exchange', 'queueName' => 'test_cache_queue', 'routeKey' => 'test_cache_route', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'rabbitmq', 'password' => 'rabbitmq', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); 判斷是否加載amqp擴(kuò)展 //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//持久化 $exchange->setName($params['exchangeName']?:''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, )); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { } //$num = mt_rand(100, 500); $num = 1; //生成消息 $exchange->publish("this is test message..", $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));消費(fèi)者:
'delay_exchange', 'queueName' => 'delay_queue', 'routeKey' => 'delay_route', ); $connectConfig = array( 'host' => 'localhost', 'port' => 5672, 'login' => 'rabbitmq', 'password' => 'rabbitmq', 'vhost' => '/' ); //var_dump(extension_loaded('amqp')); //exit(); try { $conn = new AMQPConnection($connectConfig); $conn->connect(); if (!$conn->isConnected()) { //die('Conexiune esuata'); //TODO 記錄日志 echo 'rabbit-mq 連接錯(cuò)誤:', json_encode($connectConfig); exit(); } $channel = new AMQPChannel($conn); if (!$channel->isConnected()) { // die('Connection through channel failed'); //TODO 記錄日志 echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig); exit(); } $exchange = new AMQPExchange($channel); $exchange->setFlags(AMQP_DURABLE);//聲明一個(gè)已存在的交換器的,如果不存在將拋出異常,這個(gè)一般用在consume端 $exchange->setName($params['exchangeName']?:''); $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $exchange->declareExchange(); //$channel->startTransaction(); $queue = new AMQPQueue($channel); $queue->setName($params['queueName']?:''); $queue->setFlags(AMQP_DURABLE); $queue->declareQueue(); //綁定 $queue->bind($params['exchangeName'], $params['routeKey']); } catch(Exception $e) { echo $e->getMessage(); exit(); } function callback(AMQPEnvelope $message) { global $queue; if ($message) { $body = $message->getBody(); echo $body . PHP_EOL; $queue->ack($message->getDeliveryTag()); } else { echo 'no message' . PHP_EOL; } } //$queue->consume('callback'); 第一種消費(fèi)方式,但是會(huì)阻塞,程序一直會(huì)卡在此處 //第二種消費(fèi)方式,非阻塞 $start = time(); while(true) { $message = $queue->get(); if(!empty($message)) { echo $message->getBody(); $queue->ack($message->getDeliveryTag()); //應(yīng)答,代表該消息已經(jīng)消費(fèi) $end = time(); echo '這個(gè)示例注意要跟上一篇博文示例作對(duì)比rabbitmq以及php amqp擴(kuò)展使用,最關(guān)鍵的點(diǎn)就是在生產(chǎn)者那里
$queue->setArguments(array( 'x-dead-letter-exchange' => 'delay_exchange', 'x-dead-letter-routing-key' => 'delay_route', 'x-message-ttl' => 60000, ));詳細(xì)過程:
首先由正常隊(duì)列(test_cache_queue)和正常exchange(test_cache_exchange),兩者相綁定。
該正常隊(duì)列設(shè)置了死信路由(delay_exchange)和死信路由key以及TTL,生產(chǎn)者生產(chǎn)消息到正常隊(duì)列和正常路由上.
當(dāng)正常隊(duì)列設(shè)置TTL時(shí)間一到,那延遲消息就會(huì)自動(dòng)發(fā)布到死信路由
消費(fèi)者通過死信路由(delay_exchange)和死信隊(duì)列(delay_queue)來消費(fèi)
參考文章:
https://www.cnblogs.com/haoxinyue/p/6613706.html
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。