RxJava3 源码分析

摘要

作为Android主流框架之一,简洁的线程切换和丰富的功能操作符,再配合OkHttp+Retrofit,组合成了APP必备框架,深得Android开发者的喜爱。本文基于最新的RxJava3,分析其工作原理,以及线程切换原理。

简单入门

添加依赖:

1
2
implementation "io.reactivex.rxjava3:rxjava:3.0.11"
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

通过Observable.create()来创建一个Observable对象,Observable表示可观察的,被观察者。create函数接收一个ObservableOnSubscribe类型的参数,ObservableOnSubscribe是一个函数式接口,拥有一个携带ObservableEmitter类型参数的subscribe函数。ObservableEmitter对象可以用来在发生订阅后发出事件,且是可以取消的。例如:

1
2
3
4
5
6
7
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("Hello,RxJava");
emitter.onComplete();
}
});

有了订阅事件和被观察者,接下来创建观察者observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observer observer=new Observer<String>() {
//当与被观察者发生订阅关系,该函数会被回调, Disposable可以用去取消该订阅关系
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG,"onSubscribe:"+d.isDisposed());
}
//对应`ObservableEmitter`的onNext函数
@Override
public void onNext(@NonNull String s) {
Log.d(TAG,"onNext:"+s);
}
//onComplete与onError只有一个会被触发
@Override
public void onError(@NonNull Throwable e) {
Log.e(TAG,e.getMessage());
}

@Override
public void onComplete() {
Log.d(TAG,"onComplete");
}
};

将观察者订阅到被观察者。

1
observable.subscribe(observer);

打印日志:

1
2
3
onSubscribe:false
onNext:Hello,RxJava
onComplete

RxJava在Android的流行是因为其线程切换简单和方便。

1
2
3
4
observable
.subscribeOn(Schedulers.io())//订阅在后台线程执行,即ObservableEmitter的onNext产生源,例如网络操作
.observeOn(AndroidSchedulers.mainThread())//观察者的观察在Android主线程执行
.subscribe(observer);

源码分析

Observable的创建

1
2
3
4
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}

RxJavaPlugins.onAssembly是一个钩子技术,表示是否对参数内容进行过滤和转换。如果没有自定义,一般Function对象f都是null,所以直接返回参数值。

1
2
3
4
5
6
7
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

因此,create函数这里返回是ObservableCreate对象。

Observable的订阅

创建好被观察者和观察者之后,通过subscribe函数建立起订阅关系。

1
2
3
4
5
6
//Observable
public final void subscribe(@NonNull Observer<? super T> observer) {
...
subscribeActual(observer);
...
}

调用了ObserVable类的subscribeActual函数。subscribeActual是一个抽象函数,看一下前面实现的ObservableCreate对象的subscribeActual函数。

1
2
3
4
5
6
7
8
9
10
11
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);

try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

这里直接创建了CreateEmitter对象,并调用了Observer对象的onSubscribe函数。CreateEmitter类实现了Disposable接口,这样就可以通过CreateEmitter实例去取消订阅。

然后调用了source.subscribe函数,这里的source变量指向的就是前面创建的ObservableOnSubscribe对象。这样在建立订阅关系后就立即触发了上流数据的产生,然后在观察者中消费。

线程切换

作为RxJava在Android使用的核心功能之一就是线程切换。通过RxAndroid,引入Android主线程。

subscribeOn

1
2
3
4
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

Schedulers.io()返回了一个Scheduler对象,暂且不深究,看看subscribeOn函数。

1
2
3
4
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}

创建了ObservableSubscribeOn对象,也是Observable子类,返回了自身。

observeOn

再看看observeOn函数,调用了重载函数。

1
2
3
4
5
6
7
8
9
public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}

创建了ObservableObserveOn对象,也是Observable子类,返回了自身。此时观察者订阅的被观察者是ObservableObserveOn对象,而不是最开始的Observable对象。

subscribe

