本篇文章給大家分享的是有關(guān)java中怎么實(shí)現(xiàn)異步處理,小編覺得挺實(shí)用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
創(chuàng)新互聯(lián)主要從事網(wǎng)站制作、成都網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)清水,十載網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
1.DeferredResult 加線程池 (DeferredResult 提供了超時(shí)、錯(cuò)誤處理,功能非常完善,再加上多線程處理請(qǐng)求效果很不錯(cuò))
2.新開個(gè)定時(shí)任務(wù)線程池 定時(shí)輪詢當(dāng)前任務(wù)列表 超時(shí)就停止(需要自己維護(hù)任務(wù)列表)Hystrix就是這種方案
3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法處理 前者拋出異常后者返回默認(rèn)值
總結(jié),其實(shí)線程池統(tǒng)一設(shè)置超時(shí)這個(gè)需求本身就是偽需求,線程執(zhí)行任務(wù)時(shí)間本身就是參差不齊的,而且這個(gè)控制權(quán)應(yīng)該交給Runable或Callable內(nèi)部業(yè)務(wù)處理,不同的業(yè)務(wù)處理超時(shí)、異常、報(bào)警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相當(dāng)豐富,建議在多線程處理的場(chǎng)景多使用這些api。
具體實(shí)現(xiàn):
DeferredResult 先建個(gè)工具類。調(diào)用方使用execute方法,傳入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定義一些成員變量方便后期業(yè)務(wù)擴(kuò)展使用)
然后在其他線程業(yè)務(wù)處理完設(shè)置結(jié)果,調(diào)用setResult方法,傳入msgId相同的DeferredResultDTO和result對(duì)象
/** * DeferredResult 工具類 * * @author tiancong * @date 2020/10/14 19:23 */ @UtilityClass @Slf4j public class DeferredResultUtil { private Map>> taskMap = new ConcurrentHashMap<>(16); public DeferredResult > execute(DeferredResultDTO dto) { return execute(dto, 5000L); } public DeferredResult > execute(DeferredResultDTO dto, Long time) { if (taskMap.containsKey(dto)) { throw new BusinessException(String.format("msgId=%s 已經(jīng)存在,請(qǐng)勿重發(fā)消息", dto.getMsgId())); } DeferredResult > deferredResult = new DeferredResult<>(time); deferredResult.onError((e) -> { taskMap.remove(dto); log.info("處理失敗 ", e); deferredResult.setResult(ResultVoUtil.fail("處理失敗")); }); deferredResult.onTimeout(() -> { taskMap.remove(dto); if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) { ExamController.getCURRENT_STUDENT().remove(dto.getMsgId()); } deferredResult.setResult(ResultVoUtil.fail("請(qǐng)求超時(shí),請(qǐng)聯(lián)系工作人員!")); }); taskMap.putIfAbsent(dto, deferredResult); return deferredResult; } public void setResult(DeferredResultDTO dto, ResultVO
2. 新開個(gè)定時(shí)任務(wù)線程池 定時(shí)輪詢當(dāng)前任務(wù)列表
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 2, r -> { Thread thread = new Thread(r); thread.setName("failAfter-%d"); thread.setDaemon(true); return thread; }); private static int timeCount; public static void main(String[] args) throws InterruptedException { ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor(); executorService.setCorePoolSize(4); executorService.setQueueCapacity(10); executorService.setMaxPoolSize(100); executorService.initialize(); // executorService.setAwaitTerminationSeconds(5); // executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS); executorService.setWaitForTasksToCompleteOnShutdown(true); Random random = new Random(); long start = System.currentTimeMillis(); List> asyncResultList = new ArrayList<>(); for (int i = 0; i < 100; i++) { ListenableFuture asyncResult = executorService.submitListenable(() -> { int r = random.nextInt(10); log.info("{} 開始睡{}s", Thread.currentThread().getName(), r); TimeUnit.SECONDS.sleep(r); log.info("{} 干完了 {}s", Thread.currentThread().getName(), r); //throw new RuntimeException("出現(xiàn)異常"); return true; }); asyncResult.addCallback(data -> { try { // 休息3毫秒模擬獲取到執(zhí)行結(jié)果后的操作 TimeUnit.MILLISECONDS.sleep(3); log.info("{} 收到結(jié)果:{}", Thread.currentThread().getName(), data); } catch (Exception e) { e.printStackTrace(); } }, ex -> log.info("**異常信息**", ex)); asyncResultList.add(asyncResult); } System.out.println(String.format("總結(jié)耗時(shí):%s ms", System.currentTimeMillis() - start)); // 守護(hù)進(jìn)程 定時(shí)輪詢 終止超時(shí)的任務(wù) scheduler.scheduleAtFixedRate(() -> { // 模擬守護(hù)進(jìn)程 終止超過6s的任務(wù) timeCount++; if (timeCount > 6) { for (ListenableFuture future : asyncResultList) { if (!future.isDone()) { log.error("future 因超時(shí)終止任務(wù),{}", future); future.cancel(true); } } } }, 0, 1000, TimeUnit.MILLISECONDS); } }
額外補(bǔ)充:
CompletableFuture實(shí)現(xiàn)了CompletionStage接口,里面很多豐富的異步編程接口。
applyToEither方法是哪個(gè)先完成,就apply哪一個(gè)結(jié)果(但是兩個(gè)任務(wù)都會(huì)最終走完)
/** * @author tiancong * @date 2021/4/10 11:06 */ @Slf4j public class T { public static void main(String[] args) throws InterruptedException { // CompletableFutureresponseFuture = within( // createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS); // responseFuture // .thenAccept(T::send) // .exceptionally(throwable -> { // log.error("Unrecoverable error", throwable); // return null; // }); // // 注意 exceptionally是new 的CompletableFuture CompletableFuture
以上就是java中怎么實(shí)現(xiàn)異步處理,小編相信有部分知識(shí)點(diǎn)可能是我們?nèi)粘9ぷ鲿?huì)見到或用到的。希望你能通過這篇文章學(xué)到更多知識(shí)。更多詳情敬請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。