真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

使用RxJava怎么實現(xiàn)消息發(fā)送和線程切換-創(chuàng)新互聯(lián)

本篇文章給大家分享的是有關(guān)使用RxJava怎么實現(xiàn)消息發(fā)送和線程切換,小編覺得挺實用的,因此分享給大家學(xué)習(xí),希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

成都創(chuàng)新互聯(lián)公司2013年成立,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項目網(wǎng)站制作、做網(wǎng)站網(wǎng)站策劃,項目實施與項目整合能力。我們以讓每一個夢想脫穎而出為使命,1280元愛民做網(wǎng)站,已為上家服務(wù),為愛民各地企業(yè)和個人服務(wù),聯(lián)系電話:028-86922220

消息訂閱發(fā)送

首先讓我們看看消息訂閱發(fā)送最基本的代碼組成:

 Observable observable = Observable.create(new ObservableOnSubscribe() {
      @Override
      public void subscribe(ObservableEmitter emitter) throws Exception {
        emitter.onNext("Jack1");
        emitter.onNext("Jack2");
        emitter.onNext("Jack3");
        emitter.onComplete();
      }
    });

    Observer observer = new Observer() {
      @Override
      public void onSubscribe(Disposable d) {
        Log.d(TAG, "onSubscribe");
      }

      @Override
      public void onNext(String s) {
        Log.d(TAG, "onNext : " + s);
      }

      @Override
      public void onError(Throwable e) {
        Log.d(TAG, "onError : " + e.toString());
      }

      @Override
      public void onComplete() {
        Log.d(TAG, "onComplete");
      }
    };

    observable.subscribe(observer);

代碼很簡單,observable為被觀察者,observer為觀察者,然后通過observable.subscribe(observer),把觀察者和被觀察者關(guān)聯(lián)起來。被觀察者發(fā)送消息(emitter.onNext("內(nèi)容")),觀察者就可以在onNext()方法里回調(diào)出來。

我們先來看Observable,創(chuàng)建是用Observable.create()方法進行創(chuàng)建,源碼如下:

public static  Observable create(ObservableOnSubscribe source) {
  ObjectHelper.requireNonNull(source, "source is null");
  return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}

public static  T requireNonNull(T object, String message) {
  if (object == null) {
     throw new NullPointerException(message);
  }
  return object;
 }

public static  Observable onAssembly(@NonNull Observable source) {
  Function f = onObservableAssembly;
  if (f != null) {
     return apply(f, source);
  }
  return source;
}

可以看出,create()方法里最主要的還是創(chuàng)建用ObservableOnSubscribe傳入創(chuàng)建了一個ObservableCreate對象并且保存而已。

public final class ObservableCreate extends Observable {
  final ObservableOnSubscribe source;

  public ObservableCreate(ObservableOnSubscribe source) {
    this.source = source;
  }

}

接著是創(chuàng)建Observer,這比較簡單只是單純創(chuàng)建一個接口對象而已

public interface Observer {
  void onSubscribe(@NonNull Disposable d);

  void onNext(@NonNull T t);

  void onError(@NonNull Throwable e);
  
  void onComplete();
}

訂閱發(fā)送消息

observable.subscribe(observer)的subscribe方法如下:

public final void subscribe(Observer observer) {
  ObjectHelper.requireNonNull(observer, "observer is null");
  try {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    subscribeActual(observer);
  } catch (NullPointerException e) { // NOPMD
    throw e;
  } catch (Throwable e) {
    Exceptions.throwIfFatal(e);
    RxJavaPlugins.onError(e);
    NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
    npe.initCause(e);
    throw npe;
  }
}

//ObjectHelper.requireNonNull()方法
public static  T requireNonNull(T object, String message) {
  if (object == null) {
     throw new NullPointerException(message);
  }
  return object;
}

//RxJavaPlugins.onSubscribe()方法
public static  Observer onSubscribe(@NonNull Observable source, @NonNull Observer observer) {
  BiFunction f = onObservableSubscribe;
  if (f != null) {
    return apply(f, source, observer);
  }
  return observer;
}

從上面源碼可以看出requireNonNull()只是做非空判斷而已,而RxJavaPlugins.onSubscribe()也只是返回最終的觀察者而已。所以關(guān)鍵代碼是抽象方法subscribeActual(observer);那么subscribeActual對應(yīng)哪個代碼段呢?

還記得Observable.create()創(chuàng)建的ObservableCreate類嗎,這就是subscribeActual()具體實現(xiàn)類,源碼如下:

