這篇文章主要講解了Spring Cloud Stream微服務消息框架的詳細解析,內(nèi)容清晰明了,對此有興趣的小伙伴可以學習一下,相信大家閱讀完之后會有幫助。
創(chuàng)新互聯(lián)建站提供網(wǎng)站設計制作、成都做網(wǎng)站、網(wǎng)頁設計,品牌網(wǎng)站制作,一元廣告等致力于企業(yè)網(wǎng)站建設與公司網(wǎng)站制作,10多年的網(wǎng)站開發(fā)和建站經(jīng)驗,助力企業(yè)信息化建設,成功案例突破上1000家,是您實現(xiàn)網(wǎng)站建設的好選擇.
隨著近些年微服務在國內(nèi)的盛行,消息驅(qū)動被提到的越來越多。主要原因是系統(tǒng)被拆分成多個模塊后,一個業(yè)務往往需要在多個服務間相互調(diào)用,不管是采用HTTP還是RPC都是同步的,不可避免快等慢的情況發(fā)生,系統(tǒng)性能上很容易遇到瓶頸。在這樣的背景下,將業(yè)務中實時性要求不是特別高且非主干的部分放到消息隊列中是很好的選擇,達到了異步解耦的效果。
目前消息隊列有很多優(yōu)秀的中間件,目前使用較多的主要有 RabbitMQ,Kafka,RocketMQ 等,這些中間件各有優(yōu)勢,有的對 AMQP(應用層標準高級消息隊列協(xié)議)支持完善,有的提供了更高的可靠性,有的對大數(shù)據(jù)支持良好,同時各種消息中間件概念不統(tǒng)一,使得選擇和使用一款合適的消息中間件成為難題。Spring跳出來給出了解決方案:Spring Cloud Stream,使用它可以很方便高效的操作消息中間件,程序員只要關(guān)心業(yè)務代碼即可,目前官方支持 RabbitMQ,Kafka兩大主流MQ,RocketMQ 則自己提供了相應支持。
首先看一下Spring Cloud Stream做了什么,如下圖所示,框架目前官方把消息中間件抽象成了 Binder,業(yè)務代碼通過進出管道連接 Binder,各消息中間件的差異性統(tǒng)一交給了框架處理,程序員只需要了解框架的抽象出來的一些統(tǒng)一概念即可
Spring Cloud Stream將業(yè)務代碼和消息中間件解耦,帶來的好處可以從下圖很直觀的感受到,很簡潔的代碼,我們便能從RabbitMQ中接受消息然后經(jīng)過業(yè)務處理再向Kafka發(fā)送一條消息,只需要更改相關(guān)配置就能快速改變系統(tǒng)行為。
細心的讀者可能會好奇,上圖的代碼只是注入了一個簡單的 Function 而已,實際上,Spring Cloud Stream3.0后集成了Spring Cloud Function框架 ,提倡函數(shù)式的風格,棄用先前版本基于注解的開發(fā)方式。Spring Cloud Function是 Serverless 和 Faas 的產(chǎn)物,強調(diào)面向函數(shù)編程,一份代碼各云平臺運行,和Spring Cloud Stream一樣也是解決了基礎設施的差異性問題,通過強大的自動裝配機制,可以根據(jù)配置自動暴露 HTTP 服務或者消息服務,并且同時支持命令式和響應式編程模式,可以說是很強大了。下面通過一個簡單的例子來理解下上圖的代碼和框架的使用把。
簡單案例
模擬一個簡單的下單,收到訂單之后處理完,返回成功,然后發(fā)送消息給庫存模塊,庫存模塊再發(fā)送消息給報表模塊
項目地址
springcloud-stream
項目結(jié)構(gòu)
項目依賴
org.springframework.boot spring-boot-starter-web org.springframework.cloud spring-cloud-starter-stream-rabbit
表單
@Data public class OrderForm { private String productName; }
消息管道注冊
@Configuration @Slf4j public class MessageQueueConfig { @Bean public Functioninventory() { return orderForm -> { log.info("Inventory Received Message: " + orderForm); return orderForm; }; } @Bean public Consumer report() { return orderForm -> { log.info("Report Received Message: " + orderForm); }; } }
Controller
@Slf4j @RestController public class OrderController { @Autowired private BeanFactoryChannelResolver resolver; @PostMapping("order") public String order(@RequestBody OrderForm orderForm) { log.info("Received Request " + orderForm); resolver.resolveDestination("inventory-in-0").send(new GenericMessage<>(orderForm)); return "success"; } }
配置
框架會按照中間件默認端口去連接,這里自定義了一個名為myLocalRabbit的類型是RabbitMQ的Binder配置,bindings下面 inventory-in-0 是通道名,接受inventory主題(對應RabbitMQ的ExChange)的消息,然后處理完通過 inventory-out-0 通道發(fā)送消息到 report 主題, report-in-0通道負責接受report主題的消息。
注:通道名=注冊的 function 方法名 + in或者out + 參數(shù)位置(詳見注釋)
spring: cloud: stream: # 配置消息中間件信息 binders: myLocalRabbit: type: rabbit environment: spring: rabbitmq: host: localhost port: 31003 username: guest password: guest virtual-host: / # 重點,如何綁定通道,這里有個約定,開頭是函數(shù)名,in表示消費消息,out表示生產(chǎn)消息,最后的數(shù)字是函數(shù)接受的參數(shù)的位置,destination后面為訂閱的主題 # 比如Function, Flux >, Flux > gather() # gather函數(shù)接受的第一個String參數(shù)對應 gather-in-0,第二個Integer參數(shù)對應 gather-in-1,輸出對應 gather-out-0 bindings: inventory-in-0: destination: inventory inventory-out-0: destination: report report-in-0: destination: report # 注冊聲明的三個函數(shù) function: definition: inventory;report
測試
POST http://localhost:8080/order Content-Type: application/json { "productName": "999" }
結(jié)果
POST http://localhost:8080/order HTTP/1.1 200 Content-Type: text/plain;charset=UTF-8 Content-Length: 7 Date: Sat, 30 May 2020 15:27:56 GMT Keep-Alive: timeout=60 Connection: keep-alive success Response code: 200; Time: 56ms; Content length: 7 bytes
后臺日志
可以看到消息成功發(fā)送到了庫存和報表服務
2020-05-30 23:27:56.956 INFO 8760 --- [nio-8080-exec-1] c.e.springcloudstream.OrderController : Received Request OrderForm(productName=999) 2020-05-30 23:27:56.956 INFO 8760 --- [nio-8080-exec-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one. 2020-05-30 23:27:56.957 INFO 8760 --- [nio-8080-exec-1] c.e.s.MessageQueueConfig : Inventory Received Message: OrderForm(productName=999) 2020-05-30 23:27:56.958 INFO 8760 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:31003] 2020-05-30 23:27:56.964 INFO 8760 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#6131841e:0/SimpleConnection@192fe472 [delegate=amqp://guest@127.0.0.1:31003/, localPort= 2672] 2020-05-30 23:27:56.965 INFO 8760 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (inventory.anonymous.wtaFwHlNRkql5IUh3JCNAA) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 2020-05-30 23:27:56.965 INFO 8760 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable, auto-delete, or exclusive Queue (report.anonymous.SJgpJKiJQf2tudszgf623w) durable:false, auto-delete:true, exclusive:true. It will be redeclared if the broker stops and is restarted while the connection factory is alive, but all messages will be lost. 2020-05-30 23:27:56.979 INFO 8760 --- [f2tudszgf623w-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one. 2020-05-30 23:27:56.980 INFO 8760 --- [f2tudszgf623w-1] c.e.s.MessageQueueConfig : Report Received Message: OrderForm(productName=999)
看完上述內(nèi)容,是不是對Spring Cloud Stream微服務消息框架的詳細解析有進一步的了解,如果還想學習更多內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。