redisson中怎么實(shí)現(xiàn)一個(gè)延時(shí)消息組件,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。
創(chuàng)新互聯(lián)專注于長(zhǎng)葛企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),商城網(wǎng)站制作。長(zhǎng)葛網(wǎng)站建設(shè)公司,為長(zhǎng)葛等地區(qū)提供建站服務(wù)。全流程按需定制網(wǎng)站,專業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
定義主題隊(duì)列注解
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface RMessage { /** * 消息隊(duì)列 * @return */ String queue(); /** * 主題 * @return */ String topic() default "system"; }
springboot啟動(dòng)監(jiān)聽初始化任務(wù)隊(duì)列與消息主題,消費(fèi)者訂閱主題
@Slf4j @Component public class RMessageListener implements ApplicationListener{ /** * consumer monitoringMethod monitorMessage */ private final static String METHOD_MONITOR_MESSAGE = "monitorMessage"; /** * redisson topic name */ private final static String ATTRIBUTE_NAME_TOPIC = "topic"; /** * redisson messageQueue name */ private final static String ATTRIBUTE_NAME_QUEUE = "queue"; /** * redisson queue map */ public static Map > messageQueue = new ConcurrentHashMap<>(); /** * redisson offQueue map */ public static Map > offQueue = new ConcurrentHashMap<>(); /** * redisson topic map */ public static Map topicMap = new ConcurrentHashMap<>(); @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false); provider.addIncludeFilter(new AnnotationTypeFilter(RMessage.class)); String basePackage = applicationStartedEvent.getSpringApplication().getMainApplicationClass().getPackage().getName(); Set beanDefinitions = provider.findCandidateComponents(basePackage); ConfigurableListableBeanFactory beanFactory = applicationStartedEvent.getApplicationContext().getBeanFactory(); mqInit(beanDefinitions, beanFactory); provider.clearCache(); provider.resetFilters(false); provider.addIncludeFilter(new AssignableTypeFilter(RMessageConsumer.class)); Set consumers = provider.findCandidateComponents(basePackage); consumerSubscribe(beanFactory, consumers); } /** * consumer subscription news * * @param beanFactory * @param consumers */ private void consumerSubscribe(ConfigurableListableBeanFactory beanFactory, Set consumers) { consumers.forEach(beanDefinition -> { log.info("rMessage init consumer {}",beanDefinition.getBeanClassName()); try { Object bean = beanFactory.getBean(Class.forName(beanDefinition.getBeanClassName())); Method method = bean.getClass().getMethod(METHOD_MONITOR_MESSAGE); ReflectionUtils.invokeMethod(method,bean); } catch (ClassNotFoundException | NoSuchMethodException e) { e.printStackTrace(); } }); } /** * Parameter initialization * * @param beanDefinitions * @param beanFactory */ private void mqInit(Set beanDefinitions,final ConfigurableListableBeanFactory beanFactory) { RedissonClient redissonClient = beanFactory.getBean(RedissonClient.class); beanDefinitions.stream().filter(beanDefinition -> beanDefinition instanceof AnnotatedBeanDefinition).forEach(beanDefinition->{ AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition)beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); MergedAnnotation mergedAnnotation = annotationMetadata.getAnnotations().get(RMessage.class); String queryName = mergedAnnotation.getString(ATTRIBUTE_NAME_QUEUE); String topicName = mergedAnnotation.getString(ATTRIBUTE_NAME_TOPIC); String shortName = topicName+"."+queryName; RBlockingDeque super Serializable> blockingDeque = redissonClient.getBlockingDeque(shortName); messageQueue.put(shortName,blockingDeque); RDelayedQueue super Serializable> delayedQueue = redissonClient.getDelayedQueue(blockingDeque); offQueue.put(shortName,delayedQueue); RTopic topic = redissonClient.getTopic(topicName); topicMap.put(shortName,topic); }); } }
抽象隊(duì)列主題列表
public abstract class AbstractQueue { Map> offQueue = RMessageListener.offQueue; Map > messageQueue = RMessageListener.messageQueue; Map topicMap = RMessageListener.topicMap; protected RDelayedQueue super Serializable> getRDelayedQueue() { return offQueue.get(shortName()); } protected RBlockingDeque super Serializable> getMessageQueue() { return messageQueue.get(shortName()); } private String shortName() { Annotation[] annotations = this.getClass().getAnnotations(); RMessage rMessage = Arrays.stream(annotations).filter(annotation -> annotation instanceof RMessage) .map(annotation -> (RMessage)annotation).findAny().get(); String queryName = rMessage.queue(); String topicName = rMessage.topic(); return topicName+"."+queryName; } protected RTopic getTopic() { return topicMap.get(shortName()); } }
抽象生產(chǎn)者模板
@Slf4j public abstract class RMessageProducerextends AbstractQueue { /** * 發(fā)送延時(shí)消息 * @param message * @param delay * @param timeUnit */ public void sendMessage(T message, long delay, TimeUnit timeUnit) { log.info("rMessage sendMessage: {}, delayTime {}",message.toString(),delay+timeUnit.name()); super.getRDelayedQueue().offer(message,delay,timeUnit); super.getTopic().publish(this.hashCode()); } /** * 發(fā)送異步消息 * @param message */ public void sendMessage(T message) { this.sendMessage(message,0,TimeUnit.MILLISECONDS); } }
抽象消費(fèi)者模板
@Slf4j public abstract class RMessageConsumerextends AbstractQueue { public void monitorMessage() { CompletableFuture.runAsync(this::pastConsumption); super.getTopic().addListener(Object.class,(c,m)-> { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } }); } protected abstract void useMessage(T message); public void pastConsumption() { while (super.getRDelayedQueue().size() > 0 || super.getMessageQueue().size() > 0) { try { Object take = super.getMessageQueue().take(); log.info("rMessage receiveMessage: {}, receiving time {}",take.toString(), LocalDateTime.now()); this.useMessage((T)take); } catch (InterruptedException e) { e.printStackTrace(); } } } }
生產(chǎn)者
@RMessage(queue = "redisQuery",topic = "order") public class RedissonProducer extends RMessageProducer{ } @RestController @RequestMapping("producer") @AllArgsConstructor public class ProducerController { private RedissonProducer redissonProducer; @PostMapping public String send() { HashMap map = new HashMap<>(); map.put("name","張三"); map.put("time", "測(cè)試順序第二條"+LocalDateTime.now()); redissonProducer.sendMessage(map,5, TimeUnit.MINUTES); return "send msg"; } }
消費(fèi)者
@RMessage(queue = "redisQuery",topic = "order") public class RedissonConsumer extends RMessageConsumer{ @Override protected void useMessage(HashMap message) { System.out.println("接收到消息:"+message); } }
關(guān)于 Redisson中怎么實(shí)現(xiàn)一個(gè)延時(shí)消息組件問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。