一篇帶給你EventBus原理解析
EventBus基于觀察者模式,原理是大體也是將觀察者與要觀察的消息注冊(cè)到一個(gè)map,當(dāng)有消息發(fā)布時(shí),從map中找到觀察者,并調(diào)用觀察者對(duì)應(yīng)的方法,下面我們基于源碼進(jìn)行解析來看看是不是這樣的原理
不廢話,沒有賣課,直接來
1、EventBus訂閱
(1)訂閱整體流程
EventBus的定義始于register方法:
public void register(Object subscriber) {
//獲取訂閱者/觀察者的class對(duì)象
Class<?> subscriberClass = subscriber.getClass();
//從訂閱者class對(duì)象中找出所有訂閱消息的方法列表
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//變量訂閱消息的方法列表
//定義消息
subscribe(subscriber, subscriberMethod);
}
}
}
register方法主要是從訂閱者class對(duì)象中找出所有訂閱消息的方法列表,然后對(duì)每個(gè)訂閱方法調(diào)用subscribe進(jìn)行訂閱:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//方法訂閱的消息類型,即訂閱方法的參數(shù)類型class
Class<?> eventType = subscriberMethod.eventType;
//創(chuàng)建訂閱者/觀察者
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//從事件/訂閱者map中,找出訂閱事件的訂閱者列表
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//事件/訂閱者map中沒有事件相關(guān)的訂閱者,則:
//新建訂閱事件的訂閱者列表
subscriptions = new CopyOnWriteArrayList<>();
//并將事件的class作為key,事件訂閱者列表作為value存入事件/訂閱者map
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//
if (subscriptions.contains(newSubscription)) {
//已經(jīng)訂閱過,則拋出異常
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
//按優(yōu)先級(jí)將訂閱者加入對(duì)應(yīng)的位置
subscriptions.add(i, newSubscription);
break;
}
}
//訂閱者/事件map中找到訂閱事件列表
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
//如果訂閱者/事件map沒有訂閱事件列表,則:
//新建訂閱事件列表,并存入訂閱者/事件map中,key是訂閱者,value是要訂閱的事件列表
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
//如果訂閱事件是粘性事件,則:
if (eventInheritance) {
//事件允許發(fā)布到其父類或者父接口,則:
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
//遍歷粘性事件map
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
//事件是粘性事件的父類、父接口,則:
Object stickyEvent = entry.getValue();
//發(fā)布粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//發(fā)布粘性事件
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
- subscribe訂閱事件主要是將訂閱者與要訂閱的事件存入事件/訂閱者map中,等待發(fā)布消息時(shí)從map中找到訂閱者并調(diào)用對(duì)應(yīng)的訂閱方法消耗處理事件。
- subscribe訂閱時(shí)有兩個(gè)map,一個(gè)是subscriptionsByEventType(事件/訂閱者map,key是事件,value是訂閱者列表),一個(gè)是typesBySubscriber(訂閱者/事件列表map,key是訂閱者,value是要訂閱的事件列表),subscriptionsByEventType用于發(fā)布消息時(shí)從map中快速找到訂閱者列表并調(diào)用相應(yīng)的訂閱方法,typesBySubscriber用于取消訂閱時(shí),從map中快速找到事件列表,根據(jù)此事件列表快速?gòu)膕ubscriptionsByEventType取消訂閱。
- 訂閱消息時(shí)確實(shí)也是將訂閱者與定要訂閱的事件存入事件/訂閱者map中。
register方法中的subscriberMethodFinder.findSubscriberMethods(subscriberClass),它從訂閱者class對(duì)象中找到要所有要訂閱的事件,我們看看它是怎么實(shí)現(xiàn)的:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//從緩存中獲取訂閱者的所有事件方法列表
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
//緩存匯總沒有,則:
if (ignoreGeneratedIndex) {
//如果忽略從編譯器期自動(dòng)生成訂閱者/事件關(guān)系文件中查找,則:
//通過反射訂閱者class找到訂閱事件方法列表
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//如果從編譯器期自動(dòng)生成訂閱者/事件關(guān)系文件中查找,則:
//從訂閱者/事件關(guān)系文件中查找訂閱事件方法列表
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//將訂閱者的訂閱事件列表進(jìn)行緩存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
- findSubscriberMethods有兩個(gè)策略。
- 一是通過反射訂閱者class對(duì)象找到所有的訂閱事件方法列表。
- 一是從編譯期自動(dòng)生成的訂閱者/事件關(guān)系文件中查找,默認(rèn)是從訂閱者/事件關(guān)系文件中查找。
(2)反射查找訂閱事件方法列表
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//從查找狀態(tài)對(duì)象復(fù)用池中獲取/創(chuàng)建一個(gè)新的查找狀態(tài)對(duì)象
FindState findState = prepareFindState();
//初始化查找狀態(tài)對(duì)象,設(shè)置訂閱者class
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//通過反射查找查找狀態(tài)對(duì)象當(dāng)前class,最開始從訂閱者class開始查找,接著遞歸其父類或者父接口查找
findUsingReflectionInSingleClass(findState);
//繼續(xù)從其父類/父接口中查找
findState.moveToSuperclass();
}
//返回訂閱事件方法列表,并將查找狀態(tài)對(duì)象緩存到復(fù)用池中
return getMethodsAndRelease(findState);
}
- findUsingReflection會(huì)從當(dāng)前訂閱者的class開始查找所有的訂閱事件方法列表;
- 接著遞歸其父類或者父接口繼續(xù)查找,真正查找一個(gè)class的訂閱事件方法列表是findUsingReflectionInSingleClass方法:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//反射獲取當(dāng)前class定義的方法列表
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
//出現(xiàn)異常,則獲取所有方法(包含所有父類/父接口)
methods = findState.clazz.getMethods();
//并標(biāo)記不需要從父類/父接口匯總查找
findState.skipSuperClasses = true;
}
for (Method method : methods) {
//遍歷方法列表
//獲取方法訪問性
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//方法是public、靜態(tài)、抽象,則:
//反射獲取方法的參數(shù)類型列表
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
//方法參數(shù)只有一個(gè)參數(shù),則:
//反射獲取方法的Subscribe注解對(duì)象
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//方法被Subscribe注解修飾,則:
//獲取參數(shù)類型class
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
//獲取事件方法的線程模型
ThreadMode threadMode = subscribeAnnotation.threadMode();
//收集注解信息:線程模型、優(yōu)先級(jí)、粘性,創(chuàng)建事件方法,并加入訂閱者的訂閱事件方法列表中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//如果方法嚴(yán)格校驗(yàn)并且被Subscribe注解修飾,則拋出方法參數(shù)多于1個(gè)的異常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
//如果方法嚴(yán)格校驗(yàn)并且被Subscribe注解修飾,則拋出方法非public、非靜態(tài)、非抽象的異常
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
findUsingReflectionInSingleClass通過反射從class中獲取方法列表,并按Subscribe注解規(guī)則(規(guī)則是:方法被Subscribe修飾、是public/static/abstract、只有一個(gè)參數(shù))進(jìn)行過濾,找到匹配的事件方法,并加入訂閱事件方法列表
(3)編譯期注解處理器,從編譯期生成的訂閱者/事件關(guān)系文件中查找訂閱方法列表
- 從編譯期生成的訂閱者/事件關(guān)系文件中查找訂閱方法列表,涉及編譯期注解處理器解析注解生成訂閱者/事件方法列表關(guān)系文件,然后運(yùn)行時(shí)從這些文件中根據(jù)訂閱者class直接找到事件方法列表。
- EventBus的編譯期注解處理器是獨(dú)立的模塊,依賴使用它可以在編譯期自動(dòng)解析生成訂閱者/事件方法列表關(guān)系文件,避免運(yùn)行時(shí)通過反射查找事件方法列表的性能開銷.
EventBusAnnotationProcessor 是EventBus的編譯期注解處理器實(shí)現(xiàn)類,其核心源碼如下:
//訂閱者/事件方法列表關(guān)系列表
private final ListMap<TypeElement, ExecutableElement> methodsByClass = new ListMap<>();
//被忽略的類
private final Set<TypeElement> classesToSkip = new HashSet<>();
@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment env) {
Messager messager = processingEnv.getMessager();
try {
//讀取模塊定義的訂閱者/事件索引類名(含包名+類名)
String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
if (index == null) {
messager.printMessage(Diagnostic.Kind.ERROR, "No option " + OPTION_EVENT_BUS_INDEX +
" passed to annotation processor");
return false;
}
verbose = Boolean.parseBoolean(processingEnv.getOptions().get(OPTION_VERBOSE));
int lastPeriod = index.lastIndexOf('.');
//訂閱者/事件索引類包名
String indexPackage = lastPeriod != -1 ? index.substring(0, lastPeriod) : null;
//......
//通過注解收集訂閱者/事件方法列表關(guān)系列表
collectSubscribers(annotations, env, messager);
//根據(jù)訂閱語法規(guī)則,生成忽略被忽略的觀察者列表
checkForSubscribersToSkip(messager, indexPackage);
if (!methodsByClass.isEmpty()) {
//生成訂閱者/事件索引類文件
createInfoIndexFile(index);
} else {
messager.printMessage(Diagnostic.Kind.WARNING, "No @Subscribe annotations found");
}
writerRoundDone = true;
} catch (RuntimeException e) {
// IntelliJ does not handle exceptions nicely, so log and print a message
e.printStackTrace();
messager.printMessage(Diagnostic.Kind.ERROR, "Unexpected error in EventBusAnnotationProcessor: " + e);
}
return true;
}
//通過注解收集訂閱者/事件方法列表關(guān)系列表
private void collectSubscribers(Set<? extends TypeElement> annotations, RoundEnvironment env, Messager messager) {
for (TypeElement annotation : annotations) {
//遍歷注解類型列表
//從注解類型元信息中獲取匹配的注解元信息列表
Set<? extends Element> elements = env.getElementsAnnotatedWith(annotation);
for (Element element : elements) {
//遍歷注解元信息列表
if (element instanceof ExecutableElement) {
//如果是方法元信息,則:
ExecutableElement method = (ExecutableElement) element;
if (checkHasNoErrors(method, messager)) {
//校驗(yàn)方法元信息,如果方法是public且只有一個(gè)參數(shù),則:
//獲取方法所屬的類類型元信息,此類類型就是對(duì)應(yīng)的訂閱者,方法是消耗處理事件
TypeElement classElement = (TypeElement) method.getEnclosingElement();
//將訂閱者/事件方法加入訂閱者/事件方法列表map中,key是訂閱者,value是事件方法列表
methodsByClass.putElement(classElement, method);
}
} else {
messager.printMessage(Diagnostic.Kind.ERROR, "@Subscribe is only valid for methods", element);
}
}
}
}
private void checkForSubscribersToSkip(Messager messager, String myPackage) {
for (TypeElement skipCandidate : methodsByClass.keySet()) {
//遍歷訂閱者/事件方法列表map
//訂閱者class類類型元信息
TypeElement subscriberClass = skipCandidate;
while (subscriberClass != null) {
//循環(huán):訂閱者class不為空,則:
if (!isVisible(myPackage, subscriberClass)) {
//如果訂閱者class不是public或者與要生成的索引類不是同包,則忽略此訂閱則,將其加入忽略列表
boolean added = classesToSkip.add(skipCandidate);
//.......
break;
}
//取出訂閱者要訂閱的事件方法列表
List<ExecutableElement> methods = methodsByClass.get(subscriberClass);
if (methods != null) {
for (ExecutableElement method : methods) {
//遍歷訂閱的事件方法列表
String skipReason = null;
VariableElement param = method.getParameters().get(0);
TypeMirror typeMirror = getParamTypeMirror(param, messager);
if (!(typeMirror instanceof DeclaredType) ||
!(((DeclaredType) typeMirror).asElement() instanceof TypeElement)) {
//參數(shù)類型無法處理(不是基類性、類/接口類型、數(shù)組類型、類型變量、空類型),則標(biāo)間忽略原因,一般不會(huì)出現(xiàn)此情況
skipReason = "event type cannot be processed";
}
if (skipReason == null) {
TypeElement eventTypeElement = (TypeElement) ((DeclaredType) typeMirror).asElement();
if (!isVisible(myPackage, eventTypeElement)) {
//如果參數(shù)類型對(duì)應(yīng)索引類不可訪問:如不是public或者不在同包名等,則標(biāo)間忽略原因
skipReason = "event type is not public";
}
}
if (skipReason != null) {
//如果被標(biāo)記被忽略,則:
//加入忽略列表
boolean added = classesToSkip.add(skipCandidate);
//.......
break;
}
}
}
//獲取訂閱類的父類/父接口,繼續(xù)遞歸過濾
subscriberClass = getSuperclass(subscriberClass);
}
}
}
//生成訂閱者/事件方法列表索引類文件
private void createInfoIndexFile(String index) {
BufferedWriter writer = null;
try {
//生成訂閱者/事件方法列表索引類文件,文件為索引類全限定名
JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
int period = index.lastIndexOf('.');
//索引類包名
String myPackage = period > 0 ? index.substring(0, period) : null;
//索引類名
String clazz = index.substring(period + 1);
//創(chuàng)建文件讀寫器
writer = new BufferedWriter(sourceFile.openWriter());
if (myPackage != null) {
//文件寫入類包名
writer.write("package " + myPackage + ";\n\n");
}
//文件寫入引用的類
writer.write("import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfo;\n");
writer.write("import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n");
writer.write("import org.greenrobot.eventbus.ThreadMode;\n\n");
writer.write("import java.util.HashMap;\n");
writer.write("import java.util.Map;\n\n");
writer.write("/** This class is generated by EventBus, do not edit. */\n");
//文件寫入定義類并實(shí)現(xiàn)SubscriberInfoIndex 接口
writer.write("public class " + clazz + " implements SubscriberInfoIndex {\n");
//文件寫入聲明訂閱者/事件方法列表map,key是訂閱者class,value是SubscriberInfo訂閱信息對(duì)象
writer.write(" private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;\n\n");
//文件寫入靜態(tài)塊
writer.write(" static {\n");
//文件寫入創(chuàng)建訂閱者/事件方法列表map
writer.write(" SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();\n\n");
//調(diào)用writeIndexLines完成將訂閱者/事件方法列表寫入map中
writeIndexLines(writer, myPackage);
writer.write(" }\n\n");
//文件寫入putIndex方法,方法用于將訂閱者要訂閱的信息存入map中
writer.write(" private static void putIndex(SubscriberInfo info) {\n");
writer.write(" SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n");
writer.write(" }\n\n");
//文件寫入實(shí)現(xiàn)SubscriberInfoIndex 接口的getSubscriberInfo方法,方法是根據(jù)訂閱者class從map中取訂閱者信息并返回
writer.write(" @Override\n");
writer.write(" public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n");
writer.write(" SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n");
writer.write(" if (info != null) {\n");
writer.write(" return info;\n");
writer.write(" } else {\n");
writer.write(" return null;\n");
writer.write(" }\n");
writer.write(" }\n");
writer.write("}\n");
} catch (IOException e) {
throw new RuntimeException("Could not write source for " + index, e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
//Silent
}
}
}
}
//完成將訂閱者/事件方法列表寫入map中
private void writeIndexLines(BufferedWriter writer, String myPackage) throws IOException {
for (TypeElement subscriberTypeElement : methodsByClass.keySet()) {
//遍歷收集到的訂閱者/事件方法列表
if (classesToSkip.contains(subscriberTypeElement)) {
//訂閱者被忽略
continue;
}
//獲取訂閱者class
String subscriberClass = getClassString(subscriberTypeElement, myPackage);
if (isVisible(myPackage, subscriberTypeElement)) {
//調(diào)用索引類的putIndex方法,創(chuàng)建訂閱信息類SimpleSubscriberInfo,作為參數(shù),其作用是寫入如下語句:
putIndex(new SimpleSubscriberInfo(subscriberClass, true, SubscriberMethodInfo[]))
writeLine(writer, 2,
"putIndex(new SimpleSubscriberInfo(" + subscriberClass + ".class,",
"true,", "new SubscriberMethodInfo[] {");
List<ExecutableElement> methods = methodsByClass.get(subscriberTypeElement);
writeCreateSubscriberMethods(writer, methods, "new SubscriberMethodInfo", myPackage);
writer.write(" }));\n\n");
} else {
writer.write(" // Subscriber not visible to index: " + subscriberClass + "\n");
}
}
}
//創(chuàng)建SubscriberMethodInfo對(duì)象并寫入索引文件
private void writeCreateSubscriberMethods(BufferedWriter writer, List<ExecutableElement> methods,
String callPrefix, String myPackage) throws IOException {
for (ExecutableElement method : methods) {
//遍歷事件方法列表
//獲取方法參數(shù)列表
List<? extends VariableElement> parameters = method.getParameters();
//獲取第一個(gè)參數(shù)類型對(duì)象
TypeMirror paramType = getParamTypeMirror(parameters.get(0), null);
//參數(shù)類型元對(duì)象
TypeElement paramElement = (TypeElement) processingEnv.getTypeUtils().asElement(paramType);
//獲取方法名稱
String methodName = method.getSimpleName().toString();
//獲取參數(shù)類型類型,即事件類型
String eventClass = getClassString(paramElement, myPackage) + ".class";
//獲取方法Subscribe注解對(duì)象
Subscribe subscribe = method.getAnnotation(Subscribe.class);
List<String> parts = new ArrayList<>();
//創(chuàng)建SubscriberMethodInfo
parts.add(callPrefix + "(\"" + methodName + "\",");
String lineEnd = "),";
if (subscribe.priority() == 0 && !subscribe.sticky()) {
//如果注解信息中優(yōu)先級(jí)為0并且非粘性事件,則
if (subscribe.threadMode() == ThreadMode.POSTING) {
//如果線程模式為POSTING,則:
//創(chuàng)建:new SubscriberMethodInfo(methodName, eventClass)
parts.add(eventClass + lineEnd);
} else {
//線程模式不是POSTING,則:
//創(chuàng)建:new SubscriberMethodInfo(methodName, eventClass, ThreadMode.xxx)
parts.add(eventClass + ",");
parts.add("ThreadMode." + subscribe.threadMode().name() + lineEnd);
}
} else {
//如果注解信息中優(yōu)先級(jí)不是0或者是粘性事件,則
//創(chuàng)建:new SubscriberMethodInfo(methodName, eventClass, ThreadMode.xxx, priority, sticky)
parts.add(eventClass + ",");
parts.add("ThreadMode." + subscribe.threadMode().name() + ",");
parts.add(subscribe.priority() + ",");
parts.add(subscribe.sticky() + lineEnd);
}
writeLine(writer, 3, parts.toArray(new String[parts.size()]));
if (verbose) {
processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Indexed @Subscribe at " +
method.getEnclosingElement().getSimpleName() + "." + methodName +
"(" + paramElement.getSimpleName() + ")");
}
}
}
編譯器注解處理器其實(shí)做的是跟運(yùn)行時(shí)反射獲取訂閱者class的訂閱事件方法列表有點(diǎn)類似,都是從訂閱者class收集subscribe注解修飾的訂閱事件方法列表。
編譯器注解處理器主要做:
- 收集本模塊中被subscribe注解修飾的方法及其所屬的類,并生成訂閱者/事件方法列表map。
- 一個(gè)模塊一個(gè)索引類,生成本模塊的索引類。
- 將訂閱者/事件方法列表map寫入索引類中的訂閱者/訂閱信息map中,供運(yùn)行時(shí)根據(jù)訂閱者class快速獲取訂閱信息SubscriberInfo。
(4)從編譯期生成的訂閱者/事件關(guān)系文件中查找訂閱方法列表
從編譯期生成的訂閱者/事件索引類文件中查找訂閱方法列表的入口是findUsingInfo:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//從查找狀態(tài)對(duì)象復(fù)用池中獲取/創(chuàng)建一個(gè)新的查找狀態(tài)對(duì)象
FindState findState = prepareFindState();
//初始化查找狀態(tài)對(duì)象,設(shè)置訂閱者class
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//從索引類中查找訂閱者對(duì)應(yīng)的定義信息對(duì)象
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果訂閱者訂閱的訂閱信息對(duì)象不為空,則:
//從訂閱信息對(duì)象中獲取訂閱事件方法數(shù)組
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//將數(shù)組轉(zhuǎn)為list
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//如果索引類中沒有匹配的,則通過運(yùn)行時(shí)注解反射獲取訂閱事件方法列表
findUsingReflectionInSingleClass(findState);
}
//遞歸從父類/父接口繼續(xù)查找
findState.moveToSuperclass();
}
//返回訂閱事件方法列表
return getMethodsAndRelease(findState);
}
findUsingInfo方法先從索引類中找到訂閱者class對(duì)應(yīng)的訂閱事件方法列表,如果沒有則轉(zhuǎn)為運(yùn)行時(shí)注解通過反射查找
從索引類中找到訂閱者class對(duì)應(yīng)的訂閱信息對(duì)象的源碼如下:
private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
//訂閱信息對(duì)象不為空,且有父級(jí)訂閱信息對(duì)象,則:
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
//如果訂閱者class與父級(jí)訂閱信息對(duì)象的class一樣,則返回父級(jí)訂閱信息對(duì)象
return superclassInfo;
}
}
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
//遍歷所有的索引類,即遍歷所有模塊的索引類(一個(gè)模塊一個(gè)索引類)
//從索引類訂閱者/訂閱信息map中查找訂閱信息對(duì)象
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
//找到則返回
return info;
}
}
}
return null;
}
- 從編譯期生成的訂閱者/事件關(guān)系文件中查找訂閱方法列表性能上比運(yùn)行時(shí)注解反射獲取事件方法列表更好。
- 因?yàn)樯倭朔瓷洳檎遥幾g器每個(gè)模塊都會(huì)生成一個(gè)索引文件,該索引文件記錄的是訂閱者/訂閱信息map。
- 查找事件方法列表時(shí)時(shí)間在索引文件進(jìn)行查找并轉(zhuǎn)換為事件方法列表。
2、EventBus發(fā)布消息
EventBus基于觀察者模式,所以其發(fā)布消息的流程大致為從觀察者列表中找到匹配的觀察者,并將消息傳遞給匹配的觀察者處理。
EventBus是不是這樣的處理邏輯呢,我們來看看其發(fā)布源碼,EventBus的發(fā)布消息入口是post方法:
public void post(Object event) {
//從ThreadLocal獲取當(dāng)前線程的PostingThreadState對(duì)象
PostingThreadState postingState = currentPostingThreadState.get();
//發(fā)布消息隊(duì)列
List<Object> eventQueue = postingState.eventQueue;
//將消息加入隊(duì)列中
eventQueue.add(event);
if (!postingState.isPosting) {
//如果發(fā)布隊(duì)列沒有運(yùn)行,則:
//標(biāo)記發(fā)布狀態(tài)所處的線程是否是主線程
postingState.isMainThread = isMainThread();
//標(biāo)記正在發(fā)布消息
postingState.isPosting = true;
if (postingState.canceled) {
//已取消發(fā)布,則拋出異常
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
//遍歷消息隊(duì)列
//每次從隊(duì)列刪除首個(gè)消息,并進(jìn)行發(fā)布
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post主要做的是將消息加入隊(duì)列中,并遍歷隊(duì)列對(duì)每個(gè)消息進(jìn)行一一發(fā)布,我們繼續(xù)看下postSingleEvent:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//如果支持發(fā)布的消息的父級(jí)類型消息,則:
//從當(dāng)前消息class找出其所有的父類/父接口類型列表,含本身
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
//遍歷消息類型列表
Class<?> clazz = eventTypes.get(h);
//對(duì)每一種消息類型進(jìn)行發(fā)布消息
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//如果不支持發(fā)布的消息的父級(jí)類型消息,則:
//直接發(fā)布當(dāng)前消息
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//......
}
postSingleEvent中分為兩種情況,一種支持遍歷當(dāng)前消息的父級(jí)類型及本身作為消息,然后按照類型一一發(fā)布消息。
一種僅支持消息本身類型,則直接發(fā)布當(dāng)前消息,我們繼續(xù)往下看postSingleEventForEventType:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根據(jù)消息類型class從消息/訂閱者列表map中找到訂閱此消息的訂閱者列表
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//遍歷此消息的訂閱者列表
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//調(diào)用postToSubscription將消息傳遞給訂閱者進(jìn)行處理
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
postSingleEventForEventType中根據(jù)消息類型class從消息/訂閱者列表map中找到訂閱者列表,然后遍歷訂閱者列表,將消息傳遞給每個(gè)訂閱者進(jìn)行消息的處理。
我們繼續(xù)往下看postToSubscription:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//如果訂閱方法要求線程模式是POSTING,即在當(dāng)前線程進(jìn)行處理,則:
//直接調(diào)用invokeSubscriber方法回調(diào)訂閱者的訂閱方法處理消息
invokeSubscriber(subscription, event);
break;
case MAIN:
//如果訂閱方法要求線程模式是MAIN,即要求在主線程進(jìn)行處理,則:
if (isMainThread) {
//當(dāng)前已是主線程,則直接調(diào)用invokeSubscriber方法回調(diào)訂閱者的訂閱方法處理消息
invokeSubscriber(subscription, event);
} else {
//當(dāng)前不是主線程,則將使用handler轉(zhuǎn)入主線程中處理
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
//如果訂閱方法要求線程模式是MAIN_ORDERED,即要求在主線程進(jìn)行串行處理,則:
if (mainThreadPoster != null) {
//如果設(shè)置了主線程handler,則使用handler轉(zhuǎn)入主線程中處理
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
//如果沒有設(shè)置主線程handler,則直接在當(dāng)前線程處理
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
//如果訂閱方法要求線程模式是BACKGROUND,即要求在子線程進(jìn)行處理,則:
if (isMainThread) {
//當(dāng)前是主線程,則將消息轉(zhuǎn)入子線程進(jìn)行處理
backgroundPoster.enqueue(subscription, event);
} else {
//當(dāng)前已子線程,則直接在當(dāng)前線程處理
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//如果訂閱方法要求線程模式是ASYNC,即要求在異步子線程進(jìn)行處理,則:
//將消息轉(zhuǎn)入子線程進(jìn)行處理
asyncPoster.enqueue(subscription, event);
break;
default:
//無效的線程模型,拋出異常
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
postToSubscription中主要是對(duì)線程模型進(jìn)行處理:
- POSTING:直接在當(dāng)前線程處理。
- MAIN:如果當(dāng)前已是主線程,則直接處理消息;如果當(dāng)前不是主線程,則通過handler轉(zhuǎn)入主線程處理。
- MAIN_ORDERED:主線程中串行處理,如果已經(jīng)設(shè)置主線程Handler,則將消息加入消息隊(duì)列等待loop進(jìn)行調(diào)度;如果沒有設(shè)置主線程Handler則直接在當(dāng)前線程處理。
- BACKGROUND:在子線程中處理,如果當(dāng)前是主線程,則將消息轉(zhuǎn)入子線程中再進(jìn)行處理;如果當(dāng)前已是子線程,則直接在當(dāng)前線程處理。
- *ASYNC:異步處理消息,不管當(dāng)前是主線程還是子線程,都需要加入新的子線程中再進(jìn)行處理。
直接處理消息的是通過invokeSubscriber方法,看名稱就是表示反射調(diào)用訂閱者的訂閱方法處理消息,那是不是呢?我們看看其源碼實(shí)現(xiàn):
void invokeSubscriber(Subscription subscription, Object event) {
try {
//通過反射調(diào)用訂閱者的訂閱方法處理消息
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
就是通過反射調(diào)用訂閱者的訂閱方法,將消息傳遞給訂閱方法處理的;
(1)切換到主線程(MAIN)/主線程串行(MAIN_ORDERED)處理消息:
不管MAIN還是MAIN_ORDERED都是通過mainThreadPoster,將消息放入mainThreadPoster的隊(duì)列中,mainThreadPoster其實(shí)就是主線程中的Handler:
//使用mainThreadSupport創(chuàng)建主線程消息發(fā)布器
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
//創(chuàng)建HandlerPoster作為mainThreadPoster
return new HandlerPoster(eventBus, looper, 10);
}
}
}
mainThreadPoster默認(rèn)是通過MainThreadSupport.AndroidHandlerMainThreadSupport.createPoster創(chuàng)建的,也就是mainThreadPoster默認(rèn)是HandlerPoster的實(shí)例對(duì)象,我們看看HandlerPoster:
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//將消息進(jìn)行包裝
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//將消息加入等待隊(duì)列中
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//發(fā)送一個(gè)空消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//死循環(huán)
//從等待隊(duì)列中取出消息
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
//沒有等待處理的消息,則退出循環(huán)
return;
}
}
}
//回調(diào)EventBus的invokeSubscriber處理消息
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
//沒超過10毫秒重新發(fā)送空消息重新調(diào)度
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
- 切換到主線程主要使用的是主線程的Handler實(shí)現(xiàn)。
- 每次將消息加入等待隊(duì)列,然后Handler發(fā)送一個(gè)空消息。
- 在Handler的handleMessage中遍歷等待隊(duì)列消息出隊(duì)并進(jìn)行處理。
(2)切換到子線程(BACKGROUND)處理消息:
切換到子線程處理消息是通過backgroundPoster的enqueue方法,backgroundPoster是BackgroundPoster的實(shí)現(xiàn)類,我們看看其內(nèi)部實(shí)現(xiàn):
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//將消息進(jìn)行包裝
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//將消息加入等待隊(duì)列中
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//線程加入使用線程池中,等待回調(diào)run方法
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
//死循環(huán)
//從等待隊(duì)列中獲取等待處理的消息,1000表示如果隊(duì)列為空等待1000毫秒
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
//如果消息為空,則退出循環(huán)
return;
}
}
}
//調(diào)用EventBus反射調(diào)用訂閱者的訂閱方法處理消息
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
- 切換到子線程處理消息主要是將消息加入等待隊(duì)列,backgroundPoster本身是一個(gè)線程實(shí)現(xiàn),將backgroundPoster本身加入線程池中運(yùn)行。
- backgroundPoster線程執(zhí)行時(shí)遍歷等待隊(duì)列,從隊(duì)列中取出等待處理的消息,通過eventBus.invokeSubscriber處理消息。
其中默認(rèn)使用的線程池是緩存線程池:
executorService = builder.executorService;
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(3)異步消息(ASYNC):
異步消息是通過asyncPoster的enqueue方法實(shí)現(xiàn),asyncPoster是AsyncPoster的實(shí)例對(duì)象,它本身就是一個(gè)線程,我們看看其實(shí)現(xiàn):
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
//將消息加入隊(duì)列中
queue.enqueue(pendingPost);
//線程加入線程池中
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
//每次取出一個(gè)消息
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
//調(diào)用EventBus反射調(diào)用訂閱者的訂閱方法處理消息
eventBus.invokeSubscriber(pendingPost);
}
}
異步消息的實(shí)現(xiàn)主要是將消息加入等待隊(duì)列,然后將線程加入隊(duì)列,線程執(zhí)行是每次去等待隊(duì)列的一個(gè)消息(隊(duì)尾,先進(jìn)先出原則),最后調(diào)用EventBus反射調(diào)用訂閱者的訂閱方法處理消息。
異步消息使用的線程池跟BACKGROUND模式使用的線程池是同一個(gè),都是緩存線程池,與BACKGROUND的區(qū)別是:
- 異步消息每次都將線程加入線程池中。
- 而BACKGROUND則不需要每次。
- 異步消息線程執(zhí)行每次只去隊(duì)尾消息進(jìn)行處理,而BACKGROUND線程執(zhí)行時(shí)會(huì)處理隊(duì)列中的所有消息。
總結(jié):
- EventBus基于觀察者模式,其首先需要先將訂閱者與消息類型進(jìn)行注冊(cè)到一個(gè)消息/訂閱者列表map中,發(fā)布消息時(shí)從map找找到對(duì)應(yīng)的訂閱者并調(diào)用相應(yīng)的訂閱方法進(jìn)行處理消息。
- 注冊(cè)時(shí)通過訂閱者class通過反射收集subscribe注解修飾的方法列表及注解信息,最終注冊(cè)到消息/訂閱者列表map中,這個(gè)運(yùn)行時(shí)注解是先對(duì)耗時(shí)的;所以EventBus提供了編譯期注解處理器來做個(gè)收集的工作,降低性能損耗。
- 發(fā)布消息時(shí)根據(jù)消息類型class從消息/訂閱者列表map中找到對(duì)應(yīng)的訂閱者列表并進(jìn)行一一處理,同時(shí)根據(jù)注解中的線程模式進(jìn)行線程切換處理。