真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

RxJava之subscribeOn解惑

RxJava之subscribeOn解惑

有一天,我在使用RxJava和Retrofit實(shí)現(xiàn)Android上面的網(wǎng)絡(luò)請(qǐng)求。突然,遇到了一個(gè)坑,在過(guò)了這些坑之后得到一些經(jīng)驗(yàn),覺(jué)得需要和大家分享一二。

創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),安澤企業(yè)網(wǎng)站建設(shè),安澤品牌網(wǎng)站建設(shè),網(wǎng)站定制,安澤網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營(yíng)銷(xiāo),網(wǎng)絡(luò)優(yōu)化,安澤網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競(jìng)爭(zhēng)力??沙浞譂M(mǎn)足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專(zhuān)業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶(hù)成長(zhǎng)自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。引子

用Retrofit搭配RxJava的朋友應(yīng)該都知道,一般實(shí)現(xiàn)代碼最終大都長(zhǎng)得一幅下面的樣子。

public interface BeanApi { @GET("bean/{id}") Observable getBean(@Path("id") int beanId); } BeanApi api = ···通過(guò)Retrofit的create實(shí)例化··· api.getBean(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );

上面的代碼形式相信各位都寫(xiě)得很熟了,但是我實(shí)在煩透了每個(gè)api調(diào)用的地方都寫(xiě)一次subscribOn,observeOn。然后,我找到一篇不錯(cuò)的文章——Don\'t break the chain: use RxJava\'s compose() operator,里面提到了一個(gè)方法避免這種重復(fù)代碼(其實(shí)作者本意是要體現(xiàn)“不要打破鏈?zhǔn)讲僮?rdquo;,而非避免重復(fù)代碼)。最后改進(jìn)到的代碼就長(zhǎng)下面的樣子了。

// This is a common method. Transformer applySchedulers() { return observable -> observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()); } api.getBean(1) .compose(applySchedulers()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );

改進(jìn)后的代碼比原來(lái)的代碼少了一行。但是寫(xiě)多幾次之后,我還是煩透了這個(gè)applySchedulers()。于是我瘋了,就自己實(shí)現(xiàn)一個(gè)Retrofit的CallAdapter.Factory,讓Retrotfit在每次調(diào)用api的時(shí)候自動(dòng)就給我封裝好subscribeOn和observeOn這些重復(fù)的代碼,具體實(shí)現(xiàn)可以參考我的另外一篇文章——通過(guò)委派模式包裝一個(gè)RxJavaCallAdapterFactory。最后,我的代碼就是長(zhǎng)下面這個(gè)樣子了。

api.getBean(1) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );

