摘要
作为Android主流框架之一,简洁的线程切换和丰富的功能操作符,再配合OkHttp+Retrofit,组合成了APP必备框架,深得Android开发者的喜爱。本文基于最新的RxJava3,分析其工作原理,以及线程切换原理。
简单入门
添加依赖:
1 2 implementation "io.reactivex.rxjava3:rxjava:3.0.11" implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
通过Observable.create()
来创建一个Observable
对象,Observable
表示可观察的,被观察者。create函数接收一个ObservableOnSubscribe
类型的参数,ObservableOnSubscribe
是一个函数式接口,拥有一个携带ObservableEmitter
类型参数的subscribe
函数。ObservableEmitter
对象可以用来在发生订阅后发出事件,且是可以取消的。例如:
1 2 3 4 5 6 7 Observable observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe (@NonNull ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("Hello,RxJava" ); emitter.onComplete(); } });
有了订阅事件和被观察者,接下来创建观察者observer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Observer observer=new Observer<String>() { @Override public void onSubscribe (@NonNull Disposable d) { Log.d(TAG,"onSubscribe:" +d.isDisposed()); } @Override public void onNext (@NonNull String s) { Log.d(TAG,"onNext:" +s); } @Override public void onError (@NonNull Throwable e) { Log.e(TAG,e.getMessage()); } @Override public void onComplete () { Log.d(TAG,"onComplete" ); } };
将观察者订阅到被观察者。
1 observable.subscribe(observer);
打印日志:
1 2 3 onSubscribe:false onNext:Hello,RxJava onComplete
RxJava在Android的流行是因为其线程切换简单和方便。
1 2 3 4 observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
源码分析
Observable
的创建
1 2 3 4 public static <T> Observable<T> create (@NonNull ObservableOnSubscribe<T> source) { Objects.requireNonNull(source, "source is null" ); return RxJavaPlugins.onAssembly(new ObservableCreate<>(source)); }
RxJavaPlugins.onAssembly
是一个钩子技术,表示是否对参数内容进行过滤和转换。如果没有自定义,一般Function
对象f
都是null
,所以直接返回参数值。
1 2 3 4 5 6 7 public static <T> Observable<T> onAssembly (@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null ) { return apply(f, source); } return source; }
因此,create
函数这里返回是ObservableCreate
对象。
Observable
的订阅
创建好被观察者和观察者之后,通过subscribe
函数建立起订阅关系。
1 2 3 4 5 6 public final void subscribe (@NonNull Observer<? super T> observer) { ... subscribeActual(observer); ... }
调用了ObserVable
类的subscribeActual
函数。subscribeActual
是一个抽象函数,看一下前面实现的ObservableCreate
对象的subscribeActual
函数。
1 2 3 4 5 6 7 8 9 10 11 protected void subscribeActual (Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
这里直接创建了CreateEmitter
对象,并调用了Observer
对象的onSubscribe
函数。CreateEmitter
类实现了Disposable
接口,这样就可以通过CreateEmitter
实例去取消订阅。
然后调用了source.subscribe
函数,这里的source
变量指向的就是前面创建的ObservableOnSubscribe
对象。这样在建立订阅关系后就立即触发 了上流数据的产生,然后在观察者中消费。
线程切换
作为RxJava在Android使用的核心功能之一就是线程切换。通过RxAndroid
,引入Android
主线程。
subscribeOn
1 2 3 4 observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
Schedulers.io()
返回了一个Scheduler对象,暂且不深究,看看subscribeOn
函数。
1 2 3 4 public final Observable<T> subscribeOn (@NonNull Scheduler scheduler) { Objects.requireNonNull(scheduler, "scheduler is null" ); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this , scheduler)); }
创建了ObservableSubscribeOn
对象,也是Observable
子类,返回了自身。
observeOn
再看看observeOn
函数,调用了重载函数。
1 2 3 4 5 6 7 8 9 public final Observable<T> observeOn (@NonNull Scheduler scheduler) { return observeOn(scheduler, false , bufferSize()); } public final Observable<T> observeOn (@NonNull Scheduler scheduler, boolean delayError, int bufferSize) { Objects.requireNonNull(scheduler, "scheduler is null" ); ObjectHelper.verifyPositive(bufferSize, "bufferSize" ); return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this , scheduler, delayError, bufferSize)); }
创建了ObservableObserveOn
对象,也是Observable
子类,返回了自身。此时观察者订阅的被观察者是ObservableObserveOn
对象,而不是最开始的Observable
对象。
subscribe
此时ObservableObserveOn
对象持有source
是ObservableSubscribeOn
对象,scheduler
是AndroidSchedulers.mainThread()
对象。观察者Observer
对象observer
开始订阅,会调用ObservableObserveOn
对象的subscribeActual
函数。
1 2 3 4 5 6 7 8 9 10 protected void subscribeActual (Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)); } }
AndroidSchedulers.mainThread()
对象并不是TrampolineScheduler
的实例,所以走else
分支。AndroidSchedulers.mainThread()
返回的是一个HandlerScheduler
对象,其createWorker
函数返回HandlerWorker
对象,然后将相关信息封装成ObserveOnObserver
对象。然后调用了ObservableSubscribeOn
对象的subscribe
函数。
ObservableSubscribeOn.subscribeActual
ObservableSubscribeOn
对象的subscribe
函数,也终触发subscribeActual
函数。此时的函数实参就是ObserveOnObserver
对象。
1 2 3 4 5 6 7 public void subscribeActual (final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
subscribeActual
函数,先将ObserveOnObserver
对象包装成SubscribeOnObserver
对象parent
,并调用的SubscribeOnObserver
对象的onSubscribe
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void onSubscribe (Disposable d) { if (DisposableHelper.validate(this .upstream, d)) { this .upstream = d; if (d instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<T> qd = (QueueDisposable<T>) d; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { sourceMode = m; queue = qd; done = true ; downstream.onSubscribe(this ); schedule(); return ; } if (m == QueueDisposable.ASYNC) { sourceMode = m; queue = qd; downstream.onSubscribe(this ); return ; } } queue = new SpscLinkedArrayQueue<>(bufferSize); downstream.onSubscribe(this ); } }
回到subscribeActual
函数。
1 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)))
创建了SubscribeTask
任务,并直接提交给scheduler
进行调度。此时的scheduler
对象是Schudlers.IO()
创建的IoScheduler
对象。
Scheduler.scheduleDirect
scheduleDirect
函数定位到Scheduler
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public Disposable scheduleDirect (@NonNull Runnable run) { return scheduleDirect(run, 0L , TimeUnit.NANOSECONDS); } public Disposable scheduleDirect (@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
createWorker
函数返回了EventLoopWorker
对象。因为SubscribeTask
本身是Runable
的子类,所以这里的Runnable
对象decoratedRun
还是SubscribeTask
对象。将EventLoopWorker
对象和SubscribeTask
对象封装到DisposeTask
对象中,然后提交给个EventLoopWorker
对象的schedule
函数。
1 2 3 4 5 6 7 public Disposable schedule (@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
这里的poolWorker
是通过CachedWorkerPool
的get
函数获得的ThreadWorker
对象。IoScheduler
创建时候会创建CachedWorkerPool
对象pool
。而在创建EventLoopWorker
对象的时候,会通过pool.get
函数获得ThreadWorker
对象并赋值给EventLoopWorker
实例的threadWorker
变量,ThreadWorker
是NewThreadWorker
的子类。
NewThreadWorker.scheduleActual
看看NewThreadWorker
类的scheduleActual
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public ScheduledRunnable scheduleActual (final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null ) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0 ) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null ) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
这里的executor
创建。
1 2 3 4 5 6 7 8 executor = SchedulerPoolFactory.create(threadFactory); public static ScheduledExecutorService create (ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1 , factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
create
函数实参:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ThreadWorker w = new ThreadWorker(threadFactory) CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory) public IoScheduler (ThreadFactory threadFactory) { this .threadFactory = threadFactory; this .pool = new AtomicReference<>(NONE); start(); } public IoScheduler () { this (WORKER_THREAD_FACTORY); } WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority) public RxThreadFactory (String prefix, int priority) { this (prefix, priority, false ); } public RxThreadFactory (String prefix, int priority, boolean nonBlocking) { this .prefix = prefix; this .priority = priority; this .nonBlocking = nonBlocking; }
RxThreadFactory
继承自ThreadFactory
,其nonBlocking=false
,看看newThread
函数。
1 2 3 4 5 6 7 8 public Thread newThread (@NonNull Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-' ).append(incrementAndGet()); String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true ); return t; }
因此IoScheduler
调度的线程池核心线程只有1条,且创建的线程都是普通的守护线程。
Runable.run
的调用
这时候要回到[CachedWorkerPool.scheduleActual
](# CachedWorkerPool.scheduleActual
)函数的ScheduledRunnable
对象的run
函数。
1 2 3 4 5 6 7 public void run () { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } }
这里delegate
是Scheduler.scheduleDirect
函数中的DisposeTask
对象。看看run
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void run () { runner = Thread.currentThread(); try { try { decoratedRun.run(); } catch (Throwable ex) { RxJavaPlugins.onError(ex); throw ex; } } finally { dispose(); runner = null ; } }
这里decoratedRun
是ObservableSubscribeOn.subscribeActual
函数中的SubscribeTask
对象。看看run
函数。
1 2 3 public void run () { source.subscribe(parent); }
ObservableSubscribeOn
对象source
就是我们最开始的Observable
对象,这里就触发到上流数据的开始发送,且是在后台线程执行的。而parent
是SubscribeOnObserver
对象,直接调用了下流观察者的onNext
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void onNext (T t) { downstream.onNext(t); } @Override public void onError (Throwable t) { downstream.onError(t); } @Override public void onComplete () { downstream.onComplete(); }
这里的downStream
是ObserveOnObserver
对象,看一下其onNext
函数。
1 2 3 4 5 6 7 8 9 10 public void onNext (T t) { if (done) { return ; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
回到主线程
将观察事件添加到队列后,调用了schedule
函数。
1 2 3 4 5 void schedule () { if (getAndIncrement() == 0 ) { worker.schedule(this ); } }
前面知道这里的woker
是HandlerWorker
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public Disposable schedule (Runnable run, long delay, TimeUnit unit) { if (run == null ) throw new NullPointerException("run == null" ); if (unit == null ) throw new NullPointerException("unit == null" ); if (disposed) { return Disposable.disposed(); } run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this ; if (async) { message.setAsynchronous(true ); } handler.sendMessageDelayed(message, unit.toMillis(delay)); if (disposed) { handler.removeCallbacks(scheduled); return Disposable.disposed(); } return scheduled; }
schedule
函数将handler
对象和Runnable
对象封装成ScheduledRunnable
对象,然后通过handler
发送出去。这里的Handler在创建的时候与主线程的Looper
绑定,所以ScheduledRunnable
对象将在主线程执行。看看ScheduledRunnable
的run
函数。
1 2 3 4 5 6 7 public void run () { try { delegate.run(); } catch (Throwable t) { RxJavaPlugins.onError(t); } }
这里delegate
就是ObserveOnObserver
对象,查看其run
函数。
1 2 3 4 5 6 7 public void run () { if (outputFused) { drainFused(); } else { drainNormal(); } }
看看drainNormal
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void drainNormal () { int missed = 1 ; final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return ; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true ; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return ; } boolean empty = v == null ; if (checkTerminated(d, empty, a)) { return ; } if (empty) { break ; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0 ) { break ; } } }
drainNormal
函数检查一些合法性之后,最终还是调用downstream.onNex
t函数,而这里的downstream
就是我们最终的观察者对象。也就实现了线程切换。
多个subscribeOn
与observeOn
通过上图,可以理解如果同时有多个subscribeOn
和observeOn
情况会怎么样?
多个subscribeOn
,只有离发射源最近的subscribeOn
有效,虽然中间的subscribeOn
也会切换线程,但其仅仅是发生订阅关系,都会被封装成Obserser
然后去订阅上个observable
。直到最后一个订阅关系建立,并触发源开始运作并发送数据,但此时,所有的onNext
函数都是最后一个发生订阅关系(即调用链中的第一个subscribeOn
)的subscribeOn
所创建的调度器的线程中执行。
类似的,写一个Java项目,最终打印是在B线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) { new Thread(new Runnable() { @Override public void run () { new Thread(new Runnable() { @Override public void run () { System.out.println(Thread.currentThread().getName()); } },"B" ).start(); } },"A" ).start(); }
多个observeOn
,每一个都会影响紧接着的观察者所在的线程,因为onNext
函数的调用会被切换到对应的线程中执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 observable .subscribeOn(Schedulers.newThread()) .map(o -> { Log.d(TAG, "2threadName:" + Thread.currentThread().getName()); return "Sorry,I'm Android" ; } ) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .map(o->{ Log.d(TAG, "3threadName:" + Thread.currentThread().getName()); return "I want to RxJava" ; }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer);
打印日志:
1 2 3 4 5 6 7 onSubscribe:false threadName:RxNewThreadScheduler-2 2threadName:RxNewThreadScheduler-2 3threadName:RxNewThreadScheduler-3 4threadName:main onNext:I want to RxJava onComplet
总结
RxJava基本每个操作符会产生中间的观察者和被观察者对象,所产生的被观察者对象被被下流的观察者所订阅,而产生的观察者会去订阅上流的被观察者。这样从最下流我们消费事件的观察者一直订阅到最开始的源被观察者。然后触发源发送数据给观察者,最后传递到最后观察者,而中间操作符从中进行拦截和转化。
RxJava操作符