RxJava3原理解析

建议大家先看看扔物线大神写的《给 Android 开发者的 RxJava 详解》,虽然是1.0版本的,但是RxJava发展这么多年,基本的核心思想并没有变。有了这篇文章作为基础再去理解RxJava3会容易很多。不能完全理解也没关系,本篇将由浅入深,带着大家一起来剖析RxJava3的原理。

  • implementation "io.reactivex.rxjava3:rxjava:3.1.6"
  • implementation 'io.reactivex.rxjava3:rxandroid:3.0.2'

gradle中按如上配置就可以将RxJava3引入到项目中,其中rxandroid基于RxJava3新增了一些在Android上特有的功能,比如常用的切换到主线程AndroidSchedulers.mainThread()

通过本章的学习,我们能够了解:

  • subscribe到底干了啥?
  • 操作符的原理
  • Disposable的原理
  • subscribeOn原理
  • observeOn原理

subscribe到底干了啥?

先看一段RxJava3简单的使用:

Single.just("123")
            .subscribe(object : SingleObserver<String> {
                override fun onSubscribe(d: Disposable) {
                }

                override fun onError(e: Throwable) {
                }

                override fun onSuccess(t: String) {
                }

            })
public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }

}

Single.just("123")创建了一个SingleJust对象,然后调用SingleJust父类Singlesubscribe()方法,接着调用SingleJust类重写的subscribeActual()方法,在方法内部,调用传入的Observer对象的onSubscribe()方法和onSuccess()方法,将消息传给观察者。

这个简单的例子,展示了RxJava的核心思想——观察者模式。SingleJust被观察者subscribe()方法传入的SingleObserver观察者。当subscribe()方法调用的时候,也就是消息真正开始流转的时候。这也就回答了标题的问题,subscribe()方法实际上就是让消息开始流转起来。

操作符的原理

以map作为例子来讲解,先看看下面这个例子:

val single = Single.just(1)
val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }

})
singleStr1.subscribe(object: SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
    }

    override fun onError(e: Throwable) {
    }

    override fun onSuccess(t: String) {
        println(t)
    }

})

上面例子经过了一次map转换,将Int类型转换成String,然后输出。用法其实很简单,接下来结合源码看下实现的原理。

Single.just()在前面有介绍,返回的是SingleJust,因为SingleJust继承自Single,然后调用了Singlemap()返回SingleMap,然后调用subscribe(),实际上是调用的SingleMapsubscribe()

//注意:这不是Single的源码,只是提取出的我们要关注的部分
public class Single {
    public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
    }
}
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

SingleMap类比较关键,也是我们后面理解Disposable和多线程切换的关键,所以一定要理解。其实也不难,只需要记住几个关键变量——构造方法中的source变量即SingleJustmapper变量即FunctionMapSingleObserver构造方法中的t变量即SingleObserver,当调用subscribe()方法时,消息流就开始运转起来。简单理解就是,source是上游,消息经过mapper处理后,丢给下游t

下面的图更直观些。

看到这里大家可以停下来思考两分钟再继续往下看,脑海中想象下整个链路的调用过程。首先顺着思考,SingleJust先创建,接着调用SingleJustmap()方法创建了SingleMap1,此时source指向了上游,也就是SingleJust,当调用subscribe()的时候实际上最终会调用SingleMap1subscribeActual()方法,这时候才会在内部创建MapSingleObserver,然后出发source.subscribe()调用上游,最终触发上游的消息发送,上游SingleJust将消息发送给传入的内部观察者MapSingleObserver,而内部观察者持有下游SingleObserver,所以接着将消息转发给下游。

理解了上面的单次map,接下来再看多次map的情况就容易些了。

val single = Single.just(1)
val singleStr1 = single.map(object: io.reactivex.rxjava3.functions.Function<Int, String> {
    override fun apply(t: Int): String {
        return t.toString()
    }

})
val singleStr2 = singleStr1.map(object: io.reactivex.rxjava3.functions.Function<String, String> {
    override fun apply(t: String): String {
        return t + "123"
    }
})
singleStr2.subscribe(object: SingleObserver<String> {
    override fun onSubscribe(d: Disposable) {
    }

    override fun onError(e: Throwable) {
    }

    override fun onSuccess(t: String) {
    }

})