所有的subscribeOn和observeOn不用再寫(xiě)了。因?yàn)槊看握{(diào)用api.getBean(1),Retrotfit就調(diào)用我自定義的CallAdapter.Factory把結(jié)果封裝成Observable對(duì)象的時(shí)候就已經(jīng)把subscribeOn和observeOn添加上去了。

問(wèn)題

好,用得很爽。但是問(wèn)題問(wèn)題比辦法多,所以問(wèn)題來(lái)了。有幾個(gè)特殊的地方我需要網(wǎng)絡(luò)加載和結(jié)果監(jiān)聽(tīng)都在當(dāng)前線(xiàn)程。相信理解了上面代碼的朋友都已經(jīng)看出來(lái)了,現(xiàn)在我通過(guò)api.getBean(1)獲取到的Observable最后被訂閱都會(huì)是在io線(xiàn)程獲取網(wǎng)絡(luò)數(shù)據(jù),然后在mainThread線(xiàn)程進(jìn)行結(jié)果處理。所以,我要想個(gè)辦法出來(lái),覆蓋原來(lái)的Schedulers,包括subscribeOn的Scheduler和observeOn的Scheduler。于是,我寫(xiě)了下面的代碼。

// isAsync is a boolean variable indicate whether the request is a asynchronous or not. api.getBean(1) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } );

上面的代碼再結(jié)合我之前寫(xiě)的CallAdapter.Factory,其實(shí)就是相當(dāng)于沒(méi)有自定義CallAdapter.Factory之前顯式調(diào)用兩次subscribeOn和兩次observeOn,就像下面的樣子。

api.getBean(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(isAsync ? Schedulers.io() : Schedulers.immediate()) .observeOn(isAsync ? AndroidSchedulers.mainThread() : Schedulers.immediate()) .subscribe( bean -> { // do something on main thread. }, throwable -> { // do something on main thread. } ); 前湊

作為一名RxJava的標(biāo)準(zhǔn)菜鳥(niǎo),我被驗(yàn)證了自己的確很菜,我天真的認(rèn)為后面的subscribeOn和observeOn會(huì)覆蓋之前的Scheduler,我理所當(dāng)然的認(rèn)為,當(dāng)isAsync為true的時(shí)候,這次api的調(diào)用就會(huì)在當(dāng)前線(xiàn)程執(zhí)行網(wǎng)絡(luò)訪(fǎng)問(wèn)和結(jié)果處理。于是,我被搞瘋了。我就看了subscribeOn的源碼,如下。

public final Observable subscribeOn(Scheduler scheduler) { ··· 省略一些于邏輯閱讀不重要的代碼 return nest().lift(new OperatorSubscribeOn(scheduler)); }

nest的代碼如下,

public final Observable> nest() { return just(this); }

意思就是新建一個(gè)Observable,并且只會(huì)向訂閱者發(fā)送一個(gè)元素——原來(lái)api.getBean(1)獲得的Observale對(duì)象。所以nest操作獲得的結(jié)果是Observable>對(duì)象。好,這里記著這個(gè)東東。下面我先分析一下lift操作,然后我們?cè)倩仡^把他們串在一起。

Observable結(jié)構(gòu)

在看lift操作之前,我們稍微回顧一下Observable的創(chuàng)建方法,

final OnSubscribe onSubscribe; public final static Observable create(OnSubscribe f) { return new Observable(hook.onCreate(f)); } protected Observable(OnSubscribe f) { this.onSubscribe = f; }

其他的什么from、just等創(chuàng)建方法最后都是把數(shù)據(jù)轉(zhuǎn)化為一個(gè)OnSubscribe對(duì)象再通過(guò)上面的create方法創(chuàng)建。所以我們只關(guān)注這個(gè)create方法。上面代碼的意思很簡(jiǎn)單,就是new一個(gè)Observable對(duì)象,并且設(shè)置onSubscribe。所以這里的關(guān)鍵是onSubscribe這個(gè)對(duì)象。這里我管它做數(shù)據(jù)源,即Observable對(duì)象會(huì)用它來(lái)產(chǎn)生數(shù)據(jù),并且發(fā)布給訂閱者。

看到這里,我們可以發(fā)現(xiàn),Observable其實(shí)沒(méi)有什么,只有兩個(gè)關(guān)鍵點(diǎn):1、裝載著一個(gè)onSubscribe對(duì)象,2、有訂閱者注冊(cè)時(shí),就調(diào)用用這個(gè)onSubscribe的call(Subscriber)方法。

這里我們要看一下這個(gè)call(Subscriber)方法。該方法接受一個(gè)參數(shù)Subscriber,即訂閱者。當(dāng)有訂閱者注冊(cè)到Observable對(duì)象時(shí),Observable對(duì)象就調(diào)用onSubscribe的這個(gè)call方法,并且把當(dāng)前當(dāng)前注冊(cè)的訂閱者作為參數(shù)傳遞過(guò)去。所以在call方法的實(shí)現(xiàn)中就可以調(diào)用訂閱者的onNext方法來(lái)發(fā)布數(shù)據(jù)或者做其他事(不一定是發(fā)布數(shù)據(jù))。

lift操作說(shuō)明

先把lift操作的代碼貼出來(lái)。

public final Observable lift(final Operator operator) { return new Observable(new OnSubscribe() { @Override public void call(Subscriber o) { try { Subscriber st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so \'onStart\' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { ··· 省略一些于邏輯閱讀不重要的代碼 st.onError(e); } } catch (Throwable e) { ··· 省略一些于邏輯閱讀不重要的代碼 // as we don\'t have the operator available to us o.onError(e); } } }); }

從上面代碼第2行的new Observable可以看出,lift操作其實(shí)是新建一個(gè)Observable對(duì)象然后返回。這里加點(diǎn)高級(jí)的標(biāo)識(shí)方便下面的閱讀,被new出來(lái)的Observable對(duì)象我們叫它做派生Observable,而當(dāng)前Observable就叫父級(jí)Observable。

在上面Observable結(jié)構(gòu)一節(jié)中,我們知道每個(gè)Observable都持有一個(gè)onSubscribe對(duì)象作為數(shù)據(jù)源。通過(guò)lift方法派生所得的Observable也不例外,也有一個(gè),就是上面代碼第二行new OnSubscribe實(shí)例化的匿名對(duì)象。這個(gè)OnSubscribe雖然也是數(shù)據(jù)源,但是自己卻從來(lái)不產(chǎn)生數(shù)據(jù),也不直接向訂閱者直接發(fā)布數(shù)據(jù)。它做的事就只是把自己的訂閱者包裝成為一個(gè)父級(jí)Observable認(rèn)可的訂閱者,然后委派給父級(jí)的數(shù)據(jù)源(上面代碼第十行)。父級(jí)Observable的數(shù)據(jù)源向自己的訂閱者發(fā)布數(shù)據(jù),就是發(fā)送到被包裝出來(lái)的這個(gè)訂閱者中來(lái)。

小結(jié):到這里為止,要記住很重要的一點(diǎn),通過(guò)lift操作產(chǎn)生的派生Observable對(duì)象的數(shù)據(jù)源(onSubscribe)是不實(shí)際產(chǎn)生數(shù)據(jù)的,它做的事就只是把自己的訂閱者包裝成為一個(gè)父級(jí)Observable認(rèn)可的訂閱者,然后委派給父級(jí)的數(shù)據(jù)源。

lift操作實(shí)例分析

那么被包裝出來(lái)的這個(gè)訂閱者是怎么處理父級(jí)數(shù)據(jù)源發(fā)布的數(shù)據(jù)呢?這里就要回到上面代碼的第6行。那里通過(guò)一個(gè)我們調(diào)用lift操作時(shí)傳進(jìn)去的Operator把派生Observable認(rèn)可的Subscriber包裝成一個(gè)父級(jí)Observable認(rèn)可的Subscriber。

下面我看一個(gè)lift操作的例子,用lift模擬了兩次map操作。

代碼視圖1:

class Bean { int value; Bean(int v) { this.value = v; } } Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .lift(new Observable.Operator() { @Override public Subscriber call(Subscriber subscriber) { return new Subscriber(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(Bean bean) { subscriber.onNext(bean.value); } }; } }) .lift(new Observable.Operator() { @Override public Subscriber call(Subscriber subscriber) { return new Subscriber(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(Integer i) { subscriber.onNext(String.valueOf(i)); } }; } }) .subscribe(System.out::println);

上面的代碼中Observable的鏈?zhǔn)讲僮髌鋵?shí)是等價(jià)于下面代碼形式的,

代碼視圖2:

Observable o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)); Observable o2 = o1.lift(···); Observable o3 = o2.lift(···); Subscriber subscriber3 = new Subscriber() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(String s) { System.out.println(s); } }; o3.subscribe(subscriber3);

代碼視圖2中,我們可以發(fā)現(xiàn)這一連串的操作下來(lái),一共產(chǎn)生了3個(gè)Observable對(duì)象:o1、o2、o3。之前我們說(shuō)過(guò)每個(gè)Observable對(duì)象都會(huì)持有一個(gè)onSubscribe對(duì)象作為數(shù)據(jù)源,用來(lái)向訂閱者發(fā)布數(shù)據(jù)。我們以不同的標(biāo)識(shí)來(lái)區(qū)分一下上面三個(gè)Observable對(duì)象對(duì)應(yīng)的onSubscribe對(duì)象:o1 => onSubscribe1, o2 => onSubscribe2, o3 => onSubscribe3。

從上面lift操作說(shuō)明一節(jié),我們知道lift操作產(chǎn)生的Observable對(duì)象的數(shù)據(jù)源是不產(chǎn)生數(shù)據(jù)的,它做的事就只是把自己的訂閱者包裝成為一個(gè)父級(jí)Observable認(rèn)可的訂閱者,然后委派給父級(jí)的數(shù)據(jù)源。o3是一個(gè)通過(guò)lift操作產(chǎn)生的派生Observable,當(dāng)訂閱者subscriber3注冊(cè)到o3時(shí),o3的數(shù)據(jù)源onSubscribe3就會(huì)把subscriber3包裝成一個(gè)父級(jí)(o2)可以的訂閱者(這里命名為subscriber2),然后委派給父級(jí)數(shù)據(jù)源(onSubscribe2)。 現(xiàn)在請(qǐng)回頭看代碼視圖1的第34-49行。這一段代碼就顯示了把subscriber3包裝成為subscriber2的過(guò)程。可以發(fā)現(xiàn),被包裝出來(lái)的subscriber2只做一件事,就是把onSubscribe2發(fā)布給自己的數(shù)據(jù)轉(zhuǎn)化為subscriber3可以消化的數(shù)據(jù),然后就交給subscriber3,相當(dāng)于充當(dāng)了一個(gè)subscriber3和onSubscribe2之間的橋梁。

接著分析,onSubscribe2雖然說(shuō),通過(guò)subscriber2間接把數(shù)據(jù)發(fā)布到了subscriber3。但是實(shí)際上,作為數(shù)據(jù)源,它的持有者o2,也是通過(guò)lift操作產(chǎn)生的派生Observable,所以這個(gè)onSubscribe2也是不直接產(chǎn)生數(shù)據(jù)的。它也是把自己的訂閱者(subscriber2)包裝一個(gè)父級(jí)(o1)認(rèn)可的訂閱者(這里命名為subscriber1),然后委派給父級(jí)數(shù)據(jù)源(onSubscribe1)。 現(xiàn)在請(qǐng)?jiān)倩仡^看代碼視圖1的第13-28行。這段代碼顯示了如何把subscriber2包裝成為subscriber1的過(guò)程。同樣,subscriber1也只做一件事,就是把onSubscribe1發(fā)布給自己的數(shù)據(jù)轉(zhuǎn)化為subscriber2可以消化的數(shù)據(jù),然后就交給subscriber2,相當(dāng)于充當(dāng)了一個(gè)subscriber2和onSubscribe1之間的橋梁。

最后,整個(gè)過(guò)程可以描述為下面的一個(gè)示意圖,

subscribeOn與lift

RxJava中,lift操作幾乎貫穿了整個(gè)Observable,因?yàn)椴畈欢嗨兴械牟僮鞣际峭ㄟ^(guò)lift操作來(lái)實(shí)現(xiàn)的。比如map操作符,其實(shí)就是通過(guò)lift操作產(chǎn)生的派生Observable而已。所以這個(gè)派生Observable的數(shù)據(jù)源也就如上面我所述,自己不產(chǎn)生數(shù)據(jù),而是把自己的訂閱者包裝成一個(gè)父級(jí)認(rèn)可的訂閱者。怎么包裝呢?上面的講述中,這個(gè)包裝過(guò)程其實(shí)是通過(guò)我們調(diào)用lift操作時(shí)傳遞的參數(shù)Operator來(lái)完成的。

我們?cè)倩仡檚ubscribeOn操作符的源碼。首先,通過(guò)nest操作產(chǎn)生一個(gè)Observable>(我們?cè)瓉?lái)操作的是Observable),然后它上面調(diào)用lift操作,那么Observable>就是父級(jí)了,lift操作最終產(chǎn)生的派生Observable就是整個(gè)subscribeOn操作產(chǎn)生的結(jié)果??磗ubscribeOn操作的源碼我們可以發(fā)現(xiàn),它是通過(guò)OperatorSubscribeOn這么一個(gè)Operator來(lái)實(shí)現(xiàn)Subscriber的包裝。那么我們來(lái)看一下OperatorSubscribeOn的源碼,分析一下具體的包裝過(guò)程。