protected void subscribeActual(Observer observer) {
  CreateEmitter parent = new CreateEmitter(observer);
  observer.onSubscribe(parent);
  try {
    source.subscribe(parent);
  } catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);
    parent.onError(ex);
  }
}

從上面的代碼可以看出,首先創(chuàng)建了一個CreateEmitter對象并傳入observer,然后回到observer的onSubscribe()方法,而source就是我們之前創(chuàng)建ObservableCreate傳入的ObservableOnSubscribe對象。

class CreateEmitter extends AtomicReference
  implements ObservableEmitter, Disposable {

 }

而CreateEmitter又繼承ObservableEmitter接口,又回調(diào)ObservableOnSubscribe的subscribe方法,對應(yīng)著我們的:

Observable observable = Observable.create(new ObservableOnSubscribe() {
   @Override
   public void subscribe(ObservableEmitter emitter) throws Exception {
      emitter.onNext("Jack1");
      emitter.onNext("Jack2");
      emitter.onNext("Jack3");
      emitter.onComplete();
   }
});

當(dāng)它發(fā)送消息既調(diào)用emitter.onNext()方法時,既調(diào)用了CreateEmitter的onNext()方法:

public void onNext(T t) {
  if (t == null) {
    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
    return;
  }
  if (!isDisposed()) {
    observer.onNext(t);
  }
}

可以看到最終又回調(diào)了觀察者的onNext()方法,把被觀察者的數(shù)據(jù)傳輸給了觀察者。有人會問

isDisposed()是什么意思,是判斷要不要終止傳遞的,我們看emitter.onComplete()源碼:

public void onComplete() {
  if (!isDisposed()) {
    try {
      observer.onComplete();
    } finally {
      dispose();
    }
  }
}

public static boolean dispose(AtomicReference field) {
    Disposable current = field.get();
    Disposable d = DISPOSED;
    if (current != d) {
      current = field.getAndSet(d);
      if (current != d) {
        if (current != null) {
          current.dispose();
        }
        return true;
      }
    }
    return false;
 }

public static boolean isDisposed(Disposable d) {
    return d == DISPOSED;
}

dispose()方法是終止消息傳遞,也就付了個DISPOSED常量,而isDisposed()方法就是判斷這個常量而已。這就是整個消息訂閱發(fā)送的過程,用的是觀察者模式。

線程切換

在上面模板代碼的基礎(chǔ)上,線程切換只是改變了如下代碼:

observable.subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(observer);

下面我們對線程切換的源碼進行一下分析,分為兩部分:subscribeOn()和observeOn()

subscribeOn()

首先是subscribeOn()源碼如下:

public final Observable subscribeOn(Scheduler scheduler) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  return RxJavaPlugins.onAssembly(new ObservableSubscribeOn(this, scheduler));
}

我們傳進去了一個Scheduler類,Scheduler是一個調(diào)度類,能夠延時或周期性地去執(zhí)行一個任務(wù)。

Scheduler有如下類型:

類型使用方式含義使用場景
IoSchedulerSchedulers.io()io操作線程讀寫SD卡文件,查詢數(shù)據(jù)庫,訪問網(wǎng)絡(luò)等IO密集型操作
NewThreadSchedulerSchedulers.newThread()創(chuàng)建新線程耗時操作等
SingleSchedulerSchedulers.single()單例線程只需一個單例線程時
ComputationSchedulerSchedulers.computation()CPU計算操作線程圖片壓縮取樣、xml,json解析等CPU密集型計算
TrampolineSchedulerSchedulers.trampoline()當(dāng)前線程需要在當(dāng)前線程立即執(zhí)行任務(wù)時
HandlerSchedulerAndroidSchedulers.mainThread()Android主線程更新UI等

接著就沒什么了,只是返回一個ObservableSubscribeOn對象而已。

observeOn()

首先看源碼如下:

public final Observable observeOn(Scheduler scheduler) {
  return observeOn(scheduler, false, bufferSize());
}

public final Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  ObjectHelper.requireNonNull(scheduler, "scheduler is null");
  ObjectHelper.verifyPositive(bufferSize, "bufferSize");
  return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

這里也是沒什么,只是最終返回一個ObservableObserveOn對象而已。

