redis 實(shí)現(xiàn)隊(duì)列原理的實(shí)例詳解
企業(yè)建站必須是能夠以充分展現(xiàn)企業(yè)形象為主要目的,是企業(yè)文化與產(chǎn)品對(duì)外擴(kuò)展宣傳的重要窗口,一個(gè)合格的網(wǎng)站不僅僅能為公司帶來(lái)巨大的互聯(lián)網(wǎng)上的收集和信息發(fā)布平臺(tái),創(chuàng)新互聯(lián)面向各種領(lǐng)域:墻體彩繪等網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷(xiāo)推廣解決方案、網(wǎng)站設(shè)計(jì)等建站排名服務(wù)。
場(chǎng)景說(shuō)明:
·用于處理比較耗時(shí)的請(qǐng)求,例如批量發(fā)送郵件,如果直接在網(wǎng)頁(yè)觸發(fā)執(zhí)行發(fā)送,程序會(huì)出現(xiàn)超時(shí)
·高并發(fā)場(chǎng)景,當(dāng)某個(gè)時(shí)刻請(qǐng)求瞬間增加時(shí),可以把請(qǐng)求寫(xiě)入到隊(duì)列,后臺(tái)在去處理這些請(qǐng)求
·搶購(gòu)場(chǎng)景,先入先出的模式
命令:
rpush + blpop 或 lpush + brpop
rpush : 往列表右側(cè)推入數(shù)據(jù)
blpop : 客戶(hù)端阻塞直到隊(duì)列有值輸出
簡(jiǎn)單隊(duì)列:
simple.php $stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->rPush('goods:task', json_encode($row));} $redis->close();
獲取20000萬(wàn)個(gè)商品,并把json化后的數(shù)據(jù)推入goods:task隊(duì)列
queueBlpop.php // 出隊(duì) while (true) { // 阻塞設(shè)置超時(shí)時(shí)間為3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } }
設(shè)置blpop阻塞時(shí)間為3秒,當(dāng)有數(shù)據(jù)出隊(duì)時(shí)保存到goods:success:task表示執(zhí)行成功,當(dāng)隊(duì)列沒(méi)有數(shù)據(jù)時(shí),程序睡眠10秒重新檢查goods:task是否有數(shù)據(jù)出隊(duì)
cli 模式執(zhí)行命令:
php simple.phpphp queueBlpop.php
優(yōu)先級(jí)隊(duì)列
思路:
blpop 有多個(gè)鍵時(shí),blpop會(huì)從左至右遍歷鍵,一旦一個(gè)鍵能彈出元素,客戶(hù)端立即返回。例如:
blpop key1 key2 key3 key4
從key1到key4遍歷,如果哪個(gè)key有值,則彈出這個(gè)值,若多個(gè)key同時(shí)有值時(shí),優(yōu)先彈出排在左邊的key。
priority.php // 設(shè)置優(yōu)先級(jí)隊(duì)列 $high = 'goods:high:task'; $mid = 'goods:mid:task'; $low = 'goods:low:task'; $stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000'); $stmt->execute(); while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { // cid 小于100放在低級(jí)隊(duì)列 if ($row['cid'] < 100) { $redis->rPush($low, json_encode($row)); // cid 100到600之間放在中級(jí)隊(duì)列 }elseif ($row['cid'] > 100 && $row['cid'] < 600) { $redis->rPush($mid, json_encode($row)); } // cid 大于600放在高級(jí)隊(duì)列 else { $redis->rPush($high, json_encode($row)); } }$redis->close(); priorityBlop.php // 優(yōu)先級(jí)隊(duì)列 $high = 'goods:high:task'; $mid = 'goods:mid:task';$low = 'goods:low:task'; // 出隊(duì) while(true){ // 優(yōu)先級(jí)高的隊(duì)列放在左側(cè) $task = $redis->blPop(array($high, $mid, $low), 3); if ($task) { $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } }
優(yōu)先級(jí)高的隊(duì)列放在blpop命令左側(cè),依次排序,blpop命令會(huì)依次彈出high, mid, low隊(duì)列的值
cli 模式執(zhí)行命令:
php priority.phpphp priorityBlpop.php
延遲隊(duì)列
思路:
可以用一個(gè)有序集合來(lái)保存延遲任務(wù),member保存任務(wù)內(nèi)容,score保存(當(dāng)前時(shí)間 + 延時(shí)時(shí)間)。用時(shí)間作為score。程序只要用有序集合的第一條任務(wù)的score和當(dāng)前時(shí)間做比較,如果當(dāng)前時(shí)間比score小,說(shuō)明有序集合的所有任務(wù)還沒(méi)到執(zhí)行時(shí)間。
delay.php $stmt = $pdo->prepare('select id, cid, name from zc_goods limit 200000');$stmt->execute();while ($row = $stmt->fetch(PDO::FETCH_ASSOC)) { $redis->zAdd('goods:delay:task', time() + rand(1, 300), json_encode($row));}
將20萬(wàn)條任務(wù)導(dǎo)入有序集合goods:delay:task,所有任務(wù)延遲到之后的1秒到300秒內(nèi)執(zhí)行
delayHandle.php
while (true) {// 因?yàn)槭怯行蚣?,只要判斷第一條記錄的延時(shí)時(shí)間,例如第一條未到執(zhí)行時(shí)間 // 相對(duì)說(shuō)明集合的其他任務(wù)未到執(zhí)行時(shí)間
$rs = $redis->zRange('goods:delay:task', 0, 0, true);
// 集合沒(méi)有任務(wù),睡眠時(shí)間設(shè)置為5秒
if (empty($rs)) { echo 'no tasks , sleep 5 seconds' . PHP_EOL;sleep(5);continue;} $taskJson = key($rs); $delay = $rs[$taskJson]; $task = json_decode($taskJson, true); $now = time();// 到時(shí)間執(zhí)行延時(shí)任務(wù) if ($delay <= $now) {
// 對(duì)當(dāng)前任務(wù)加鎖,避免移動(dòng)移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列時(shí)被其他客戶(hù)端修改
if (!($identifier = acquireLock($task['id']))) { continue;}
// 移動(dòng)延時(shí)任務(wù)到任務(wù)隊(duì)列
$redis->zRem('goods:delay:task', $taskJson); $redis->rPush('goods:task', $taskJson); echo $task['id'] . ' run ' . PHP_EOL;
// 釋放鎖
releaseLock($task['id'], $identifier); } else {
// 延時(shí)任務(wù)未到執(zhí)行時(shí)間
$sleep = $delay - $now;
// 最大值設(shè)置為2秒,保證如果有新的任務(wù)(延時(shí)時(shí)間1秒)進(jìn)入集合時(shí)能夠及時(shí)的被處理
$sleep = $sleep > 2 ? 2 :$sleep; echo 'wait ' . $sleep . ' seconds ' . PHP_EOL; sleep($sleep); } }
這個(gè)文件對(duì)有序集合內(nèi)的延遲任務(wù)做處理,如果延遲任務(wù)到了執(zhí)行時(shí)間,則把延遲任務(wù)移動(dòng)到任務(wù)隊(duì)列中
queueBlpop.php // 出隊(duì)while (true) { // 阻塞設(shè)置超時(shí)時(shí)間為3秒 $task = $redis->blPop(array('goods:task'), 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL;sleep(5); } }
處理任務(wù)隊(duì)列中的任務(wù)
cli模式下執(zhí)行命令:
php delay.phpphp delayHanlde.phpphp queueBlpop.php
如有疑問(wèn)請(qǐng)留言或者到本站社區(qū)交流討論,感謝閱讀,希望能幫助到大家,謝謝大家對(duì)本站的支持!