此时ObservableObserveOn对象持有sourceObservableSubscribeOn对象,schedulerAndroidSchedulers.mainThread()对象。观察者Observer对象observer开始订阅,会调用ObservableObserveOn对象的subscribeActual函数。

1
2
3
4
5
6
7
8
9
10
//ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}

AndroidSchedulers.mainThread()对象并不是TrampolineScheduler的实例,所以走else分支。AndroidSchedulers.mainThread()返回的是一个HandlerScheduler对象,其createWorker函数返回HandlerWorker对象,然后将相关信息封装成ObserveOnObserver对象。然后调用了ObservableSubscribeOn对象的subscribe函数。

ObservableSubscribeOn.subscribeActual

ObservableSubscribeOn对象的subscribe函数,也终触发subscribeActual函数。此时的函数实参就是ObserveOnObserver对象。

1
2
3
4
5
6
7
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

subscribeActual函数,先将ObserveOnObserver对象包装成SubscribeOnObserver对象parent,并调用的SubscribeOnObserver对象的onSubscribe函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void onSubscribe(Disposable d) {
//upstream为null且d为非null,返回true,upstream默认为null,条件成立。
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {//条件不成立
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;

int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}

queue = new SpscLinkedArrayQueue<>(bufferSize);
//触发最后Observer的onSubscribe函数
downstream.onSubscribe(this);
}
}

回到subscribeActual函数。

1
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))

创建了SubscribeTask任务,并直接提交给scheduler进行调度。此时的scheduler对象是Schudlers.IO()创建的IoScheduler对象。

Scheduler.scheduleDirect

scheduleDirect函数定位到Scheduler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();

final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

DisposeTask task = new DisposeTask(decoratedRun, w);

w.schedule(task, delay, unit);

return task;
}

createWorker函数返回了EventLoopWorker对象。因为SubscribeTask本身是Runable的子类,所以这里的Runnable对象decoratedRun还是SubscribeTask对象。将EventLoopWorker对象和SubscribeTask对象封装到DisposeTask对象中,然后提交给个EventLoopWorker对象的schedule函数。

1
2
3
4
5
6
7
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {//默认false
return EmptyDisposable.INSTANCE;
}

return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

这里的poolWorker是通过CachedWorkerPoolget函数获得的ThreadWorker对象。IoScheduler创建时候会创建CachedWorkerPool对象pool。而在创建EventLoopWorker对象的时候,会通过pool.get函数获得ThreadWorker对象并赋值给EventLoopWorker实例的threadWorker变量,ThreadWorkerNewThreadWorker的子类。

NewThreadWorker.scheduleActual

看看NewThreadWorker类的scheduleActual函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
if (!parent.add(sr)) {//将当前任务添加集合OpenHashSet,添加成功则返回true
return sr;
}
}

Future<?> f;
try {
if (delayTime <= 0) {//根据是否延迟提交到线程池中
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}

return sr;
}

这里的executor创建。

1
2
3
4
5
6
7
8
//NewThreadWorker()
executor = SchedulerPoolFactory.create(threadFactory);
//SchedulerPoolFactory
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}

create函数实参:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//CachedWorkerPool#get
ThreadWorker w = new ThreadWorker(threadFactory)
//IoScheduler#start
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory)
//IoScheduler
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
}

public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}

WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority)

public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}

public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}

RxThreadFactory继承自ThreadFactory,其nonBlocking=false,看看newThread函数。

1
2
3
4
5
6
7
8
public Thread newThread(@NonNull Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}

因此IoScheduler调度的线程池核心线程只有1条,且创建的线程都是普通的守护线程。

Runable.run的调用

这时候要回到[CachedWorkerPool.scheduleActual](# CachedWorkerPool.scheduleActual)函数的ScheduledRunnable对象的run函数。

1
2
3
4
5
6
7
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}

这里delegateScheduler.scheduleDirect函数中的DisposeTask对象。看看run函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
runner = Thread.currentThread();
try {
try {
decoratedRun.run();
} catch (Throwable ex) {
RxJavaPlugins.onError(ex);
throw ex;
}
} finally {
dispose();
runner = null;
}
}

