摘要
本文分析Android主流的事件框架EventBus
,尤其在不同的界面触发函数的调用,避免层层嵌套回调。通过源码分析,了解EventBus
如何查找订阅方法,切换线程,发布事件的原理。
前言
我们将根据下面的例子来分析EventBus
的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class MainActivity : AppCompatActivity() { private val TAG = "MainActivity" override fun onCreate (savedInstanceState: Bundle?) { super .onCreate(savedInstanceState) EventBus.getDefault().register(this ); EventBus.getDefault().post(1 ) } @Subscribe(threadMode = ThreadMode.MAIN) fun get (event:Int) { } override fun onDestroy () { super .onDestroy() EventBus.getDefault().unregister(this ) } }
EventBus.getDefault
1 2 3 4 5 6 7 8 9 10 11 12 public static EventBus getDefault () { EventBus instance = defaultInstance; if (instance == null ) { synchronized (EventBus.class) { instance = EventBus.defaultInstance; if (instance == null ) { instance = EventBus.defaultInstance = new EventBus(); } } } return instance; }
通过双重检查单例模式创建EventBus
单例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public EventBus () { this (DEFAULT_BUILDER); } EventBus(EventBusBuilder builder) { logger = builder.getLogger(); subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); stickyEvents = new ConcurrentHashMap<>(); mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ; backgroundPoster = new BackgroundPoster(this ); asyncPoster = new AsyncPoster(this ); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0 ; subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; }
DEFAULT_BUILDER
是EventBusBuilder
的实例,所有配置都采用默认值。这里涉及到几个主要对象的创建,例如HandlerPoster
对象mainThreadPoster
,其实是用来切换到主线程的。
register
接着我们看EventBus
如何将一个类与订阅事件绑定。查看EventBus
的register
函数。
1 2 3 4 5 6 7 8 9 public void register (Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this ) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
register
函数通过SubscriberMethodFinder
对象findSubscriberMethods
函数,获取当前订阅类所有的订阅方法subscriberMethods
。然后遍历所有订阅方法SubscriberMethod
,然后调用subscribe
函数,将当前订阅类和订阅方法进行绑定。
findSubscriberMethods
我们查看SubscriberMethodFinder
对象如何通过findSubscriberMethod
函数如何寻找当前订阅类中合法的订阅方法,并封装成SubscriberMethod
实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 List<SubscriberMethod> findSubscriberMethods (Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null ) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation" ); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
这里的METHOD_CACHE
是ConcurrentHashMap
类型,键值对分别是Class<?>, List<SubscriberMethod>
,也就是findSubscriberMethods
函数会查找订阅方法,然后缓存到METHOD_CACHE
中,下次优先通过通过缓存查找。初次注册,那肯定是没有缓存。ignoreGeneratedIndex
默认情况下是flase
,所以会调用findUsingInfo(subscriberClass)
查找类中的注册方法。
findUsingInfo
findUsingInfo
函数会将相关的信息封装到FindState
对象,最后获取到订阅方法SubscriberMethod
列表后释放FindState
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private List<SubscriberMethod> findUsingInfo (Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null ) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null ) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
通过prepareFindState
函数返回了FindState
对象,该函数主要遍历大小为4的FIND_STATE_POOL
数组,取第一个不为null
的FindState
对象,并将数组该位置引用置为null
,如果数组不存在元素,则直接新建一个FindState
对象。
1 2 3 4 5 6 7 8 9 10 11 12 private FindState prepareFindState () { synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { FindState state = FIND_STATE_POOL[i]; if (state != null ) { FIND_STATE_POOL[i] = null ; return state; } } } return new FindState(); }
然后调用FindState
对象的initForSubscriber
函数。将当前订阅类类型赋值FindState
对象的subscriberClass
和clazz
变量。
1 2 3 4 5 void initForSubscriber (Class<?> subscriberClass) { this .subscriberClass = clazz = subscriberClass; skipSuperClasses = false ; subscriberInfo = null ; }
回到findUsingInfo
函数,由于clazz
被赋值为subscriberClass
,所以while
循环为ture
。接着调用了getSubscriberInfo(findState)
函数。
getSubscriberInfo
在getSubscriberInfo(findState)
函数中,由于第一次注册该类,所以FindState
对象的subscriberInfo=null
。而subscriberInfoIndexes
在SubscriberMethodFinder
实例创建的时候传递进来。subscriberInfoIndexes
是通过EventBusBuilder
传递进来的,默认为null
。所以getSubscriberInfo
函数返回了null
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private SubscriberInfo getSubscriberInfo (FindState findState) { if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null ) { SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo(); if (findState.clazz == superclassInfo.getSubscriberClass()) { return superclassInfo; } } if (subscriberInfoIndexes != null ) { for (SubscriberInfoIndex index : subscriberInfoIndexes) { SubscriberInfo info = index.getSubscriberInfo(findState.clazz); if (info != null ) { return info; } } } return null ; }
findUsingReflectionInSingleClass
回到[findUsingInfo
](# findUsingInfo
)函数中,由于findState.subscriberInfo=null,
将执行findUsingReflectionInSingleClass(findState)
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private void findUsingReflectionInSingleClass (FindState findState) { Method[] methods; try { methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { try { methods = findState.clazz.getMethods(); } catch (LinkageError error) { ... } findState.skipSuperClasses = true ; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0 ) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1 ) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null ) { Class<?> eventType = parameterTypes[0 ]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract" ); } } }
getDeclaredMethods
函数是获取订阅类自身声明的所有方法,包括public
、protected
、private
方法,而getMethods
函数是类所有公共方法,自身、父类和接口的公共方法。如果前者抛出异常,将采用后者,并设置findState.skipSuperClasses = true
。
接着遍历所有获取的方法,访问方法的修饰符。判断条件(modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0
表示当前方法是public
修饰符且不是编译器生成的。然后通过method.getParameterTypes()
函数,获取方法的参数类型列表,这里只是用来判断参数的数量,当参数数量为1才继续下一步。
然后通过method.getAnnotation(Subscribe.class)
函数判断方法被Subscribe
注解着。然后调用了FindState
对象的checkAdd
函数,将方法method
和事件类型eventType
(参数类型)作为参数传递 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 boolean checkAdd (Method method, Class<?> eventType) { Object existing = anyMethodByEventType.put(eventType, method); if (existing == null ) { return true ; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { throw new IllegalStateException(); } anyMethodByEventType.put(eventType, this ); } return checkAddWithMethodSignature(method, eventType); } }
anyMethodByEventType
是HashMap
类型,键值对是<Class, Object>
。将注解类型和方法put
到anyMethodByEventType
,如果返回null
,说明该类只有一个参数的类型为该eventType
的方法,没有冲突,直接返回ture
。如果返回了existing
对象不为null
,说明类中已经找到了相同参数类型的方法,进一步检查方法签名。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private boolean checkAddWithMethodSignature (Method method, Class<?> eventType) { methodKeyBuilder.setLength(0 ); methodKeyBuilder.append(method.getName()); methodKeyBuilder.append('>' ).append(eventType.getName()); String methodKey = methodKeyBuilder.toString(); Class<?> methodClass = method.getDeclaringClass(); Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass); if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) { return true ; } else { subscriberClassByMethodKey.put(methodKey, methodClassOld); return false ; } }
checkAddWithMethodSignature
函数通过put
元素到subscriberClassByMethodKey
来判断前面是否有已存在的方法,subscriberClassByMethodKey
也是HashMap
类型,键值对是Map<String, Class>
。其put
函数返回null
表明前面没有签名(methodName>paramTypeName
)相同的方法,或者存在且其值是当前订阅类的父类,父接口或者相同类。
因此checkAdd
函数也称为第二层检查EventBus
中类的合法注册。checkAdd
函数返回true
之后,将相关信息封装到SubscriberMethod
对象中,并添加到findState.subscriberMethods
列表中。
1 2 3 ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
**到这里确定了如何注解一个合法的方法: **
注解方法必须是public
修饰且非编译器生成,只有一个参数,并且Subscribe
注解该方法
不存在相同签名(methodName>paramTypeName
)
若已存在的注册类则得是方法所在的类或者父类
moveToSuperclass
回到findUsingInfo
函数。调用了FindState
对象的moveToSuperclass
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 void moveToSuperclass () { if (skipSuperClasses) { clazz = null ; } else { clazz = clazz.getSuperclass(); String clazzName = clazz.getName(); if (clazzName.startsWith("java." ) || clazzName.startsWith("javax." ) || clazzName.startsWith("android." ) || clazzName.startsWith("androidx." )) { clazz = null ; } } }
注意到这里还在while
循环中,正常情况skipSuperClasses=true
,clazz
将赋值为当前订阅类的父类,然后重复寻找父类的订阅方法。
findUsingInfo
函数最后调用了getMethodsAndRelease
函数返回SubscriberMethod
列表,将FindState
对象进行回收。
1 2 3 4 5 6 7 8 9 10 11 12 13 private List<SubscriberMethod> getMethodsAndRelease (FindState findState) { List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods); findState.recycle(); synchronized (FIND_STATE_POOL) { for (int i = 0 ; i < POOL_SIZE; i++) { if (FIND_STATE_POOL[i] == null ) { FIND_STATE_POOL[i] = findState; break ; } } } return subscriberMethods; }
subscribe
回到register
函数。寻找到SubscriberMethod
列表后,会遍历每个SubscribeMethod
,并调用subscribe
函数,将订阅者与订阅方法绑定。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null ) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0 ; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break ; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null ) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
subscribe
函数相对来说比较简单,已经在上面代码中注释一些信息。看下最后的checkPostStickyEventToSubscription
函数,如何发送黏性事件。
1 2 3 4 5 private void checkPostStickyEventToSubscription (Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null ) { postToSubscription(newSubscription, stickyEvent, isMainThread()); } }
可见调用了postToSubscription
函数,发送了一次事件,可见,如果订阅的事黏性事件,会出发最后一次黏性消息事件。这里不继续分析,因为和后面的重叠。
unregister
在register
函数注册中,会把当前类添加到typesBySubscriber
中,而unregister
函数则是从typesBySubscriber
移除该对象,并调用unsubscribeByEventType
函数。
1 2 3 4 5 6 7 8 9 10 11 public synchronized void unregister (Object subscriber) { List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); if (subscribedTypes != null ) { for (Class<?> eventType : subscribedTypes) { unsubscribeByEventType(subscriber, eventType); } typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }
看看unsubscribeByEventType
函数,通过subscriptionsByEventType
获取注册事件eventType
的注册信息列表,遍历列表,判断每个订阅信息是否注册类是否与当前解绑注册相同,相同的话则从订阅信息列表移除该订阅信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void unsubscribeByEventType (Object subscriber, Class<?> eventType) { List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null ) { int size = subscriptions.size(); for (int i = 0 ; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false ; subscriptions.remove(i); i--; size--; } } } }
也就是说,解绑的过程是将订阅者和订阅信息分别从typesBySubscriber
和subscriptions
集合中移除。
Post
分析了注册与解注册,接下来分析发送事件的post
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void post (Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = isMainThread(); postingState.isPosting = true ; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset" ); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0 ), postingState); } } finally { postingState.isPosting = false ; postingState.isMainThread = false ; } } }
currentPostingThreadState
是ThreadLocal
类型,缓存PostingThreadState
。post
函数的 主要功能是将事件添加到eventQueue
队列中,如果当前postingState.isPosting!=ture
,则调用postSingleEvent
发布该事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void postSingleEvent (Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false ; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0 ; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this , event)); } } }
调用postSingleEventForEventType
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private boolean postSingleEventForEventType (Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this ) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null ; postingState.subscription = null ; postingState.canceled = false ; } if (aborted) { break ; } } return true ; } return false ; }
postSingleEventForEventType
函数遍历eventClass
对应的Subscription
列表,然后调用了postToSubscription
函数发送事件。前面分析到,当类订阅黏性事件也会调用该方法。
postToSubscription
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case MAIN_ORDERED: if (mainThreadPoster != null ) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
这里就涉及到EventBus
中线程的一些知识点,在注册类中,我们通常通过这样方式去订阅一个事件。
1 2 @Subscribe(threadMode = ThreadMode.MAIN) fun get (event:Int) {}
其中threadMode
表示当事件来临时,执行get
函数的方法体的线程。默认情况下是ThreadMode.POSTING
,表示与事件发布的线程一致,即发布线程,所以可能在主线程,也有可能在后台线程。而ThreadMode.MAIN
表示在Android
的主线程,如果发布线程也在主线程,那么方法体会被直接执行,所以可能会被主线程阻塞,可以通过MAIN_ORDERED
按序在主线程执行,这样就不会阻塞主线程。ThreadMode.MAIN_ORDERED
后台线程,即Android主线程外的其他线程。ThreadMode.ASYNC
即异步线程,即发布线程和订阅线程是独立线程,不会直接运行和等待订阅方法。
回到postToSubscription
函数。由于我们发布事件和订阅事件都在主线程,所以直接执行了invokeSubscriber
函数。也就是直接通过反射调用了我们的订阅方法。
1 2 3 4 5 6 7 8 9 void invokeSubscriber (Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception" , e); } }
假设我们在子线程发布了事件,此时调用的是mainThreadPoster.enqueue(subscription, event)
。mainThreadPoster
对象的类型MainThreadSupport
,在EventBus
对象创建的时候被创建。
MainThreadPoster
1 2 3 4 5 6 7 8 9 10 11 12 13 14 mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this ) : null ; MainThreadSupport getMainThreadSupport () { if (mainThreadSupport != null ) { return mainThreadSupport; } else if (AndroidLogger.isAndroidLogAvailable()) { Object looperOrNull = getAndroidMainLooperOrNull(); return looperOrNull == null ? null : new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull); } else { return null ; } }
所以MainThreadSupport
其实很简单。直接创建了MainThreadSupport
的实现类AndroidHandlerMainThreadSupport
,内部持有主线程的Looper
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public interface MainThreadSupport { boolean isMainThread () ; Poster createPoster (EventBus eventBus) ; class AndroidHandlerMainThreadSupport implements MainThreadSupport { private final Looper looper; public AndroidHandlerMainThreadSupport (Looper looper) { this .looper = looper; } @Override public boolean isMainThread () { return looper == Looper.myLooper(); } @Override public Poster createPoster (EventBus eventBus) { return new HandlerPoster(eventBus, looper, 10 ); } } }
而mainPoster
则是HandlerPoster
,HandlerPoster
是Handler
的子类,并且实现Poster
接口。查看其enqueue
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true ; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } } } }
将信息包装成PendingPost
对象,然后加入到PendingPostQueue
队列queue
中,如handler
未激活,则调用handle
的sendMessage
函数。
handleMessage
我们看看其handleMessage
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void handleMessage (Message msg) { boolean rescheduled = false ; try { long started = SystemClock.uptimeMillis(); while (true ) { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { handlerActive = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message" ); } rescheduled = true ; return ; } } } finally { handlerActive = rescheduled; } }
调用了invokeSubscriber
函数。可见最终调用了invokeSubscriber
的重载函数,并释放PendingPost
对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void invokeSubscriber (PendingPost pendingPost) { Object event = pendingPost.event; Subscription subscription = pendingPost.subscription; PendingPost.releasePendingPost(pendingPost); if (subscription.active) { invokeSubscriber(subscription, event); } } void invokeSubscriber (Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception" , e); } }
这样一分析,EventBus
的订阅与解订阅,简单事件发布流程就分析完毕了。
黏性事件
所谓的黏性事件,就是在订阅方法的Subscribe
注解,将其sticky
属性设置为ture
。
1 2 3 4 @Subscribe(threadMode = ThreadMode.MAIN,sticky = true) fun get (event:Int) {}
这样在类注册的时候,[subscribe
](# subscribe
)函数会发布上次黏性事件。如果没有黏性事件,则不会发布该事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void subscribe (Object subscriber, SubscriberMethod subscriberMethod) { ... if (subscriberMethod.sticky) { if (eventInheritance) { Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
我们通过postSticky
函数来发布黏性事件。它会将黏性事件保存到stickyEvents
中,这种后面订阅就可以取出上次黏性事件。然后再调用post
进行事件发布。
1 2 3 4 5 6 public void postSticky (Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } post(event); }
优先权
在订阅事件时候,我们也可以通过priority
属性设置同线程内的同事件不同订阅类接收该订阅事件的优先权。值越大表示优先权越高,会优先接收到订阅事件,这个通过[subscribe
](# subscribe
)函数得知。
1 2 3 4 @Subscribe(threadMode = ThreadMode.MAIN,sticky = true,priority = 1) fun get (event:Int) {}
线程切换
前面分析中,分析了主线程的切换[MainThreadPoster
](# MainThreadPoster
)。回到postToSubscription
函数,了解其他线程的切换。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private void postToSubscription (Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break ; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break ; case MAIN_ORDERED: if (mainThreadPoster != null ) { mainThreadPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break ; case ASYNC: asyncPoster.enqueue(subscription, event); break ; default : throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
这里主要看下backgroundPoster
和asyncPoster
的工作原理。
BackgroundPoster
BackgroundPoster
实现Runnable
和Poster
接口。其enqueue
函数。
1 2 3 4 5 6 7 8 9 10 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this ) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true ; eventBus.getExecutorService().execute(this ); } } }
先把订阅信息包装类PendingPost
添加到队列中,executorRunning=false
,则调用eventBus.getExecutorService().execute(this)
执行自身。这里的eventBus.getExecutorService()
返回的线程池是Executors.newCachedThreadPool()
对象,此时查看run
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void run () { try { try { while (true ) { PendingPost pendingPost = queue.poll(1000 ); if (pendingPost == null ) { synchronized (this ) { pendingPost = queue.poll(); if (pendingPost == null ) { executorRunning = false ; return ; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted" , e); } } finally { executorRunning = false ; } }
可见于MainPoster
的逻辑是一致的,只不过这里在线程池执行订阅方法,从而实现切换到后台线程,且在一条后台线程会处理完所有的事件的订阅方法,。
AsyncPoster
AsyncPoster
也实现Runnable
和Poster
接口。查看其enqueue
函数和run
函数。也就是说AsyncPoster
实现逻辑与BackgroundPoster
一致的。只不过AsyncPoster
对象中一条线程只执行一个事件的订阅方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void enqueue (Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this ); } @Override public void run () { PendingPost pendingPost = queue.poll(); if (pendingPost == null ) { throw new IllegalStateException("No pending post available" ); } eventBus.invokeSubscriber(pendingPost); }
【相关知识点连接 】
ThreadLocal