建议大家先看看扔物线大神写的《给 Android 开发者的 RxJava 详解》,虽然是1.0版本的,但是RxJava发展这么多年,基本的核心思想并没有变。有了这篇文章作为基础再去理解RxJava3会容易很多。不能完全理解也没关系,本篇将由浅入深,带着大家一起来剖析RxJava3的原理。
implementation "io.reactivex.rxjava3:rxjava:3.1.6"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
gradle中按如上配置就可以将RxJava3引入到项目中,其中rxandroid基于RxJava3新增了一些在Android上特有的功能,比如常用的切换到主线程AndroidSchedulers.mainThread()
。
通过本章的学习,我们能够了解:
- subscribe到底干了啥?
- 操作符的原理
- Disposable的原理
- subscribeOn原理
- observeOn原理
subscribe到底干了啥?
先看一段RxJava3简单的使用:
Single.just("123") .subscribe(object : SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
public final class SingleJust<T> extends Single<T> { final T value; public SingleJust(T value) { this.value = value; } @Override protected void subscribeActual(SingleObserver<? super T> observer) { observer.onSubscribe(Disposable.disposed()); observer.onSuccess(value); } }
Single.just("123")
创建了一个SingleJust
对象,然后调用SingleJust
父类Single
的subscribe()
方法,接着调用SingleJust
类重写的subscribeActual()
方法,在方法内部,调用传入的Observer
对象的onSubscribe()
方法和onSuccess()
方法,将消息传给观察者。
这个简单的例子,展示了RxJava的核心思想——观察者模式。SingleJust
是被观察者,subscribe()
方法传入的SingleObserver
是观察者。当subscribe()
方法调用的时候,也就是消息真正开始流转的时候。这也就回答了标题的问题,subscribe()
方法实际上就是让消息开始流转起来。
操作符的原理
以map作为例子来讲解,先看看下面这个例子:
val single = Single.just(1) val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> { override fun apply(t: Int): String { return t.toString() } }) singleStr1.subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { println(t) } })
上面例子经过了一次map转换,将Int类型转换成String,然后输出。用法其实很简单,接下来结合源码看下实现的原理。
Single.just()
在前面有介绍,返回的是SingleJust
,因为SingleJust
继承自Single
,然后调用了Single
的map()
返回SingleMap
,然后调用subscribe()
,实际上是调用的SingleMap
的subscribe()
。
//注意:这不是Single的源码,只是提取出的我们要关注的部分 public class Single { public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) { Objects.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper)); } }
public final class SingleMap<T, R> extends Single<R> { final SingleSource<? extends T> source; final Function<? super T, ? extends R> mapper; public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(final SingleObserver<? super R> t) { source.subscribe(new MapSingleObserver<T, R>(t, mapper)); } static final class MapSingleObserver<T, R> implements SingleObserver<T> { final SingleObserver<? super R> t; final Function<? super T, ? extends R> mapper; MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this.t = t; this.mapper = mapper; } @Override public void onSubscribe(Disposable d) { t.onSubscribe(d); } @Override public void onSuccess(T value) { R v; try { v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value."); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } t.onSuccess(v); } @Override public void onError(Throwable e) { t.onError(e); } } }
SingleMap
类比较关键,也是我们后面理解Disposable
和多线程切换的关键,所以一定要理解。其实也不难,只需要记住几个关键变量——构造方法中的source
变量即SingleJust
,mapper
变量即Function
,MapSingleObserver
构造方法中的t
变量即SingleObserver
,当调用subscribe()
方法时,消息流就开始运转起来。简单理解就是,source
是上游,消息经过mapper
处理后,丢给下游t
。
下面的图更直观些。

看到这里大家可以停下来思考两分钟再继续往下看,脑海中想象下整个链路的调用过程。首先顺着思考,SingleJust
先创建,接着调用SingleJust
的map()
方法创建了SingleMap1
,此时source
指向了上游,也就是SingleJust
,当调用subscribe()
的时候实际上最终会调用SingleMap1
的subscribeActual()
方法,这时候才会在内部创建MapSingleObserver
,然后出发source.subscribe()
调用上游,最终触发上游的消息发送,上游SingleJust
将消息发送给传入的内部观察者MapSingleObserver
,而内部观察者持有下游SingleObserver
,所以接着将消息转发给下游。
理解了上面的单次map
,接下来再看多次map
的情况就容易些了。
val single = Single.just(1) val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> { override fun apply(t: Int): String { return t.toString() } }) val singleStr2 = singleStr1.map(object: io.reactivex.rxjava3.functions.Function<String, String> { override fun apply(t: String): String { return t + "123" } }) singleStr2.subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
上面例子是在单次map
的基础上又增加了一次map
,那这个又是怎样的调用过程呢?同样,停下来思考两分钟再往下看。
我看单个map时脑子还能转过来,但是两个map串联起来时,脑子就卡壳了。其实就是source变量和t变量没有拎清导致。记住了上面说的——source
是上游,消息经过mapper
处理后,丢给下游t
,再结合下面的流程图就很好理解了。

图中黑色和红色的线可以理解为指针,比如从上往下第一个source
变量指向SingleJust
,第二个source
变量指向SingleMap1
,当调用singleMap2.subscribe()
时,整个链路就开始运行起来了。
- 首先触发的是
SingleMap2
的subscribeActual()
方法,此时创建了SingleMap2
内部的MapSingleObserver
(图中绿色部分)对象,该对象内部有个t
变量指向最终的观察者SingleObserver
。 SingleMap2
的subscribeActual()
方法内部实际触发的是SingleMap1
的subscribeActual()
方法,此时创建了SingleMap1
内部的MapSingleObserver
对象,该对象内部有个t
变量指向的是上面步骤创建的MapSingleObserver
对象。SingleMap1
的subscribeActual()
方法内部实际触发的是SingleJust
的subscribeActual()
方法,然后消息由该方法内部发出,先触发SingleMap1
的MapSingleObserver
对象对应的方法,接着出发SingleMap2
的MapSingleObserver
对象,最后触发观察者对象SingleObserver
。
结合Single、SingleMap的源码以及上面的流程图,整个链的各个对象之间的调用关系就很清晰了。
Disposable的原理
Disposable的dispose()方法可以取消上游未发送的消息,以及让下游也不要处理接收到的消息了。
(1)先来看看既没有后续消息,也没有延迟的例子,以最简单的SingleJust为例。
Single.just("123") .subscribe(object : SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
onSubscribe()
方法传入了Disposable
对象,结合上面介绍的SingleJust
的源码可知,onSubscribe()
方法是由SingleJust
触发,通过如下方法传入Disposable
对象,observer.onSubscribe(Disposable.disposed())
。传入的是Disposable.disposed()
,由此可知,SingleJust
消息实际上是不能取消的,因为当订阅subscribe()
发生时,消息就被立即发出了,这也符合我们的认知。
(2)接下来看看有后续消息,但无延迟的例子。
以Observable.just("1", "2", "3")
为例,该方法最终创建了一个ObservableFromArray
,我们来看看源码。
//注意:这不是完整的源码,我删减了一些,只保留本篇博客需要的关键代码 public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array); observer.onSubscribe(d); d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } }
通过前面的分析我们已经知道,Disposable
最终是通过调用观察者Observer
的onSubscribe()
方法传入的,所以我们看observer.onSubscribe(d)
,传入的是一个FromArrayDisposable
对象,重写了dispose()
方法,也就是当我们想取消消息发送的时候,调用该方法将disposed
置为true
,再看run()
方法,for
循环发送消息的时候有个isDisposed()
判断,当发现disposed=true
了,退出循环,这样上游消息也就停止向下游发送了。
(3)再来看看无后续消息,但是有延迟的例子。
Single.just("1") .delay(1, TimeUnit.SECONDS) .subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
Single.just()
就不多说了,delay()
方法调用过程如下:
//只节选核心代码 public abstract class Single<@NonNull T> implements SingleSource<T> { public final Single<T> delay(long time, @NonNull TimeUnit unit) { return delay(time, unit, Schedulers.computation(), false); } public final Single<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) { return RxJavaPlugins.onAssembly(new SingleDelay<>(this, time, unit, scheduler, delayError)); } }
主要关注SingleDelay。
public final class SingleDelay<T> extends Single<T> { final SingleSource<? extends T> source; final long time; final TimeUnit unit; final Scheduler scheduler; final boolean delayError; public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) { this.source = source; this.time = time; this.unit = unit; this.scheduler = scheduler; this.delayError = delayError; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { final SequentialDisposable sd = new SequentialDisposable(); observer.onSubscribe(sd); source.subscribe(new Delay(sd, observer)); } final class Delay implements SingleObserver<T> { private final SequentialDisposable sd; final SingleObserver<? super T> downstream; Delay(SequentialDisposable sd, SingleObserver<? super T> observer) { this.sd = sd; this.downstream = observer; } @Override public void onSubscribe(Disposable d) { sd.replace(d); } @Override public void onSuccess(final T value) { sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); } @Override public void onError(final Throwable e) { sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit)); } final class OnSuccess implements Runnable { private final T value; OnSuccess(T value) { this.value = value; } @Override public void run() { downstream.onSuccess(value); } } final class OnError implements Runnable { private final Throwable e; OnError(Throwable e) { this.e = e; } @Override public void run() { downstream.onError(e); } } } }
同前面的分析一样的套路,通过observer.onSubscribe(sd)
将SequentialDisposable
传给观察者,消息开始发送的时候,首先会调用SequentialDisposable
的onSubscribe()
,其中sd.replace(d)
有点不好理解,它的作用其实是在不改变观察者持有的Disposable
对象的前提下,偷偷的置换掉真正disposed()
的对象。这句话不是很好理解,还是结合源码来看。
我们一起看下SequentialDisposable
和DisposableHelper
。
//只提取核心代码 public final class SequentialDisposable extends AtomicReference<Disposable> implements Disposable { public boolean replace(Disposable next) { return DisposableHelper.replace(this, next); } @Override public void dispose() { DisposableHelper.dispose(this); } }
//只提取核心代码 public enum DisposableHelper implements Disposable { public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) { Objects.requireNonNull(d, "d is null"); if (!field.compareAndSet(null, d)) { d.dispose(); if (field.get() != DISPOSED) { reportDisposableSet(); } return false; } return true; } public static boolean replace(AtomicReference<Disposable> field, Disposable d) { for (;;) { Disposable current = field.get(); if (current == DISPOSED) { if (d != null) { d.dispose(); } return false; } if (field.compareAndSet(current, d)) { return true; } } } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } //Verifies that current is null, next is not null, otherwise signals errors to the RxJavaPlugins and returns false. public static boolean validate(Disposable current, Disposable next) { if (next == null) { RxJavaPlugins.onError(new NullPointerException("next is null")); return false; } if (current != null) { next.dispose(); reportDisposableSet(); return false; } return true; } }
sd.replace(d)
最终调用DisposableHelper.replace(this, next)
,将传入的Disposable
对象保存起来。仔细观察下,会发现onSuccess()
和onError()
都有调用replace()
方法。当调用disposed()
的时候,如果上游消息还未发送,此时取消上游消息的发送,因为是Single.just()
,在前面的无后续消息中讲过,当subscribe()
调用的时候消息就开始立即发送了,所以没有机会取消。当消息发出来后,进入onSuccess()
,启动一个定时器,此时如果调用disposed()
取消,取消的是定时器发送。
(4)最后再看看有后续消息,并且有延迟的例子。
Observable.just(1, 2, 3) .delay(1, TimeUnit.SECONDS) .subscribe(object: Observer<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onComplete() { } override fun onNext(t: Int) { } })
Observable.just()
返回ObservableFromArray
对象,该类的核心代码上面有贴出来过,delay()
返回ObservableDelay
对象,上面也介绍过。其实取消上游的消息发送,以及取消下游的定时器。
接下来开始讲解线程是如何切换的。
subscribeOn原理
先看下面简单的例子:
Single.just(1) .subscribeOn(Schedulers.io()) .subscribe(object: SingleObserver<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: Int) { } })
上面在subscribe()
之前切了一次线程,这时候观察者的几个回调方法实际上都是在子线程执行的,那它是怎么做到的呢?
subscribeOn()
返回了一个SingleSubscribeOn
对象,该类的完整源码如下:
public final class SingleSubscribeOn<T> extends Single<T> { final SingleSource<? extends T> source; final Scheduler scheduler; public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source); observer.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 7000911171163930287L; final SingleObserver<? super T> downstream; final SequentialDisposable task; final SingleSource<? extends T> source; SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) { this.downstream = actual; this.source = source; this.task = new SequentialDisposable(); } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } @Override public void onSuccess(T value) { downstream.onSuccess(value); } @Override public void onError(Throwable e) { downstream.onError(e); } @Override public void dispose() { DisposableHelper.dispose(this); task.dispose(); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } @Override public void run() { source.subscribe(this); } } }
scheduler.scheduleDirect()
开启线程,会触发调用SubscribeOnObserver
的run()
方法,然后在子线程中调用source.subscribe(this)
,后续的消息发送都是在这个子线程中完成的,比如onSubscribe()
,onSuccess()
。看图可能更直观点。

