建议大家先看看扔物线大神写的《给 Android 开发者的 RxJava 详解》,虽然是1.0版本的,但是RxJava发展这么多年,基本的核心思想并没有变。有了这篇文章作为基础再去理解RxJava3会容易很多。不能完全理解也没关系,本篇将由浅入深,带着大家一起来剖析RxJava3的原理。
implementation "io.reactivex.rxjava3:rxjava:3.1.6"implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
gradle中按如上配置就可以将RxJava3引入到项目中,其中rxandroid基于RxJava3新增了一些在Android上特有的功能,比如常用的切换到主线程AndroidSchedulers.mainThread()。
通过本章的学习,我们能够了解:
- subscribe到底干了啥?
- 操作符的原理
- Disposable的原理
- subscribeOn原理
- observeOn原理
subscribe到底干了啥?
先看一段RxJava3简单的使用:
Single.just("123")
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: String) {
}
})
public final class SingleJust<T> extends Single<T> {
final T value;
public SingleJust(T value) {
this.value = value;
}
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
}
Single.just("123")创建了一个SingleJust对象,然后调用SingleJust父类Single的subscribe()方法,接着调用SingleJust类重写的subscribeActual()方法,在方法内部,调用传入的Observer对象的onSubscribe()方法和onSuccess()方法,将消息传给观察者。
这个简单的例子,展示了RxJava的核心思想——观察者模式。SingleJust是被观察者,subscribe()方法传入的SingleObserver是观察者。当subscribe()方法调用的时候,也就是消息真正开始流转的时候。这也就回答了标题的问题,subscribe()方法实际上就是让消息开始流转起来。
操作符的原理
以map作为例子来讲解,先看看下面这个例子:
val single = Single.just(1)
val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
singleStr1.subscribe(object: SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: String) {
println(t)
}
})
上面例子经过了一次map转换,将Int类型转换成String,然后输出。用法其实很简单,接下来结合源码看下实现的原理。
Single.just()在前面有介绍,返回的是SingleJust,因为SingleJust继承自Single,然后调用了Single的map()返回SingleMap,然后调用subscribe(),实际上是调用的SingleMap的subscribe()。
//注意:这不是Single的源码,只是提取出的我们要关注的部分
public class Single {
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
}
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e);
return;
}
t.onSuccess(v);
}
@Override
public void onError(Throwable e) {
t.onError(e);
}
}
}
SingleMap类比较关键,也是我们后面理解Disposable和多线程切换的关键,所以一定要理解。其实也不难,只需要记住几个关键变量——构造方法中的source变量即SingleJust,mapper变量即Function,MapSingleObserver构造方法中的t变量即SingleObserver,当调用subscribe()方法时,消息流就开始运转起来。简单理解就是,source是上游,消息经过mapper处理后,丢给下游t。
下面的图更直观些。

看到这里大家可以停下来思考两分钟再继续往下看,脑海中想象下整个链路的调用过程。首先顺着思考,SingleJust先创建,接着调用SingleJust的map()方法创建了SingleMap1,此时source指向了上游,也就是SingleJust,当调用subscribe()的时候实际上最终会调用SingleMap1的subscribeActual()方法,这时候才会在内部创建MapSingleObserver,然后出发source.subscribe()调用上游,最终触发上游的消息发送,上游SingleJust将消息发送给传入的内部观察者MapSingleObserver,而内部观察者持有下游SingleObserver,所以接着将消息转发给下游。
理解了上面的单次map,接下来再看多次map的情况就容易些了。
val single = Single.just(1)
val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> {
override fun apply(t: Int): String {
return t.toString()
}
})
val singleStr2 = singleStr1.map(object: io.reactivex.rxjava3.functions.Function<String, String> {
override fun apply(t: String): String {
return t + "123"
}
})
singleStr2.subscribe(object: SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: String) {
}
})
上面例子是在单次map的基础上又增加了一次map,那这个又是怎样的调用过程呢?同样,停下来思考两分钟再往下看。
我看单个map时脑子还能转过来,但是两个map串联起来时,脑子就卡壳了。其实就是source变量和t变量没有拎清导致。记住了上面说的——source是上游,消息经过mapper处理后,丢给下游t,再结合下面的流程图就很好理解了。

