今天就跟大家聊聊有關(guān)RxJava線程切換過程是怎樣的,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
創(chuàng)新互聯(lián)專注于企業(yè)全網(wǎng)整合營銷推廣、網(wǎng)站重做改版、金堂縣網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁面制作、商城網(wǎng)站制作、集團(tuán)公司官網(wǎng)建設(shè)、外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為金堂縣等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
線程切換過程
下面我們就來看看它的又一利器,調(diào)度器Scheduler
:就像我們所知道的,Scheduler
是給Observable
數(shù)據(jù)流添加多線程功能所準(zhǔn)備的,一般我們會(huì)通過使用subscribeOn()
、observeOn()
方法傳入對(duì)應(yīng)的Scheduler
去指定數(shù)據(jù)流的每部分操作應(yīng)該以何種方式運(yùn)行在何種線程。對(duì)于我們而言,最常見的莫過于在非主線程獲取并處理數(shù)據(jù)之后在主線程更新UI這樣的場(chǎng)景了:
這是我們十分常見的調(diào)用方法,一氣呵成就把不同線程之間的處理都搞定了,因?yàn)槭擎準(zhǔn)剿越Y(jié)構(gòu)也很清晰,我們現(xiàn)在來看看這其中的線程切換流程。
subscribeOn()
當(dāng)我們調(diào)用subscribeOn()
的時(shí)候:
可以看到這里也是調(diào)用了create()
去生成一個(gè)Observable
,而OperatorSubscribeOn
則是實(shí)現(xiàn)了OnSubscribe
接口,同時(shí)將原始的Observable
和我們需要的scheduler
傳入:
可以看出來,這里對(duì)subscriber
的處理與前文中OperatorMap
中call()
對(duì)subscriber
的處理很相似。在這里我們同樣會(huì)根據(jù)傳入的subscriber
構(gòu)造出新的Subscribers
,不過這一系列的過程大部分都是由worker通過schedule()
去執(zhí)行的,從后面setProducer()
中對(duì)于線程的判斷,再結(jié)合subscribeOn()
方法的目的我們能大概推測(cè)出,這個(gè)worker在一定程度上就相當(dāng)于一個(gè)新線程的代理執(zhí)行者,schedule()
所實(shí)現(xiàn)的與Thread類中run()應(yīng)該十分類似。我們現(xiàn)在來看看這個(gè)worker的執(zhí)行過程。
首先從Schedulers.io()
進(jìn)入:
這個(gè)通過hook拿到scheduler的過程我們先不管,直接進(jìn)CachedThreadScheduler
,看它的createWorker()
方法:
這里的pool是一個(gè)原子變量引用AtomicReference
,所持有的則是CachedWorkerPool
,因而這個(gè)pool顧名思義就是用來保存worker的緩存池啦,我們從緩存池里拿到需要的worker并作了一層封裝成為EventLoopWorker
:
在這里我們終于發(fā)現(xiàn)目標(biāo)ThreadWorker
,它繼承自NewThreadWorker
,之前的schedule()方法最終都會(huì)到這個(gè)scheduleActual()
方法里:
這里我們看到了executor線程池,我們用Schedulers.io()
最終實(shí)現(xiàn)的線程切換的本質(zhì)就在這里了?,F(xiàn)在再結(jié)合之前的過程我們從頭梳理一下:
在subscribeOn()
時(shí),我們會(huì)新生成一個(gè)Observable
,它的成員onSubscribe
會(huì)在目標(biāo)Subscriber
訂閱時(shí)使用傳入的Scheduler
的worker作為線程調(diào)度執(zhí)行者,在對(duì)應(yīng)的線程中通知原始Observable
發(fā)送消息給這個(gè)過程中臨時(shí)生成的Subscriber
,這個(gè)Subscriber
又會(huì)通知到目標(biāo)Subscriber,這樣就完成了subscribeOn()
的過程。
observeOn()
下面我們接著來看看observeOn()
:
我們直接看最終調(diào)用的部分,可以看到這里又是一個(gè)lift()
,在這里傳入了OperatorObserveOn
,它與OperatorSubscribeOn
不同,是一個(gè)Operator
(Operator
的功能我們上文中已經(jīng)講過就不贅述了),它構(gòu)造出了新的觀察者ObserveOnSubscriber
并實(shí)現(xiàn)了Action0
接口:
可以看出來,這里ObserveOnSubscriber
所有的發(fā)送給目標(biāo)Subscriber child
的消息都被切換到了recursiveScheduler
的線程作處理,也就達(dá)到了將線程切回的目的。
總結(jié)observeOn()
整體流程如下:
對(duì)比subscribeOn()
和observeOn()
這兩個(gè)過程,我們不難發(fā)現(xiàn)兩者的區(qū)別:subscribeOn()
將初始Observable
的訂閱事件整體都切換到了另一個(gè)線程;而observeOn()
則是將初始Observable
發(fā)送的消息切換到另一個(gè)線程通知到目標(biāo)Subscriber。前者把 “訂閱 + 發(fā)送” 的切換了一個(gè)線程,后者把 “發(fā)送” 切換了一個(gè)線程。所以,我們的代碼中所實(shí)現(xiàn)的功能其實(shí)是:
這樣就能很容易實(shí)現(xiàn)耗時(shí)任務(wù)在子線程操作,在主線程作更新操作等這些常見場(chǎng)景的功能啦。
Subject
Subject在Rx系列是一個(gè)比較特殊的角色,它繼承了Observable的同時(shí)也實(shí)現(xiàn)了Observer接口,也就是說它既可作為觀察者,也可作為被觀察者,他一般被用來作為連接多個(gè)不同Observable、Observer之間的紐帶??赡苣銜?huì)奇怪,我們不是已經(jīng)有了像map()
、flatMap()
這類的操作符去變化 Observable數(shù)據(jù)流了嗎,為什么還要引入Subject這個(gè)東西呢?這是因?yàn)镾ubject所承擔(dān)的工作并非是針對(duì)Observable數(shù)據(jù)流內(nèi)容的轉(zhuǎn)換連接,而是數(shù)據(jù)流本身在Observable、Observer之間的調(diào)度。光這么說可能還是很模糊,我們舉個(gè)《RxJava Essentials》中的例子:
我們通過create()
創(chuàng)建了一個(gè)PublishSubject,觀察者成功訂閱了這個(gè)subject,然而這個(gè)subject卻沒有任何數(shù)據(jù)要發(fā)送,我們只是知道他未來會(huì)發(fā)送的會(huì)是String值而已。之后,當(dāng)我們調(diào)用subject.onNext()
時(shí),消息才被發(fā)送,Observer的onNext()
被觸發(fā)調(diào)用,輸出了"Hello World"。
這里我們注意到,當(dāng)訂閱事件發(fā)生時(shí),我們的subject是沒有產(chǎn)生數(shù)據(jù)流的,直到它發(fā)射了"Hello World",數(shù)據(jù)流才開始運(yùn)轉(zhuǎn),試想我們?nèi)绻麑⒂嗛嗊^程和subject.onNext()
調(diào)換一下位置,那么Observer就一定不會(huì)接受到"Hello World"了(這不是廢話嗎- -|||),因而這也在根本上反映了Observable的冷熱區(qū)別。
一般而言,我們的Observable都屬于Cold Observables,就像看視頻,每次點(diǎn)開新視頻我們都要從頭開始播放;而Subject
則默認(rèn)屬于Hot Observables,就像看直播,視頻數(shù)據(jù)永遠(yuǎn)都是新的。
基于這種屬性,Subject
自然擁有了對(duì)接收到的數(shù)據(jù)流進(jìn)行選擇調(diào)度等的能力了,因此,我們對(duì)于Subject
的使用也就通?;谌缦碌乃悸罚?/p>
在前面的例子里我們用到的是PublishSubject,它只會(huì)把在訂閱發(fā)生的時(shí)間點(diǎn)之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者。等一下,這功能聽起來是不是有些似曾相識(shí)呢?
沒錯(cuò),就是EventBus和Otto。(RxJava的出現(xiàn)慢慢讓Otto退出了舞臺(tái),現(xiàn)在Otto的Repo已經(jīng)是Deprecated狀態(tài)了,而EventBus依舊堅(jiān)挺)基于RxJava的觀察訂閱取消的能力和PublishSubject的功能,我們十分容易就能寫出實(shí)現(xiàn)了最基本功能的簡(jiǎn)易事件總線框架:
當(dāng)然Subject還有其他如BehaviorSubject
、ReplaySubject
、AsyncSubject
等類型,大家可以去看官方文檔,寫得十分詳細(xì),這里就不介紹了。
前面相信最近這段日子里,提到RxJava,大家就會(huì)想到Google最近剛剛開源的Agera。Agera作為專門為Android打造的Reactive Programming框架,難免會(huì)被拿來與RxJava做對(duì)比。本文前面RxJava的主體流程分析已近尾聲,現(xiàn)在我們?cè)賮砜纯碅gera這東東又是怎么一回事。
首先先上結(jié)論:
Agera最初是為了Google Play Movies而開發(fā)的一個(gè)內(nèi)部框架,現(xiàn)在開源出來了,它雖然是在RxJava之后才出現(xiàn),但是完全獨(dú)立于RxJava,與它沒有任何關(guān)系(只不過開源的時(shí)間十分微妙罷了233333)。 與RxJava比起來,Agera更加專注于Android的生命周期,而RxJava則更加純粹地面向Java平臺(tái)而非Android。
也許你可能會(huì)問:“那么RxAndroid呢,不是還有它嗎?”事實(shí)上,RxAndroid早在1.0版本的時(shí)候就進(jìn)行了很大的重構(gòu),很多模塊被拆分到其他的項(xiàng)目中去了,同時(shí)也刪除了部分代碼,僅存下來的部分多是和Android線程相關(guān)的部分,比如AndroidSchedulers
、MainThreadSubscription
等。鑒于這種情況,我們暫且不去關(guān)注RxAndroid,先把目光放在Agera上。
同樣也是基于觀察者模式,Agera和RxJava的角色分類大致相似,在Agera中,主要角色有兩個(gè):Observable
(被觀察者)、Updatable
(觀察者)。
是的,相較于RxJava中的Observable
,Agera中的Observable
只是一個(gè)簡(jiǎn)單的接口,也沒有范性的存在,Updatable亦是如此,這樣我們要如何做到消息的傳遞呢?這就需要另外一個(gè)接口了:
終于看到了泛型T,我們的消息的傳遞能力就是依賴于此接口了。所以我們將這個(gè)接口和基礎(chǔ)的Observable
結(jié)合一下:
這里的Repository
在一定程度上就是我們想要的RxJava中的Observable
啦。類似地,Repository也有兩種類型的實(shí)現(xiàn):
Direct - 所包含的數(shù)據(jù)總是可用的或者是可被同步計(jì)算出來的;一個(gè)Direct的Repository總是處于活躍(active)狀態(tài)下
Deferred - 所包含的數(shù)據(jù)是異步計(jì)算或拉去所得;一個(gè)Deffered的Repository直到有Updatable被添加進(jìn)來之前都會(huì)是非活躍(inactive)狀態(tài)下
是不是感到似曾相識(shí)呢?沒錯(cuò),Repository也是有冷熱區(qū)分的,不過我們現(xiàn)在暫且不去關(guān)注這一點(diǎn)。回到上面接著看,既然現(xiàn)在發(fā)數(shù)據(jù)的角色有了,那么我們要如何接收數(shù)據(jù)呢?答案就是Receiver
:
相信看到這里,大家應(yīng)該也隱約感覺到了:在Agera的世界里,數(shù)據(jù)的傳輸與事件的傳遞是相互隔離開的,這是目前Agera與Rx系列的最大本質(zhì)區(qū)別。Agera所使用的是一種push event, pull data的模型,這意味著event并不會(huì)攜帶任何data,Updatable
在需要更新時(shí),它自己會(huì)承擔(dān)起從數(shù)據(jù)源拉取數(shù)據(jù)的任務(wù)。這樣,提供數(shù)據(jù)的責(zé)任就從Observable
中拆分了出來交給了Repository
,讓其自身能夠?qū)W⒂诎l(fā)送一些簡(jiǎn)單的事件如按鈕點(diǎn)擊、一次下拉刷新的觸發(fā)等等。
那么,這樣的實(shí)現(xiàn)有什么好處呢?
當(dāng)這兩種處理分發(fā)邏輯分離開時(shí),Updatable
就不必觀察到來自Repository
的完整數(shù)據(jù)變化的歷史,畢竟在大多數(shù)場(chǎng)景下,尤其是更新UI的場(chǎng)景下,最新的數(shù)據(jù)往往才是有用的數(shù)據(jù)。
但是我就是需要看到變化的歷史數(shù)據(jù),怎么辦?
不用擔(dān)心,這里我們?cè)僬?qǐng)出一個(gè)角色Reservoir
:
顧名思義,Reservoir
就是我們用來存儲(chǔ)變化中的數(shù)據(jù)的地方,它繼承了Receiver
、Repository
,也就相當(dāng)于同時(shí)具有了接收數(shù)據(jù),發(fā)送數(shù)據(jù)的能力。通過查看其具體實(shí)現(xiàn)我們可以知道它的本質(zhì)操作都是使用內(nèi)部的Queue實(shí)現(xiàn)的:通過accept()接收到數(shù)據(jù)后入列,通過get()
拿到數(shù)據(jù)后出列。若一個(gè)Updatable
觀察了此Reservoir
,其隊(duì)列中發(fā)生調(diào)度變化后即將出列的下一個(gè)數(shù)據(jù)如果是可用的(非空),就會(huì)通知該Updatable,進(jìn)一步拉取這個(gè)數(shù)據(jù)發(fā)送給Receiver
。
現(xiàn)在,我們已經(jīng)大概了解了這幾個(gè)角色的功能屬性了,接下來我們來看一段官方示例代碼:
是不是有些云里霧里的感覺呢?多虧有注釋,我們大概能夠猜出到底上面都做了什么:使用需要的圖片規(guī)格作為參數(shù)拼接到url中,拉取對(duì)應(yīng)的圖片并用ImageView顯示出來。我們結(jié)合API來看看整個(gè)過程:
Repositories.repositoryWithInitialValue(Result.absent())
創(chuàng)建一個(gè)可運(yùn)行(抑或說執(zhí)行)的repository。
初始化傳入值是Result,它用來概括一些諸如apply()
、merge()
的操作的結(jié)果的不可變對(duì)象,并且存在兩種狀態(tài)succeeded()
、failed()
。
返回REventSource
observe()
用于添加新的Observable作為更新我們的圖片的Event source,本例中不需要。
返回RFrequency
onUpdatesPerLoop()
在每一個(gè)Looper Thread loop中若有來自多個(gè)Event Source的update()處理時(shí),只需開啟一個(gè)數(shù)據(jù)處理流。
返回RFlow
getFrom(new Supplier(…))
忽略輸入值,使用來自給定Supplier的新獲取的數(shù)據(jù)作為輸出值。
返回RFlow
goTo(executor)
切換到給定的executor繼續(xù)數(shù)據(jù)處理流。
attemptTransform(function())
使用給定的function()變換輸入值,若變換失敗,則終止數(shù)據(jù)流;若成功,則取新的變換后的值作為當(dāng)前流指令的輸出。
返回RTermination
orSkip()
若前面的操作檢查為失敗,就跳過剩下的數(shù)據(jù)處理流,并且不會(huì)通知所有已添加的Updatable。
thenTransform(function())
與attemptTransform(function())相似,區(qū)別在于當(dāng)必要時(shí)會(huì)發(fā)出通知。
返回RConfig
onDeactivation(SEND_INTERRUPT)
用于明確repository不再active時(shí)的行為。
返回RConfig
compile()
執(zhí)行這個(gè)repository。
返回Repository
整體流程乍看起來并沒有什么特別的地方,但是真正的玄機(jī)其實(shí)藏在執(zhí)行每一步的返回值里:
初始的REventSource
代表著事件源的開端,它從傳入值接收了T initialValue
,這里的中,第一個(gè)T是當(dāng)前repository的數(shù)據(jù)的類型,第二個(gè)T則是數(shù)據(jù)處理流開端的時(shí)候的數(shù)據(jù)的類型。
之后,當(dāng)observe()調(diào)用后,我們傳入事件源給REventSource
,相當(dāng)于設(shè)定好了需要的事件源和對(duì)應(yīng)的開端,這里返回的是RFrequency
,它繼承自REventSource
,為其添加了事件源的發(fā)送頻率的屬性。
之后,我們來到了onUpdatesPerLoop()
,這里明確了所開啟的數(shù)據(jù)流的個(gè)數(shù)(也就是前面所講的頻率)后,返回了RFlow,這里也就意味著我們的數(shù)據(jù)流正式生成了。同時(shí),這里也是流式調(diào)用的起點(diǎn)。
拿到我們的RFlow之后,我們就可以為其提供數(shù)據(jù)源了,也就是前面說的Supplier
,于是調(diào)用getFrom()
,這樣我們的數(shù)據(jù)流也就真正意義擁有了數(shù)據(jù)“干貨”。
有了數(shù)據(jù)之后我們就可以按具體需要進(jìn)行數(shù)據(jù)轉(zhuǎn)換了,這里我們可以直接使用transform()
,返回RFlow,以便進(jìn)一步進(jìn)行流式調(diào)用;也可以調(diào)用attemptTransform()來對(duì)可能出現(xiàn)的異常進(jìn)行處理,比如orSkip()、orEnd()之后繼續(xù)進(jìn)行流式調(diào)用。
經(jīng)過一系列的流式調(diào)用之后,我們終于對(duì)數(shù)據(jù)處理完成啦,現(xiàn)在我們可以選擇先對(duì)成型的數(shù)據(jù)在做一次最后的包裝thenTransform()
,或是與另一個(gè)Supplier合并thenMergeIn()
等。這些處理之后,我們的返回值也就轉(zhuǎn)為了RConfig,進(jìn)入了最終配置和repository聲明結(jié)束的狀態(tài)。
在最終的這個(gè)配置過程中,我們調(diào)用了onDeactivation()
,為這個(gè)repository明確了最終進(jìn)入非活躍狀態(tài)時(shí)的行為,如果不需要其他多余的配置的話,我們就可以進(jìn)入最終的compile()
方法了。當(dāng)我們調(diào)用compile()
時(shí),就會(huì)按照前面所走過的所有流程與配置去執(zhí)行并生成這個(gè)repository。到此,我們的repository才真正被創(chuàng)建了出來。
以上就是repository從無到有的全過程。當(dāng)repository誕生后,我們也就可以傳輸需要的數(shù)據(jù)啦。再回到上面的示例代碼:
我們?cè)?code>onResume()、onPause()
這兩個(gè)生命周期下分別添加、移除了Updatable。相較于RxJava中通過Subscription去取消訂閱的做法,Agera的這種寫法顯然更為清晰也更為整潔。我們的Activity實(shí)現(xiàn)了Updatable和Receiver接口,直接看其實(shí)現(xiàn)方法:
可以看到這里repository
將數(shù)據(jù)發(fā)送給了receiver
,也就是自己,在對(duì)應(yīng)的accept()方法中接收到我們想要的bitmap后,這張圖片也就顯示出來了,示例代碼中的完整流程也就結(jié)束了。
總結(jié)一下上述過程:
首先Repositories.repositoryWithInitialValue()
生成原點(diǎn)REventSource。
配置完Observable之后進(jìn)入RFrequency狀態(tài),接著配置數(shù)據(jù)流的流數(shù)。
前面配置完成后,數(shù)據(jù)流RFlow生成,之后通過getFrom()
、mergeIn()
、transform()等方法可進(jìn)一步進(jìn)行流式調(diào)用;也可以使用attemptXXX()
方法代替原方法,后面接著調(diào)用orSkip()
、orEnd()
進(jìn)行error handling
處理。當(dāng)使用attemptXXX()
方法時(shí),數(shù)據(jù)流狀態(tài)會(huì)變?yōu)镽Termination,它代表此時(shí)的狀態(tài)已具有終結(jié)數(shù)據(jù)流的能力,是否終結(jié)數(shù)據(jù)流要根據(jù)failed check觸發(fā),結(jié)合后面跟著調(diào)用的orSkip()
、orEnd()
,我們的數(shù)據(jù)流會(huì)從RTermination
再次切換為RFlow
,以便進(jìn)行后面的流式調(diào)用。
經(jīng)過前面一系列的流式處理,我們需要結(jié)束數(shù)據(jù)流時(shí),可以選擇調(diào)用thenXXX()
方法,對(duì)數(shù)據(jù)流進(jìn)行最終的處理,處理之后,數(shù)據(jù)流狀態(tài)會(huì)變?yōu)?RConfig;也可以為此行為添加error handling處理,選擇thenAttemptXXX()
方法,后面同樣接上orSkip()
、orEnd()
即可,最終數(shù)據(jù)流也會(huì)轉(zhuǎn)為Rconfig狀態(tài)。
此時(shí),我們可以在結(jié)束前按需要選擇對(duì)數(shù)據(jù)流進(jìn)行最后的配置,例如:調(diào)用onDeactivation()
配置從“訂閱”到“取消訂閱”的過程是否需要繼續(xù)執(zhí)行數(shù)據(jù)流等等。
一切都部署完畢后,我們compile()
這個(gè)RConfig,得到最終的成型的Repository,它具有添加Updatable、發(fā)送數(shù)據(jù)通知Receiver的能力。
我們根據(jù)需要添加Updatable
,repository
在數(shù)據(jù)流處理完成后會(huì)通過update()
發(fā)送event通知Updatable
。
Updatable收到通知后則會(huì)拉取repository的成果數(shù)據(jù),并將數(shù)據(jù)通過accept()發(fā)送給Receiver。完成 Push event, pull data 的流程。
看完上述內(nèi)容,你們對(duì)RxJava線程切換過程是怎樣的有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。