真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

springcloudsteam整合kafka進(jìn)行消息發(fā)送與接收-創(chuàng)新互聯(lián)

spring cloud steam :
Binder和Binding
Binder是SpringCloud Stream的一個抽象概念,是應(yīng)用與消息中間件之間的粘合劑,目前SpringCloud Stream實現(xiàn)了Kafka和RabbitMQ的binder
Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應(yīng)于消費者,OUTPUT對應(yīng)于生產(chǎn)者。
整合配置yml文件:

成都創(chuàng)新互聯(lián)專注于涼州企業(yè)網(wǎng)站建設(shè),自適應(yīng)網(wǎng)站建設(shè),購物商城網(wǎng)站建設(shè)。涼州網(wǎng)站建設(shè)公司,為涼州等地區(qū)提供建站服務(wù)。全流程按需求定制制作,專業(yè)設(shè)計,全程項目跟蹤,成都創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)
spring:
  cloud:
    #    function:
    #      definition: testChannel
    stream:
      default-binder: kafka #默認(rèn)的binder是kafka(粘合劑粘合的類型為kafka)

      #可以動態(tài)綁定的目標(biāo)列表(如:動態(tài)路由),如果設(shè)置,則只能綁定列出的目的地
      #     dynamic-destinations:
      #綁定信息
      bindings:
        #消息報錯后,數(shù)據(jù)保存的topic
        error:
          destination: myError
        testChannel-in-0:
          #消費者
          consumer:
            #            bindingName: myInConsumer
            #消費者并發(fā) 默認(rèn)為1
            concurrency: 1
            #是否分區(qū)接收數(shù)據(jù) 默認(rèn)false
            partitioned: false
            #頭信息模式,設(shè)置為raw時,禁用輸入頭文件解析。僅適用于不支持消息頭的消息中間件,并且需要頭部嵌入。入站數(shù)據(jù)來自外部Spring Cloud Stream應(yīng)用程序時很有用。
            header-mode: headers
            #重試次數(shù)
            max-attempts: 3
            #初始回退間隔時間
            back-off-initial-interval: 1000
            #大回退間隔時間
            back-off-max-interval: 10000
            #回退倍數(shù)
            back-off-multiplier: 2.0
            #大于0時,表示允許自定義該消費者的實例索引,-1時使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0時表示自定義消費者實例技術(shù),-1時默認(rèn)使用spring.cloud.stream.instanceCount
            instance-count: -1
            content-type: application/json

          #生產(chǎn)者
          producer:
            #一個確定如何分配出站數(shù)據(jù)的SpEL表達(dá)式
            partition-key-expression: headers.cs
            #一個PartitionKeyExtractorStrategy實現(xiàn)。如果設(shè)置,或者如果設(shè)置了partitionKeyExpression,則該通道上的出站數(shù)據(jù)將被分區(qū),并且partitionCount必須設(shè)置為大于1的值才能生效。這兩個選項是相互排斥的。
            #partition-key-extractor-class:
            #一個PartitionSelectorStrategy實現(xiàn)。與partitionSelectorExpression相互排斥。如果沒有設(shè)置,則分區(qū)將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
            #partition-selector-class:
            #partition-selector-expression:
            #如果啟用分區(qū),則數(shù)據(jù)的目標(biāo)分區(qū)數(shù)。如果生產(chǎn)者被分區(qū),則必須設(shè)置為大于1的值。在Kafka,解釋為提示; 而是使用更大的和目標(biāo)主題的分區(qū)計數(shù)。
            partition-count: 3
            #消息發(fā)送失敗的處理邏輯默認(rèn)是關(guān)閉的   test.errors
            error-channel-enabled: true
          destination: test #目標(biāo)主題 相當(dāng)于kafka的topic
          binder: kafka  #粘合器
          content-type: application/json
          #          content-type: text/html
          group: group2
        testChannel-out-0:
          #消費者
          consumer:
            #            bindingName: myOutConsumer
            #消費者并發(fā) 默認(rèn)為1
            concurrency: 1
            #是否分區(qū)接收數(shù)據(jù) 默認(rèn)false
            partitioned: false
            #頭信息模式,設(shè)置為raw時,禁用輸入頭文件解析。僅適用于不支持消息頭的消息中間件,并且需要頭部嵌入。入站數(shù)據(jù)來自外部Spring Cloud Stream應(yīng)用程序時很有用。
            header-mode: headers
            #重試次數(shù)
            max-attempts: 3
            #初始回退間隔時間
            back-off-initial-interval: 1000
            #大回退間隔時間
            back-off-max-interval: 10000
            #回退倍數(shù)
            back-off-multiplier: 2.0
            #大于0時,表示允許自定義該消費者的實例索引,-1時使用spring.cloud.stream.instance-index
            instance-index: -1
            #大于0時表示自定義消費者實例技術(shù),-1時默認(rèn)使用spring.cloud.stream.instanceCount
            instance-count: -1

          #          #生產(chǎn)者
          #          producer:
          #            #一個確定如何分配出站數(shù)據(jù)的SpEL表達(dá)式
          #            partition-key-expression: headers.cs
          #            #一個PartitionKeyExtractorStrategy實現(xiàn)。如果設(shè)置,或者如果設(shè)置了partitionKeyExpression,則該通道上的出站數(shù)據(jù)將被分區(qū),并且partitionCount必須設(shè)置為大于1的值才能生效。這兩個選項是相互排斥的。
          #            #partition-key-extractor-class:
          #            #一個PartitionSelectorStrategy實現(xiàn)。與partitionSelectorExpression相互排斥。如果沒有設(shè)置,則分區(qū)將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
          #            #partition-selector-class:
          #            #partition-selector-expression:
          #            #如果啟用分區(qū),則數(shù)據(jù)的目標(biāo)分區(qū)數(shù)。如果生產(chǎn)者被分區(qū),則必須設(shè)置為大于1的值。在Kafka,解釋為提示; 而是使用更大的和目標(biāo)主題的分區(qū)計數(shù)。
          #            partition-count: 3
          #            #消息發(fā)送失敗的處理邏輯默認(rèn)是關(guān)閉的   test.errors
          #            error-channel-enabled: true
          destination: test #本例子創(chuàng)建了另外一個topic (test1)用于區(qū)分不同的功能區(qū)分。
          binder: kafka
          content-type: application/json
          group: group1

      #          producer:
      #            error-channel-enabled: true
      ##            partitionSelectorName: customPartitionSelector
      ##            partitionKeyExtractorName: customPartitionKeyExtractor
      #            partitionCount: 3
      #            partitionKeyExpression: headers.cs
      ##            partition-key-extractor-name: customPartitionKeyExtractor
      ##            partition-selector-name: customPartitionSelector
      binders:
        kafka:
          binder:
            #kafka brokers,默認(rèn)localhost
            brokers: localhost
            #kafka 端口號,默認(rèn)9092
            default-broker-port: 9092
            #kafka zk節(jié)點,默認(rèn)localhost
            zk-nodes: localhost
            #zookeeper 端口
            default-zk-port: 2181
            #配置,map
            #configuration:
            #自定義標(biāo)題列表
            headers:  cs
            #偏移量保存時間(ms)窗口,0:忽略,默認(rèn)10000(ms)
            offset-update-time-window: 10000
            #偏移量保存次數(shù),與時間窗口互斥
            offset-update-count: 0
            #broker 需要的ack數(shù)量
            required-acks: 1
            #只有設(shè)置autoCreateTopics或autoAddPartitions才有效
            min-partition-count: 1
            #自動創(chuàng)建topic時 生成的副本數(shù)量
            replication-factor: 1
            #自動創(chuàng)建主題
            auto-create-topics: true
            #如果設(shè)置為true,則綁定器將根據(jù)需要創(chuàng)建新的分區(qū)。如果設(shè)置為false,則綁定器將依賴于已配置的主題的分區(qū)大小。如果目標(biāo)主題的分區(qū)計數(shù)小于預(yù)期值,則綁定器將無法啟動。
            auto-add-partitions: true
            #socket 緩沖區(qū)大小
            socket-buffer-size: 2097152

            bootstrap-servers: 127.0.0.1:9092 #kafka服務(wù)地址,集群部署的時候需要配置多個

            #配置,map
            configuration:
              acks: -1
              key:
                serializer: org.apache.kafka.common.serialization.StringSerializer
              #            value:
              #              serializer: org.apache.kafka.common.serialization.StringSerializer
              max:
                poll:
                  records: 200
              retries: 3
              session:
                timeout:
                  ms: 40000   # 每次消費的處理時間
          #綁定
          bindings:
            testChannel-out-put:
              #消費者
              consumer:
                #主題分區(qū)消費者組成員之間自動平衡
                auto-rebalance-enabled: true
                #自動提交偏移量
                auto-commit-offset: true