public class OperatorSubscribeOn implements Operator> { private final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; } @Override public Subscriber> call(final Subscriber subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber>(subscriber) { ···這里省略了一些于邏輯閱讀無(wú)關(guān)的代碼。 @Override public void onNext(final Observable o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } ···這里省略了一些于邏輯閱讀無(wú)關(guān)的代碼。 }); } }); } }; } }

從上面的分析中,我們知道subscribeOn操作其實(shí)是先通過(guò)nest操作產(chǎn)生一個(gè)Observable>對(duì)象,再通過(guò)lift操作產(chǎn)生派生Observable(即(Observable)對(duì)象的,所以O(shè)bservable>對(duì)象就是父級(jí)。所以O(shè)peratorSubscribeOn的職責(zé)就是為包裝一個(gè)Observable>認(rèn)可的訂閱者。被包裝出來(lái)的訂立者接受到父級(jí)發(fā)布的數(shù)據(jù)(即一個(gè)Observable對(duì)象)時(shí),它這里沒(méi)有把數(shù)據(jù)轉(zhuǎn)換成下級(jí)subscriber(即上面代碼第10行傳遞給call方法的參數(shù))可消化的數(shù)據(jù),而是通過(guò)inner對(duì)象來(lái)安裝一次訂閱。

小結(jié),經(jīng)過(guò)subscribeOn操作產(chǎn)生了一個(gè)派生Observable對(duì)象,這個(gè)對(duì)象的數(shù)據(jù)源(onSubscribe)的工作是為自己的訂閱者在某個(gè)線(xiàn)程上安排訂閱。

樣例分析

代碼視圖3

Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)) .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.immediate()) .subscribe(bean -> System.out.println(bean.value));

