两张图彻底理解 RxJava2 的核心原理
作者: solart
版权声明:本文图文为博主原创,转载请注明出处。
文章似乎有些标题党的嫌疑,根据我的理解画出两幅图相信可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手:smile:!相信 RxJava
是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个,但是也是理解后最清晰的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。
操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。
这篇文章我会讲些什么
- RxJava2 基本的运行流程
- RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
- 为什么一订阅就回调了 onSubscribe
- 为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效
以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。
1、简单的链式调用(无线程切换)
先来看一段示例代码:
Observable.create(object : ObservableOnSubscribe{ override fun subscribe(emitter: ObservableEmitter ) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() } }).flatMap(object : Function > { override fun apply(t: String): Observable { return Observable.just(t) } }).map(object : Function { override fun apply(t: String): Int { return 0 } }).subscribe(object : Observer { override fun onSubscribe(d: Disposable) { Log.d("solart", "onSubscribe > ${Thread.currentThread().name}") } override fun onNext(t: Int) { Log.d("solart", "onNext > ${Thread.currentThread().name}") } override fun onComplete() { Log.d("solart", "onComplete > ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.d("solart", "onError > ${Thread.currentThread().name}") } })
这段代码中我们简单用了 create
、 flatMap
、 map
等操作符,进行了流式的数据转换,最后我们通过 subscribe
订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图
1.1 数据源的包裹
比照着这张图,我们来看一下,首先 蓝色虚线
部分是我们代码中实际调用的顺序,查看 Observable.create
我们不难发现,此处就是产生了一个 ObservableCreate
实例,
public staticObservable create(ObservableOnSubscribe source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate (source)); }
如我们图中所示, ObservableCreate
内部包含一个类型为 ObservableOnSubscribe
的 source
变量,根据我们代码中的调用,这个 source
就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe
。
public final class ObservableCreateextends Observable { final ObservableOnSubscribe source; public ObservableCreate(ObservableOnSubscribe source) { this.source = source; } @Override protected void subscribeActual(Observer super T> observer) { ... } ... }
我们顺着代码的调用顺序,继续看一下 flatMap
的方法中又做了什么:
public finalObservable flatMap(Function super T, ? extends ObservableSource extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... return RxJavaPlugins.onAssembly(new ObservableFlatMap (this, mapper, delayErrors, maxConcurrency, bufferSize)); }
类似的产生了一个 ObservableFlatMap
实例,而其内部持有一个类型为 ObservableSource
的 source
变量,而该 source 则是上一步中的 ObservableCreate
实例,依次我们看 map
依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容。
1.2 逆向订阅数据源
我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe
时(图中红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:
public final void subscribe(Observer super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 重点!!! 发生订阅的核心方法 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { ... throw npe; } }
根据我们上面的分析,执行 subscribeActual
的对象其实是 ObservableMap
,我们来看它的 subscribeActual
的实现
public final class ObservableMapextends AbstractObservableWithUpstream { ... @Override public void subscribeActual(Observer super U> t) { source.subscribe(new MapObserver (t, function)); } ... }
注意,此时产生了一个 MapObserver
对象, MapObserver
中通过 actual
持有了我们自己的匿名对象 object : Observer
,同样的, ObservableMap
执行 subscribeActual 又调用了上层的 source.subscribe
,依次逆向调用,就得到了我们图中上半部分的红线内容。
1.3 触发数据源产生原始数据,数据流转
当订阅发生在最顶层时,也就是 ObservableCreate
中的 subscribeActual
,此时触发了数据源的产生,通过 emitter
发射数据
public final class ObservableCreateextends Observable { ... @Override protected void subscribeActual(Observer super T> observer) { CreateEmitter parent = new CreateEmitter (observer); observer.onSubscribe(parent); //此时触发了 onSubscribe 回调,这里先提一下 try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } ... }
而我们代码中此时产生了真正的数据
override fun subscribe(emitter: ObservableEmitter) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() }
此时我们再来看 CreateEmitter
的实现:
static final class CreateEmitterextends AtomicReference implements ObservableEmitter , Disposable { final Observer super T> observer; CreateEmitter(Observer super T> observer) { this.observer = observer; } @Override public void onNext(T t) { ... if (!isDisposed()) { observer.onNext(t); //向下层分发数据 } } ... }
根据我们上面的分析 CreateEmitter
中持有的 observer
即是 FlatMapObserver
的实例,而 FlatMapObserver
调用 onNext 时,又会调用 MapObserver
的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。
观察我们这个图,你会发现, 操作符
对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符
,例如 ObservableMap
= Observable + map,观察者命名大多遵循 操作符 + Observer
,例如 FlatMapObserver
= flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。
2、异步事件流编程(线程切换)
相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。
同样的我们还是先给出一个简单的示例:
Observable.create(object : ObservableOnSubscribe{ override fun subscribe(emitter: ObservableEmitter ) { Log.d("solart", "subscribe > ${Thread.currentThread().name}") emitter.onNext("test") emitter.onComplete() } }).subscribeOn(Schedulers.io()) .map(object : Function { override fun apply(t: String): Int { return 0 } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(object : Observer { override fun onSubscribe(d: Disposable) { Log.d("solart", "onSubscribe > ${Thread.currentThread().name}") } override fun onNext(t: Int) { Log.d("solart", "onNext > ${Thread.currentThread().name}") } override fun onComplete() { Log.d("solart", "onComplete > ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.d("solart", "onError > ${Thread.currentThread().name}") } })
这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:
图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。
2.1 逆向订阅时触发 subscribeOn 的线程切换
根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。
我们来看订阅时(图中⑦的流程)切换线程的 ObservableSubscribeOn
的代码:
public final class ObservableSubscribeOnextends AbstractObservableWithUpstream { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer super T> s) { final SubscribeOnObserver parent = new SubscribeOnObserver (s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } ... }
在逆向订阅的流程中,通过指定 Scheduler
将 SubscribeTask
任务交给线程池处理,我们先来看一下 SubscribeTask
的代码,就是执行了订阅:
final class SubscribeTask implements Runnable { private final SubscribeOnObserverparent; SubscribeTask(SubscribeOnObserver parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); // 仅仅订阅了一下 } }
我们再来看 scheduler.scheduleDirect()
中是如何做到线程切换的:
public abstract class Scheduler { ... @NonNull public abstract Worker createWorker(); // 实现类中实现 ... @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); // 创建一个 worker final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); //执行任务 return task; } ... }
我们示例中是切换到了 io
线程,所以我们对应的看一下 IoScheduler
的部分代码:
public final class IoScheduler extends Scheduler { ... @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } ... static final class EventLoopWorker extends Scheduler.Worker { private final CompositeDisposable tasks; private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once = new AtomicBoolean(); EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } ... @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } static final class ThreadWorker extends NewThreadWorker { ... // 此处粘贴了了父类中的实现 @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); ... Future> f; try { // 线程池执行任务 if (delayTime <= 0) { f = executor.submit((Callable
综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn
本身就是在 subscribeActual
中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生。这里还有一点要提一下, ObservableSubscribeOn
在执行 subscribeActual
时,回调了下层产生的 Observer
的 onSubscribe
,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会收到 onSubscribe
的回调,且在当前订阅时的线程中。
2.2 正向数据流触发 observerOn 的线程切换
同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver
的 onNext 处理的逻辑,也就是图中步骤⑬⑭:
public final class ObservableObserveOnextends AbstractObservableWithUpstream { final Scheduler scheduler; ... @Override protected void subscribeActual(Observer super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver (observer, w, delayError, bufferSize)); } } ... static final class ObserveOnObserver extends BasicIntQueueDisposable implements Observer , Runnable { ... final Observer super T> actual; final Scheduler.Worker worker; ... @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); //类似 ObservableSubscribeOn.subscribeActual() 异步线程执行 } ... void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } ... @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } ... } }
示例中我们此时切换到了 Main
线程中执行,我们来看对应的 HandlerScheduler
实现:
final class HandlerScheduler extends Scheduler { private final Handler handler; private final boolean async; ... @Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; } @Override public Worker createWorker() { return new HandlerWorker(handler, async); } private static final class HandlerWorker extends Worker { ... @Override @SuppressLint("NewApi") // Async will only be true when the API is available to call. public Disposable schedule(Runnable run, long delay, TimeUnit unit) { ... run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } handler.sendMessageDelayed(message, unit.toMillis(delay)); ... return scheduled; } }
从代码中我们可以看到,此时将 Runnable
通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了上面我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。
总结
至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码流转,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现 异步事件流编程
,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。
另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。