在微服務架構中,我們將一個項目拆分成很多個獨立的模塊,這些獨立的模塊通過遠程調(diào)用來互相配合工作,但是,在高并發(fā)情況下,通信次數(shù)的增加會導致總的通信時間增加,同時,線程池的資源也是有限的,高并發(fā)環(huán)境會導致有大量的線程處于等待狀態(tài),進而導致響應延遲,為了解決這些問題,我們需要來了解Hystrix的請求合并。
成都創(chuàng)新互聯(lián)公司專業(yè)提供成都主機托管四川主機托管成都服務器托管四川服務器托管,支持按月付款!我們的承諾:貴族品質(zhì)、平民價格,機房位于中國電信/網(wǎng)通/移動機房,成都移動云計算中心服務有保障!
Hystrix中的請求合并,就是利用一個合并處理器,將對同一個服務發(fā)起的連續(xù)請求合并成一個請求進行處理(這些連續(xù)請求的時間窗默認為10ms),在這個過程中涉及到的一個核心類就是HystrixCollapser,OK,接下來我們就來看看如何實現(xiàn)Hystrix的請求合并。
服務提供者接口
我需在在服務提供者中提供兩個接口供服務消費者調(diào)用,如下:
@RequestMapping("/getbook6") public Listbook6(String ids) { System.out.println("ids>>>>>>>>>>>>>>>>>>>>>" + ids); ArrayList books = new ArrayList<>(); books.add(new Book("《李自成》", 55, "姚雪垠", "人民文學出版社")); books.add(new Book("中國文學簡史", 33, "林庚", "清華大學出版社")); books.add(new Book("文學改良芻議", 33, "胡適", "無")); books.add(new Book("ids", 22, "helloworld", "haha")); return books; } @RequestMapping("/getbook6/{id}") public Book book61(@PathVariable Integer id) { Book book = new Book("《李自成》2", 55, "姚雪垠2", "人民文學出版社2"); return book; }
第一個接口是一個批處理接口,第二個接口是一個處理單個請求的接口。在批處理接口中,服務消費者傳來的ids參數(shù)格式是1,2,3,4…這種格式,正常情況下我們需要根據(jù)ids查詢到對應的數(shù)據(jù),然后組裝成一個集合返回,我這里為了處理方便,不管什么樣的請求統(tǒng)統(tǒng)都返回一樣的數(shù)據(jù)集;處理單個請求的接口就比較簡單了,不再贅述。
服務消費者
OK,服務提供者處理好之后,接下來我們來看看服務消費者要怎么處理。
BookService
首先在BookService中添加兩個方法用來調(diào)用服務提供者提供的接口,如下:
public Book test8(Long id) { return restTemplate.getForObject("http://HELLO-SERVICE/getbook6/{1}", Book.class, id); } public Listtest9(List ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
test8用來調(diào)用提供單個id的接口,test9用來調(diào)用批處理的接口,在test9中,我將test9執(zhí)行時所處的線程打印出來,方便我們觀察執(zhí)行結果,另外,在RestTemplate中,如果返回值是一個集合,我們得先用一個數(shù)組接收,然后再轉(zhuǎn)為集合(或許也有其他辦法,小伙伴們有更好的建議可以提)。
BookBatchCommand
OK,BookService中的方法準備好了后,我們就可以來創(chuàng)建一個BookBatchCommand,這是一個批處理命令,如下:
public class BookBatchCommand extends HystrixCommand> { private List
ids; private BookService bookService; public BookBatchCommand(List ids, BookService bookService) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey"))); this.ids = ids; this.bookService = bookService; } @Override protected List run() throws Exception { return bookService.test9(ids); } }
這個類實際上和我們在上篇博客中介紹的類差不多,都是繼承自HystrixCommand,用來處理合并之后的請求,在run方法中調(diào)用BookService中的test9方法。
BookCollapseCommand
接下來我們需要創(chuàng)建BookCollapseCommand繼承自HystrixCollapser來實現(xiàn)請求合并。如下:
public class BookCollapseCommand extends HystrixCollapser, Book, Long> { private BookService bookService; private Long id; public BookCollapseCommand(BookService bookService, Long id) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("bookCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.bookService = bookService; this.id = id; } @Override public Long getRequestArgument() { return id; } @Override protected HystrixCommand
> createCommand(Collection
> collapsedRequests) { List ids = new ArrayList<>(collapsedRequests.size()); ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); BookBatchCommand bookBatchCommand = new BookBatchCommand(ids, bookService); return bookBatchCommand; } @Override protected void mapResponseToRequests(List batchResponse, Collection > collapsedRequests) { System.out.println("mapResponseToRequests"); int count = 0; for (CollapsedRequest collapsedRequest : collapsedRequests) { Book book = batchResponse.get(count++); collapsedRequest.setResponse(book); } } }
關于這個類,我說如下幾點:
1.首先在構造方法中,我們設置了請求時間窗為100ms,即請求時間間隔在100ms之內(nèi)的請求會被合并為一個請求。
2.createCommand方法主要用來合并請求,在這里獲取到各個單個請求的id,將這些單個的id放到一個集合中,然后再創(chuàng)建出一個BookBatchCommand對象,用該對象去發(fā)起一個批量請求。
3.mapResponseToRequests方法主要用來為每個請求設置請求結果。該方法的第一個參數(shù)batchResponse表示批處理請求的結果,第二個參數(shù)collapsedRequests則代表了每一個被合并的請求,然后我們通過遍歷batchResponse來為collapsedRequests設置請求結果。
OK,所有的這些操作完成后,我們就可以來測試啦。
測試
我們在服務消費者端創(chuàng)建訪問接口,來測試合并請求,測試接口如下:
@RequestMapping("/test7") @ResponseBody public void test7() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); BookCollapseCommand bc1 = new BookCollapseCommand(bookService, 1l); BookCollapseCommand bc2 = new BookCollapseCommand(bookService, 2l); BookCollapseCommand bc3 = new BookCollapseCommand(bookService, 3l); BookCollapseCommand bc4 = new BookCollapseCommand(bookService, 4l); Futureq1 = bc1.queue(); Future q2 = bc2.queue(); Future q3 = bc3.queue(); Book book1 = q1.get(); Book book2 = q2.get(); Book book3 = q3.get(); Thread.sleep(3000); Future q4 = bc4.queue(); Book book4 = q4.get(); System.out.println("book1>>>"+book1); System.out.println("book2>>>"+book2); System.out.println("book3>>>"+book3); System.out.println("book4>>>"+book4); context.close(); }
關于這個測試接口我說如下兩點:
1.首先要初始化HystrixRequestContext
2.創(chuàng)建BookCollapseCommand類的實例來發(fā)起請求,先發(fā)送3個請求,然后睡眠3秒鐘,再發(fā)起1個請求,這樣,前3個請求就會被合并為一個請求,第四個請求因為間隔的時間比較久,所以不會被合并,而是單獨創(chuàng)建一個線程去處理。
OK,我們來看看執(zhí)行結果,如下:
通過注解實現(xiàn)請求合并
OK,上面這種請求合并方式寫起來稍微有一點麻煩,我們可以使用注解來更優(yōu)雅的實現(xiàn)這一功能。首先在BookService中添加兩個方法,如下:
@HystrixCollapser(batchMethod = "test11",collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "100")}) public Futuretest10(Long id) { return null; } @HystrixCommand public List test11(List ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
在test10方法上添加@HystrixCollapser注解實現(xiàn)請求合并,用batchMethod屬性指明請求合并后的處理方法,collapserProperties屬性指定其他屬性。
OK,在BookService中寫好之后,直接調(diào)用就可以了,如下:
@RequestMapping("/test8") @ResponseBody public void test8() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); Futuref1 = bookService.test10(1l); Future f2 = bookService.test10(2l); Future f3 = bookService.test10(3l); Book b1 = f1.get(); Book b2 = f2.get(); Book b3 = f3.get(); Thread.sleep(3000); Future f4 = bookService.test10(4l); Book b4 = f4.get(); System.out.println("b1>>>"+b1); System.out.println("b2>>>"+b2); System.out.println("b3>>>"+b3); System.out.println("b4>>>"+b4); context.close(); }
和前面的一樣,前三個請求會進行合并,第四個請求會單獨執(zhí)行,OK,執(zhí)行結果如下:
總結
請求合并的優(yōu)點小伙伴們已經(jīng)看到了,多個請求被合并為一個請求進行一次性處理,可以有效節(jié)省網(wǎng)絡帶寬和線程池資源,但是,有優(yōu)點必然也有缺點,設置請求合并之后,本來一個請求可能5ms就搞定了,但是現(xiàn)在必須再等10ms看看還有沒有其他的請求一起的,這樣一個請求的耗時就從5ms增加到15ms了,不過,如果我們要發(fā)起的命令本身就是一個高延遲的命令,那么這個時候就可以使用請求合并了,因為這個時候時間窗的時間消耗就顯得微不足道了,另外高并發(fā)也是請求合并的一個非常重要的場景。
Ok,我們的請求合并就說到這里,有問題歡迎小伙伴們留言討論。希望對大家的學習有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。