真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

Redis如何實(shí)現(xiàn)延遲隊(duì)列?方法介紹

延遲隊(duì)列,顧名思義它是一種帶有延遲功能的消息隊(duì)列。那么,是在什么場(chǎng)景下我才需要這樣的隊(duì)列呢?

創(chuàng)新互聯(lián)建站服務(wù)項(xiàng)目包括喀喇沁網(wǎng)站建設(shè)、喀喇沁網(wǎng)站制作、喀喇沁網(wǎng)頁(yè)制作以及喀喇沁網(wǎng)絡(luò)營(yíng)銷策劃等。多年來(lái),我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術(shù)優(yōu)勢(shì)、行業(yè)經(jīng)驗(yàn)、深度合作伙伴關(guān)系等,向廣大中小型企業(yè)、政府機(jī)構(gòu)等提供互聯(lián)網(wǎng)行業(yè)的解決方案,喀喇沁網(wǎng)站推廣取得了明顯的社會(huì)效益與經(jīng)濟(jì)效益。目前,我們服務(wù)的客戶以成都為中心已經(jīng)輻射到喀喇沁省份的部分城市,未來(lái)相信會(huì)繼續(xù)擴(kuò)大服務(wù)區(qū)域并繼續(xù)獲得客戶的支持與信任!

1. 背景

我們先看看以下業(yè)務(wù)場(chǎng)景:

當(dāng)訂單一直處于未支付狀態(tài)時(shí),如何及時(shí)的關(guān)閉訂單如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功在訂單長(zhǎng)時(shí)間沒有收到下游系統(tǒng)的狀態(tài)通知的時(shí)候,如何實(shí)現(xiàn)階梯式的同步訂單狀態(tài)的策略在系統(tǒng)通知上游系統(tǒng)支付成功終態(tài)時(shí),上游系統(tǒng)返回通知失敗,如何進(jìn)行異步通知實(shí)行分頻率發(fā)送:15s 3m 10m 30m 30m 1h 2h 6h 15h

1.1 解決方案

最簡(jiǎn)單的方式,定時(shí)掃表。例如對(duì)于訂單支付失效要求比較高的,每2S掃表一次檢查過(guò)期的訂單進(jìn)行主動(dòng)關(guān)單操作。優(yōu)點(diǎn)是簡(jiǎn)單,缺點(diǎn)是每分鐘全局掃表,浪費(fèi)資源,如果遇到表數(shù)據(jù)訂單量即將過(guò)期的訂單量很大,會(huì)造成關(guān)單延遲。

使用RabbitMq或者其他MQ改造實(shí)現(xiàn)延遲隊(duì)列,優(yōu)點(diǎn)是,開源,現(xiàn)成的穩(wěn)定的實(shí)現(xiàn)方案,缺點(diǎn)是:MQ是一個(gè)消息中間件,如果團(tuán)隊(duì)技術(shù)棧本來(lái)就有MQ,那還好,如果不是,那為了延遲隊(duì)列而去部署一套MQ成本有點(diǎn)大

使用Redis的zset、list的特性,我們可以利用redis來(lái)實(shí)現(xiàn)一個(gè)延遲隊(duì)列RedisDelayQueue

2. 設(shè)計(jì)目標(biāo)

實(shí)時(shí)性:允許存在一定時(shí)間的秒級(jí)誤差高可用性:支持單機(jī)、支持集群支持消息刪除:業(yè)務(wù)會(huì)隨時(shí)刪除指定消息消息可靠性:保證至少被消費(fèi)一次消息持久化:基于Redis自身的持久化特性,如果Redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過(guò)可以做主備和集群保證。這個(gè)可以考慮后續(xù)優(yōu)化將消息持久化到MangoDB中

3. 設(shè)計(jì)方案

設(shè)計(jì)主要包含以下幾點(diǎn):

將整個(gè)Redis當(dāng)做消息池,以KV形式存儲(chǔ)消息使用ZSET做優(yōu)先隊(duì)列,按照Score維持優(yōu)先級(jí)使用LIST結(jié)構(gòu),以先進(jìn)先出的方式消費(fèi)ZSET和LIST存儲(chǔ)消息地址(對(duì)應(yīng)消息池的每個(gè)KEY)自定義路由對(duì)象,存儲(chǔ)ZSET和LIST名稱,以點(diǎn)對(duì)點(diǎn)的方式將消息從ZSET路由到正確的LIST使用定時(shí)器維護(hù)路由根據(jù)TTL規(guī)則實(shí)現(xiàn)消息延遲