#                auto-commit-on-error:
                #連接恢復(fù)嘗試之間的間隔,以毫秒為單位。
                recovery-interval: 5000
                #是否將消費者偏移量重置為start-offset提供的值
                reset-offsets: false
                #新組的起始偏移量,或resetOffsets為true時的起始偏移量。允許的值:earliest,latest,默認(rèn)值:null(相當(dāng)于earliest)
                start-offset: earliest
                enable-dlq: false
                #              configuration:
                #接收錯誤消息的DLQ主題的名稱。默認(rèn)值:null(如果未指定,將導(dǎo)致錯誤的消息將轉(zhuǎn)發(fā)到名為error::的主題)。:
              #              dlq-name:
              #生產(chǎn)者
              producer:
                #              configuration:
                buffer-size: 16348
                #生產(chǎn)者是否是同步的
                sync: true
                #生產(chǎn)者在發(fā)送之前等待多長時間,以便允許更多消息在同一批次中累積。(通常,生產(chǎn)者根本不等待,并且簡單地發(fā)送在先前發(fā)送進(jìn)行中累積的所有消息。)非零值可能會以延遲為代價增加吞吐量。
                batch-timeout: 0
                #                key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                #                value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
                client-id: producer1
              health-timeout: 60

消息生產(chǎn)者:

