這篇文章主要介紹“java異步并發(fā)請(qǐng)求和請(qǐng)求合并舉例分析”,在日常操作中,相信很多人在java異步并發(fā)請(qǐng)求和請(qǐng)求合并舉例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對(duì)大家解答”java異步并發(fā)請(qǐng)求和請(qǐng)求合并舉例分析”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)是專業(yè)的天峻網(wǎng)站建設(shè)公司,天峻接單;提供成都做網(wǎng)站、網(wǎng)站制作,網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行天峻網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊(duì),希望更多企業(yè)前來合作!
在做業(yè)務(wù)系統(tǒng)需求開發(fā)中,經(jīng)常需要從其他服務(wù)獲取數(shù)據(jù),拼接數(shù)據(jù),然后返回?cái)?shù)據(jù)給前端使用;常見的服務(wù)調(diào)用就是通過http接口調(diào)用,而對(duì)于http,通常一個(gè)請(qǐng)求會(huì)分配一個(gè)線程執(zhí)行,在同步調(diào)用接口的情況下,整個(gè)線程是一直被占用或者阻塞的;如果有大量的這種請(qǐng)求,整個(gè)系統(tǒng)的吞吐量就比較低,而在依賴的服務(wù)響應(yīng)時(shí)間比較低的情況下,我們希望先讓出cpu,讓其他請(qǐng)求先執(zhí)行,等依賴的服務(wù)請(qǐng)求返回結(jié)果時(shí)再繼續(xù)往下執(zhí)行,這時(shí)我們會(huì)考慮將請(qǐng)求異步化,或者將相同的請(qǐng)求合并,從而達(dá)到提高系統(tǒng)執(zhí)行效率和吞吐量的目的。
目前常見的幾種調(diào)用方式是同步調(diào)用,線程池+future,異步回調(diào)completableFuture;協(xié)程也是異步調(diào)用的解決方式,但java目前不支持協(xié)程;對(duì)于future方式,只能用get或者while(!isDone)輪詢這種阻塞的方式直到線程執(zhí)行完成,這也不是我們希望的異步執(zhí)行方式,jdk8提供的completableFuture其實(shí)也不是異步的方式,只是對(duì)依賴多服務(wù)的Callback調(diào)用結(jié)果處理做結(jié)果編排,來彌補(bǔ)Callback的不足,從而實(shí)現(xiàn)異步鏈?zhǔn)秸{(diào)用的目的,這也是比較推薦的方式。
RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); // 假設(shè)rpc1耗時(shí)10ms MaprpcResult1=rpcService.getRpcResult(); // 假設(shè)rpc2耗時(shí)20ms Integer rpcResult2 = httpService.getHttpResult(); // 則線程總耗時(shí):30ms
ExecutorService executor = Executors.newFixedThreadPool(2); RpcService rpcService = new RpcService(); HttpService httpService = new HttpService(); future1 = executor.submit(() -> rpcService.getRpcResult()); future2 = executor.submit(() -> httpService.getHttpResult()); //rpc1耗時(shí)10ms MaprpcResult1 = future1.get(300, TimeUnit.MILLISECONDS); //rpc2耗時(shí)20ms Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS); //則線程總耗時(shí)20ms
/** * 場(chǎng)景:兩個(gè)接口并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程 * 兩個(gè)服務(wù)也是異步非阻塞調(diào)用 **/ CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50"); CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50"); CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> { //處理業(yè)務(wù).... return f1 + "," + f2; }).exceptionally(e -> { return ""; });
CompletableFuture使用ForkJoinPool執(zhí)行線程,在ForkJoinPool類注冊(cè)ForkJoinWorkerThread線程時(shí)可以看到,F(xiàn)orkJoinPool里面的線程都是daemon線程(垃圾回收線程就是一個(gè)典型的deamon線程),
/** * Callback from ForkJoinWorkerThread constructor to establish and * record its WorkQueue. * * @param wt the worker thread * @return the worker's queue */ final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; #注冊(cè)守護(hù)線程 wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) | 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i | mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
當(dāng)主線程執(zhí)行完后,jvm就會(huì)退出,所以需要考慮主線程執(zhí)行完成時(shí)間和fork出去的線程執(zhí)行時(shí)間,也需要考慮線程池的大小,默認(rèn)為當(dāng)前cpu的核數(shù)-1,可以參考下其他系統(tǒng)的故障記錄:CompletableFuture線程池問題。
當(dāng)系統(tǒng)遇到瞬間產(chǎn)生大量請(qǐng)求時(shí),可以考慮將相同的請(qǐng)求合并,最大化利用系統(tǒng)IO,提高系統(tǒng)的吞吐量。
設(shè)計(jì)時(shí),可以將符合條件的url請(qǐng)求,先收集起來,直到滿足以下條件之一時(shí)進(jìn)行合并發(fā)送:
收集到的請(qǐng)求數(shù)超過預(yù)設(shè)的最大請(qǐng)求數(shù)。
距離上次請(qǐng)求發(fā)送時(shí)長超過預(yù)設(shè)的最大時(shí)長。
實(shí)現(xiàn)方案有自行使用阻塞隊(duì)列方式:并發(fā)環(huán)境下的請(qǐng)求合并,也可以考慮Hystrix:Hystrix實(shí)現(xiàn)請(qǐng)求合并/請(qǐng)求緩存
目前公司網(wǎng)關(guān)組件janus也是通過合并auth請(qǐng)求的方式減少網(wǎng)絡(luò)開銷,提高cpu的利用率和系統(tǒng)吞吐量的。
nginx同樣有合并請(qǐng)求模塊nginx-http-concat用來減少請(qǐng)求io,參考:nginx 合并多個(gè)js/css請(qǐng)求為一個(gè)請(qǐng)求
賴澤坤@vipshop.com
到此,關(guān)于“java異步并發(fā)請(qǐng)求和請(qǐng)求合并舉例分析”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!