什么是異步調(diào)用?
創(chuàng)新互聯(lián)公司長(zhǎng)期為超過(guò)千家客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對(duì)不同對(duì)象提供差異化的產(chǎn)品和服務(wù);打造開(kāi)放共贏平臺(tái),與合作伙伴共同營(yíng)造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為武江企業(yè)提供專業(yè)的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè),武江網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開(kāi)發(fā)。
異步調(diào)用是相對(duì)于同步調(diào)用而言的,同步調(diào)用是指程序按預(yù)定順序一步步執(zhí)行,每一步必須等到上一步執(zhí)行完后才能執(zhí)行,異步調(diào)用則無(wú)需等待上一步程序執(zhí)行完即可執(zhí)行。異步調(diào)用指,在程序在執(zhí)行時(shí),無(wú)需等待執(zhí)行的返回值即可繼續(xù)執(zhí)行后面的代碼。在我們的應(yīng)用服務(wù)中,有很多業(yè)務(wù)邏輯的執(zhí)行操作不需要同步返回(如發(fā)送郵件、冗余數(shù)據(jù)表等),只需要異步執(zhí)行即可。
本文將介紹 Spring 應(yīng)用中,如何實(shí)現(xiàn)異步調(diào)用。在異步調(diào)用的過(guò)程中,會(huì)出現(xiàn)線程上下文信息的丟失,我們?cè)撊绾谓鉀Q線程上下文信息的傳遞。
Spring 應(yīng)用中實(shí)現(xiàn)異步
Spring 為任務(wù)調(diào)度與異步方法執(zhí)行提供了注解支持。通過(guò)在方法或類上設(shè)置 @Async 注解,可使得方法被異步調(diào)用。調(diào)用者會(huì)在調(diào)用時(shí)立即返回,而被調(diào)用方法的實(shí)際執(zhí)行是交給 Spring 的 TaskExecutor 來(lái)完成的。所以被注解的方法被調(diào)用的時(shí)候,會(huì)在新的線程中執(zhí)行,而調(diào)用它的方法會(huì)在原線程中執(zhí)行,這樣可以避免阻塞,以及保證任務(wù)的實(shí)時(shí)性。
引入依賴
org.springframework.boot spring-boot-starter-web
引入 Spring 相關(guān)的依賴即可。
入口類
@SpringBootApplication @EnableAsync public class AsyncApplication { public static void main(String[] args) { SpringApplication.run(AsyncApplication.class, args); } ``` 入口類增加了 `@EnableAsync` 注解,主要是為了掃描范圍包下的所有 `@Async` 注解。 #### 對(duì)外的接口 這里寫(xiě)了一個(gè)簡(jiǎn)單的接口: ```java @RestController @Slf4j public class TaskController { @Autowired private TaskService taskService; @GetMapping("/task") public String taskExecute() { try { taskService.doTaskOne(); taskService.doTaskTwo(); taskService.doTaskThree(); } catch (Exception e) { log.error("error executing task for {}",e.getMessage()); } return "ok"; } }
調(diào)用 TaskService 執(zhí)行三個(gè)異步方法。
Service 方法
@Component @Slf4j //@Async public class TaskService { @Async public void doTaskOne() throws Exception { log.info("開(kāi)始做任務(wù)一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)一,耗時(shí):" + (end - start) + "毫秒"); } @Async public void doTaskTwo() throws Exception { log.info("開(kāi)始做任務(wù)二"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)二,耗時(shí):" + (end - start) + "毫秒"); } @Async public void doTaskThree() throws Exception { log.info("開(kāi)始做任務(wù)三"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)三,耗時(shí):" + (end - start) + "毫秒"); } }
@Async 可以用于類上,標(biāo)識(shí)該類的所有方法都是異步方法,也可以單獨(dú)用于某些方法。每個(gè)方法都會(huì) sleep 1000 ms。
結(jié)果展示
運(yùn)行結(jié)果如下:
可以看到 TaskService 中的三個(gè)方法是異步執(zhí)行的,接口的結(jié)果快速返回,日志信息異步輸出。異步調(diào)用,通過(guò)開(kāi)啟新的線程調(diào)用的方法,不影響主線程。異步方法實(shí)際的執(zhí)行交給了 Spring 的 TaskExecutor 來(lái)完成。
Future:獲取異步執(zhí)行的結(jié)果
在上面的測(cè)試中我們也可以發(fā)現(xiàn)主調(diào)用方法并沒(méi)有等到調(diào)用方法執(zhí)行完就結(jié)束了當(dāng)前的任務(wù)。如果想要知道調(diào)用的三個(gè)方法全部執(zhí)行完該怎么辦呢,下面就可以用到異步回調(diào)。
異步回調(diào)就是讓每個(gè)被調(diào)用的方法返回一個(gè) Future 類型的值,Spring 中提供了一個(gè) Future 接口的子類:AsyncResult,所以我們可以返回 AsyncResult 類型的值。
public class AsyncResultimplements ListenableFuture { private final V value; private final ExecutionException executionException; //... }
AsyncResult 實(shí)現(xiàn)了 ListenableFuture 接口,該對(duì)象內(nèi)部有兩個(gè)屬性:返回值和異常信息。
public interface ListenableFutureextends Future { void addCallback(ListenableFutureCallback<? super T> var1); void addCallback(SuccessCallback<? super T> var1, FailureCallback var2); }
ListenableFuture 接口繼承自 Future,在此基礎(chǔ)上增加了回調(diào)方法的定義。Future 接口定義如下:
public interface Future{ // 是否可以打斷當(dāng)前正在執(zhí)行的任務(wù) boolean cancel(boolean mayInterruptIfRunning); // 任務(wù)取消的結(jié)果 boolean isCancelled(); // 異步方法中最后返回的那個(gè)對(duì)象中的值 V get() throws InterruptedException, ExecutionException; // 用來(lái)判斷該異步任務(wù)是否執(zhí)行完成,如果執(zhí)行完成,則返回 true,如果未執(zhí)行完成,則返回false boolean isDone(); // 與 get() 一樣,只不過(guò)這里參數(shù)中設(shè)置了超時(shí)時(shí)間 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
#get() 方法,在執(zhí)行的時(shí)候是需要等待回調(diào)結(jié)果的,阻塞等待。如果不設(shè)置超時(shí)時(shí)間,它就阻塞在那里直到有了任務(wù)執(zhí)行完成。我們?cè)O(shè)置超時(shí)時(shí)間,就可以在當(dāng)前任務(wù)執(zhí)行太久的情況下中斷當(dāng)前任務(wù),釋放線程,這樣就不會(huì)導(dǎo)致一直占用資源。
#cancel(boolean) 方法,參數(shù)是一個(gè) boolean 類型的值,用來(lái)傳入是否可以打斷當(dāng)前正在執(zhí)行的任務(wù)。如果參數(shù)是 true 且當(dāng)前任務(wù)沒(méi)有執(zhí)行完成 ,說(shuō)明可以打斷當(dāng)前任務(wù),那么就會(huì)返回 true;如果當(dāng)前任務(wù)還沒(méi)有執(zhí)行,那么不管參數(shù)是 true 還是 false,返回值都是 true;如果當(dāng)前任務(wù)已經(jīng)完成,那么不管參數(shù)是 true 還是 false,那么返回值都是 false;如果當(dāng)前任務(wù)沒(méi)有完成且參數(shù)是 false,那么返回值也是 false。即:
獲取異步方法返回值的實(shí)現(xiàn)
public FuturedoTaskOne() throws Exception { log.info("開(kāi)始做任務(wù)一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)一,耗時(shí):" + (end - start) + "毫秒"); return new AsyncResult<>("任務(wù)一完成,耗時(shí)" + (end - start) + "毫秒"); } //...其他兩個(gè)方法類似,省略
我們將 task 方法的返回值改為 Future
@GetMapping("/task") public String taskExecute() { try { Futurer1 = taskService.doTaskOne(); Future r2 = taskService.doTaskTwo(); Future r3 = taskService.doTaskThree(); while (true) { if (r1.isDone() && r2.isDone() && r3.isDone()) { log.info("execute all tasks"); break; } Thread.sleep(200); } log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get()); } catch (Exception e) { log.error("error executing task for {}",e.getMessage()); } return "ok"; }
在調(diào)用異步方法之后,可以通過(guò)循環(huán)判斷異步方法是否執(zhí)行完成。結(jié)果正如我們所預(yù)期,future 所 get 到的是 AsyncResult 返回的字符串。
配置線程池
前面是最簡(jiǎn)單的使用方法,使用默認(rèn)的 TaskExecutor。如果想使用自定義的 Executor,可以結(jié)合 @Configuration 注解的配置方式。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration public class TaskPoolConfig { @Bean("taskExecutor") // bean 的名稱,默認(rèn)為首字母小寫(xiě)的方法名 public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); // 核心線程數(shù)(默認(rèn)線程數(shù)) executor.setMaxPoolSize(20); // 最大線程數(shù) executor.setQueueCapacity(200); // 緩沖隊(duì)列數(shù) executor.setKeepAliveSeconds(60); // 允許線程空閑時(shí)間(單位:默認(rèn)為秒) executor.setThreadNamePrefix("taskExecutor-"); // 線程池名前綴 // 線程池對(duì)拒絕任務(wù)的處理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } }
線程池的配置很靈活,對(duì)核心線程數(shù)、最大線程數(shù)等屬性進(jìn)行配置。其中,rejection-policy,當(dāng)線程池已經(jīng)達(dá)到最大線程數(shù)的時(shí)候,如何處理新任務(wù)??蛇x策略有 CallerBlocksPolicy、CallerRunsPolicy 等。CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是由調(diào)用者所在的線程來(lái)執(zhí)行。我們驗(yàn)證下,線程池的設(shè)置是否生效,在 TaskService 中,打印當(dāng)前的線程名稱:
public FuturedoTaskOne() throws Exception { log.info("開(kāi)始做任務(wù)一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)一,耗時(shí):" + (end - start) + "毫秒"); log.info("當(dāng)前線程為 {}", Thread.currentThread().getName()); return new AsyncResult<>("任務(wù)一完成,耗時(shí)" + (end - start) + "毫秒"); }
通過(guò)結(jié)果可以看到,線程池配置的線程名前綴已經(jīng)生效。在 Spring @Async 異步線程使用過(guò)程中,需要注意的是以下的用法會(huì)使 @Async 失效:
線程上下文信息傳遞
很多時(shí)候,在微服務(wù)架構(gòu)中的一次請(qǐng)求會(huì)涉及多個(gè)微服務(wù)?;蛘咭粋€(gè)服務(wù)中會(huì)有多個(gè)處理方法,這些方法有可能是異步方法。有些線程上下文信息,如請(qǐng)求的路徑,用戶唯一的 userId,這些信息會(huì)一直在請(qǐng)求中傳遞。如果不做任何處理,我們看下是否能夠正常獲取這些信息。
@GetMapping("/task") public String taskExecute() { try { Futurer1 = taskService.doTaskOne(); Future r2 = taskService.doTaskTwo(); Future r3 = taskService.doTaskThree(); ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); log.info("當(dāng)前線程為 {},請(qǐng)求方法為 {},請(qǐng)求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString()); while (true) { if (r1.isDone() && r2.isDone() && r3.isDone()) { log.info("execute all tasks"); break; } Thread.sleep(200); } log.info("\n" + r1.get() + "\n" + r2.get() + "\n" + r3.get()); } catch (Exception e) { log.error("error executing task for {}", e.getMessage()); } return "ok"; }
在 Spring Boot Web 中我們可以通過(guò) RequestContextHolder 很方便的獲取 request。在接口方法中,輸出請(qǐng)求的方法和請(qǐng)求的路徑。
public FuturedoTaskOne() throws Exception { log.info("開(kāi)始做任務(wù)一"); long start = System.currentTimeMillis(); Thread.sleep(1000); long end = System.currentTimeMillis(); log.info("完成任務(wù)一,耗時(shí):" + (end - start) + "毫秒"); ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); log.info("當(dāng)前線程為 {},請(qǐng)求方法為 {},請(qǐng)求路徑為:{}", Thread.currentThread().getName(), request.getMethod(), request.getRequestURL().toString()); return new AsyncResult<>("任務(wù)一完成,耗時(shí)" + (end - start) + "毫秒"); }
同時(shí)在 TaskService 中,驗(yàn)證是不是也能輸出請(qǐng)求的信息。運(yùn)行程序,結(jié)果如下:
在 TaskService 中,每個(gè)異步線程的方法獲取 RequestContextHolder 中的請(qǐng)求信息時(shí),報(bào)了空指針異常。這說(shuō)明了請(qǐng)求的上下文信息未傳遞到異步方法的線程中。RequestContextHolder 的實(shí)現(xiàn),里面有兩個(gè) ThreadLocal 保存當(dāng)前線程下的 request。
//得到存儲(chǔ)進(jìn)去的request private static final ThreadLocalrequestAttributesHolder = new NamedThreadLocal ("Request attributes"); //可被子線程繼承的request private static final ThreadLocal inheritableRequestAttributesHolder = new NamedInheritableThreadLocal ("Request context");
再看 #getRequestAttributes() 方法,相當(dāng)于直接獲取 ThreadLocal 里面的值,這樣就使得每一次獲取到的 Request 是該請(qǐng)求的 request。如何將上下文信息傳遞到異步線程呢?Spring 中的 ThreadPoolTaskExecutor 有一個(gè)配置屬性 TaskDecorator,TaskDecorator 是一個(gè)回調(diào)接口,采用裝飾器模式。裝飾模式是動(dòng)態(tài)的給一個(gè)對(duì)象添加一些額外的功能,就增加功能來(lái)說(shuō),裝飾模式比生成子類更為靈活。因此 TaskDecorator 主要用于任務(wù)的調(diào)用時(shí)設(shè)置一些執(zhí)行上下文,或者為任務(wù)執(zhí)行提供一些監(jiān)視/統(tǒng)計(jì)。
public interface TaskDecorator { Runnable decorate(Runnable runnable); }
#decorate 方法,裝飾給定的 Runnable,返回包裝的 Runnable 以供實(shí)際執(zhí)行。
下面我們定義一個(gè)線程上下文拷貝的 TaskDecorator。
import org.springframework.core.task.TaskDecorator; import org.springframework.web.context.request.RequestAttributes; import org.springframework.web.context.request.RequestContextHolder; public class ContextDecorator implements TaskDecorator { @Override public Runnable decorate(Runnable runnable) { RequestAttributes context = RequestContextHolder.currentRequestAttributes(); return () -> { try { RequestContextHolder.setRequestAttributes(context); runnable.run(); } finally { RequestContextHolder.resetRequestAttributes(); } }; } }
實(shí)現(xiàn)較為簡(jiǎn)單,將當(dāng)前線程的 context 裝飾到指定的 Runnable,最后重置當(dāng)前線程上下文。
在線程池的配置中,增加回調(diào)的 TaskDecorator 屬性的配置:
@Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-"); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); // 增加 TaskDecorator 屬性的配置 executor.setTaskDecorator(new ContextDecorator()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
經(jīng)過(guò)如上配置,我們?cè)俅芜\(yùn)行服務(wù),并訪問(wèn)接口,控制臺(tái)日志信息如下:
由結(jié)果可知,線程的上下文信息傳遞成功。
小結(jié)
本文結(jié)合示例講解了 Spring 中實(shí)現(xiàn)異步方法,獲取異步方法的返回值。并介紹了配置 Spring 線程池的方式。最后介紹如何在異步多線程中傳遞線程上下文信息。線程上下文傳遞在分布式環(huán)境中會(huì)經(jīng)常用到,比如分布式鏈路追蹤中需要一次請(qǐng)求涉及到的 TraceId、SpanId。簡(jiǎn)單來(lái)說(shuō),需要傳遞的信息能夠在不同線程中。異步方法是我們?cè)谌粘i_(kāi)發(fā)中用來(lái)多線程處理業(yè)務(wù)邏輯,這些業(yè)務(wù)邏輯不需要嚴(yán)格的執(zhí)行順序。用好異步解決問(wèn)題的同時(shí),更要用對(duì)異步多線程的方式。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,謝謝大家對(duì)創(chuàng)新互聯(lián)的支持。