spring cloud steam :
Binder和Binding
Binder是SpringCloud Stream的一個抽象概念,是應(yīng)用與消息中間件之間的粘合劑,目前SpringCloud Stream實現(xiàn)了Kafka和RabbitMQ的binder
Binder可以生成Binding,Binding用來綁定消息容器的生產(chǎn)者和消費者,它有兩種類型,INPUT和OUTPUT,INPUT對應(yīng)于消費者,OUTPUT對應(yīng)于生產(chǎn)者。
整合配置yml文件:
spring:
cloud:
# function:
# definition: testChannel
stream:
default-binder: kafka #默認(rèn)的binder是kafka(粘合劑粘合的類型為kafka)
#可以動態(tài)綁定的目標(biāo)列表(如:動態(tài)路由),如果設(shè)置,則只能綁定列出的目的地
# dynamic-destinations:
#綁定信息
bindings:
#消息報錯后,數(shù)據(jù)保存的topic
error:
destination: myError
testChannel-in-0:
#消費者
consumer:
# bindingName: myInConsumer
#消費者并發(fā) 默認(rèn)為1
concurrency: 1
#是否分區(qū)接收數(shù)據(jù) 默認(rèn)false
partitioned: false
#頭信息模式,設(shè)置為raw時,禁用輸入頭文件解析。僅適用于不支持消息頭的消息中間件,并且需要頭部嵌入。入站數(shù)據(jù)來自外部Spring Cloud Stream應(yīng)用程序時很有用。
header-mode: headers
#重試次數(shù)
max-attempts: 3
#初始回退間隔時間
back-off-initial-interval: 1000
#大回退間隔時間
back-off-max-interval: 10000
#回退倍數(shù)
back-off-multiplier: 2.0
#大于0時,表示允許自定義該消費者的實例索引,-1時使用spring.cloud.stream.instance-index
instance-index: -1
#大于0時表示自定義消費者實例技術(shù),-1時默認(rèn)使用spring.cloud.stream.instanceCount
instance-count: -1
content-type: application/json
#生產(chǎn)者
producer:
#一個確定如何分配出站數(shù)據(jù)的SpEL表達(dá)式
partition-key-expression: headers.cs
#一個PartitionKeyExtractorStrategy實現(xiàn)。如果設(shè)置,或者如果設(shè)置了partitionKeyExpression,則該通道上的出站數(shù)據(jù)將被分區(qū),并且partitionCount必須設(shè)置為大于1的值才能生效。這兩個選項是相互排斥的。
#partition-key-extractor-class:
#一個PartitionSelectorStrategy實現(xiàn)。與partitionSelectorExpression相互排斥。如果沒有設(shè)置,則分區(qū)將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
#partition-selector-class:
#partition-selector-expression:
#如果啟用分區(qū),則數(shù)據(jù)的目標(biāo)分區(qū)數(shù)。如果生產(chǎn)者被分區(qū),則必須設(shè)置為大于1的值。在Kafka,解釋為提示; 而是使用更大的和目標(biāo)主題的分區(qū)計數(shù)。
partition-count: 3
#消息發(fā)送失敗的處理邏輯默認(rèn)是關(guān)閉的 test.errors
error-channel-enabled: true
destination: test #目標(biāo)主題 相當(dāng)于kafka的topic
binder: kafka #粘合器
content-type: application/json
# content-type: text/html
group: group2
testChannel-out-0:
#消費者
consumer:
# bindingName: myOutConsumer
#消費者并發(fā) 默認(rèn)為1
concurrency: 1
#是否分區(qū)接收數(shù)據(jù) 默認(rèn)false
partitioned: false
#頭信息模式,設(shè)置為raw時,禁用輸入頭文件解析。僅適用于不支持消息頭的消息中間件,并且需要頭部嵌入。入站數(shù)據(jù)來自外部Spring Cloud Stream應(yīng)用程序時很有用。
header-mode: headers
#重試次數(shù)
max-attempts: 3
#初始回退間隔時間
back-off-initial-interval: 1000
#大回退間隔時間
back-off-max-interval: 10000
#回退倍數(shù)
back-off-multiplier: 2.0
#大于0時,表示允許自定義該消費者的實例索引,-1時使用spring.cloud.stream.instance-index
instance-index: -1
#大于0時表示自定義消費者實例技術(shù),-1時默認(rèn)使用spring.cloud.stream.instanceCount
instance-count: -1
# #生產(chǎn)者
# producer:
# #一個確定如何分配出站數(shù)據(jù)的SpEL表達(dá)式
# partition-key-expression: headers.cs
# #一個PartitionKeyExtractorStrategy實現(xiàn)。如果設(shè)置,或者如果設(shè)置了partitionKeyExpression,則該通道上的出站數(shù)據(jù)將被分區(qū),并且partitionCount必須設(shè)置為大于1的值才能生效。這兩個選項是相互排斥的。
# #partition-key-extractor-class:
# #一個PartitionSelectorStrategy實現(xiàn)。與partitionSelectorExpression相互排斥。如果沒有設(shè)置,則分區(qū)將被選為hashCode(key) % partitionCount,其中key通過partitionKeyExpression或partitionKeyExtractorClass計算。
# #partition-selector-class:
# #partition-selector-expression:
# #如果啟用分區(qū),則數(shù)據(jù)的目標(biāo)分區(qū)數(shù)。如果生產(chǎn)者被分區(qū),則必須設(shè)置為大于1的值。在Kafka,解釋為提示; 而是使用更大的和目標(biāo)主題的分區(qū)計數(shù)。
# partition-count: 3
# #消息發(fā)送失敗的處理邏輯默認(rèn)是關(guān)閉的 test.errors
# error-channel-enabled: true
destination: test #本例子創(chuàng)建了另外一個topic (test1)用于區(qū)分不同的功能區(qū)分。
binder: kafka
content-type: application/json
group: group1
# producer:
# error-channel-enabled: true
## partitionSelectorName: customPartitionSelector
## partitionKeyExtractorName: customPartitionKeyExtractor
# partitionCount: 3
# partitionKeyExpression: headers.cs
## partition-key-extractor-name: customPartitionKeyExtractor
## partition-selector-name: customPartitionSelector
binders:
kafka:
binder:
#kafka brokers,默認(rèn)localhost
brokers: localhost
#kafka 端口號,默認(rèn)9092
default-broker-port: 9092
#kafka zk節(jié)點,默認(rèn)localhost
zk-nodes: localhost
#zookeeper 端口
default-zk-port: 2181
#配置,map
#configuration:
#自定義標(biāo)題列表
headers: cs
#偏移量保存時間(ms)窗口,0:忽略,默認(rèn)10000(ms)
offset-update-time-window: 10000
#偏移量保存次數(shù),與時間窗口互斥
offset-update-count: 0
#broker 需要的ack數(shù)量
required-acks: 1
#只有設(shè)置autoCreateTopics或autoAddPartitions才有效
min-partition-count: 1
#自動創(chuàng)建topic時 生成的副本數(shù)量
replication-factor: 1
#自動創(chuàng)建主題
auto-create-topics: true
#如果設(shè)置為true,則綁定器將根據(jù)需要創(chuàng)建新的分區(qū)。如果設(shè)置為false,則綁定器將依賴于已配置的主題的分區(qū)大小。如果目標(biāo)主題的分區(qū)計數(shù)小于預(yù)期值,則綁定器將無法啟動。
auto-add-partitions: true
#socket 緩沖區(qū)大小
socket-buffer-size: 2097152
bootstrap-servers: 127.0.0.1:9092 #kafka服務(wù)地址,集群部署的時候需要配置多個
#配置,map
configuration:
acks: -1
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
# value:
# serializer: org.apache.kafka.common.serialization.StringSerializer
max:
poll:
records: 200
retries: 3
session:
timeout:
ms: 40000 # 每次消費的處理時間
#綁定
bindings:
testChannel-out-put:
#消費者
consumer:
#主題分區(qū)消費者組成員之間自動平衡
auto-rebalance-enabled: true
#自動提交偏移量
auto-commit-offset: true
# auto-commit-on-error:
#連接恢復(fù)嘗試之間的間隔,以毫秒為單位。
recovery-interval: 5000
#是否將消費者偏移量重置為start-offset提供的值
reset-offsets: false
#新組的起始偏移量,或resetOffsets為true時的起始偏移量。允許的值:earliest,latest,默認(rèn)值:null(相當(dāng)于earliest)
start-offset: earliest
enable-dlq: false
# configuration:
#接收錯誤消息的DLQ主題的名稱。默認(rèn)值:null(如果未指定,將導(dǎo)致錯誤的消息將轉(zhuǎn)發(fā)到名為error::的主題)。:
# dlq-name:
#生產(chǎn)者
producer:
# configuration:
buffer-size: 16348
#生產(chǎn)者是否是同步的
sync: true
#生產(chǎn)者在發(fā)送之前等待多長時間,以便允許更多消息在同一批次中累積。(通常,生產(chǎn)者根本不等待,并且簡單地發(fā)送在先前發(fā)送進(jìn)行中累積的所有消息。)非零值可能會以延遲為代價增加吞吐量。
batch-timeout: 0
# key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
# value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
client-id: producer1
health-timeout: 60
消息生產(chǎn)者:
@Component
public class MessageProducer {private final StreamBridge streamBridge;
@Autowired(required = false)
private BinderAwareChannelResolver resolver;
public MessageProducer(StreamBridge streamBridge) {this.streamBridge = streamBridge;
}
public String resolverSendMessage(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
sendMessageDto.setIp(UUID.randomUUID().toString());
sendMessageDto.setMessage(messages);
sendMessageDto.setTiem(new Date().toString());
MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
stringMessageBuilder.setHeader("cs","1");
// stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
// Messagebuild = ;
GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
resolver.resolveDestination("testChannel-in-0").send(stringMessage);
return "yes!";
}
public String send(String messages) {SendMessageDto sendMessageDto = new SendMessageDto();
sendMessageDto.setIp(UUID.randomUUID().toString());
sendMessageDto.setMessage(messages);
sendMessageDto.setTiem(new Date().toString());
// String s = JSON.toJSONString(sendMessageDto);
MessageBuilderstringMessageBuilder = MessageBuilder.withPayload(sendMessageDto);
stringMessageBuilder.setHeader("cs","1");
// stringMessageBuilder.setHeader(KafkaHeaders.MESSAGE_KEY,"1233322");
// Messagebuild = ;
GenericMessage stringMessage = (GenericMessage) stringMessageBuilder.build();
streamBridge.send("testChannel-in-0", stringMessage);
return "發(fā)送消息: " + messages;
}
}
消息消費者
//注意這里采用的是函數(shù)式編程,向spring 容器中注入名為testChannel 的bean, 應(yīng)為高版本的spring cloud steam 棄用了
//@StreamListener ,@Input等注解,而是提倡函數(shù)式接口
//testChannel 與生產(chǎn)者寫入消息的通道名“testChannel-in-0” 有所差異,-in-0是spring cloud steam存在的默認(rèn)規(guī)則
@Bean(name = "testChannel")
ConsumertestChannel( ) {return str ->{System.out.println("消費者處理消息:" +str );
};
}
如果需要指定消息的分區(qū),需要在配置文件中自定義分區(qū)的計算邏輯屬性為:
partition-key-expression: headers.cs
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