這篇文章給大家介紹java中如何實現(xiàn)異步編程,內(nèi)容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
創(chuàng)新互聯(lián)為您提適合企業(yè)的網(wǎng)站設計?讓您的網(wǎng)站在搜索引擎具有高度排名,讓您的網(wǎng)站具備超強的網(wǎng)絡競爭力!結合企業(yè)自身,進行網(wǎng)站設計及把握,最后結合企業(yè)文化和具體宗旨等,才能創(chuàng)作出一份性化解決方案。從網(wǎng)站策劃到網(wǎng)站制作、成都網(wǎng)站設計, 我們的網(wǎng)頁設計師為您提供的解決方案。
很多時候我們都希望能夠最大的利用資源,比如在進行IO操作的時候盡可能的避免同步阻塞的等待,因為這會浪費CPU的資源。如果在有可讀的數(shù)據(jù)的時候能夠通知程序執(zhí)行讀操作甚至由操作系統(tǒng)內(nèi)核幫助我們完成數(shù)據(jù)的拷貝,這再好不過了。從NIO到CompletableFuture、Lambda、Fork/Join,java一直在努力讓程序盡可能變的異步甚至擁有更高的并行度,這一點一些函數(shù)式語言做的比較好,因此java也或多或少的借鑒了某些特性。下面介紹一種非常常用的實現(xiàn)異步操作的方式。
考慮有一個耗時的操作,操作完后會返回一個結果(不管是正常結果還是異常),程序如果想擁有比較好的性能不可能由線程去等待操作的完成,而是應該采用listener模式。jdk并發(fā)包里的Future代表了未來的某個結果,當我們向線程池中提交任務的時候會返回該對象。代碼例子:
/** * jdk1.8之前的Future * * @author Administrator * */ public class JavaFuture { public static void main(String[] args) throws Throwable, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(1); // Future代表了線程執(zhí)行完以后的結果,可以通過future獲得執(zhí)行的結果 // 但是jdk1.8之前的Future有點雞肋,并不能實現(xiàn)真正的異步,需要阻塞的獲取結果,或者不斷的輪詢 // 通常我們希望當線程執(zhí)行完一些耗時的任務后,能夠自動的通知我們結果,很遺憾這在原生jdk1.8之前 // 是不支持的,但是我們可以通過第三方的庫實現(xiàn)真正的異步回調(diào) Futuref = executor.submit(new Callable () { @Override public String call() throws Exception { System.out.println("task started!"); Thread.sleep(3000); System.out.println("task finished!"); return "hello"; } }); //此處阻塞main線程 System.out.println(f.get()); System.out.println("main thread is blocked"); } }
如果想獲得耗時操作的結果,可以通過get方法獲取,但是該方法會阻塞當前線程,我們可以在做完剩下的某些工作的時候調(diào)用get方法試圖去獲取結果,也可以調(diào)用非阻塞的方法isDone來確定操作是否完成,這種方式有點兒類似下面的過程:
這種方式對流程的控制很混亂,但是在jdk1.8之前只提供了這種笨拙的實現(xiàn)方式,以至于很多高性能的框架都實現(xiàn)了自己的一套異步框架,比如Netty和Guava,下面分別介紹下這三種異步的實現(xiàn)方式(包括jdk1.8)。首先是Guava中的實現(xiàn)方式:
package guava; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** * Guava中的Future * * @author Administrator * */ public class GuavaFuture { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); // 使用guava提供的MoreExecutors工具類包裝原始的線程池 ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor); //向線程池中提交一個任務后,將會返回一個可監(jiān)聽的Future,該Future由Guava框架提供 ListenableFuturelf = listeningExecutor.submit(new Callable () { @Override public String call() throws Exception { System.out.println("task started!"); //模擬耗時操作 Thread.sleep(3000); System.out.println("task finished!"); return "hello"; } }); //添加回調(diào),回調(diào)由executor中的線程觸發(fā),但也可以指定一個新的線程 Futures.addCallback(lf, new FutureCallback () { //耗時任務執(zhí)行失敗后回調(diào)該方法 @Override public void onFailure(Throwable t) { System.out.println("failure"); } //耗時任務執(zhí)行成功后回調(diào)該方法 @Override public void onSuccess(String s) { System.out.println("success " + s); } }); //主線程可以繼續(xù)做其他的工作 System.out.println("main thread is running"); } }
Guava提供了一套完整的異步框架,核心是可監(jiān)聽的Future,通過注冊監(jiān)聽器或者回調(diào)方法實現(xiàn)及時獲取操作結果的能力。需要提一點的是,假設添加監(jiān)聽的時候耗時操作已經(jīng)執(zhí)行完了,此時回調(diào)方法會被立即執(zhí)行并不會丟失。想探究其實現(xiàn)方式的話可以跟一下源碼,底層的原理并不難。
談到異步編程就不得不提一下Promise,很多函數(shù)式語言比如js原生支持Promise,但是在java界也有一些promise框架,其中就有大名鼎鼎的Netty。從Future、Callback到Promise甚至線程池,Netty實現(xiàn)了一套完整的異步框架,并且netty代碼中也大量使用了Promise,下面是Netty中的例子:
package netty_promise; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; /** * netty中的promise * * @author Administrator * */ public class PromiseTest { @SuppressWarnings({ "unchecked", "rawtypes" }) public static void main(String[] args) throws Throwable { //線程池 EventExecutorGroup group = new DefaultEventExecutorGroup(1); //向線程池中提交任務,并返回Future,該Future是netty自己實現(xiàn)的future //位于io.netty.util.concurrent包下,此處運行時的類型為PromiseTask Future> f = group.submit(new Runnable() { @Override public void run() { System.out.println("任務正在執(zhí)行"); //模擬耗時操作,比如IO操作 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("任務執(zhí)行完畢"); } }); //增加監(jiān)聽 f.addListener( new FutureListener() { @Override public void operationComplete(Future arg0) throws Exception { System.out.println("ok!!!"); } }); System.out.println("main thread is running."); } }
直到jdk1.8才算真正支持了異步操作,其中借鑒了某些框架的實現(xiàn)思想,但又有新的功能,同時在jdk1.8中提供了lambda表達式,使得java向函數(shù)式語言又靠近了一步。借助jdk原生的CompletableFuture可以實現(xiàn)異步的操作,同時結合lambada表達式大大簡化了代碼量。代碼例子如下:
package netty_promise; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Supplier; /** * 基于jdk1.8實現(xiàn)任務異步處理 * * @author Administrator * */ public class JavaPromise { public static void main(String[] args) throws Throwable, ExecutionException { // 兩個線程的線程池 ExecutorService executor = Executors.newFixedThreadPool(2); //jdk1.8之前的實現(xiàn)方式 CompletableFuturefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { System.out.println("task started!"); try { //模擬耗時操作 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "task finished!"; } }, executor); //采用lambada的實現(xiàn)方式 future.thenAccept(e -> System.out.println(e + " ok")); System.out.println("main thread is running"); } }
關于java中如何實現(xiàn)異步編程就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。