真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

一篇詳解Redis--延時(shí)隊(duì)列

redis的 list 數(shù)據(jù)結(jié)構(gòu)常用來作為 異步消息隊(duì)列 使用,使用 rpush/lpush 操作 入隊(duì) ,使用 lpop/rpop 來操作 出隊(duì)

專注于為中小企業(yè)提供成都網(wǎng)站設(shè)計(jì)、成都網(wǎng)站建設(shè)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)睢縣免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動(dòng)了超過千家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

一篇詳解Redis -- 延時(shí)隊(duì)列

> rpush my-queue apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)


空隊(duì)列

  1. 如果隊(duì)列為空,客戶端會(huì)陷入 pop的死循環(huán) , 空輪詢 不僅拉高了 客戶端的CPU , Redis的QPS 也會(huì)被拉高
  2. 如果空輪詢的客戶端有幾十個(gè), Redis的慢查詢 也會(huì)顯著增加,可以嘗試讓客戶端線程 sleep 1s
  3. 但睡眠會(huì)導(dǎo)致消息的 延遲增大 ,可以使用 blpop/brpop (blocking, 阻塞讀 )
  4. 阻塞讀在隊(duì)列沒有數(shù)據(jù)時(shí),會(huì)立即進(jìn)入 休眠 狀態(tài),一旦有數(shù)據(jù)到來,會(huì)立即被 喚醒 , 消息延遲幾乎為0


空閑連接

  1. 如果線程一直阻塞在那里,Redis的客戶端連接就成了 閑置連接
  2. 閑置過久, 服務(wù)器 一般會(huì) 主動(dòng)斷開 連接, 減少閑置的資源占用 ,此時(shí) blpop/brpop 會(huì) 拋出異常


鎖沖突處理

  1. 分布式鎖 加鎖失敗 的處理策略
  2. 直接拋出異常 ,通知用戶稍后重試
  3. sleep 后再重試
  4. 將請(qǐng)求轉(zhuǎn)移到 延時(shí)隊(duì)列 ,過一會(huì)重試
  5. 拋出異常
  6. 這種方式比較適合由 用戶直接發(fā)起 的請(qǐng)求
  7. sleep
  8. sleep會(huì) 阻塞 當(dāng)前的消息處理線程,從而導(dǎo)致隊(duì)列的后續(xù)消息處理出現(xiàn) 延遲
  9. 如果 碰撞比較頻繁 ,sleep方案不合適
  10. 延時(shí)隊(duì)列
  11. 比較適合異步消息處理的場景,通過將當(dāng)前沖突的請(qǐng)求轉(zhuǎn)移到另一個(gè)隊(duì)列 延后處理 來 避免沖突


延時(shí)隊(duì)列

  1. 可以通過Redis的 zset 來實(shí)現(xiàn)延時(shí)隊(duì)列
  2. 將消息序列化成一個(gè)字符串作為zet的 value ,將該消息的 到期處理時(shí)間 作為 score
  3. 然后 多線程輪詢 zset獲取 到期的任務(wù) 進(jìn)行處理
  4. 多線程是為了保障 可用性 ,但同時(shí)要考慮 并發(fā)安全 ,確保 任務(wù)不能被多次執(zhí)行

public class RedisDelayingQueue {
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 private static class TaskItem {
 private String id;
 private T msg;
 }
 private Type taskType = new TypeReference>() {
 }.getType();
 private Jedis jedis;
 private String queueKey;
 public RedisDelayingQueue(Jedis jedis, String queueKey) {
 this.jedis = jedis;
 this.queueKey = queueKey;
 }
 public void delay(T msg) {
 TaskItem task = new TaskItem<>(UUID.randomUUID().toString(), msg);
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
 }
 public void loop() {
 // 可以進(jìn)一步優(yōu)化,通過Lua腳本將zrangeByScore和zrem統(tǒng)一挪到Redis服務(wù)端進(jìn)行原子化操作,減少搶奪失敗出現(xiàn)的資源浪費(fèi)
 while (!Thread.interrupted()) {
 // 只取一條
 Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
 if (values.isEmpty()) {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 break;
 }
 continue;
 }
 String s = values.iterator().next();
 if (jedis.zrem(queueKey, s) > 0) {
 // zrem是多線程多進(jìn)程爭奪任務(wù)的關(guān)鍵
 TaskItem task = JSON.parseObject(s, taskType);
 this.handleMsg(task.msg);
 }
 }
 }
 private void handleMsg(T msg) {
 try {
 System.out.println(msg);
 } catch (Throwable ignored) {
 // 一定要捕獲異常,避免因?yàn)閭€(gè)別任務(wù)處理問題導(dǎo)致循環(huán)異常退出
 }
 }
 public static void main(String[] args) {
 final RedisDelayingQueue queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
 Thread producer = new Thread() {
 @Override
 public void run() {
 for (int i = 0; i < 10; i++) {
 queue.delay("zhongmingmao" + i);
 }
 }
 };
 Thread consumer = new Thread() {
 @Override
 public void run() {
 queue.loop();
 }
 };
 producer.start();
 consumer.start();
 try {
 producer.join();
 Thread.sleep(6000);
 consumer.interrupt();
 consumer.join();
 } catch (InterruptedException ignored) {
 }
 }
}

網(wǎng)站名稱:一篇詳解Redis--延時(shí)隊(duì)列
鏈接分享:http://weahome.cn/article/pscpsj.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部