真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網站制作重慶分公司

怎么用Kotlin+RocketMQ實現(xiàn)延時消息-創(chuàng)新互聯(lián)

這篇文章主要講解了“怎么用Kotlin+RocketMQ實現(xiàn)延時消息”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么用Kotlin+RocketMQ實現(xiàn)延時消息”吧!

創(chuàng)新互聯(lián)公司自2013年創(chuàng)立以來,先為興平等服務建站,興平等地企業(yè),進行企業(yè)商務咨詢服務。為興平企業(yè)網站制作PC+手機+微官網三網同步一站式服務解決您的所有建站問題。

一. 延時消息

延時消息是指消息被發(fā)送以后,并不想讓消費者立即拿到消息,而是等待指定時間后,消費者才拿到這個消息進行消費。

使用延時消息的典型場景,例如:

在電商系統(tǒng)中,用戶下完訂單30分鐘內沒支付,則訂單可能會被取消。  在電商系統(tǒng)中,用戶七天內沒有評價商品,則默認好評。

這些場景對應的解決方案,包括:

輪詢遍歷數據庫記錄  JDK 的 DelayQueue  ScheduledExecutorService  基于 Quartz 的定時任務  基于 Redis 的 zset 實現(xiàn)延時隊列。

除此之外,還可以使用消息隊列來實現(xiàn)延時消息,例如 RocketMQ。

二. RocketMQ

RocketMQ 是一個分布式消息和流數據平臺,具有低延遲、高性能、高可靠性、萬億級容量和靈活的可擴展性。RocketMQ 是2012年阿里巴巴開源的第三代分布式消息中間件。

三. RocketMQ 實現(xiàn)延時消息

3.1 業(yè)務背景

我們的系統(tǒng)完成某項操作之后,會推送事件消息到業(yè)務方的接口。當我們調用業(yè)務方的通知接口返回值為成功時,表示本次推送消息成功;當返回值為失敗時,則會多次推送消息,直到返回成功為止(保證至少成功一次)。當我們推送失敗后,雖然會進行多次推送消息,但并不是立即進行。會有一定的延遲,并按照一定的規(guī)則進行推送消息。例如:1小時后嘗試推送、3小時后嘗試推送、1天后嘗試推送、3天后嘗試推送等等。因此,考慮使用延時消息實現(xiàn)該功能。

3.2 生產者(Producer)

生產者負責產生消息,生產者向消息服務器發(fā)送由業(yè)務應用程序系統(tǒng)生成的消息。

首先,定義一個支持延時發(fā)送的 AbstractProducer。

abstract class AbstractProducer :ProducerBean() {  var producerId: String? = null  var topic: String? = null  var tag: String?=null  var timeoutMillis: Int? = null  var delaySendTimeMills: Long? = null  val log = LogFactory.getLog(this.javaClass)  open fun sendMessage(messageBody: Any, tag: String) {    val msgBody = JSON.toJSONString(messageBody)    val message = Message(topic, tag, msgBody.toByteArray())    if (delaySendTimeMills != null) {      val startDeliverTime = System.currentTimeMillis() + delaySendTimeMills!!      message.startDeliverTime = startDeliverTime      log.info( "send delay message producer startDeliverTime:${startDeliverTime}currentTime :${System.currentTimeMillis()}")    }    val logMessageId = buildLogMessageId(message)    try {      val sendResult = send(message)      log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)    } catch (e: Exception) {      log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)    }  }  fun buildLogMessageId(message: Message): String {    return "topic: " + message.topic + "\n" +        "producer: " + producerId + "\n" +        "tag: " + message.tag + "\n" +        "key: " + message.key + "\n"  }}

根據業(yè)務需要,增加一個支持重試機制的 Producer

