小編給大家分享一下服務(wù)器中并發(fā)處理技巧有哪些,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
創(chuàng)新互聯(lián)公司是一家專注于網(wǎng)站制作、成都網(wǎng)站設(shè)計與策劃設(shè)計,棲霞網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)10年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:棲霞等地區(qū)。棲霞做網(wǎng)站價格咨詢:028-86922220
1.捕獲InterruptedException錯誤
請檢查下面的代碼片段:
public class Task implements Runnable { private final BlockingQueue queue = ...; @Override public void run() { while (!Thread.currentThread().isInterrupted()) { String result = getOrDefault(() -> queue.poll(1L, TimeUnit.MINUTES), "default"); //do smth with the result } } T getOrDefault(Callable supplier, T defaultValue) { try { return supplier.call(); } catch (Exception e) { logger.error("Got exception while retrieving value.", e); return defaultValue; } } }
代碼的問題是,在等待隊列中的新元素時,是不可能終止線程的,因為中斷的標(biāo)志永遠(yuǎn)不會被恢復(fù):
1.運行代碼的線程被中斷。
2.BlockingQueue # poll()方法拋出InterruptedException異常,并清除了中斷的標(biāo)志。
3.while中的循環(huán)條件 (!Thread.currentThread().isInterrupted())的判斷是true,因為標(biāo)記已被清除。
為了防止這種行為,當(dāng)一個方法被顯式拋出(通過聲明拋出InterruptedException)或隱式拋出(通過聲明/拋出一個原始異常)時,總是捕獲InterruptedException異常,并恢復(fù)中斷的標(biāo)志。
T getOrDefault(Callable supplier, T defaultValue) { try { return supplier.call(); } catch (InterruptedException e) { logger.error("Got interrupted while retrieving value.", e); Thread.currentThread().interrupt(); return defaultValue; } catch (Exception e) { logger.error("Got exception while retrieving value.", e); return defaultValue; } }
2.使用特定的執(zhí)行程序來阻止操作
因為一個緩慢的操作而使整個服務(wù)器變得無響應(yīng),這通常不是開發(fā)人員想要的。不幸的是,對于RPC,響應(yīng)時間通常是不可預(yù)測的。
假設(shè)服務(wù)器有100個工作線程,有一個端點,稱為100 RPS。在內(nèi)部,它發(fā)出一個RPC調(diào)用,通常需要10毫秒。在某個時間點,此RPC的響應(yīng)時間變?yōu)?秒,在峰值期間服務(wù)器能夠做的惟一的一件事就是等待這些調(diào)用,而其他端點則無法訪問。
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) public Response getGenre(@PathParam("name") String genreName) { Genre genre = potentiallyVerySlowSynchronousCall(genreName); return Response.ok(genre).build(); }
解決這個問題最簡單的方法是提交代碼,它將阻塞調(diào)用變成一個線程池:
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) { response.setTimeout(1L, TimeUnit.SECONDS); executorService.submit(() -> { Genre genre = potentiallyVerySlowSynchronousCall(genreName); return response.resume(Response.ok(genre).build()); } ); }
3.傳MDC的值
MDC(Mapped Diagnostic Context)通常用于存儲單個任務(wù)的特定值。例如,在web應(yīng)用程序中,它可能為每個請求存儲一個請求id和一個用戶id,因此MDC查找與單個請求或整個用戶活動相關(guān)的日志記錄變得更加容易。
2017-08-27 14:38:30,893 INFO [server-thread-0] [requestId=060d8c7f, userId=2928ea66] c.g.s.web.Controller - Message.
可是如果代碼的某些部分是在專用線程池中執(zhí)行的,則線程(提交任務(wù)的線程)中MDC就不會被繼續(xù)傳值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志則沒有:
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) { try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) { String genreId = getGenreIdbyName(genreName); //Sync call logger.trace("Submitting task to find genre with id '{}'.", genreId); //'requestId' is logged executorService.submit(() -> { logger.trace("Starting task to find genre with id '{}'.", genreId); //'requestId' is not logged Response result = getGenre(genreId) //Async call .map(artist -> Response.ok(artist).build()) .orElseGet(() -> Response.status(Response.Status.NOT_FOUND).build()); response.resume(result); } ); } }
這可以通過MDC#getCopyOfContextMap()方法來解決:
... public void getGenre(@PathParam("name") String genreName, @Suspended AsyncResponse response) { try (MDC.MDCCloseable ignored = MDC.putCloseable("requestId", UUID.randomUUID().toString())) { ... logger.trace("Submitting task to find genre with id '{}'.", genreId); //'requestId' is logged withCopyingMdc(executorService, () -> { logger.trace("Starting task to find genre with id '{}'.", genreId); //'requestId' is logged ... }); } } private void withCopyingMdc(ExecutorService executorService, Runnable function) { Map
4.更改線程名稱
為了簡化日志讀取和線程轉(zhuǎn)儲,可以自定義線程的名稱。這可以通過創(chuàng)建ExecutorService時用一個ThreadFactory來完成。在流行的實用程序庫中有許多ThreadFactory接口的實現(xiàn):
com.google.common.util.concurrent.ThreadFactoryBuilde+r in Guava. org.springframework.scheduling.concurrent.CustomizableThreadFactory in Spring. org.apache.commons.lang3.concurrent.BasicThreadFactory in Apache Commons Lang 3.
ThreadFactory threadFactory = new BasicThreadFactory.Builder() .namingPattern("computation-thread-%d") .build(); ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads, threadFactory);
盡管ForkJoinPool不使用ThreadFactory接口,但也支持對線程的重命名:
ForkJoinPool.ForkJoinWorkerThreadFactory forkJoinThreadFactory = pool -> { ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setName("computation-thread-" + thread.getPoolIndex()); return thread; }; ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfThreads, forkJoinThreadFactory, null, false);
將線程轉(zhuǎn)儲與默認(rèn)命名進(jìn)行比較:
"pool-1-thread-3" #14 prio=5 os_prio=31 tid=0x00007fc06b19f000 nid=0x5703 runnable [0x0000700001ff9000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ... "pool-2-thread-3" #15 prio=5 os_prio=31 tid=0x00007fc06aa10800 nid=0x5903 runnable [0x00007000020fc000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21) at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9) ... "pool-1-thread-2" #12 prio=5 os_prio=31 tid=0x00007fc06aa10000 nid=0x5303 runnable [0x0000700001df3000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ...
與自定義命名進(jìn)行比較:
"task-handler-thread-1" #14 prio=5 os_prio=31 tid=0x00007fb49c9df000 nid=0x5703 runnable [0x000070000334a000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ... "authentication-service-ping-thread-0" #15 prio=5 os_prio=31 tid=0x00007fb49c9de000 nid=0x5903 runnable [0x0000700003247000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21) at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9) ... "task-handler-thread-0" #12 prio=5 os_prio=31 tid=0x00007fb49b9b5000 nid=0x5303 runnable [0x0000700003144000] java.lang.Thread.State: RUNNABLE at com.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ...
想象一下,可能會不止3個線程。
5.使用LongAdder計數(shù)器
在高競爭的情況下,會采用java.util.concurrent.atomic.LongAdder進(jìn)行計數(shù),而不會采用AtomicLong/AtomicInteger。
LongAdder可以跨越多個單元間仍保持值不變,但是如果需要的話,也可以增加它們的值,但與父類AtomicXX比較,這會導(dǎo)致更高的吞吐量,也會增加內(nèi)存消耗。
LongAdder counter = new LongAdder(); counter.increment(); ... long currentValue = counter.sum();
以上是“服務(wù)器中并發(fā)處理技巧有哪些”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!