1、group:
10多年的長(zhǎng)沙網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。全網(wǎng)營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整長(zhǎng)沙建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。創(chuàng)新互聯(lián)從事“長(zhǎng)沙網(wǎng)站設(shè)計(jì)”,“長(zhǎng)沙網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。組內(nèi)只有1個(gè)實(shí)例消費(fèi)。如果不設(shè)置group,則stream會(huì)自動(dòng)為每個(gè)實(shí)例創(chuàng)建匿名且獨(dú)立的group——于是每個(gè)實(shí)例都會(huì)消費(fèi)
組內(nèi)單次只有1個(gè)實(shí)例消費(fèi),并且會(huì)輪詢負(fù)載均衡。通常,在將應(yīng)用程序綁定到給定目標(biāo)時(shí),最好始終指定consumer group
2、destination binder:
與外部消息系統(tǒng)通信的組件,為構(gòu)造 Binding提供了 2 個(gè)方法,分別是 bindConsumer 和 bindProducer ,它們分別用于構(gòu)造生產(chǎn)者和消費(fèi)者。Binder使Spring Cloud Stream應(yīng)用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder
3、destination binding:
Binding 是連接應(yīng)用程序跟消息中間件的橋梁,用于消息的消費(fèi)和生產(chǎn),由binder創(chuàng)建
4、partition
一個(gè)或多個(gè)生產(chǎn)者將數(shù)據(jù)發(fā)送到多個(gè)消費(fèi)者,并確保有共同特征標(biāo)識(shí)的數(shù)據(jù)由同一個(gè)消費(fèi)者處理。默認(rèn)是對(duì)消息進(jìn)行hashCode,然后根據(jù)分區(qū)個(gè)數(shù)取余,所以對(duì)于相同的消息,總會(huì)落到同一個(gè)消費(fèi)者上
注:嚴(yán)格來(lái)說(shuō)partition不屬于概念,而是一種Stream提高伸縮性、吞吐量的一種方式
1、@Input,使用示例:
public interface MySink {
@Input("my-input")
SubscribableChannel input();
}
作用:
2、@Output,使用示例:
public interface MySource {
@Output("my-output")
MessageChannel output();
}
作用:
@Input
類似,只不過(guò)是用來(lái)生產(chǎn)消息3、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void receive(String messageBody) {
log.info("Received: {}", messageBody);
}
作用:
4、@SendTo,使用示例:
// 接收INPUT這個(gè)channel的消息,并將返回值發(fā)送到OUTPUT這個(gè)channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
return "handle...";
}
作用:
4、@InboundChannelAdapter,使用示例:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource producer() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
作用:
5、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
作用:
6、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
作用:
@ServiceActivator
類似,標(biāo)注該注解的方法能夠轉(zhuǎn)換消息,消息頭,或消息有效內(nèi)容PollableMessageSource允許消費(fèi)者可以控制消費(fèi)速率。舉個(gè)例子簡(jiǎn)單演示一下,首先定義一個(gè)接口:
public interface PolledProcessor {
@Input("pollable-input")
PollableMessageSource input();
}
使用示例:
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.input().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
System.out.println(payload);
});
}
參考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。