两张图彻底理解 RxJava2 的核心原理

作者: solart

版权声明:本文图文为博主原创,转载请注明出处。

文章似乎有些标题党的嫌疑,根据我的理解画出两幅图相信可以让大家理解 RxJava2 的核心原理,稍后不要吝啬,请叫我灵魂画手:smile:!相信 RxJava 是大家业务中用到比较多的一个依赖库,RxJava 的强大之处在于它改变了程序员的编程习惯,相比较其他的开源项目,Rxjava 是最弯弯绕的一个,但是也是理解后最清晰的一个。对于 RxJava 种类繁多的操作符,大多数同学都表示很是头疼,也有不少同学陷入了学习操作符不能停的怪圈。

操作符要不要学,当然要,但是如果能理解 RxJava 的核心,操作符的使用就像是学会九阳神功的张无忌学招数,必定是手到擒来。

这篇文章我会讲些什么

  • RxJava2 基本的运行流程
  • RxJava2 线程切换的原理(涉及到为什么 subscribeOn() 只有第一次调用时有效)
  • 为什么一订阅就回调了 onSubscribe
  • 为什么 subscribeOn() 对上面的代码生效,observerOn() 对下面代码生效

以下内容如果涉及到自己写的代码我会采用 Kotlin 进行示例展示,涉及到 RxJava2 会展示部分源码。

1、简单的链式调用(无线程切换)

先来看一段示例代码:

Observable.create(object : ObservableOnSubscribe {
            override fun subscribe(emitter: ObservableEmitter) {
                Log.d("solart", "subscribe > ${Thread.currentThread().name}")
                emitter.onNext("test")
                emitter.onComplete()
            }
        }).flatMap(object : Function> {
            override fun apply(t: String): Observable {
                return Observable.just(t)
            }
        }).map(object : Function {
            override fun apply(t: String): Int {
                return 0
            }
        }).subscribe(object : Observer {
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }
        })

这段代码中我们简单用了 createflatMapmap 等操作符,进行了流式的数据转换,最后我们通过 subscribe 订阅了数据流,其实通过查看源码我们不难发现, RxJava 本身是个逆向订阅的过程,话不多说先看图

1.1 数据源的包裹

比照着这张图,我们来看一下,首先 蓝色虚线 部分是我们代码中实际调用的顺序,查看 Observable.create 我们不难发现,此处就是产生了一个 ObservableCreate 实例,

public static  Observable create(ObservableOnSubscribe source) {
       ObjectHelper.requireNonNull(source, "source is null");
       return RxJavaPlugins.onAssembly(new ObservableCreate(source));
}

如我们图中所示, ObservableCreate 内部包含一个类型为 ObservableOnSubscribesource 变量,根据我们代码中的调用,这个 source 就是我们 Kotlin 代码中的匿名对象 object : ObservableOnSubscribe

public final class ObservableCreate extends Observable {
    final ObservableOnSubscribe source;

    public ObservableCreate(ObservableOnSubscribe source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer observer) {
        ...
    }
    ...
}

我们顺着代码的调用顺序,继续看一下 flatMap 的方法中又做了什么:

public final  Observable flatMap(Function> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

类似的产生了一个 ObservableFlatMap 实例,而其内部持有一个类型为 ObservableSourcesource 变量,而该 source 则是上一步中的 ObservableCreate 实例,依次我们看 map 依然是类似的代码,这里不在赘述,所以到此我们得到了图中蓝色虚线部分的内容。

1.2 逆向订阅数据源

我们知道以上的代码调用并没有出发数据的流转,只有当我们调用 subscribe 时(图中红色实线部分)才真正触发了 RxJava 的数据流,我们来看代码:

public final void subscribe(Observer observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            // 重点!!! 发生订阅的核心方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
            throw npe;
        }
    }

根据我们上面的分析,执行 subscribeActual 的对象其实是 ObservableMap ,我们来看它的 subscribeActual 的实现

public final class ObservableMap extends AbstractObservableWithUpstream {
    ...

    @Override
    public void subscribeActual(Observer t) {
        source.subscribe(new MapObserver(t, function));
    }
    ...
}

注意,此时产生了一个 MapObserver 对象, MapObserver 中通过 actual 持有了我们自己的匿名对象 object : Observer ,同样的, ObservableMap 执行 subscribeActual 又调用了上层的 source.subscribe ,依次逆向调用,就得到了我们图中上半部分的红线内容。