3.1 設(shè)計(jì)圖

還是基于有贊的延遲隊(duì)列設(shè)計(jì),進(jìn)行優(yōu)化改造及代碼實(shí)現(xiàn)。有贊設(shè)計(jì)

3.2 數(shù)據(jù)結(jié)構(gòu)

ZING:DELAY_QUEUE:JOB_POOL是一個(gè)Hash_Table結(jié)構(gòu),里面存儲(chǔ)了所有延遲隊(duì)列的信息。KV結(jié)構(gòu):K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數(shù)據(jù),消費(fèi)的時(shí)候回傳ZING:DELAY_QUEUE:BUCKET延遲隊(duì)列的有序集合ZSET,存放K=ID和需要的執(zhí)行時(shí)間戳,根據(jù)時(shí)間戳排序ZING:DELAY_QUEUE:QUEUELIST結(jié)構(gòu),每個(gè)Topic一個(gè)LIST,list存放的都是當(dāng)前需要被消費(fèi)的JOB


圖片僅供參考,基本可以描述整個(gè)流程的執(zhí)行過(guò)程,圖片源于文末的參考博客中

3.3 任務(wù)的生命周期

新增一個(gè)JOB,會(huì)在ZING:DELAY_QUEUE:JOB_POOL中插入一條數(shù)據(jù),記錄了業(yè)務(wù)方消費(fèi)方。ZING:DELAY_QUEUE:BUCKET也會(huì)插入一條記錄,記錄執(zhí)行的時(shí)間戳搬運(yùn)線程會(huì)去ZING:DELAY_QUEUE:BUCKET中查找哪些執(zhí)行時(shí)間戳的RunTimeMillis比現(xiàn)在的時(shí)間小,將這些記錄全部刪除;同時(shí)會(huì)解析出每個(gè)任務(wù)的Topic是什么,然后將這些任務(wù)PUSH到TOPIC對(duì)應(yīng)的列表ZING:DELAY_QUEUE:QUEUE中每個(gè)TOPIC的LIST都會(huì)有一個(gè)監(jiān)聽線程去批量獲取LIST中的待消費(fèi)數(shù)據(jù),獲取到的數(shù)據(jù)全部扔給這個(gè)TOPIC的消費(fèi)線程池消費(fèi)線程池執(zhí)行會(huì)去ZING:DELAY_QUEUE:JOB_POOL查找數(shù)據(jù)結(jié)構(gòu),返回給回調(diào)結(jié)構(gòu),執(zhí)行回調(diào)方法。

3.4 設(shè)計(jì)要點(diǎn)

3.4.1 基本概念

JOB:需要異步處理的任務(wù),是延遲隊(duì)列里的基本單元Topic:一組相同類型Job的集合(隊(duì)列)。供消費(fèi)者來(lái)訂閱

3.4.2 消息結(jié)構(gòu)

每個(gè)JOB必須包含以下幾個(gè)屬性

jobId:Job的唯一標(biāo)識(shí)。用來(lái)檢索和刪除指定的Job信息topic:Job類型。可以理解成具體的業(yè)務(wù)名稱delay:Job需要延遲的時(shí)間。單位:秒。(服務(wù)端會(huì)將其轉(zhuǎn)換為絕對(duì)時(shí)間)body:Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲(chǔ)retry:失敗重試次數(shù)url:通知URL

3.5 設(shè)計(jì)細(xì)節(jié)

3.5.1 如何快速消費(fèi)ZING:DELAY_QUEUE:QUEUE

