EventBus 源码分析

摘要

本文分析Android主流的事件框架EventBus,尤其在不同的界面触发函数的调用,避免层层嵌套回调。通过源码分析,了解EventBus如何查找订阅方法,切换线程,发布事件的原理。

前言

我们将根据下面的例子来分析EventBus的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class MainActivity : AppCompatActivity() {
private val TAG = "MainActivity"

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
EventBus.getDefault().register(this);
EventBus.getDefault().post(1)
}

@Subscribe(threadMode = ThreadMode.MAIN)
fun get(event:Int){

}

override fun onDestroy() {
super.onDestroy()
EventBus.getDefault().unregister(this)
}
}

EventBus.getDefault

1
2
3
4
5
6
7
8
9
10
11
12
public static EventBus getDefault() {
EventBus instance = defaultInstance;
if (instance == null) {
synchronized (EventBus.class) {
instance = EventBus.defaultInstance;
if (instance == null) {
instance = EventBus.defaultInstance = new EventBus();
}
}
}
return instance;
}

通过双重检查单例模式创建EventBus单例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();//通过事件类型获取所有订阅信息
typesBySubscriber = new HashMap<>();//通过订阅类获取该订阅类中所有的事件类型
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();//支持Android主线程
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;//Handler子类
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//用于寻找订阅类中的订阅方法
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

DEFAULT_BUILDEREventBusBuilder的实例,所有配置都采用默认值。这里涉及到几个主要对象的创建,例如HandlerPoster对象mainThreadPoster,其实是用来切换到主线程的。

register

接着我们看EventBus如何将一个类与订阅事件绑定。查看EventBusregister函数。

1
2
3
4
5
6
7
8
9
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

register函数通过SubscriberMethodFinder对象findSubscriberMethods函数,获取当前订阅类所有的订阅方法subscriberMethods。然后遍历所有订阅方法SubscriberMethod,然后调用subscribe函数,将当前订阅类和订阅方法进行绑定。

findSubscriberMethods

我们查看SubscriberMethodFinder对象如何通过findSubscriberMethod函数如何寻找当前订阅类中合法的订阅方法,并封装成SubscriberMethod实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//先通过缓存获取该订阅类是否解析过(例如Activity销毁后,重新进入)
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

if (ignoreGeneratedIndex) {//默认false
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

这里的METHOD_CACHEConcurrentHashMap类型,键值对分别是Class<?>, List<SubscriberMethod>,也就是findSubscriberMethods函数会查找订阅方法,然后缓存到METHOD_CACHE中,下次优先通过通过缓存查找。初次注册,那肯定是没有缓存。ignoreGeneratedIndex 默认情况下是flase,所以会调用findUsingInfo(subscriberClass)查找类中的注册方法。

findUsingInfo

findUsingInfo函数会将相关的信息封装到FindState对象,最后获取到订阅方法SubscriberMethod列表后释放FindState对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

通过prepareFindState函数返回了FindState对象,该函数主要遍历大小为4的FIND_STATE_POOL数组,取第一个不为nullFindState对象,并将数组该位置引用置为null,如果数组不存在元素,则直接新建一个FindState对象。

1
2
3
4
5
6
7
8
9
10
11
12
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}

然后调用FindState对象的initForSubscriber函数。将当前订阅类类型赋值FindState对象的subscriberClassclazz变量。

1
2
3
4
5
void initForSubscriber(Class<?> subscriberClass) {
this.subscriberClass = clazz = subscriberClass;
skipSuperClasses = false;
subscriberInfo = null;
}

回到findUsingInfo函数,由于clazz被赋值为subscriberClass,所以while循环为ture。接着调用了getSubscriberInfo(findState)函数。

getSubscriberInfo

getSubscriberInfo(findState)函数中,由于第一次注册该类,所以FindState对象的subscriberInfo=null。而subscriberInfoIndexesSubscriberMethodFinder实例创建的时候传递进来。subscriberInfoIndexes是通过EventBusBuilder传递进来的,默认为null。所以getSubscriberInfo函数返回了null

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}

findUsingReflectionInSingleClass

