真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

SpringCloudStream異常處理

應(yīng)用處理

當(dāng)消費(fèi)者在處理接收到的消息時(shí),有可能會(huì)由于某些原因而拋出異常。若希望對(duì)拋出來(lái)的異常進(jìn)行處理的話,就需要采取一些異常處理手段,異常處理的方式可分為三種:應(yīng)用層面的處理、系統(tǒng)層面的處理以及通過(guò)RetryTemplate進(jìn)行處理。

創(chuàng)新互聯(lián)公司專(zhuān)注于慈利企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè),成都做商城網(wǎng)站。慈利網(wǎng)站建設(shè)公司,為慈利等地區(qū)提供建站服務(wù)。全流程按需求定制開(kāi)發(fā),專(zhuān)業(yè)設(shè)計(jì),全程項(xiàng)目跟蹤,創(chuàng)新互聯(lián)公司專(zhuān)業(yè)和態(tài)度為您提供的服務(wù)

本小節(jié)先來(lái)介紹較為常用的應(yīng)用層面的異常處理方式,該方式又細(xì)分為局部處理和全局處理。

局部處理

Stream相關(guān)的配置內(nèi)容如下:

spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.190.129:9876
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group

所謂局部處理就是針對(duì)指定的channel進(jìn)行處理,需要定義一個(gè)處理異常的方法,并在該方法上添加@ServiceActivator注解,該注解有一個(gè)inputChannel屬性,用于指定對(duì)哪個(gè)channel進(jìn)行處理,格式為{destination}.{group}.errors。具體代碼如下:

package com.zj.node.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

/**
 * 消費(fèi)者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {

    @StreamListener(Sink.INPUT)
    public void receive1(String messageBody) {
        log.info("消費(fèi)消息,messageBody = {}", messageBody);
        throw new IllegalArgumentException("參數(shù)錯(cuò)誤");
    }

    /**
     * 處理局部異常的方法
     *
     * @param errorMessage 異常消息對(duì)象
     */
    @ServiceActivator(
        // 通過(guò)特定的格式指定處理哪個(gè)channel的異常
        inputChannel = "stream-test-topic.binder-group.errors"
    )
    public void handleError(ErrorMessage errorMessage) {
        // 獲取異常對(duì)象
        Throwable errorMessagePayload = errorMessage.getPayload();
        log.error("發(fā)生異常", errorMessagePayload);

        // 獲取消息體
        Message originalMessage = errorMessage.getOriginalMessage();
        if (originalMessage != null) {
            log.error("消息體: {}", originalMessage.getPayload());
        } else {
            log.error("消息體為空");
        }
    }
}

全局處理

全局處理則是可以處理所有channel拋出來(lái)的異常,所有的channel拋出異常后會(huì)生成一個(gè)ErrorMessage對(duì)象,即錯(cuò)誤消息。錯(cuò)誤消息會(huì)被放到一個(gè)專(zhuān)門(mén)的channel里,這個(gè)channel就是errorChannel。所以通過(guò)監(jiān)聽(tīng)errorChannel就可以實(shí)現(xiàn)全局異常的處理。具體代碼如下:

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
    log.info("消費(fèi)消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("參數(shù)錯(cuò)誤");
}