上面例子是在单次map的基础上又增加了一次map,那这个又是怎样的调用过程呢?同样,停下来思考两分钟再往下看。

我看单个map时脑子还能转过来,但是两个map串联起来时,脑子就卡壳了。其实就是source变量和t变量没有拎清导致。记住了上面说的——source是上游,消息经过mapper处理后,丢给下游t,再结合下面的流程图就很好理解了。

图中黑色和红色的线可以理解为指针,比如从上往下第一个source变量指向SingleJust,第二个source变量指向SingleMap1,当调用singleMap2.subscribe()时,整个链路就开始运行起来了。

  1. 首先触发的是SingleMap2subscribeActual()方法,此时创建了SingleMap2内部的MapSingleObserver(图中绿色部分)对象,该对象内部有个t变量指向最终的观察者SingleObserver
  2. SingleMap2subscribeActual()方法内部实际触发的是SingleMap1subscribeActual()方法,此时创建了SingleMap1内部的MapSingleObserver对象,该对象内部有个t变量指向的是上面步骤创建的MapSingleObserver对象。
  3. SingleMap1subscribeActual()方法内部实际触发的是SingleJustsubscribeActual()方法,然后消息由该方法内部发出,先触发SingleMap1MapSingleObserver对象对应的方法,接着出发SingleMap2MapSingleObserver对象,最后触发观察者对象SingleObserver

结合Single、SingleMap的源码以及上面的流程图,整个链的各个对象之间的调用关系就很清晰了。

Disposable的原理

Disposable的dispose()方法可以取消上游未发送的消息,以及让下游也不要处理接收到的消息了。

(1)先来看看既没有后续消息,也没有延迟的例子,以最简单的SingleJust为例。

Single.just("123")
    .subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

        override fun onSuccess(t: String) {
        }
    })

onSubscribe()方法传入了Disposable对象,结合上面介绍的SingleJust的源码可知,onSubscribe()方法是由SingleJust触发,通过如下方法传入Disposable对象,observer.onSubscribe(Disposable.disposed())。传入的是Disposable.disposed(),由此可知,SingleJust消息实际上是不能取消的,因为当订阅subscribe()发生时,消息就被立即发出了,这也符合我们的认知。

(2)接下来看看有后续消息,但无延迟的例子。

Observable.just("1", "2", "3")为例,该方法最终创建了一个ObservableFromArray,我们来看看源码。

//注意:这不是完整的源码,我删减了一些,只保留本篇博客需要的关键代码
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {
        FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array);
        observer.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> downstream;
        final T[] array;
        volatile boolean disposed;

        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.downstream = actual;
            this.array = array;
        }

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
    }
}

通过前面的分析我们已经知道,Disposable最终是通过调用观察者ObserveronSubscribe()方法传入的,所以我们看observer.onSubscribe(d),传入的是一个FromArrayDisposable对象,重写了dispose()方法,也就是当我们想取消消息发送的时候,调用该方法将disposed置为true,再看run()方法,for循环发送消息的时候有个isDisposed()判断,当发现disposed=true了,退出循环,这样上游消息也就停止向下游发送了。

(3)再来看看无后续消息,但是有延迟的例子。

Single.just("1")
    .delay(1, TimeUnit.SECONDS)
    .subscribe(object: SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

        override fun onSuccess(t: String) {
        }
    })

Single.just()就不多说了,delay()方法调用过程如下:

//只节选核心代码
public abstract class Single<@NonNull T> implements SingleSource<T> {

    public final Single<T> delay(long time, @NonNull TimeUnit unit) {
        return delay(time, unit, Schedulers.computation(), false);
    }

    public final Single<T> delay(long time, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean delayError) {
        return RxJavaPlugins.onAssembly(new SingleDelay<>(this, time, unit, scheduler, delayError));
    }

}

主要关注SingleDelay。

public final class SingleDelay<T> extends Single<T> {