回到[findUsingInfo](# findUsingInfo)函数中,由于findState.subscriberInfo=null,将执行findUsingReflectionInSingleClass(findState)函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
try {
methods = findState.clazz.getMethods();
} catch (LinkageError error) {
...
}
findState.skipSuperClasses = true;//该标志位会在moveToSuperclass函数用到
}
for (Method method : methods) {
//获取方法修饰符
int modifiers = method.getModifiers();
//public修饰且非编译器生成
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取所有参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {//参数长度为1
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {//方法被Subscribe注解
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

getDeclaredMethods函数是获取订阅类自身声明的所有方法,包括publicprotectedprivate方法,而getMethods函数是类所有公共方法,自身、父类和接口的公共方法。如果前者抛出异常,将采用后者,并设置findState.skipSuperClasses = true

接着遍历所有获取的方法,访问方法的修饰符。判断条件(modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0表示当前方法是public修饰符且不是编译器生成的。然后通过method.getParameterTypes()函数,获取方法的参数类型列表,这里只是用来判断参数的数量,当参数数量为1才继续下一步。

然后通过method.getAnnotation(Subscribe.class)函数判断方法被Subscribe注解着。然后调用了FindState对象的checkAdd函数,将方法method和事件类型eventType(参数类型)作为参数传递 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
boolean checkAdd(Method method, Class<?> eventType) {
Object existing = anyMethodByEventType.put(eventType, method);
if (existing == null) {
return true;
} else {
if (existing instanceof Method) {
if (!checkAddWithMethodSignature((Method) existing, eventType)) {
throw new IllegalStateException();
}
anyMethodByEventType.put(eventType, this);
}
return checkAddWithMethodSignature(method, eventType);
}
}

anyMethodByEventTypeHashMap类型,键值对是<Class, Object>。将注解类型和方法putanyMethodByEventType,如果返回null,说明该类只有一个参数的类型为该eventType的方法,没有冲突,直接返回ture。如果返回了existing对象不为null,说明类中已经找到了相同参数类型的方法,进一步检查方法签名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean checkAddWithMethodSignature(Method method, Class<?> eventType) {
methodKeyBuilder.setLength(0);
methodKeyBuilder.append(method.getName());
methodKeyBuilder.append('>').append(eventType.getName());

String methodKey = methodKeyBuilder.toString();//methodName>paramTypeName
Class<?> methodClass = method.getDeclaringClass();//获取方法所在的类
Class<?> methodClassOld = subscriberClassByMethodKey.put(methodKey, methodClass);
if (methodClassOld == null || methodClassOld.isAssignableFrom(methodClass)) {
return true;
} else {
subscriberClassByMethodKey.put(methodKey, methodClassOld);
return false;
}
}

checkAddWithMethodSignature函数通过put元素到subscriberClassByMethodKey来判断前面是否有已存在的方法,subscriberClassByMethodKey也是HashMap类型,键值对是Map<String, Class>。其put函数返回null表明前面没有签名(methodName>paramTypeName)相同的方法,或者存在且其值是当前订阅类的父类,父接口或者相同类。

因此checkAdd函数也称为第二层检查EventBus中类的合法注册。checkAdd函数返回true之后,将相关信息封装到SubscriberMethod对象中,并添加到findState.subscriberMethods列表中。

1
2
3
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));

**到这里确定了如何注解一个合法的方法: **

  • 注解方法必须是public修饰且非编译器生成,只有一个参数,并且Subscribe注解该方法
  • 不存在相同签名(methodName>paramTypeName
  • 若已存在的注册类则得是方法所在的类或者父类

moveToSuperclass

回到findUsingInfo函数。调用了FindState对象的moveToSuperclass函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
void moveToSuperclass() {
if (skipSuperClasses) {//findUsingReflectionInsingleClass函数,默认情况getDeclaredMethods函数获取当前类自身声明方法,发生异常情况下调用getMethods函数获取所有方法,会将skipSuperClasses设为true
clazz = null;
} else {
clazz = clazz.getSuperclass();
String clazzName = clazz.getName();
//如果当前类是系统类,直接跳过
if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") ||
clazzName.startsWith("android.") || clazzName.startsWith("androidx.")) {
clazz = null;
}
}
}

注意到这里还在while循环中,正常情况skipSuperClasses=true,clazz将赋值为当前订阅类的父类,然后重复寻找父类的订阅方法。

findUsingInfo函数最后调用了getMethodsAndRelease函数返回SubscriberMethod列表,将FindState对象进行回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
findState.recycle();
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
return subscriberMethods;
}

subscribe

回到register函数。寻找到SubscriberMethod列表后,会遍历每个SubscribeMethod,并调用subscribe函数,将订阅者与订阅方法绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//通过参数类型获取已存在的订阅信息
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//将方法的订阅信息添加到subscriptions列表合适的位置,
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
//最后的位置或者根据优先级排序
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//typesBySubscriber保存当前类的订阅事件(方法的参数类型)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//判断当前方法订阅的是黏性事件
if (subscriberMethod.sticky) {
if (eventInheritance) {//默认true,表示黏性事件可以从父类继承
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

subscribe函数相对来说比较简单,已经在上面代码中注释一些信息。看下最后的checkPostStickyEventToSubscription函数,如何发送黏性事件。

1
2
3
4
5
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}

可见调用了postToSubscription函数,发送了一次事件,可见,如果订阅的事黏性事件,会出发最后一次黏性消息事件。这里不继续分析,因为和后面的重叠。

unregister

register函数注册中,会把当前类添加到typesBySubscriber中,而unregister函数则是从typesBySubscriber移除该对象,并调用unsubscribeByEventType函数。

1
2
3
4
5
6
7
8
9
10
11
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {//遍历当前类注册的所有事件,在事件列表里移除本注册类对应的订阅信息
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

看看unsubscribeByEventType函数,通过subscriptionsByEventType获取注册事件eventType的注册信息列表,遍历列表,判断每个订阅信息是否注册类是否与当前解绑注册相同,相同的话则从订阅信息列表移除该订阅信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//订阅类型的所有订阅信息
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {//订阅信息中的订阅类与解绑订阅类一致
subscription.active = false;//该标志位在处理订阅方法会很有用
subscriptions.remove(i);
i--;
size--;
}
}
}
}