图中黑色和红色的线可以理解为指针,比如从上往下第一个source变量指向SingleJust,第二个source变量指向SingleMap1,当调用singleMap2.subscribe()时,整个链路就开始运行起来了。
- 首先触发的是
SingleMap2的subscribeActual()方法,此时创建了SingleMap2内部的MapSingleObserver(图中绿色部分)对象,该对象内部有个t变量指向最终的观察者SingleObserver。 SingleMap2的subscribeActual()方法内部实际触发的是SingleMap1的subscribeActual()方法,此时创建了SingleMap1内部的MapSingleObserver对象,该对象内部有个t变量指向的是上面步骤创建的MapSingleObserver对象。SingleMap1的subscribeActual()方法内部实际触发的是SingleJust的subscribeActual()方法,然后消息由该方法内部发出,先触发SingleMap1的MapSingleObserver对象对应的方法,接着出发SingleMap2的MapSingleObserver对象,最后触发观察者对象SingleObserver。
结合Single、SingleMap的源码以及上面的流程图,整个链的各个对象之间的调用关系就很清晰了。
Disposable的原理
Disposable的dispose()方法可以取消上游未发送的消息,以及让下游也不要处理接收到的消息了。
(1)先来看看既没有后续消息,也没有延迟的例子,以最简单的SingleJust为例。
Single.just("123")
.subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: String) {
}
})
onSubscribe()方法传入了Disposable对象,结合上面介绍的SingleJust的源码可知,onSubscribe()方法是由SingleJust触发,通过如下方法传入Disposable对象,observer.onSubscribe(Disposable.disposed())。传入的是Disposable.disposed(),由此可知,SingleJust消息实际上是不能取消的,因为当订阅subscribe()发生时,消息就被立即发出了,这也符合我们的认知。
(2)接下来看看有后续消息,但无延迟的例子。
以Observable.just("1", "2", "3")为例,该方法最终创建了一个ObservableFromArray,我们来看看源码。
//注意:这不是完整的源码,我删减了一些,只保留本篇博客需要的关键代码
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array);
observer.onSubscribe(d);
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> downstream;
final T[] array;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.downstream = actual;
this.array = array;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
downstream.onError(new NullPointerException("The element at index " + i + " is null"));
return;
}
downstream.onNext(value);
}
if (!isDisposed()) {
downstream.onComplete();
}
}
}
}
通过前面的分析我们已经知道,Disposable最终是通过调用观察者Observer的onSubscribe()方法传入的,所以我们看observer.onSubscribe(d),传入的是一个FromArrayDisposable对象,重写了dispose()方法,也就是当我们想取消消息发送的时候,调用该方法将disposed置为true,再看run()方法,for循环发送消息的时候有个isDisposed()判断,当发现disposed=true了,退出循环,这样上游消息也就停止向下游发送了。
(3)再来看看无后续消息,但是有延迟的例子。
Single.just("1")
.delay(1, TimeUnit.SECONDS)
.subscribe(object: SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: String) {
}
})
Single.just()就不多说了,delay()方法调用过程如下:
//只节选核心代码
public abstract class Single<@NonNull T> implements SingleSource<T> {
public final Single<T> delay(long time, @NonNull TimeUnit unit) {
return delay(time, unit, Schedulers.computation(), false);
}
public final Single<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) {
return RxJavaPlugins.onAssembly(new SingleDelay<>(this, time, unit, scheduler, delayError));
}
}
主要关注SingleDelay。
public final class SingleDelay<T> extends Single<T> {
final SingleSource<? extends T> source;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final boolean delayError;
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
this.source = source;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SequentialDisposable sd = new SequentialDisposable();
observer.onSubscribe(sd);
source.subscribe(new Delay(sd, observer));
}
final class Delay implements SingleObserver<T> {
private final SequentialDisposable sd;
final SingleObserver<? super T> downstream;
Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
this.sd = sd;
this.downstream = observer;
}
@Override
public void onSubscribe(Disposable d) {
sd.replace(d);
}
@Override
public void onSuccess(final T value) {
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
final class OnSuccess implements Runnable {
private final T value;
OnSuccess(T value) {
this.value = value;
}
@Override
public void run() {
downstream.onSuccess(value);
}
}
final class OnError implements Runnable {
private final Throwable e;
OnError(Throwable e) {
this.e = e;
}
@Override
public void run() {
downstream.onError(e);
}
}
}
}
同前面的分析一样的套路,通过observer.onSubscribe(sd)将SequentialDisposable传给观察者,消息开始发送的时候,首先会调用SequentialDisposable的onSubscribe(),其中sd.replace(d)有点不好理解,它的作用其实是在不改变观察者持有的Disposable对象的前提下,偷偷的置换掉真正disposed()的对象。这句话不是很好理解,还是结合源码来看。
我们一起看下SequentialDisposable和DisposableHelper。
//只提取核心代码
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {
public boolean replace(Disposable next) {
return DisposableHelper.replace(this, next);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
}
//只提取核心代码
public enum DisposableHelper implements Disposable {
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose();
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
if (field.compareAndSet(current, d)) {
return true;
}
}
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
//Verifies that current is null, next is not null, otherwise signals errors to the RxJavaPlugins and returns false.
public static boolean validate(Disposable current, Disposable next) {
if (next == null) {
RxJavaPlugins.onError(new NullPointerException("next is null"));
return false;
}
if (current != null) {
next.dispose();
reportDisposableSet();
return false;
}
return true;
}
}
sd.replace(d)最终调用DisposableHelper.replace(this, next),将传入的Disposable对象保存起来。仔细观察下,会发现onSuccess()和onError()都有调用replace()方法。当调用disposed()的时候,如果上游消息还未发送,此时取消上游消息的发送,因为是Single.just(),在前面的无后续消息中讲过,当subscribe()调用的时候消息就开始立即发送了,所以没有机会取消。当消息发出来后,进入onSuccess(),启动一个定时器,此时如果调用disposed()取消,取消的是定时器发送。
(4)最后再看看有后续消息,并且有延迟的例子。
Observable.just(1, 2, 3)
.delay(1, TimeUnit.SECONDS)
.subscribe(object: Observer<Int> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onComplete() {
}
override fun onNext(t: Int) {
}
})
Observable.just()返回ObservableFromArray对象,该类的核心代码上面有贴出来过,delay()返回ObservableDelay对象,上面也介绍过。其实取消上游的消息发送,以及取消下游的定时器。
接下来开始讲解线程是如何切换的。
subscribeOn原理
先看下面简单的例子:
Single.just(1)
.subscribeOn(Schedulers.io())
.subscribe(object: SingleObserver<Int> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: Int) {
}
})
上面在subscribe()之前切了一次线程,这时候观察者的几个回调方法实际上都是在子线程执行的,那它是怎么做到的呢?
subscribeOn()返回了一个SingleSubscribeOn对象,该类的完整源码如下:
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
task.dispose();
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public void run() {
source.subscribe(this);
}
}
}
scheduler.scheduleDirect()开启线程,会触发调用SubscribeOnObserver的run()方法,然后在子线程中调用source.subscribe(this),后续的消息发送都是在这个子线程中完成的,比如onSubscribe(),onSuccess()。看图可能更直观点。