    final SingleSource<? extends T> source;
    final long time;
    final TimeUnit unit;
    final Scheduler scheduler;
    final boolean delayError;

    public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
        this.source = source;
        this.time = time;
        this.unit = unit;
        this.scheduler = scheduler;
        this.delayError = delayError;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {

        final SequentialDisposable sd = new SequentialDisposable();
        observer.onSubscribe(sd);
        source.subscribe(new Delay(sd, observer));
    }

    final class Delay implements SingleObserver<T> {
        private final SequentialDisposable sd;
        final SingleObserver<? super T> downstream;

        Delay(SequentialDisposable sd, SingleObserver<? super T> observer) {
            this.sd = sd;
            this.downstream = observer;
        }

        @Override
        public void onSubscribe(Disposable d) {
            sd.replace(d);
        }

        @Override
        public void onSuccess(final T value) {
            sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
        }

        @Override
        public void onError(final Throwable e) {
            sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
        }

        final class OnSuccess implements Runnable {
            private final T value;

            OnSuccess(T value) {
                this.value = value;
            }

            @Override
            public void run() {
                downstream.onSuccess(value);
            }
        }

        final class OnError implements Runnable {
            private final Throwable e;

            OnError(Throwable e) {
                this.e = e;
            }

            @Override
            public void run() {
                downstream.onError(e);
            }
        }
    }
}

同前面的分析一样的套路,通过observer.onSubscribe(sd)SequentialDisposable传给观察者,消息开始发送的时候,首先会调用SequentialDisposableonSubscribe(),其中sd.replace(d)有点不好理解,它的作用其实是在不改变观察者持有的Disposable对象的前提下,偷偷的置换掉真正disposed()的对象。这句话不是很好理解,还是结合源码来看。

我们一起看下SequentialDisposableDisposableHelper

//只提取核心代码
public final class SequentialDisposable
extends AtomicReference<Disposable>
implements Disposable {

    public boolean replace(Disposable next) {
        return DisposableHelper.replace(this, next);
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

}
//只提取核心代码
public enum DisposableHelper implements Disposable {
    public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
        Objects.requireNonNull(d, "d is null");
        if (!field.compareAndSet(null, d)) {
            d.dispose();
            if (field.get() != DISPOSED) {
                reportDisposableSet();
            }
            return false;
        }
        return true;
    }

    public static boolean replace(AtomicReference<Disposable> field, Disposable d) {
        for (;;) {
            Disposable current = field.get();
            if (current == DISPOSED) {
                if (d != null) {
                    d.dispose();
                }
                return false;
            }
            if (field.compareAndSet(current, d)) {
                return true;
            }
        }
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

    //Verifies that current is null, next is not null, otherwise signals errors to the RxJavaPlugins and returns false.
    public static boolean validate(Disposable current, Disposable next) {
        if (next == null) {
            RxJavaPlugins.onError(new NullPointerException("next is null"));
            return false;
        }
        if (current != null) {
            next.dispose();
            reportDisposableSet();
            return false;
        }
        return true;
    }
}

sd.replace(d)最终调用DisposableHelper.replace(this, next),将传入的Disposable对象保存起来。仔细观察下,会发现onSuccess()onError()都有调用replace()方法。当调用disposed()的时候,如果上游消息还未发送,此时取消上游消息的发送,因为是Single.just(),在前面的无后续消息中讲过,当subscribe()调用的时候消息就开始立即发送了,所以没有机会取消。当消息发出来后,进入onSuccess(),启动一个定时器,此时如果调用disposed()取消,取消的是定时器发送。

(4)最后再看看有后续消息,并且有延迟的例子。

Observable.just(1, 2, 3)
    .delay(1, TimeUnit.SECONDS)
    .subscribe(object: Observer<Int> {
        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

        override fun onComplete() {
        }

        override fun onNext(t: Int) {
        }

    })

Observable.just()返回ObservableFromArray对象,该类的核心代码上面有贴出来过,delay()返回ObservableDelay对象,上面也介绍过。其实取消上游的消息发送,以及取消下游的定时器。

接下来开始讲解线程是如何切换的。

subscribeOn原理

先看下面简单的例子:

Single.just(1)
    .subscribeOn(Schedulers.io())
    .subscribe(object: SingleObserver<Int> {
        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

        override fun onSuccess(t: Int) {
        }
    })

上面在subscribe()之前切了一次线程,这时候观察者的几个回调方法实际上都是在子线程执行的,那它是怎么做到的呢?

subscribeOn()返回了一个SingleSubscribeOn对象,该类的完整源码如下:

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;

    final Scheduler scheduler;

    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
        observer.onSubscribe(parent);

        Disposable f = scheduler.scheduleDirect(parent);

        parent.task.replace(f);

    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;

        final SingleObserver<? super T> downstream;

        final SequentialDisposable task;

        final SingleSource<? extends T> source;

        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            downstream.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            downstream.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}

