rocketmq中ListenerContainerConfiguration的作用是什么,相信很多沒有經(jīng)驗的人對此束手無策,為此本文總結(jié)了問題出現(xiàn)的原因和解決方法,通過這篇文章希望你能解決這個問題。
創(chuàng)新互聯(lián)專注于企業(yè)營銷型網(wǎng)站建設、網(wǎng)站重做改版、藤縣網(wǎng)站定制設計、自適應品牌網(wǎng)站建設、HTML5建站、商城網(wǎng)站建設、集團公司官網(wǎng)建設、成都外貿(mào)網(wǎng)站建設公司、高端網(wǎng)站制作、響應式網(wǎng)頁設計等建站業(yè)務,價格優(yōu)惠性價比高,為藤縣等各大城市提供網(wǎng)站開發(fā)制作服務。
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@Configuration public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton { private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class); private ConfigurableApplicationContext applicationContext; private AtomicLong counter = new AtomicLong(0); private StandardEnvironment environment; private RocketMQProperties rocketMQProperties; private ObjectMapper objectMapper; public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper, StandardEnvironment environment, RocketMQProperties rocketMQProperties) { this.objectMapper = rocketMQMessageObjectMapper; this.environment = environment; this.rocketMQProperties = rocketMQProperties; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @Override public void afterSingletonsInstantiated() { Mapbeans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext; genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; } private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } } }
ListenerContainerConfiguration實現(xiàn)了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法會獲取標注了RocketMQMessageListener注解的bean,然后挨個執(zhí)行registerContainer
registerContainer方法首先判斷該bean是否是RocketMQListener的實現(xiàn)類,不是則拋出IllegalStateException;接著獲取RocketMQMessageListener注解的信息,判斷是否設置了不支持的屬性;之后通過createRocketMQListenerContainer創(chuàng)建DefaultRocketMQListenerContainer并注冊到applicationContext,然后對于沒有running的container執(zhí)行start方法
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; /** * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * * See here for further discussion. */ String consumerGroup(); /** * Topic name. */ String topic(); /** * Control how to selector message. * * @see SelectorType */ SelectorType selectorType() default SelectorType.TAG; /** * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */ String selectorExpression() default "*"; /** * Control consume mode, you can choice receive message concurrently or orderly. */ ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; /** * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */ MessageModel messageModel() default MessageModel.CLUSTERING; /** * Max consumer thread number. */ int consumeThreadMax() default 64; /** * Max consumer timeout, default 30s. */ long consumeTimeout() default 30000L; /** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */ String secretKey() default SECRET_KEY_PLACEHOLDER; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; /** * The property of "name-server". */ String nameServer() default NAME_SERVER_PLACEHOLDER; /** * The property of "access-channel". */ String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
RocketMQMessageListener注解定義了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel屬性
ListenerContainerConfiguration實現(xiàn)了ApplicationContextAware、SmartInitializingSingleton接口
其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法會獲取標注了RocketMQMessageListener注解的bean,然后挨個執(zhí)行registerContainer
registerContainer方法首先判斷該bean是否是RocketMQListener的實現(xiàn)類,不是則拋出IllegalStateException;接著獲取RocketMQMessageListener注解的信息,判斷是否設置了不支持的屬性;之后通過createRocketMQListenerContainer創(chuàng)建DefaultRocketMQListenerContainer并注冊到applicationContext,然后對于沒有running的container執(zhí)行start方法
看完上述內(nèi)容,你們掌握rocketmq中ListenerContainerConfiguration的作用是什么的方法了嗎?如果還想學到更多技能或想了解更多相關內(nèi)容,歡迎關注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!