@Component
public class MessageProducer {private final StreamBridge streamBridge;

    @Autowired(required = false)
    private BinderAwareChannelResolver resolver;


    public MessageProducer(StreamBridge streamBridge) {this.streamBridge = streamBridge;
    }

    public String resolverSendMessage(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
        MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Messagebuild = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        resolver.resolveDestination("testChannel-in-0").send(stringMessage);
        return "yes!";

    }

    public String send(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
        sendMessageDto.setIp(UUID.randomUUID().toString());
        sendMessageDto.setMessage(messages);
        sendMessageDto.setTiem(new Date().toString());
//        String s = JSON.toJSONString(sendMessageDto);
        MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
        stringMessageBuilder.setHeader("cs","1");
//        stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
//        Messagebuild = ;
        GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
        streamBridge.send("testChannel-in-0", stringMessage);
        return "發(fā)送消息: " + messages;
    }
}

消息消費者

//注意這里采用的是函數(shù)式編程,向spring 容器中注入名為testChannel 的bean,  應(yīng)為高版本的spring cloud steam 棄用了
//@StreamListener ,@Input等注解,而是提倡函數(shù)式接口
//testChannel 與生產(chǎn)者寫入消息的通道名“testChannel-in-0”  有所差異,-in-0是spring cloud steam存在的默認(rèn)規(guī)則
 @Bean(name = "testChannel")
    ConsumertestChannel( ) {return str ->{System.out.println("消費者處理消息:" +str );
        };
    }

如果需要指定消息的分區(qū),需要在配置文件中自定義分區(qū)的計算邏輯屬性為:

partition-key-expression: headers.cs

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧


當(dāng)前標(biāo)題:springcloudsteam整合kafka進(jìn)行消息發(fā)送與接收-創(chuàng)新互聯(lián)
網(wǎng)頁路徑:http://weahome.cn/article/jscoi.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部