当我们调用subscribe()
的时候,调用链开始运转起来,首先调用SingleSubscribeOn
的subscribeActual()
方法,前面我们说过的案例中,subscribeActual()
方法中就直接触发了上游的subscribe()
方法的调用,而这里比较特殊,是起了一个线程,对应的是run()
方法中的source.subscribe(this)
,这是在线程中调用的。图中SingleSubscribeOn
中的source
指的就是上游SingleJust
,所以后续的消息发送都是在子线程中完成,对应图中红色箭头。
那disposed()
取消是怎么做到的呢?
上游SingleJust
触发调用了SubscribeOnObserver
中的onSubscribe(d)
方法,d
变量就是上游传入的,DisposableHelper.setOnce(this, d)
就是把上游的Disposable
保存起来,当下游想取消的时候,首先调用dispose()
方法,接着调用DisposableHelper.dispose(this)
,也就取消了上游的消息发送,然后调用task.dispose()
,取消scheduler.scheduleDirect(parent)
任务。
一次subscribeOn
我们已经理解了,那多次subscribeOn
会如何切换线程呢?

可以对照着图,脑海中想象着走一遍流程。首先SingleObserver
看到的是蓝色的Observable
(被观察对象,SingleSubscribeOn
实际上就是一个被观察的对象),然后创建一个线程,并在线程里面订阅上游红色的Observable
,所以这是第一次切线程。在红色的Observable
中同样创建子线程,并在子线程中订阅上游的SingleJust
,所以这是第二次切线程。后续所有链路的调用,都是在红色线程里面完成的。
所以总结一句就是:无论多少次subscribeOn,只有第一个生效。
observeOn原理
先看例子
Single.just(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object: SingleObserver<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: Int) { } })
observerOn
创建了SingleObserveOn
public final class SingleObserveOn<T> extends Single<T> { final SingleSource<T> source; final Scheduler scheduler; public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler)); } static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 3528003840217436037L; final SingleObserver<? super T> downstream; final Scheduler scheduler; T value; Throwable error; ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { downstream.onSubscribe(this); } } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { this.error = e; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { Throwable ex = error; if (ex != null) { downstream.onError(ex); } else { downstream.onSuccess(value); } } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } }
subscribeActual()
方法中,直接让上游订阅ObserveOnSingleObserver
,这里不需要在订阅过程中切线程,因为observeOn的作用是在收到消息后去切线程。所以看到onSuccess()
和onError()
里面都做了线程切换。

所以我们看图,在订阅的时候也就是黑色箭头部分没有切线程,而是在ObserveOnSingleObserver
收到上游发送的消息后才开始切线程。
同样我们看看dispose()
取消是如何实现的。首先我们看ObserveOnSingleObserver
中的onSubscribe()
,这里接收上游传入的Disposable
,并将其保存在AtomicReference
中,然后让下游订阅当前类,这样当下游想取消的时候调用dispose()
方法,这样就直接取消了AtomicReference
中保存的上游传入的Disposable
。仔细观察onSuccess()和onError()中,都使用了DisposableHelper.replace
来替换AtomicReference
中保存的Disposable
,这样就可以在消息发出来后不要再将消息往下游发送了。
多个observeOn又是怎么切换线程的呢?

订阅的时候不会发生线程切换,每次在SingleObserveOn收到消息后再切换线程。
所以总结一句就是:每次observeOn都会切线程。
到这里,整个RxJava的核心原理解析都讲完了。
转载请注明出处。