@Component@ConfigurationProperties("mqs.ons.producers.xxx-producer")@Configuration@Dataclass CleanReportPushEventProducer :AbstractProducer() {  lateinit var delaySecondList:List  fun sendMessage(messageBody: CleanReportPushEventMessage){    //重試超過次數之后不再發(fā)事件    if (delaySecondList!=null) {      if(messageBody.times>=delaySecondList.size){        return      }      val msgBody = JSON.toJSONString(messageBody)      val message = Message(topic, tag, msgBody.toByteArray())      val delayTimeMills = delaySecondList[messageBody.times]*1000L      message.startDeliverTime = System.currentTimeMillis() + delayTimeMills      log.info( "messageBody: " + msgBody+ "startDeliverTime: "+message.startDeliverTime )      val logMessageId = buildLogMessageId(message)      try {        val sendResult = send(message)        log.info(logMessageId + "producer messageId: " + sendResult.getMessageId() + "\n" + "messageBody: " + msgBody)      } catch (e: Exception) {        log.error(logMessageId + "messageBody: " + msgBody + "\n" + " error: " + e.message, e)      }    }  }}

在 CleanReportPushEventProducer 中,超過了重試的次數就不會再發(fā)送消息了。

每一次延時消息的時間也會不同,因此需要根據重試的次數來獲取這個delayTimeMills 。

通過 System.currentTimeMillis() + delayTimeMills 可以設置 message 的 startDeliverTime。然后調用 send(message) 即可發(fā)送延時消息。

我們使用商用版的 RocketMQ,因此支持精度為秒級別的延遲消息。在開源版本中,RocketMQ 只支持18個特定級別的延遲消息。:(

3.3 消費者(Consumer)

消費者負責消費消息,消費者從消息服務器拉取信息并將其輸入用戶應用程序。

定義 Push 類型的 AbstractConsumer:

@Dataabstract class AbstractConsumer ():MessageListener{  var consumerId: String? = null  lateinit var subscribeOptions: List  var threadNums: Int? = null  val log = LogFactory.getLog(this.javaClass)  override fun consume(message: Message, context: ConsumeContext): Action {    val logMessageId = buildLogMessageId(message)    val body = String(message.body)    try {      log.info(logMessageId + " body: " + body)      val result = consumeInternal(message, context, JSON.parseObject(body, getMessageBodyType(message.tag)))      log.info(logMessageId + " result: " + result.name)      return result    } catch (e: Exception) {      if (message.reconsumeTimes >= 3) {        log.error(logMessageId + " error: " + e.message, e)      }      return Action.ReconsumeLater    }  }  abstract fun getMessageBodyType(tag: String): Type?  abstract fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action  protected fun buildLogMessageId(message: Message): String {    return "topic: " + message.topic + "\n" +        "consumer: " + consumerId + "\n" +        "tag: " + message.tag + "\n" +        "key: " + message.key + "\n" +        "MsgId:" + message.msgID + "\n" +        "BornTimestamp" + message.bornTimestamp + "\n" +        "StartDeliverTime:" + message.startDeliverTime + "\n" +        "ReconsumeTimes:" + message.reconsumeTimes + "\n"  }}

再定義具體的消費者,并且在消費失敗之后能夠再發(fā)送一次消息。

@Configuration@ConfigurationProperties("mqs.ons.consumers.clean-report-push-event-consumer")@Dataclass CleanReportPushEventConsumer(val cleanReportService: CleanReportService,val eventProducer:CleanReportPushEventProducer):AbstractConsumer() {  val logger: Logger = LoggerFactory.getLogger(this.javaClass)  override fun consumeInternal(message: Message, context: ConsumeContext, obj: Any): Action {    if(obj is CleanReportPushEventMessage){      //清除事件      logger.info("consumer clean-report event report_id:${obj.id} ")      //消費失敗之后再發(fā)送一次消息      if(!cleanReportService.sendCleanReportEvent(obj.id)){        val times = obj.times+1        eventProducer.sendMessage(CleanReportPushEventMessage(obj.id,times))      }    }    return Action.CommitMessage  }  override fun getMessageBodyType(tag: String): Type? {    return CleanReportPushEventMessage::class.java  }}

其中,cleanReportService 的 sendCleanReportEvent() 會通過 http 的方式調用業(yè)務方提供的接口,進行事件消息的推送。如果推送失敗了,則會進行下一次的推送。(這里使用了 eventProducer 的 sendMessage() 方法再次投遞消息,是因為要根據調用的http接口返回的內容來判斷消息是否發(fā)送成功。)

最后,定義 ConsumerFactory

@Componentclass ConsumerFactory(val consumers: List,val aliyunOnsOptions: AliyunOnsOptions) {  val logger: Logger = LoggerFactory.getLogger(this.javaClass)  @PostConstruct  fun start() {    CompletableFuture.runAsync{      consumers.stream().forEach {        val properties = buildProperties(it.consumerId!!, it.threadNums)        val consumer = ONSFactory.createConsumer(properties)        if (it.subscribeOptions != null && !it.subscribeOptions!!.isEmpty()) {          for (options in it.subscribeOptions!!) {            consumer.subscribe(options.topic, options.tag, it)          }          consumer.start()          val message = "\n".plus(              it.subscribeOptions!!.stream().map{ a -> String.format("topic: %s, tag: %s has been started", a.topic, a.tag)}                  .collect(Collectors.toList()))          logger.info(String.format("consumer: %s\n", message))        }      }    }  }  private fun buildProperties(consumerId: String,threadNums: Int?): Properties {    val properties = Properties()    properties.put(PropertyKeyConst.ConsumerId, consumerId)    properties.put(PropertyKeyConst.AccessKey, aliyunOnsOptions.accessKey)    properties.put(PropertyKeyConst.SecretKey, aliyunOnsOptions.secretKey)    if (StringUtils.isNotEmpty(aliyunOnsOptions.onsAddr)) {      properties.put(PropertyKeyConst.ONSAddr, aliyunOnsOptions.onsAddr)    } else {      // 測試環(huán)境接入RocketMQ      properties.put(PropertyKeyConst.NAMESRV_ADDR, aliyunOnsOptions.nameServerAddress)    }    properties.put(PropertyKeyConst.ConsumeThreadNums, threadNums!!)    return properties  }}

四. 總結

正如本文開頭曾介紹過,可以使用多種方式來實現(xiàn)延時消息。然而,我們的系統(tǒng)本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 實現(xiàn)延時消息不失為一種可靠而又方便的方式。

感謝各位的閱讀,以上就是“怎么用Kotlin+RocketMQ實現(xiàn)延時消息”的內容了,經過本文的學習后,相信大家對怎么用Kotlin+RocketMQ實現(xiàn)延時消息這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是創(chuàng)新互聯(lián)網站建設公司,,小編將為大家推送更多相關知識點的文章,歡迎關注!


標題名稱:怎么用Kotlin+RocketMQ實現(xiàn)延時消息-創(chuàng)新互聯(lián)
鏈接URL:http://weahome.cn/article/cdicoh.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部