最簡(jiǎn)單的實(shí)現(xiàn)方式就是使用定時(shí)器進(jìn)行秒級(jí)掃描,為了保證消息執(zhí)行的時(shí)效性,可以設(shè)置每1S請(qǐng)求Redis一次,判斷隊(duì)列中是否有待消費(fèi)的JOB。但是這樣會(huì)存在一個(gè)問(wèn)題,如果queue中一直沒有可消費(fèi)的JOB,那頻繁的掃描就失去了意義,也浪費(fèi)了資源,幸好LIST中有一個(gè)BLPOP阻塞原語(yǔ),如果list中有數(shù)據(jù)就會(huì)立馬返回,如果沒有數(shù)據(jù)就會(huì)一直阻塞在那里,直到有數(shù)據(jù)返回,可以設(shè)置阻塞的超時(shí)時(shí)間,超時(shí)會(huì)返回NULL;具體的實(shí)現(xiàn)方式及策略會(huì)在代碼中進(jìn)行具體的實(shí)現(xiàn)介紹

3.5.2 避免定時(shí)導(dǎo)致的消息重復(fù)搬運(yùn)及消費(fèi)

使用Redis的分布式鎖來(lái)控制消息的搬運(yùn),從而避免消息被重復(fù)搬運(yùn)導(dǎo)致的問(wèn)題使用分布式鎖來(lái)保證定時(shí)器的執(zhí)行頻率

4. 核心代碼實(shí)現(xiàn)

4.1 技術(shù)說(shuō)明

技術(shù)棧:SpringBoot,Redisson,Redis,分布式鎖,定時(shí)器

注意:本項(xiàng)目沒有實(shí)現(xiàn)設(shè)計(jì)方案中的多Queue消費(fèi),只開啟了一個(gè)QUEUE,這個(gè)待以后優(yōu)化

4.2 核心實(shí)體

4.2.1 Job新增對(duì)象

/**
 * 消息結(jié)構(gòu)
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一標(biāo)識(shí)。用來(lái)檢索和刪除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱
     */
    @NotBlank
    private String topic;

    /**
     * Job需要延遲的時(shí)間。單位:秒。(服務(wù)端會(huì)將其轉(zhuǎn)換為絕對(duì)時(shí)間)
     */
    private Long delay;

    /**
     * Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲(chǔ)
     */
    @NotBlank
    private String body;

    /**
     * 失敗重試次數(shù)
     */
    private int retry = 0;

    /**
     * 通知URL
     */
    @NotBlank
    private String url;
}

4.2.2 Job刪除對(duì)象

/**
 * 消息結(jié)構(gòu)
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * Job的唯一標(biāo)識(shí)。用來(lái)檢索和刪除指定的Job信息
     */
    @NotBlank
    private String jobId;


    /**
     * Job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱
     */
    @NotBlank
    private String topic;
}

4.3 搬運(yùn)線程