接著還是像原來那樣調(diào)用subscribe()方法進行訂閱,看起來好像整體變化不大,就是封裝了一些對象而已,不過著恰恰是RxJava源碼的精華,當(dāng)他再次調(diào)用subscribeActual()方法時,已經(jīng)不是之前的ObservableCreate()里subscribeActual方法了,而是最先調(diào)用ObservableObserveOn的subscribeActual()方法,對應(yīng)源碼如下:

protected void subscribeActual(Observer observer) {
  if (scheduler instanceof TrampolineScheduler) {
    source.subscribe(observer);
  } else {
    Scheduler.Worker w = scheduler.createWorker();
    source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
  }
}

在這里有兩點要講,一點是ObserveOnObserver是執(zhí)行觀察者的線程,后面還會詳解,然后就是source.subscribe,這個source.subscribe調(diào)的是ObservableSubscribeOn的subscribe方法,而subscribe方法因為繼承的也是Observable,是Observable里的方法,所以和上面的ObservableCreate一樣的方法,所以會調(diào)用ObservableSubscribeOn里的subscribeActual()方法,對應(yīng)的代碼如下:

public void subscribeActual(final Observer s) {
  final SubscribeOnObserver parent = new SubscribeOnObserver(s);
  s.onSubscribe(parent);
  parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

上面代碼中,首先把ObserveOnObserver返回給來的用SubscribeOnObserver“包裝”起來,然后在回調(diào)Observer的onSubscribe(),就是對應(yīng)模板代碼的onSubscribe()方法。

接著看SubscribeTask類的源碼:

final class SubscribeTask implements Runnable {
  private final SubscribeOnObserver parent;
  SubscribeTask(SubscribeOnObserver parent) {
    this.parent = parent;
  }
  @Override
  public void run() {
    source.subscribe(parent);
  }
}

其中的source.subscribe(parent),就是我們執(zhí)行子線程的回調(diào)方法,對應(yīng)我們模板代碼里的被觀察者的subscribe()方法。它放在run()方法里,并且繼承Runnable,說明這個類主要是線程運行。接著看scheduler.scheduleDirect()方法對應(yīng)的源碼如下:

public Disposable scheduleDirect(@NonNull Runnable run) {
  return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
  final Worker w = createWorker();
  final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
  DisposeTask task = new DisposeTask(decoratedRun, w);
  w.schedule(task, delay, unit);
  return task;
}

在這里,createWorker()也是一個抽象方法,調(diào)用的是我們的調(diào)度類對應(yīng)的Schedulers類里面的方法,這里是IoScheduler類,

public final class IoScheduler extends Scheduler{

  final AtomicReference pool;

  //省略....

  public Worker createWorker() {
    return new EventLoopWorker(pool.get());
  }

  static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
      this.pool = pool;
      this.tasks = new CompositeDisposable();
      this.threadWorker = pool.get();
    }

    //省略....

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
      if (tasks.isDisposed()) {
        // don't schedule, we are unsubscribed
        return EmptyDisposable.INSTANCE;
      }
      return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
  }

}

 static final class CachedWorkerPool implements Runnable {

  //省略....

  ThreadWorker get() {
    if (allWorkers.isDisposed()) {
      return SHUTDOWN_THREAD_WORKER;
    }
    while (!expiringWorkerQueue.isEmpty()) {
      ThreadWorker threadWorker = expiringWorkerQueue.poll();
      if (threadWorker != null) {
        return threadWorker;
      }
    }

    ThreadWorker w = new ThreadWorker(threadFactory);
    allWorkers.add(w);
    return w;
   }
   //省略....
}

這就是IoScheduler的createWorker()的方法,其實最主要的意思就是獲取線程池,以便于生成子線程,讓SubscribeTask()可以運行。然后直接調(diào)用 w.schedule(task, delay, unit)方法讓它在線程池里執(zhí)行。上面中那ThreadWorker的源碼如下:

static final class ThreadWorker extends NewThreadWorker {
  private long expirationTime;
  ThreadWorker(ThreadFactory threadFactory) {
    super(threadFactory);
    this.expirationTime = 0L;
  }

  //省略代碼....
 }

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
  private final ScheduledExecutorService executor;

  public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
  }

  public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
      if (!parent.add(sr)) {
        return sr;
      }
    }

    Future f;
    try {
      if (delayTime <= 0) {
        f = executor.submit((Callable)sr);
      } else {
        f = executor.schedule((Callable)sr, delayTime, unit);
      }
      sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
      if (parent != null) {
        parent.remove(sr);
      }
      RxJavaPlugins.onError(ex);
    }

    return sr;
  }
}