当我们调用subscribe()的时候,调用链开始运转起来,首先调用SingleSubscribeOn的subscribeActual()方法,前面我们说过的案例中,subscribeActual()方法中就直接触发了上游的subscribe()方法的调用,而这里比较特殊,是起了一个线程,对应的是run()方法中的source.subscribe(this),这是在线程中调用的。图中SingleSubscribeOn中的source指的就是上游SingleJust,所以后续的消息发送都是在子线程中完成,对应图中红色箭头。
那disposed()取消是怎么做到的呢?
上游SingleJust触发调用了SubscribeOnObserver中的onSubscribe(d)方法,d变量就是上游传入的,DisposableHelper.setOnce(this, d)就是把上游的Disposable保存起来,当下游想取消的时候,首先调用dispose()方法,接着调用DisposableHelper.dispose(this),也就取消了上游的消息发送,然后调用task.dispose(),取消scheduler.scheduleDirect(parent)任务。
一次subscribeOn我们已经理解了,那多次subscribeOn会如何切换线程呢?

可以对照着图,脑海中想象着走一遍流程。首先SingleObserver看到的是蓝色的Observable(被观察对象,SingleSubscribeOn实际上就是一个被观察的对象),然后创建一个线程,并在线程里面订阅上游红色的Observable,所以这是第一次切线程。在红色的Observable中同样创建子线程,并在子线程中订阅上游的SingleJust,所以这是第二次切线程。后续所有链路的调用,都是在红色线程里面完成的。
所以总结一句就是:无论多少次subscribeOn,只有第一个生效。
observeOn原理
先看例子
Single.just(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: SingleObserver<Int> {
override fun onSubscribe(d: Disposable) {
}
override fun onError(e: Throwable) {
}
override fun onSuccess(t: Int) {
}
})
observerOn创建了SingleObserveOn
public final class SingleObserveOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void onError(Throwable e) {
this.error = e;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
}
subscribeActual()方法中,直接让上游订阅ObserveOnSingleObserver,这里不需要在订阅过程中切线程,因为observeOn的作用是在收到消息后去切线程。所以看到onSuccess()和onError()里面都做了线程切换。

所以我们看图,在订阅的时候也就是黑色箭头部分没有切线程,而是在ObserveOnSingleObserver收到上游发送的消息后才开始切线程。
同样我们看看dispose()取消是如何实现的。首先我们看ObserveOnSingleObserver中的onSubscribe(),这里接收上游传入的Disposable,并将其保存在AtomicReference中,然后让下游订阅当前类,这样当下游想取消的时候调用dispose()方法,这样就直接取消了AtomicReference中保存的上游传入的Disposable。仔细观察onSuccess()和onError()中,都使用了DisposableHelper.replace来替换AtomicReference中保存的Disposable,这样就可以在消息发出来后不要再将消息往下游发送了。
多个observeOn又是怎么切换线程的呢?

订阅的时候不会发生线程切换,每次在SingleObserveOn收到消息后再切换线程。
所以总结一句就是:每次observeOn都会切线程。
到这里,整个RxJava的核心原理解析都讲完了。
转载请注明出处。