看過(guò)subscribeOn的源碼之后,我們應(yīng)該知道上面的代碼幾乎等價(jià)于下面的寫(xiě)法,

代碼視圖4

Observable o1 = Observable.just(new Bean(1), new Bean(2), new Bean(3), new Bean(4)); Observable> o2 = o1.nest(); Observable o3 = o2.lift(new OperatorSubscribeOn(Schedulers.io())); Observable> o4 = o3.nest(); Observable o5 = o4.lift(new OperatorSubscribeOn(Schedulers.immediate())); Subscriber subscriber4 = new Subscriber() { @Override public void onCompleted() {} @Override public void onError(Throwable e) {} @Override public void onNext(Bean s) { System.out.println(s.value); } }; o5.subscribe(subscriber5);

上面代碼的執(zhí)行過(guò)程,可以表示成如下示意圖,

通過(guò)上面示意圖可以看出,最后整個(gè)整個(gè)訂閱過(guò)程的運(yùn)行線(xiàn)程是 currentThread -> immediateThread -> ioThread。

聲名

By 啪嗒科技 AtanL(atanl@padakeji.com)

©啪嗒科技版本所有,沒(méi)有經(jīng)作者允許,只能發(fā)表于padakeji.com相關(guān)域名下。


文章標(biāo)題:RxJava之subscribeOn解惑
標(biāo)題URL:http://weahome.cn/article/cjjdoo.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部