也就是说,解绑的过程是将订阅者和订阅信息分别从typesBySubscribersubscriptions集合中移除。

Post

分析了注册与解注册,接下来分析发送事件的post函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void post(Object event) {
//将事件添加到当前线程的事件队列中
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

if (!postingState.isPosting) {
//判断是否Android主线程
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);//此时队列只有我们一个事件
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

currentPostingThreadStateThreadLocal类型,缓存PostingThreadStatepost函数的 主要功能是将事件添加到eventQueue队列中,如果当前postingState.isPosting!=ture,则调用postSingleEvent发布该事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);//查询该EventType所有父类和接口
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

调用postSingleEventForEventType函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

postSingleEventForEventType函数遍历eventClass对应的Subscription列表,然后调用了postToSubscription函数发送事件。前面分析到,当类订阅黏性事件也会调用该方法。

postToSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

这里就涉及到EventBus中线程的一些知识点,在注册类中,我们通常通过这样方式去订阅一个事件。

1
2
@Subscribe(threadMode = ThreadMode.MAIN)
fun get(event:Int){}

其中threadMode表示当事件来临时,执行get函数的方法体的线程。默认情况下是ThreadMode.POSTING,表示与事件发布的线程一致,即发布线程,所以可能在主线程,也有可能在后台线程。而ThreadMode.MAIN表示在Android的主线程,如果发布线程也在主线程,那么方法体会被直接执行,所以可能会被主线程阻塞,可以通过MAIN_ORDERED按序在主线程执行,这样就不会阻塞主线程。ThreadMode.MAIN_ORDERED后台线程,即Android主线程外的其他线程。ThreadMode.ASYNC即异步线程,即发布线程和订阅线程是独立线程,不会直接运行和等待订阅方法。

回到postToSubscription函数。由于我们发布事件和订阅事件都在主线程,所以直接执行了invokeSubscriber函数。也就是直接通过反射调用了我们的订阅方法。

1
2
3
4
5
6
7
8
9
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