可以看到,這就調(diào)了原始的javaAPI來進行線程池操作。

然后最后一環(huán)在子線程調(diào)用source.subscribe(parent)方法,然后回調(diào)剛開始創(chuàng)建的ObservableCreate的subscribeActual(),既:

protected void subscribeActual(Observer observer) {
    CreateEmitter parent = new CreateEmitter(observer);
    observer.onSubscribe(parent);
    try {
      source.subscribe(parent);
    } catch (Throwable ex) {
      Exceptions.throwIfFatal(ex);
      parent.onError(ex);
    }
}

進行消息的訂閱綁定。

當(dāng)我們在調(diào)用 emitter.onNext(內(nèi)容)時,是在io線程里的,那回調(diào)的onNext()又是什么時候切換的?那就是前面為了整個流程流暢性沒講的在observeOn()里的ObserveOnObserver是執(zhí)行觀察者的線程的過程。

class ObserveOnObserver extends BasicIntQueueDisposable
  implements Observer, Runnable {

    //省略代碼....

    ObserveOnObserver(Observer actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
      this.actual = actual;
      this.worker = worker;
      this.delayError = delayError;
      this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
      if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        if (s instanceof QueueDisposable) {
          @SuppressWarnings("unchecked")
          QueueDisposable qd = (QueueDisposable) s;
          int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
          if (m == QueueDisposable.SYNC) {
            sourceMode = m;
            queue = qd;
            done = true;
            actual.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
            sourceMode = m;
            queue = qd;
            actual.onSubscribe(this);
            return;
          }
        }
        queue = new SpscLinkedArrayQueue(bufferSize);
        actual.onSubscribe(this);
      }
    }

    @Override
    public void onNext(T t) {
      if (done) {
        return;
      }
      if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
      }
      schedule();
    }  

    void schedule() {
      if (getAndIncrement() == 0) {
        worker.schedule(this);
      }
    }
    //省略代碼....
  }

當(dāng)調(diào)用emitter.onNext(內(nèi)容)方法,會調(diào)用上面的onNext()方法,然后在這個方法里會把數(shù)據(jù)壓入一個隊列,然后執(zhí)行worker.schedule(this)方法,work是什么呢,還記得AndroidSchedulers.mainThread()嗎,這個對應(yīng)這個HandlerScheduler這個類,所以createWorker()對應(yīng)著:

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}


public Worker createWorker() {
  return new HandlerWorker(handler);
}

private static final class HandlerWorker extends Worker {
    private final Handler handler;
    private volatile boolean disposed;

    HandlerWorker(Handler handler) {
      this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
      if (run == null) throw new NullPointerException("run == null");
      if (unit == null) throw new NullPointerException("unit == null");
      if (disposed) {
        return Disposables.disposed();
      }
      run = RxJavaPlugins.onSchedule(run);
      ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
      Message message = Message.obtain(handler, scheduled);
      message.obj = this; // Used as token for batch disposal of this worker's runnables.
      handler.sendMessageDelayed(message, unit.toMillis(delay));
      if (disposed) {
        handler.removeCallbacks(scheduled);
        return Disposables.disposed();
      }
      return scheduled;
    }
}

在next()方法里,運用android自帶的Handler消息機制,通過把方法包裹在Message里,同通過handler.sendMessageDelayed()發(fā)送消息,就會在ui線程里回調(diào)Next()方法,從而實現(xiàn)從子線程切換到android主線程的操作。我們在主線程拿到數(shù)據(jù)就可以進行各種在主線程的操作了。

總結(jié)一下:

使用RxJava怎么實現(xiàn)消息發(fā)送和線程切換

ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn為初始化順序

當(dāng)調(diào)用observable.subscribe(observer)時的執(zhí)行順序
ObservableObserveOn 一> ObservableSubscribeOn 一> ObservableCreate

當(dāng)發(fā)送消息的執(zhí)行順序
ObservableCreate 一> ObservableSubscribeOn 一> ObservableObserveOn

以上就是使用RxJava怎么實現(xiàn)消息發(fā)送和線程切換,小編相信有部分知識點可能是我們?nèi)粘9ぷ鲿姷交蛴玫降?。希望你能通過這篇文章學(xué)到更多知識。更多詳情敬請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


分享文章:使用RxJava怎么實現(xiàn)消息發(fā)送和線程切換-創(chuàng)新互聯(lián)
文章源于:http://weahome.cn/article/cdsjgo.html

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部