本篇內容主要講解“RxJava怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“RxJava怎么使用”吧!
創(chuàng)新互聯(lián)公司于2013年開始,是專業(yè)互聯(lián)網(wǎng)技術服務公司,擁有項目網(wǎng)站設計、成都網(wǎng)站設計網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元拉薩做網(wǎng)站,已為上家服務,為拉薩各地企業(yè)和個人服務,聯(lián)系電話:18982081108
本次詩函通過打印日志來觀察代碼執(zhí)行情況,會打印時間和執(zhí)行線程,這里用的是slf4j+log4j的方式;
工程創(chuàng)建完畢后,結構如下:
log4j.propertieds文件的位置請注意,需要放在上圖紅框位置;
為了在日志中打印當前線程,log4j的配置如上圖綠框所示, %t表示當前線程, %r表示程序已經(jīng)執(zhí)行的時間;
在pom文件中,對日志的依賴為:
org.slf4j slf4j-log4j12 1.8.0-alpha2
驗證代碼是通過單元測試實現(xiàn)的,pom.xml文件中,對單元測試的依賴為:
junit junit 3.8.1 test
單元測試代碼在如下圖紅框位置:
支持lambda表達式表現(xiàn)在maven支持和intellij idea工具支持兩個方面,具體設置請參照《設置Intellij idea和maven,支持lambda表達式》
準備工作結束,可以正式開發(fā)了
依賴庫選用1.0.10版本,如下:
io.reactivex rxjava 1.0.10
第一個例子,我們實踐最簡單的用法:
創(chuàng)建App.java類,聲明日志服務:
public class App { private static final Logger logger = LoggerFactory.getLogger(App.class);
開發(fā)doExecute方法實現(xiàn)基于Rxjava的觀察者模式:
public void doExecute(){ logger.debug("start doExecute"); //聲明一個觀察者,用來響應被觀察者發(fā)布的事件 Observerobserver = new Observer () { /** * 被觀察者發(fā)布結束事件的時候,該方法會被調用 */ public void onCompleted() { logger.debug("start onCompleted"); } /** * 被觀察者發(fā)布事件期間,和觀察者處理事件期間,發(fā)生異常的時候,該方法都會被調用 */ public void onError(Throwable throwable) { logger.debug("start onError : " + throwable); } /** * 被觀察者發(fā)布事件后,該方法會被調用 * @param s */ public void onNext(String s) { logger.debug("start onNext [" + s + "]"); } }; Observable observable = Observable.create(new Observable.OnSubscribe () { public void call(Subscriber super String> subscriber) { //向觀察者發(fā)布事件 subscriber.onNext("Hello"); //再次向觀察者發(fā)布事件 subscriber.onNext("world"); //通知觀察者,訂閱結束 subscriber.onCompleted(); } }); logger.debug("try subscribe"); //執(zhí)行訂閱 observable.subscribe(observer); logger.debug("finish doExecute"); }
代碼的邏輯很簡單,定義觀察者(observer),被觀察者(observable),執(zhí)行訂閱; 3. 本次測試用junit來執(zhí)行,在test目錄下創(chuàng)建一個AppTest類,具體的目錄和內容如下圖:
打開控制臺,在pom.xml文件所在目錄下執(zhí)行mvn test,即可看到日志:
2017-06-10 10:02:02 [ main:0 ] - [ DEBUG ] start doExecute 2017-06-10 10:02:02 [ main:19 ] - [ DEBUG ] try subscribe 2017-06-10 10:02:02 [ main:22 ] - [ DEBUG ] start onNext [Hello] 2017-06-10 10:02:02 [ main:22 ] - [ DEBUG ] start onNext [world] 2017-06-10 10:02:02 [ main:22 ] - [ DEBUG ] start onCompleted 2017-06-10 10:02:02 [ main:23 ] - [ DEBUG ] finish doExecute
執(zhí)行的代碼是observable.subscribe,此代碼執(zhí)行后,觀察者的onNext和onCompleted被回調;
在上面的doExecute方法中,我們創(chuàng)建的被觀察者實現(xiàn)了onNext,onError,onCompleted這三個方法,有的場景下我們只關注onNext,對onError和onCompleted都不關心,此時我們可以使用Action1對象來替代Observer,代碼如下:
public void doAction(){ logger.debug("start doAction"); Action1onNextAction = new Action1 () { public void call(String s) { logger.debug("start Action1 onNextAction [" + s + "]"); } }; Observable observable = Observable.create(new Observable.OnSubscribe () { public void call(Subscriber super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("world"); subscriber.onCompleted(); } }); logger.debug("try subscribe"); observable.subscribe(onNextAction); logger.debug("finish doAction"); }
可以看到,只要一個Action1對象即可;
另外,對于錯誤回調也可以用Action1來實現(xiàn),事件完成的回調用Action0,Action0的特點是方法沒有返回,對于的這些Action,observable.subscribe方法提供了各種重載,我們可以按照自己需要來決定使用哪種,傳入哪些Action;
在上面的doExecute方法中,被觀察者發(fā)布了兩個事件:onNext("Hello")和onNext("world"),我們創(chuàng)建被觀察者是通過Observable.create,然后在call方法中寫入onNext("Hello"),onNext("world")最后在寫上subscriber.onCompleted(),對于這種發(fā)布確定的對象事件的場景,rxjava已經(jīng)做了簡化,直接上代碼:
public void doFromChain(){ logger.debug("start doFromChain"); //聲明一個觀察者,用來響應被觀察者發(fā)布的事件 Action1observer = new Action1 () { /** * 被觀察者發(fā)布事件后,該方法會被調用 * @param s */ public void call(String s) { logger.debug("start onNext [" + s + "]"); } }; String[] array = {"Hello", "world"}; //from方法可以直接創(chuàng)建被觀察者,并且發(fā)布array中的元素對應的事件 Observable.from(array).subscribe(observer); logger.debug("finish doFromChain"); }
如上代碼,之前我們創(chuàng)建被觀察者,并且在call方法中依次執(zhí)行onNext的操作,這些事情都被Observable.from(array)簡化了;
Observable.from接受的是一個數(shù)組,而Observable.just可以直接接受多個元素,我們連創(chuàng)建數(shù)組的步驟都省略掉了,再把Action1簡化為lambda,可以得到更加簡化的代碼:
public void doJustChain(){ logger.debug("start doJustChain"); Observable.just("Hello", "world") .subscribe(s -> logger.debug("start onNext [" + s + "]")); logger.debug("finish doJustChain"); }
經(jīng)歷了以上的實戰(zhàn),我們對Rxjava的基本能力有了了解,下面了解一些更復雜的用法;
試想,如果被觀察者發(fā)布的事件是int型,但是觀察者是處理String型事件的,那么此觀察者如何才能處理被觀察者發(fā)布的事件呢,除了修改觀察者或者被觀察者的代碼,我們還可以使用Rxjava的變換方法-map:
public void doMap(){ logger.debug("start doMap"); Observable.just(1001, 1002) .map(intValue -> "int[" + intValue + "]") .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]")); logger.debug("finish doMap"); }
代碼中可以看到,map方法接受的是Func1接口的實現(xiàn),由于此接口只聲明了一個方法,所以這里被簡化成了lambda表達式,lambda表達式的入?yún)⒂蒵ust的入?yún)㈩愋屯茢喽鴣?,是int型,返回的是字符串,后面的代碼就可以直接用String型的消費者來處理事件了;
map方法提供了一對一的映射,但是實際場景中未必是一對一的,例如一個int數(shù)字要發(fā)起兩個String事件,map就不合適了,RxJava還有個flatMap方法,可以提供這種能力,此處沒用lambda來簡化,可以看的更清楚:
public void doFlatMap(){ logger.debug("start doFlatMap"); Observable.just(101, 102, 103) .flatMap(new Func1>() { public Observable call(final Integer integer) { return Observable.create(new Observable.OnSubscribe () { public void call(Subscriber super String> subscriber) { subscriber.onNext("after flatMap (" + integer + ")"); subscriber.onNext("after flatMap (" + (integer+1000) + ")"); } }); } }) .subscribe(s -> logger.debug("Action1 call invoked [" + s + "]")); logger.debug("finish doFlatMap"); }
可以看到,被觀察者發(fā)布了三個int事件:101, 102, 103,在flatMap中訂閱了這三個事件,每個事件都可以新建一個被觀察者,這個被觀察者拿到了101,102,103,然后可以按實際需求,選擇發(fā)布一個或者多個String事件,甚至不發(fā)布,這里發(fā)布的事件,都會被觀察者收到;
Rxjava可以指定被觀察者發(fā)布事件的線程,也可以制定觀察者處理事件的線程:
public void doSchedule(){ logger.debug("start doSchedule"); Observable.create(subscriber -> { logger.debug("enter subscribe"); subscriber.onNext("Hello"); subscriber.onCompleted(); }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .flatMap(str -> { logger.debug("enter flatMap"); return Observable.create( subscriber -> subscriber.onNext("after flatMap (" + str + ")") ); } ) .observeOn(Schedulers.newThread()) .subscribe(s -> logger.debug("Observer's onNext invoked [" + s + "]")); logger.debug("finish doSchedule"); }
subscribeOn()方法指定了被觀察者發(fā)布事件的時候使用io類型的線程處理,參數(shù)Schedulers.io()表示指定的線程來自內部實現(xiàn)的一個無數(shù)量上限的線程池,可以重用空閑的線程,適合處理io相關的業(yè)務,特點是等待時間長,cup占用低;
observeOn()方法表示觀察者處理事件的時候使用新線程處理,Schedulers.newThread()表示總是啟用新線程,并在新線程執(zhí)行操作; 上面代碼用了兩次observeOn,分別用來指定flatMap中處理事件以及觀察者中處理事件的線程;
執(zhí)行代碼的結果:
2017-06-10 12:15:42 [ main:0 ] - [ DEBUG ] start doSchedule 2017-06-10 12:15:42 [ RxCachedThreadScheduler-1:156 ] - [ DEBUG ] enter subscribe 2017-06-10 12:15:42 [ main:156 ] - [ DEBUG ] finish doSchedule 2017-06-10 12:15:42 [ RxNewThreadScheduler-2:157 ] - [ DEBUG ] enter flatMap 2017-06-10 12:15:42 [ RxNewThreadScheduler-1:164 ] - [ DEBUG ] Observer's onNext invoked [after flatMap (Hello)]
RxCachedThreadScheduler-1:156表示來自線程池的緩存線程; RxNewThreadScheduler-2:157和RxNewThreadScheduler-1:164表示新的線程;
常用的參數(shù)類型還有: Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程; Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
到此,相信大家對“RxJava怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續(xù)學習!