/**
 * 搬運(yùn)線程
 *
 * @author 睜眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 啟動(dòng)定時(shí)開啟搬運(yùn)JOB信息
     */
    @Scheduled(cron = "*/1 * * * * *")
    public void carryJobToQueue() {
        System.out.println("carryJobToQueue --->");
        RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            RScoredSortedSet bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
            long now = System.currentTimeMillis();
            Collection jobCollection = bucketSet.valueRange(0, false, now, true);
            List jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
            RList readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
            readyQueue.addAll(jobList);
            bucketSet.removeAllAsync(jobList);
        } catch (InterruptedException e) {
            log.error("carryJobToQueue error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}

4.4 消費(fèi)線程

@Slf4j
@Component
public class ReadyQueueContext {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private ConsumerService consumerService;

    /**
     * TOPIC消費(fèi)線程
     */
    @PostConstruct
    public void startTopicConsumer() {
        TaskManager.doTask(this::runTopicThreads, "開啟TOPIC消費(fèi)線程");
    }

    /**
     * 開啟TOPIC消費(fèi)線程
     * 將所有可能出現(xiàn)的異常全部catch住,確保While(true)能夠不中斷
     */
    @SuppressWarnings("InfiniteLoopStatement")
    private void runTopicThreads() {
        while (true) {
            RLock lock = null;
            try {
                lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
            } catch (Exception e) {
                log.error("runTopicThreads getLock error", e);
            }
            try {
                if (lock == null) {
                    continue;
                }
                // 分布式鎖時(shí)間比Blpop阻塞時(shí)間多1S,避免出現(xiàn)釋放鎖的時(shí)候,鎖已經(jīng)超時(shí)釋放,unlock報(bào)錯(cuò)
                boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
                if (!lockFlag) {
                    continue;
                }

                // 1. 獲取ReadyQueue中待消費(fèi)的數(shù)據(jù)
                RBlockingQueue queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
                String topicId = queue.poll(60, TimeUnit.SECONDS);
                if (StringUtils.isEmpty(topicId)) {
                    continue;
                }

                // 2. 獲取job元信息內(nèi)容
                RMap jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
                Job job = jobPoolMap.get(topicId);

                // 3. 消費(fèi)
                FutureTask taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費(fèi)JobId-->" + job.getJobId());
                if (taskResult.get()) {
                    // 3.1 消費(fèi)成功,刪除JobPool和DelayBucket的job信息
                    jobPoolMap.remove(topicId);
                } else {
                    int retrySum = job.getRetry() + 1;
                    // 3.2 消費(fèi)失敗,則根據(jù)策略重新加入Bucket

                    // 如果重試次數(shù)大于5,則將jobPool中的數(shù)據(jù)刪除,持久化到DB
                    if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
                        jobPoolMap.remove(topicId);
                        continue;
                    }
                    job.setRetry(retrySum);
                    long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
                    log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
                    RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
                    delayBucket.add(nextTime, topicId);
                    // 3.3 更新元信息失敗次數(shù)
                    jobPoolMap.put(topicId, job);
                }
            } catch (Exception e) {
                log.error("runTopicThreads error", e);
            } finally {
                if (lock != null) {
                    try {
                        lock.unlock();
                    } catch (Exception e) {
                        log.error("runTopicThreads unlock error", e);
                    }
                }
            }
        }
    }
}

4.5 添加及刪除JOB

/**
 * 提供給外部服務(wù)的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

    @Autowired
    private RedissonClient redissonClient;


    /**
     * 添加job元信息
     *
     * @param job 元信息
     */
    @Override
    public void addJob(Job job) {

        RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

            // 1. 將job添加到 JobPool中
            RMap jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            if (jobPool.get(topicId) != null) {
                throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
            }

            jobPool.put(topicId, job);

            // 2. 將job添加到 DelayBucket中
            RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.add(job.getDelay(), topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }


    /**
     * 刪除job信息
     *
     * @param job 元信息
     */
    @Override
    public void deleteJob(JobDie jobDie) {

        RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
        try {
            boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
            if (!lockFlag) {
                throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
            }
            String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

            RMap jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
            jobPool.remove(topicId);

            RScoredSortedSet delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
            delayBucket.remove(topicId);
        } catch (InterruptedException e) {
            log.error("addJob error", e);
        } finally {
            if (lock != null) {
                lock.unlock();
            }
        }
    }
}

5. 待優(yōu)化的內(nèi)容

目前只有一個(gè)Queue隊(duì)列存放消息,當(dāng)需要消費(fèi)的消息大量堆積后,會(huì)影響消息通知的時(shí)效。改進(jìn)的辦法是,開啟多個(gè)Queue,進(jìn)行消息路由,再開啟多個(gè)消費(fèi)線程進(jìn)行消費(fèi),提供吞吐量消息沒有進(jìn)行持久化,存在風(fēng)險(xiǎn),后續(xù)會(huì)將消息持久化到MangoDB中

6. 源碼

更多詳細(xì)源碼請(qǐng)?jiān)谙旅娴刂分蝎@取

RedisDelayQueue實(shí)現(xiàn)zing-delay-queue(https://gitee.com/whyCodeData/zing-project/tree/master/zing-delay-queue)RedissonStarterredisson-spring-boot-starter(https://gitee.com/whyCodeData/zing-project/tree/master/zing-starter/redisson-spring-boot-starter)項(xiàng)目應(yīng)用zing-pay(https://gitee.com/whyCodeData/zing-pay)

7. 參考

https://tech.youzan.com/queuing_delay/https://blog.csdn.net/u010634066/article/details/98864764

更多redis知識(shí),請(qǐng)關(guān)注:redis入門教程欄目。
文章題目:Redis如何實(shí)現(xiàn)延遲隊(duì)列?方法介紹
轉(zhuǎn)載注明:http://weahome.cn/article/cjshdd.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部