scheduler.scheduleDirect()开启线程,会触发调用SubscribeOnObserverrun()方法,然后在子线程中调用source.subscribe(this),后续的消息发送都是在这个子线程中完成的,比如onSubscribe()onSuccess()。看图可能更直观点。

当我们调用subscribe()的时候,调用链开始运转起来,首先调用SingleSubscribeOnsubscribeActual()方法,前面我们说过的案例中,subscribeActual()方法中就直接触发了上游的subscribe()方法的调用,而这里比较特殊,是起了一个线程,对应的是run()方法中的source.subscribe(this),这是在线程中调用的。图中SingleSubscribeOn中的source指的就是上游SingleJust,所以后续的消息发送都是在子线程中完成,对应图中红色箭头。

disposed()取消是怎么做到的呢?

上游SingleJust触发调用了SubscribeOnObserver中的onSubscribe(d)方法,d变量就是上游传入的,DisposableHelper.setOnce(this, d)就是把上游的Disposable保存起来,当下游想取消的时候,首先调用dispose()方法,接着调用DisposableHelper.dispose(this),也就取消了上游的消息发送,然后调用task.dispose(),取消scheduler.scheduleDirect(parent)任务。

一次subscribeOn我们已经理解了,那多次subscribeOn会如何切换线程呢?

可以对照着图,脑海中想象着走一遍流程。首先SingleObserver看到的是蓝色的Observable(被观察对象,SingleSubscribeOn实际上就是一个被观察的对象),然后创建一个线程,并在线程里面订阅上游红色的Observable,所以这是第一次切线程。在红色的Observable中同样创建子线程,并在子线程中订阅上游的SingleJust,所以这是第二次切线程。后续所有链路的调用,都是在红色线程里面完成的。

所以总结一句就是:无论多少次subscribeOn,只有第一个生效。

observeOn原理

先看例子

Single.just(1)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object: SingleObserver<Int> {
        override fun onSubscribe(d: Disposable) {
        }

        override fun onError(e: Throwable) {
        }

        override fun onSuccess(t: Int) {
        }
    })

observerOn创建了SingleObserveOn

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;

    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> downstream;

        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

subscribeActual()方法中,直接让上游订阅ObserveOnSingleObserver,这里不需要在订阅过程中切线程,因为observeOn的作用是在收到消息后去切线程。所以看到onSuccess()onError()里面都做了线程切换。

所以我们看图,在订阅的时候也就是黑色箭头部分没有切线程,而是在ObserveOnSingleObserver收到上游发送的消息后才开始切线程。

同样我们看看dispose()取消是如何实现的。首先我们看ObserveOnSingleObserver中的onSubscribe(),这里接收上游传入的Disposable,并将其保存在AtomicReference中,然后让下游订阅当前类,这样当下游想取消的时候调用dispose()方法,这样就直接取消了AtomicReference中保存的上游传入的Disposable。仔细观察onSuccess()和onError()中,都使用了DisposableHelper.replace来替换AtomicReference中保存的Disposable,这样就可以在消息发出来后不要再将消息往下游发送了。

多个observeOn又是怎么切换线程的呢?

订阅的时候不会发生线程切换,每次在SingleObserveOn收到消息后再切换线程。

所以总结一句就是:每次observeOn都会切线程。

到这里,整个RxJava的核心原理解析都讲完了。

转载请注明出处。