/**
 * 處理全局異常的方法
 *
 * @param errorMessage 異常消息對(duì)象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
    log.error("發(fā)生異常. errorMessage = {}", errorMessage);
}

系統(tǒng)處理

系統(tǒng)處理方式,因消息中間件的不同而異。如果應(yīng)用層面沒(méi)有配置錯(cuò)誤處理,那么error將會(huì)被傳播給binder,而binder則會(huì)將error回傳給消息中間件。消息中間件可以選擇:

  • 丟棄消息:錯(cuò)誤消息將被丟棄。雖然在某些情況下可以接受,但這種方式一般不適用于生產(chǎn)
  • requeue(重新排隊(duì),從而重新處理)
  • 將失敗的消息發(fā)送給DLQ(死信隊(duì)列)

DLQ

目前RabbitMQ對(duì)DLQ的支持比較好,這里以RabbitMQ為例,只需要添加DLQ相關(guān)的配置:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: stream-test-topic
          group: binder-group
      rabbit:
        bindings:
          input:
            consumer:
              # 自動(dòng)將失敗的消息發(fā)送給DLQ
              auto-bind-dlq: true

消息消費(fèi)失敗后,就會(huì)放入死信隊(duì)列。在控制臺(tái)操作一下,即可將死信放回消息隊(duì)列,這樣,客戶(hù)端就可以重新處理。

如果想獲取原始錯(cuò)誤的異常堆棧,可添加如下配置:

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          input:
            consumer:
              republish-to-dlq: true

requeue

Rabbit及Kafka的binder依賴(lài)RetryTemplate實(shí)現(xiàn)消息重試,從而提升消息處理的成功率。然而,如果設(shè)置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate則不會(huì)再重試。此時(shí)可以通過(guò)requeue方式來(lái)處理異常。

需要添加如下配置:

# 默認(rèn)是3,設(shè)為1則禁用重試
spring.cloud.stream.bindings..consumer.max-attempts=1
# 表示是否要requeue被拒絕的消息(即:requeue處理失敗的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

這樣,失敗的消息將會(huì)被重新提交到同一個(gè)handler進(jìn)行處理,直到handler拋出 AmqpRejectAndDontRequeueException 異常為止。


RetryTemplate

RetryTemplate主要用于實(shí)現(xiàn)消息重試,也是錯(cuò)誤處理的一種手段。有兩種配置方式,一種是通過(guò)配置文件進(jìn)行配置,如下示例:

spring:
  cloud:
    stream:
      bindings:
        :
          consumer:
            # 最多嘗試處理幾次,默認(rèn)3
            maxAttempts: 3
            # 重試時(shí)初始避退間隔,單位毫秒,默認(rèn)1000
            backOffInitialInterval: 1000
            # 重試時(shí)最大避退間隔,單位毫秒,默認(rèn)10000
            backOffMaxInterval: 10000
            # 避退乘數(shù),默認(rèn)2.0
            backOffMultiplier: 2.0
            # 當(dāng)listen拋出retryableExceptions未列出的異常時(shí),是否要重試
            defaultRetryable: true
            # 異常是否允許重試的map映射
            retryableExceptions:
              java.lang.RuntimeException: true
              java.lang.IllegalStateException: false

另一種則是通過(guò)代碼配置,在多數(shù)場(chǎng)景下,使用配置文件定制重試行為都是可以滿(mǎn)足需求的,但配置文件里支持的配置項(xiàng)可能無(wú)法滿(mǎn)足一些復(fù)雜需求。此時(shí)可使用代碼方式配置RetryTemplate,如下示例:

@Configuration
class RetryConfiguration {
    @StreamRetryTemplate
    public RetryTemplate sinkConsumerRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(retryPolicy());
        retryTemplate.setBackOffPolicy(backOffPolicy());

        return retryTemplate;
    }

    private ExceptionClassifierRetryPolicy retryPolicy() {
        BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
                Collections.singletonList(IllegalAccessException.class
                ));
        keepRetryingClassifier.setTraverseCauses(true);

        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

        ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
        retryPolicy.setExceptionClassifier(
                classifiable -> keepRetryingClassifier.classify(classifiable) ?
                        alwaysRetryPolicy : simpleRetryPolicy);

        return retryPolicy;
    }

    private FixedBackOffPolicy backOffPolicy() {
        final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(2);

        return backOffPolicy;
    }
}

最后還需要添加一段配置:

spring.cloud.stream.bindings..consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持設(shè)置retry-template-name


本文標(biāo)題:SpringCloudStream異常處理
當(dāng)前網(wǎng)址:http://weahome.cn/article/giidig.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部