这里decoratedRunObservableSubscribeOn.subscribeActual函数中的SubscribeTask对象。看看run函数。

1
2
3
public void run() {
source.subscribe(parent);
}

ObservableSubscribeOn对象source就是我们最开始的Observable对象,这里就触发到上流数据的开始发送,且是在后台线程执行的。而parentSubscribeOnObserver对象,直接调用了下流观察者的onNext函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void onNext(T t) {
downstream.onNext(t);
}

@Override
public void onError(Throwable t) {
downstream.onError(t);
}

@Override
public void onComplete() {
downstream.onComplete();
}

这里的downStreamObserveOnObserver对象,看一下其onNext函数。

1
2
3
4
5
6
7
8
9
10
public void onNext(T t) {
if (done) {
return;
}

if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}

回到主线程

将观察事件添加到队列后,调用了schedule函数。

1
2
3
4
5
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}

前面知道这里的wokerHandlerWorker对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");

if (disposed) {
return Disposable.disposed();
}

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.

if (async) {
message.setAsynchronous(true);
}

handler.sendMessageDelayed(message, unit.toMillis(delay));

// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}

return scheduled;
}

schedule函数将handler对象和Runnable对象封装成ScheduledRunnable对象,然后通过handler发送出去。这里的Handler在创建的时候与主线程的Looper绑定,所以ScheduledRunnable对象将在主线程执行。看看ScheduledRunnablerun函数。

1
2
3
4
5
6
7
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}

这里delegate就是ObserveOnObserver对象,查看其run函数。

1
2
3
4
5
6
7
public void run() {
if (outputFused) {//outputFused在异步情况下才会等于true
drainFused();
} else {
drainNormal();
}
}

看看drainNormal函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void drainNormal() {
int missed = 1;

final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;

for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}

for (;;) {
boolean d = done;
T v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;

if (checkTerminated(d, empty, a)) {
return;
}

if (empty) {
break;
}

a.onNext(v);
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

drainNormal函数检查一些合法性之后,最终还是调用downstream.onNext函数,而这里的downstream就是我们最终的观察者对象。也就实现了线程切换。

rxjava线程切换

多个subscribeOnobserveOn

通过上图,可以理解如果同时有多个subscribeOnobserveOn情况会怎么样?

多个subscribeOn,只有离发射源最近的subscribeOn有效,虽然中间的subscribeOn也会切换线程,但其仅仅是发生订阅关系,都会被封装成Obserser然后去订阅上个observable。直到最后一个订阅关系建立,并触发源开始运作并发送数据,但此时,所有的onNext函数都是最后一个发生订阅关系(即调用链中的第一个subscribeOn)的subscribeOn所创建的调度器的线程中执行。

类似的,写一个Java项目,最终打印是在B线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
},"B").start();
}
},"A").start();
}

多个observeOn,每一个都会影响紧接着的观察者所在的线程,因为onNext函数的调用会被切换到对应的线程中执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
observable
.subscribeOn(Schedulers.newThread())
.map(o -> {
Log.d(TAG, "2threadName:" + Thread.currentThread().getName());
return "Sorry,I'm Android";
}
)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(o->{
Log.d(TAG, "3threadName:" + Thread.currentThread().getName());
return "I want to RxJava";
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);

打印日志:

1
2
3
4
5
6
7
onSubscribe:false
threadName:RxNewThreadScheduler-2 //源所在线程
2threadName:RxNewThreadScheduler-2 //map所在线程
3threadName:RxNewThreadScheduler-3 //observeOn map 所在线程
4threadName:main//最后observer所在线程
onNext:I want to RxJava
onComplet

总结

RxJava基本每个操作符会产生中间的观察者和被观察者对象,所产生的被观察者对象被被下流的观察者所订阅,而产生的观察者会去订阅上流的被观察者。这样从最下流我们消费事件的观察者一直订阅到最开始的源被观察者。然后触发源发送数据给观察者,最后传递到最后观察者,而中间操作符从中进行拦截和转化。

RxJava操作符