這篇文章給大家分享的是有關(guān)Spring Cloud Stream異常處理的示例分析的內(nèi)容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
鹽津網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項目制作,到程序開發(fā),運營維護(hù)。創(chuàng)新互聯(lián)公司自2013年起到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)公司。
應(yīng)用處理
當(dāng)消費者在處理接收到的消息時,有可能會由于某些原因而拋出異常。若希望對拋出來的異常進(jìn)行處理的話,就需要采取一些異常處理手段,異常處理的方式可分為三種:應(yīng)用層面的處理、系統(tǒng)層面的處理以及通過RetryTemplate進(jìn)行處理。
局部處理
Stream相關(guān)的配置內(nèi)容如下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: input: destination: stream-test-topic group: binder-group
所謂局部處理就是針對指定的channel進(jìn)行處理,需要定義一個處理異常的方法,并在該方法上添加@ServiceActivator注解,該注解有一個inputChannel屬性,用于指定對哪個channel進(jìn)行處理,格式為{destination}.{group}.errors。具體代碼如下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; import org.springframework.stereotype.Service; /** * 消費者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(Sink.INPUT) public void receive1(String messageBody) { log.info("消費消息,messageBody = {}", messageBody); throw new IllegalArgumentException("參數(shù)錯誤"); } /** * 處理局部異常的方法 * * @param errorMessage 異常消息對象 */ @ServiceActivator( // 通過特定的格式指定處理哪個channel的異常 inputChannel = "stream-test-topic.binder-group.errors" ) public void handleError(ErrorMessage errorMessage) { // 獲取異常對象 Throwable errorMessagePayload = errorMessage.getPayload(); log.error("發(fā)生異常", errorMessagePayload); // 獲取消息體 Message> originalMessage = errorMessage.getOriginalMessage(); if (originalMessage != null) { log.error("消息體: {}", originalMessage.getPayload()); } else { log.error("消息體為空"); } } }
全局處理
全局處理則是可以處理所有channel拋出來的異常,所有的channel拋出異常后會生成一個ErrorMessage對象,即錯誤消息。錯誤消息會被放到一個專門的channel里,這個channel就是errorChannel。所以通過監(jiān)聽errorChannel就可以實現(xiàn)全局異常的處理。具體代碼如下:
@StreamListener(Sink.INPUT) public void receive1(String messageBody) { log.info("消費消息,messageBody = {}", messageBody); throw new IllegalArgumentException("參數(shù)錯誤"); } /** * 處理全局異常的方法 * * @param errorMessage 異常消息對象 */ @StreamListener("errorChannel") public void handleError(ErrorMessage errorMessage) { log.error("發(fā)生異常. errorMessage = {}", errorMessage); }
系統(tǒng)處理
系統(tǒng)處理方式,因消息中間件的不同而異。如果應(yīng)用層面沒有配置錯誤處理,那么error將會被傳播給binder,而binder則會將error回傳給消息中間件。消息中間件可以選擇:
丟棄消息:錯誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用于生產(chǎn)
requeue(重新排隊,從而重新處理)
將失敗的消息發(fā)送給DLQ(死信隊列)
DLQ
目前RabbitMQ對DLQ的支持比較好,這里以RabbitMQ為例,只需要添加DLQ相關(guān)的配置:
spring: cloud: stream: bindings: input: destination: stream-test-topic group: binder-group rabbit: bindings: input: consumer: # 自動將失敗的消息發(fā)送給DLQ auto-bind-dlq: true
消息消費失敗后,就會放入死信隊列。在控制臺操作一下,即可將死信放回消息隊列,這樣,客戶端就可以重新處理。
如果想獲取原始錯誤的異常堆棧,可添加如下配置:
spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq: true
requeue
Rabbit及Kafka的binder依賴RetryTemplate實現(xiàn)消息重試,從而提升消息處理的成功率。然而,如果設(shè)置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate則不會再重試。此時可以通過requeue方式來處理異常。
需要添加如下配置:
# 默認(rèn)是3,設(shè)為1則禁用重試 spring.cloud.stream.bindings..consumer.max-attempts=1 # 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息) spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
這樣,失敗的消息將會被重新提交到同一個handler進(jìn)行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。
RetryTemplate
RetryTemplate主要用于實現(xiàn)消息重試,也是錯誤處理的一種手段。有兩種配置方式,一種是通過配置文件進(jìn)行配置,如下示例:
spring: cloud: stream: bindings: : consumer: # 最多嘗試處理幾次,默認(rèn)3 maxAttempts: 3 # 重試時初始避退間隔,單位毫秒,默認(rèn)1000 backOffInitialInterval: 1000 # 重試時最大避退間隔,單位毫秒,默認(rèn)10000 backOffMaxInterval: 10000 # 避退乘數(shù),默認(rèn)2.0 backOffMultiplier: 2.0 # 當(dāng)listen拋出retryableExceptions未列出的異常時,是否要重試 defaultRetryable: true # 異常是否允許重試的map映射 retryableExceptions: java.lang.RuntimeException: true java.lang.IllegalStateException: false
另一種則是通過代碼配置,在多數(shù)場景下,使用配置文件定制重試行為都是可以滿足需求的,但配置文件里支持的配置項可能無法滿足一些復(fù)雜需求。此時可使用代碼方式配置RetryTemplate,如下示例:
@Configuration class RetryConfiguration { @StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy()); return retryTemplate; } private ExceptionClassifierRetryPolicy retryPolicy() { BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3); AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy(); ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy); return retryPolicy; } private FixedBackOffPolicy backOffPolicy() { final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2); return backOffPolicy; } }
最后還需要添加一段配置:
spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate
注:Spring Cloud Stream 2.2才支持設(shè)置retry-template-name
感謝各位的閱讀!關(guān)于“Spring Cloud Stream異常處理的示例分析”這篇文章就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,讓大家可以學(xué)到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!