1.3 触发数据源产生原始数据,数据流转

当订阅发生在最顶层时,也就是 ObservableCreate 中的 subscribeActual ,此时触发了数据源的产生,通过 emitter 发射数据

public final class ObservableCreate extends Observable {
    ...
    @Override
    protected void subscribeActual(Observer observer) {
        CreateEmitter parent = new CreateEmitter(observer);
        observer.onSubscribe(parent); //此时触发了 onSubscribe 回调,这里先提一下

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ...
}

而我们代码中此时产生了真正的数据

override fun subscribe(emitter: ObservableEmitter) {
                Log.d("solart", "subscribe > ${Thread.currentThread().name}")
                emitter.onNext("test")
                emitter.onComplete()
}

此时我们再来看 CreateEmitter 的实现:

static final class CreateEmitter
    extends AtomicReference
    implements ObservableEmitter, Disposable {

        final Observer observer;

        CreateEmitter(Observer observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            ...
            if (!isDisposed()) {
                observer.onNext(t);  //向下层分发数据
            }
        }
        ...
  }

根据我们上面的分析 CreateEmitter 中持有的 observer 即是 FlatMapObserver 的实例,而 FlatMapObserver 调用 onNext 时,又会调用 MapObserver 的 onNext ,依次调用至我们自己实现的观察者的 onNext 处理数据,此时数据流转完毕。

观察我们这个图,你会发现, 操作符 对应产生的被观察者和观察者命名规则很有规律,比如说被观察者的命名 Observable + 操作符 ,例如 ObservableMap = Observable + map,观察者命名大多遵循 操作符 + Observer ,例如 FlatMapObserver = flatMap + Observer。除了命名规则外,我们观察整个流程,你也会发现有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹(蓝色虚线的流程部分),另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹(上部分的红色流程部分)。

2、异步事件流编程(线程切换)

相信有了上面的分析,大家对 RxJava 的逆向订阅以及数据流转有了一定的认识,但是 RxJava 的强大之处在于它的异步事件流编程方式,随心所欲的切换工作线程,下面我们来分析它是如何做到的。

同样的我们还是先给出一个简单的示例:

Observable.create(object : ObservableOnSubscribe {
                override fun subscribe(emitter: ObservableEmitter) {
                    Log.d("solart", "subscribe >  ${Thread.currentThread().name}")
                    emitter.onNext("test")
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.io())
                .map(object : Function {
                override fun apply(t: String): Int {
                    return 0
                }
            }).observeOn(AndroidSchedulers.mainThread())
                .subscribe(object : Observer {
               
                override fun onSubscribe(d: Disposable) {
                    Log.d("solart", "onSubscribe >  ${Thread.currentThread().name}")
                }

                override fun onNext(t: Int) {
                    Log.d("solart", "onNext >  ${Thread.currentThread().name}")
                }
                
                override fun onComplete() {
                    Log.d("solart", "onComplete >  ${Thread.currentThread().name}")
                }

                override fun onError(e: Throwable) {
                    Log.d("solart", "onError >  ${Thread.currentThread().name}")
                }

            })

这里简化了操作符的调用,以切换线程为示例,根据这段代码,我画出了这个过程的流程图(灵魂画手有没有?)如下:

图中不同颜色(红、绿、紫)的实线表示流程所属不同线程,体现在不同线程中的过程,且标上了对应的序号,方便大家观看,这个图已经能够揭示 RxJava 运转的核心原理。

2.1 逆向订阅时触发 subscribeOn 的线程切换

根据我们第一部分的分析,我们知道 RxJava 有两个包裹封装的过程,一个是按照代码顺序的操作符产生了一个一层层的数据源包裹,另外一个是在逆向订阅时,将观察者按照订阅顺序打包成一个一层层的观察者包裹,虽然我们在代码调用过程中使用了线程切换(subscribeOn 和 observerOn)这两个特殊的操作符,在整个流程中依然遵循了这两个包裹封装的过程,只不过它的特殊之处在于处理时完成了流程上的线程切换。

我们来看订阅时(图中⑦的流程)切换线程的 ObservableSubscribeOn 的代码:

public final class ObservableSubscribeOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer s) {
        final SubscribeOnObserver parent = new SubscribeOnObserver(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    ...
}

在逆向订阅的流程中,通过指定 SchedulerSubscribeTask 任务交给线程池处理,我们先来看一下 SubscribeTask 的代码,就是执行了订阅:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver parent;

        SubscribeTask(SubscribeOnObserver parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent); // 仅仅订阅了一下
        }
    }

我们再来看 scheduler.scheduleDirect() 中是如何做到线程切换的:

public abstract class Scheduler {
    ...
    @NonNull
    public abstract Worker createWorker(); // 实现类中实现
    ...
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker(); // 创建一个 worker

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit); //执行任务