假设我们在子线程发布了事件,此时调用的是mainThreadPoster.enqueue(subscription, event)mainThreadPoster对象的类型MainThreadSupport,在EventBus对象创建的时候被创建。

MainThreadPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
//EventBusBuilder
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
} else if (AndroidLogger.isAndroidLogAvailable()) {// Class.forName("android.util.Log") != null
Object looperOrNull = getAndroidMainLooperOrNull();//Looper.getMainLooper()
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}

所以MainThreadSupport其实很简单。直接创建了MainThreadSupport的实现类AndroidHandlerMainThreadSupport,内部持有主线程的Looper对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface MainThreadSupport {

boolean isMainThread();

Poster createPoster(EventBus eventBus);

class AndroidHandlerMainThreadSupport implements MainThreadSupport {

private final Looper looper;

public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}

@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}

@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}

}

mainPoster则是HandlerPoster,HandlerPosterHandler的子类,并且实现Poster接口。查看其enqueue函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void enqueue(Subscription subscription, Object event) {
//利用旧的或直接新建PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);//加入队列
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}

将信息包装成PendingPost对象,然后加入到PendingPostQueue队列queue中,如handler未激活,则调用handlesendMessage函数。

handleMessage

我们看看其handleMessage函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//从队列取PendingPost对象
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}

调用了invokeSubscriber函数。可见最终调用了invokeSubscriber的重载函数,并释放PendingPost对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {//调用unregister,该变量=false。
invokeSubscriber(subscription, event);
}
}

void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

这样一分析,EventBus的订阅与解订阅,简单事件发布流程就分析完毕了。

黏性事件

所谓的黏性事件,就是在订阅方法的Subscribe注解,将其sticky属性设置为ture

1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true)
fun get(event:Int){

}

这样在类注册的时候,[subscribe](# subscribe)函数会发布上次黏性事件。如果没有黏性事件,则不会发布该事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
...
//判断当前方法订阅的是黏性事件
if (subscriberMethod.sticky) {
if (eventInheritance) {//默认true,表示黏性事件可以从父类继承
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

我们通过postSticky函数来发布黏性事件。它会将黏性事件保存到stickyEvents中,这种后面订阅就可以取出上次黏性事件。然后再调用post进行事件发布。

1
2
3
4
5
6
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}

优先权

在订阅事件时候,我们也可以通过priority属性设置同线程内的同事件不同订阅类接收该订阅事件的优先权。值越大表示优先权越高,会优先接收到订阅事件,这个通过[subscribe](# subscribe)函数得知。

1
2
3
4
@Subscribe(threadMode = ThreadMode.MAIN,sticky = true,priority = 1)
fun get(event:Int){

}

线程切换

前面分析中,分析了主线程的切换[MainThreadPoster](# MainThreadPoster)。回到postToSubscription函数,了解其他线程的切换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING://订阅线程为POSTING,表示与发布线程一致,不需要切换,直接调用订阅方法。
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {//先判断发布线程是否在主线程,是的话,直接调用
invokeSubscriber(subscription, event);
} else {//否则,通过mainThreadPoster切换到主线程
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {//mainThreadPoster进队列按序发布事件
mainThreadPoster.enqueue(subscription, event);
} else {
//技术错误,直接调用
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {//后台线程队列,在线程池执行
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

这里主要看下backgroundPosterasyncPoster的工作原理。

BackgroundPoster

BackgroundPoster实现RunnablePoster接口。其enqueue函数。

1
2
3
4
5
6
7
8
9
10
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}

先把订阅信息包装类PendingPost添加到队列中,executorRunning=false,则调用eventBus.getExecutorService().execute(this)执行自身。这里的eventBus.getExecutorService()返回的线程池是Executors.newCachedThreadPool()对象,此时查看run函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}

可见于MainPoster的逻辑是一致的,只不过这里在线程池执行订阅方法,从而实现切换到后台线程,且在一条后台线程会处理完所有的事件的订阅方法,。

AsyncPoster

AsyncPoster也实现RunnablePoster接口。查看其enqueue函数和run函数。也就是说AsyncPoster实现逻辑与BackgroundPoster一致的。只不过AsyncPoster对象中一条线程只执行一个事件的订阅方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}

相关知识点连接

ThreadLocal