建议大家先看看扔物线大神写的《给 Android 开发者的 RxJava 详解》,虽然是1.0版本的,但是RxJava发展这么多年,基本的核心思想并没有变。有了这篇文章作为基础再去理解RxJava3会容易很多。不能完全理解也没关系,本篇将由浅入深,带着大家一起来剖析RxJava3的原理。
implementation "io.reactivex.rxjava3:rxjava:3.1.6"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'
gradle中按如上配置就可以将RxJava3引入到项目中,其中rxandroid基于RxJava3新增了一些在Android上特有的功能,比如常用的切换到主线程AndroidSchedulers.mainThread()
。
通过本章的学习,我们能够了解:
- subscribe到底干了啥?
- 操作符的原理
- Disposable的原理
- subscribeOn原理
- observeOn原理
subscribe到底干了啥?
先看一段RxJava3简单的使用:
Single.just("123") .subscribe(object : SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
public final class SingleJust<T> extends Single<T> { final T value; public SingleJust(T value) { this.value = value; } @Override protected void subscribeActual(SingleObserver<? super T> observer) { observer.onSubscribe(Disposable.disposed()); observer.onSuccess(value); } }
Single.just("123")
创建了一个SingleJust
对象,然后调用SingleJust
父类Single
的subscribe()
方法,接着调用SingleJust
类重写的subscribeActual()
方法,在方法内部,调用传入的Observer
对象的onSubscribe()
方法和onSuccess()
方法,将消息传给观察者。
这个简单的例子,展示了RxJava的核心思想——观察者模式。SingleJust
是被观察者,subscribe()
方法传入的SingleObserver
是观察者。当subscribe()
方法调用的时候,也就是消息真正开始流转的时候。这也就回答了标题的问题,subscribe()
方法实际上就是让消息开始流转起来。
操作符的原理
以map作为例子来讲解,先看看下面这个例子:
val single = Single.just(1) val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> { override fun apply(t: Int): String { return t.toString() } }) singleStr1.subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { println(t) } })
上面例子经过了一次map转换,将Int类型转换成String,然后输出。用法其实很简单,接下来结合源码看下实现的原理。
Single.just()
在前面有介绍,返回的是SingleJust
,因为SingleJust
继承自Single
,然后调用了Single
的map()
返回SingleMap
,然后调用subscribe()
,实际上是调用的SingleMap
的subscribe()
。
//注意:这不是Single的源码,只是提取出的我们要关注的部分 public class Single { public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) { Objects.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper)); } }
public final class SingleMap<T, R> extends Single<R> { final SingleSource<? extends T> source; final Function<? super T, ? extends R> mapper; public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(final SingleObserver<? super R> t) { source.subscribe(new MapSingleObserver<T, R>(t, mapper)); } static final class MapSingleObserver<T, R> implements SingleObserver<T> { final SingleObserver<? super R> t; final Function<? super T, ? extends R> mapper; MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this.t = t; this.mapper = mapper; } @Override public void onSubscribe(Disposable d) { t.onSubscribe(d); } @Override public void onSuccess(T value) { R v; try { v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value."); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } t.onSuccess(v); } @Override public void onError(Throwable e) { t.onError(e); } } }
SingleMap
类比较关键,也是我们后面理解Disposable
和多线程切换的关键,所以一定要理解。其实也不难,只需要记住几个关键变量——构造方法中的source
变量即SingleJust
,mapper
变量即Function
,MapSingleObserver
构造方法中的t
变量即SingleObserver
,当调用subscribe()
方法时,消息流就开始运转起来。简单理解就是,source
是上游,消息经过mapper
处理后,丢给下游t
。
下面的图更直观些。
![](https://www.longdw.com/wp-content/uploads/2023/07/image-5-879x1024.png)
看到这里大家可以停下来思考两分钟再继续往下看,脑海中想象下整个链路的调用过程。首先顺着思考,SingleJust
先创建,接着调用SingleJust
的map()
方法创建了SingleMap1
,此时source
指向了上游,也就是SingleJust
,当调用subscribe()
的时候实际上最终会调用SingleMap1
的subscribeActual()
方法,这时候才会在内部创建MapSingleObserver
,然后出发source.subscribe()
调用上游,最终触发上游的消息发送,上游SingleJust
将消息发送给传入的内部观察者MapSingleObserver
,而内部观察者持有下游SingleObserver
,所以接着将消息转发给下游。
理解了上面的单次map
,接下来再看多次map
的情况就容易些了。
val single = Single.just(1) val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> { override fun apply(t: Int): String { return t.toString() } }) val singleStr2 = singleStr1.map(object: io.reactivex.rxjava3.functions.Function<String, String> { override fun apply(t: String): String { return t + "123" } }) singleStr2.subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
上面例子是在单次map
的基础上又增加了一次map
,那这个又是怎样的调用过程呢?同样,停下来思考两分钟再往下看。
我看单个map时脑子还能转过来,但是两个map串联起来时,脑子就卡壳了。其实就是source变量和t变量没有拎清导致。记住了上面说的——source
是上游,消息经过mapper
处理后,丢给下游t
,再结合下面的流程图就很好理解了。
![](https://www.longdw.com/wp-content/uploads/2023/07/image-7-714x1024.png)
图中黑色和红色的线可以理解为指针,比如从上往下第一个source
变量指向SingleJust
,第二个source
变量指向SingleMap1
,当调用singleMap2.subscribe()
时,整个链路就开始运行起来了。
- 首先触发的是
SingleMap2
的subscribeActual()
方法,此时创建了SingleMap2
内部的MapSingleObserver
(图中绿色部分)对象,该对象内部有个t
变量指向最终的观察者SingleObserver
。 SingleMap2
的subscribeActual()
方法内部实际触发的是SingleMap1
的subscribeActual()
方法,此时创建了SingleMap1
内部的MapSingleObserver
对象,该对象内部有个t
变量指向的是上面步骤创建的MapSingleObserver
对象。SingleMap1
的subscribeActual()
方法内部实际触发的是SingleJust
的subscribeActual()
方法,然后消息由该方法内部发出,先触发SingleMap1
的MapSingleObserver
对象对应的方法,接着出发SingleMap2
的MapSingleObserver
对象,最后触发观察者对象SingleObserver
。
结合Single、SingleMap的源码以及上面的流程图,整个链的各个对象之间的调用关系就很清晰了。
Disposable的原理
Disposable的dispose()方法可以取消上游未发送的消息,以及让下游也不要处理接收到的消息了。
(1)先来看看既没有后续消息,也没有延迟的例子,以最简单的SingleJust为例。
Single.just("123") .subscribe(object : SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
onSubscribe()
方法传入了Disposable
对象,结合上面介绍的SingleJust
的源码可知,onSubscribe()
方法是由SingleJust
触发,通过如下方法传入Disposable
对象,observer.onSubscribe(Disposable.disposed())
。传入的是Disposable.disposed()
,由此可知,SingleJust
消息实际上是不能取消的,因为当订阅subscribe()
发生时,消息就被立即发出了,这也符合我们的认知。
(2)接下来看看有后续消息,但无延迟的例子。
以Observable.just("1", "2", "3")
为例,该方法最终创建了一个ObservableFromArray
,我们来看看源码。
//注意:这不是完整的源码,我删减了一些,只保留本篇博客需要的关键代码 public final class ObservableFromArray<T> extends Observable<T> { final T[] array; public ObservableFromArray(T[] array) { this.array = array; } @Override public void subscribeActual(Observer<? super T> observer) { FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array); observer.onSubscribe(d); d.run(); } static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> { final Observer<? super T> downstream; final T[] array; volatile boolean disposed; FromArrayDisposable(Observer<? super T> actual, T[] array) { this.downstream = actual; this.array = array; } @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { downstream.onError(new NullPointerException("The element at index " + i + " is null")); return; } downstream.onNext(value); } if (!isDisposed()) { downstream.onComplete(); } } } }
通过前面的分析我们已经知道,Disposable
最终是通过调用观察者Observer
的onSubscribe()
方法传入的,所以我们看observer.onSubscribe(d)
,传入的是一个FromArrayDisposable
对象,重写了dispose()
方法,也就是当我们想取消消息发送的时候,调用该方法将disposed
置为true
,再看run()
方法,for
循环发送消息的时候有个isDisposed()
判断,当发现disposed=true
了,退出循环,这样上游消息也就停止向下游发送了。
(3)再来看看无后续消息,但是有延迟的例子。
Single.just("1") .delay(1, TimeUnit.SECONDS) .subscribe(object: SingleObserver<String> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: String) { } })
Single.just()
就不多说了,delay()
方法调用过程如下:
//只节选核心代码 public abstract class Single<@NonNull T> implements SingleSource<T> { public final Single<T> delay(long time, @NonNull TimeUnit unit) { return delay(time, unit, Schedulers.computation(), false); } public final Single<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) { return RxJavaPlugins.onAssembly(new SingleDelay<>(this, time, unit, scheduler, delayError)); } }
主要关注SingleDelay。
public final class SingleDelay<T> extends Single<T> { final SingleSource<? extends T> source; final long time; final TimeUnit unit; final Scheduler scheduler; final boolean delayError; public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) { this.source = source; this.time = time; this.unit = unit; this.scheduler = scheduler; this.delayError = delayError; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { final SequentialDisposable sd = new SequentialDisposable(); observer.onSubscribe(sd); source.subscribe(new Delay(sd, observer)); } final class Delay implements SingleObserver<T> { private final SequentialDisposable sd; final SingleObserver<? super T> downstream; Delay(SequentialDisposable sd, SingleObserver<? super T> observer) { this.sd = sd; this.downstream = observer; } @Override public void onSubscribe(Disposable d) { sd.replace(d); } @Override public void onSuccess(final T value) { sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit)); } @Override public void onError(final Throwable e) { sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit)); } final class OnSuccess implements Runnable { private final T value; OnSuccess(T value) { this.value = value; } @Override public void run() { downstream.onSuccess(value); } } final class OnError implements Runnable { private final Throwable e; OnError(Throwable e) { this.e = e; } @Override public void run() { downstream.onError(e); } } } }
同前面的分析一样的套路,通过observer.onSubscribe(sd)
将SequentialDisposable
传给观察者,消息开始发送的时候,首先会调用SequentialDisposable
的onSubscribe()
,其中sd.replace(d)
有点不好理解,它的作用其实是在不改变观察者持有的Disposable
对象的前提下,偷偷的置换掉真正disposed()
的对象。这句话不是很好理解,还是结合源码来看。
我们一起看下SequentialDisposable
和DisposableHelper
。
//只提取核心代码 public final class SequentialDisposable extends AtomicReference<Disposable> implements Disposable { public boolean replace(Disposable next) { return DisposableHelper.replace(this, next); } @Override public void dispose() { DisposableHelper.dispose(this); } }
//只提取核心代码 public enum DisposableHelper implements Disposable { public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) { Objects.requireNonNull(d, "d is null"); if (!field.compareAndSet(null, d)) { d.dispose(); if (field.get() != DISPOSED) { reportDisposableSet(); } return false; } return true; } public static boolean replace(AtomicReference<Disposable> field, Disposable d) { for (;;) { Disposable current = field.get(); if (current == DISPOSED) { if (d != null) { d.dispose(); } return false; } if (field.compareAndSet(current, d)) { return true; } } } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } //Verifies that current is null, next is not null, otherwise signals errors to the RxJavaPlugins and returns false. public static boolean validate(Disposable current, Disposable next) { if (next == null) { RxJavaPlugins.onError(new NullPointerException("next is null")); return false; } if (current != null) { next.dispose(); reportDisposableSet(); return false; } return true; } }
sd.replace(d)
最终调用DisposableHelper.replace(this, next)
,将传入的Disposable
对象保存起来。仔细观察下,会发现onSuccess()
和onError()
都有调用replace()
方法。当调用disposed()
的时候,如果上游消息还未发送,此时取消上游消息的发送,因为是Single.just()
,在前面的无后续消息中讲过,当subscribe()
调用的时候消息就开始立即发送了,所以没有机会取消。当消息发出来后,进入onSuccess()
,启动一个定时器,此时如果调用disposed()
取消,取消的是定时器发送。
(4)最后再看看有后续消息,并且有延迟的例子。
Observable.just(1, 2, 3) .delay(1, TimeUnit.SECONDS) .subscribe(object: Observer<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onComplete() { } override fun onNext(t: Int) { } })
Observable.just()
返回ObservableFromArray
对象,该类的核心代码上面有贴出来过,delay()
返回ObservableDelay
对象,上面也介绍过。其实取消上游的消息发送,以及取消下游的定时器。
接下来开始讲解线程是如何切换的。
subscribeOn原理
先看下面简单的例子:
Single.just(1) .subscribeOn(Schedulers.io()) .subscribe(object: SingleObserver<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: Int) { } })
上面在subscribe()
之前切了一次线程,这时候观察者的几个回调方法实际上都是在子线程执行的,那它是怎么做到的呢?
subscribeOn()
返回了一个SingleSubscribeOn
对象,该类的完整源码如下:
public final class SingleSubscribeOn<T> extends Single<T> { final SingleSource<? extends T> source; final Scheduler scheduler; public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source); observer.onSubscribe(parent); Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 7000911171163930287L; final SingleObserver<? super T> downstream; final SequentialDisposable task; final SingleSource<? extends T> source; SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) { this.downstream = actual; this.source = source; this.task = new SequentialDisposable(); } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this, d); } @Override public void onSuccess(T value) { downstream.onSuccess(value); } @Override public void onError(Throwable e) { downstream.onError(e); } @Override public void dispose() { DisposableHelper.dispose(this); task.dispose(); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } @Override public void run() { source.subscribe(this); } } }
scheduler.scheduleDirect()
开启线程,会触发调用SubscribeOnObserver
的run()
方法,然后在子线程中调用source.subscribe(this)
,后续的消息发送都是在这个子线程中完成的,比如onSubscribe()
,onSuccess()
。看图可能更直观点。
![](https://www.longdw.com/wp-content/uploads/2023/08/image-819x1024.png)
当我们调用subscribe()
的时候,调用链开始运转起来,首先调用SingleSubscribeOn
的subscribeActual()
方法,前面我们说过的案例中,subscribeActual()
方法中就直接触发了上游的subscribe()
方法的调用,而这里比较特殊,是起了一个线程,对应的是run()
方法中的source.subscribe(this)
,这是在线程中调用的。图中SingleSubscribeOn
中的source
指的就是上游SingleJust
,所以后续的消息发送都是在子线程中完成,对应图中红色箭头。
那disposed()
取消是怎么做到的呢?
上游SingleJust
触发调用了SubscribeOnObserver
中的onSubscribe(d)
方法,d
变量就是上游传入的,DisposableHelper.setOnce(this, d)
就是把上游的Disposable
保存起来,当下游想取消的时候,首先调用dispose()
方法,接着调用DisposableHelper.dispose(this)
,也就取消了上游的消息发送,然后调用task.dispose()
,取消scheduler.scheduleDirect(parent)
任务。
一次subscribeOn
我们已经理解了,那多次subscribeOn
会如何切换线程呢?
![](https://www.longdw.com/wp-content/uploads/2023/08/image-3-717x1024.png)
可以对照着图,脑海中想象着走一遍流程。首先SingleObserver
看到的是蓝色的Observable
(被观察对象,SingleSubscribeOn
实际上就是一个被观察的对象),然后创建一个线程,并在线程里面订阅上游红色的Observable
,所以这是第一次切线程。在红色的Observable
中同样创建子线程,并在子线程中订阅上游的SingleJust
,所以这是第二次切线程。后续所有链路的调用,都是在红色线程里面完成的。
所以总结一句就是:无论多少次subscribeOn,只有第一个生效。
observeOn原理
先看例子
Single.just(1) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(object: SingleObserver<Int> { override fun onSubscribe(d: Disposable) { } override fun onError(e: Throwable) { } override fun onSuccess(t: Int) { } })
observerOn
创建了SingleObserveOn
public final class SingleObserveOn<T> extends Single<T> { final SingleSource<T> source; final Scheduler scheduler; public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) { this.source = source; this.scheduler = scheduler; } @Override protected void subscribeActual(final SingleObserver<? super T> observer) { source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler)); } static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable> implements SingleObserver<T>, Disposable, Runnable { private static final long serialVersionUID = 3528003840217436037L; final SingleObserver<? super T> downstream; final Scheduler scheduler; T value; Throwable error; ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) { this.downstream = actual; this.scheduler = scheduler; } @Override public void onSubscribe(Disposable d) { if (DisposableHelper.setOnce(this, d)) { downstream.onSubscribe(this); } } @Override public void onSuccess(T value) { this.value = value; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { this.error = e; Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { Throwable ex = error; if (ex != null) { downstream.onError(ex); } else { downstream.onSuccess(value); } } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } }
subscribeActual()
方法中,直接让上游订阅ObserveOnSingleObserver
,这里不需要在订阅过程中切线程,因为observeOn的作用是在收到消息后去切线程。所以看到onSuccess()
和onError()
里面都做了线程切换。
![](https://www.longdw.com/wp-content/uploads/2023/08/image-6-950x1024.png)
所以我们看图,在订阅的时候也就是黑色箭头部分没有切线程,而是在ObserveOnSingleObserver
收到上游发送的消息后才开始切线程。
同样我们看看dispose()
取消是如何实现的。首先我们看ObserveOnSingleObserver
中的onSubscribe()
,这里接收上游传入的Disposable
,并将其保存在AtomicReference
中,然后让下游订阅当前类,这样当下游想取消的时候调用dispose()
方法,这样就直接取消了AtomicReference
中保存的上游传入的Disposable
。仔细观察onSuccess()和onError()中,都使用了DisposableHelper.replace
来替换AtomicReference
中保存的Disposable
,这样就可以在消息发出来后不要再将消息往下游发送了。
多个observeOn又是怎么切换线程的呢?
![](https://www.longdw.com/wp-content/uploads/2023/08/image-8-716x1024.png)
订阅的时候不会发生线程切换,每次在SingleObserveOn收到消息后再切换线程。
所以总结一句就是:每次observeOn都会切线程。
到这里,整个RxJava的核心原理解析都讲完了。
转载请注明出处。