        return task;
    }
    ...
}

我们示例中是切换到了 io 线程,所以我们对应的看一下 IoScheduler 的部分代码:

public final class IoScheduler extends Scheduler {
    ...
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
   ...
   static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        ...

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    static final class ThreadWorker extends NewThreadWorker {
        ...
        // 此处粘贴了了父类中的实现
        @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

            ...

            Future f;
            try {
                // 线程池执行任务
                if (delayTime <= 0) {
                     f = executor.submit((Callable)sr);
                } else {
                     f = executor.schedule((Callable)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                ...
            }
            
            return sr;
        }
    }
}

综合上面的代码,我们来总结一下,其实 ObservableSubscribeOn 本身就是在 subscribeActual 中将上层数据源在异步线程中执行订阅,这样就完成了线程的切换,后续的流程都会在这个切换后的线程中执行,直到再次切换线程。因为 RxJava 本身是逆向订阅的流程,所以这里就解释了两个问题:1、为什么 subscribeOn() 对上面的代码生效?2、为什么 subscribeOn() 只有第一次调用时有效?归根结底都是因为逆向订阅的流程决定了 subscribeOn 是在订阅流程中起作用,此时数据还未产生。这里还有一点要提一下, ObservableSubscribeOn 在执行 subscribeActual 时,回调了下层产生的 ObserveronSubscribe ,如图中的④⑤⑥流程,所以这也是为什么,在观察者一订阅后就会收到 onSubscribe 的回调,且在当前订阅时的线程中。

2.2 正向数据流触发 observerOn 的线程切换

同第一部分一样的,订阅到最上层时,触发数据源产生原始数据,从而又正向的流转数据,此过程我们不在详细分析,参照1.3,我们着重看一下 ObserveOnObserver 的 onNext 处理的逻辑,也就是图中步骤⑬⑭:

public final class ObservableObserveOn extends AbstractObservableWithUpstream {
    final Scheduler scheduler;
    ...
    @Override
    protected void subscribeActual(Observer observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver(observer, w, delayError, bufferSize));
        }
    }
    ...
    static final class ObserveOnObserver extends BasicIntQueueDisposable
    implements Observer, Runnable {

        ...
        final Observer actual;
        final Scheduler.Worker worker;
        ...
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule(); //类似 ObservableSubscribeOn.subscribeActual() 异步线程执行
        }
        ...
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
        ...
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        ...
    }
}

示例中我们此时切换到了 Main 线程中执行,我们来看对应的 HandlerScheduler 实现:

final class HandlerScheduler extends Scheduler {
    private final Handler handler;
    private final boolean async;
    ...
    @Override
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }
    
    private static final class HandlerWorker extends Worker {
        ...
        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            if (async) {
                message.setAsynchronous(true);
            }

            handler.sendMessageDelayed(message, unit.toMillis(delay));

            ...

            return scheduled;
        }
    }

从代码中我们可以看到,此时将 Runnable 通过 Handler 发到了住线程去执行,所以经过此步骤后,后续的 onNext 的处理已经切换为主线程。同样的,这里也解释了上面我们提到的另一个问题:为什么 observerOn() 对下面代码生效?正是因为,数据的流向决定了 observerOn() 对后续的 onNext 产生影响。

总结

至此 RxJava 运转机制我们已经分析完毕,大家可以比照图中流程,跟踪代码流转,相信会有很大收获。 RxJava 本身是一个变种的观察者模式,正是因为框架本身要实现 异步事件流编程 ,所以产生了逆向订阅的过程,同时数据又是正向流转的,这个过程中大家还需要理解两个包裹封装(被观察者、观察者)的过程,不管操作符怎么变换,都不会脱离这样的运作核心。

另外根据不同的操作符的实现,我们依照同样的模式,可实现自己的自定义操作符,只要能在订阅时和数据回流时做好上下层的衔接就好,这个大家可以自己实践。