OpenHarmony—EventHandler源碼解析
??想了解更多關(guān)于開源的內(nèi)容,請訪問:??
EventHandler是用于處理線程間通信的一種機制,可以通過EventRunner創(chuàng)建新線程,將耗時的操作放到新線程上執(zhí)行。這樣既不阻塞原來的線程,任務(wù)又可以得到合理的處理。比如:主線程使用EventHandler創(chuàng)建子線程,子線程做耗時的下載圖片操作,下載完成后,子線程通過EventHandler通知主線程,主線程再更新UI。
基本概念
EventRunner是一種事件循環(huán)器,循環(huán)處理從該EventRunner創(chuàng)建的新線程的事件隊列中獲取InnerEvent事件。InnerEvent是EventHandler投遞的事件。
EventHandler是一種用戶在當(dāng)前線程上投遞InnerEvent事件到異步線程上處理的機制。每一個EventHandler和指定的EventRunner所創(chuàng)建的新線程綁定,并且該新線程內(nèi)部有一個事件隊列。EventHandler可以投遞指定的InnerEvent事件到這個事件隊列。EventRunner從事件隊列里循環(huán)地取出事件,并在EventRunner所在線程執(zhí)行processEvent回調(diào)。一般,EventHandler有兩個主要作用:
- 在不同線程間分發(fā)和處理InnerEvent事件。
- 延遲處理InnerEvent事件。
運作機制
使用EventHandler實現(xiàn)線程間通信的主要流程:
- EventHandler投遞具體的InnerEvent事件到EventRunner所創(chuàng)建的線程的事件隊列。
- EventRunner循環(huán)從事件隊列中獲取InnerEvent事件。
- 新線程上處理該事件:觸發(fā)InnerEvent的回調(diào)方法并觸發(fā)EventHandler的處理方法。
接口說明:
ohos.events.emitter(Emitter)
InnerEvent
進程內(nèi)的事件。
EventData
發(fā)送事件時傳遞的數(shù)據(jù)。
EventPriority
用于表示事件被投遞的優(yōu)先級。
代碼目錄:
foundation/
└──foundation/appexecfwk/standard
├── interfaces
│ ├── innerkits
│ | └── libeventhandler # 內(nèi)部接口
│ └── napi
│ └── eventhandler # NAPI接口
└──— libs/libeventhandler # 機制實現(xiàn)代碼
類圖:
EventRunner:
EventRunner的create方法在創(chuàng)建Runner線程的同時會實例化一個EventQueue,并賦值給類內(nèi)部的成員變量queue_ 。 EventRunner的Runner線程輪詢queue_,取出隊列中的InnerEvent事件,并處理該事件。
EventHandler:
EventHandler的各Send方法通過調(diào)用EventQueue的Insert方法將InnerEvent插入EventQueue的subEventQueues_中 或 idleEvents__ 中。
當(dāng)調(diào)用Post方法,投遞某個回調(diào)函數(shù)時,該回調(diào)函數(shù)會賦值給InnerEvent的taskCallback_,并通過Send方法將將擁有回調(diào)函數(shù)的InnerEvent插入隊列。
其中,subEventQueues_是size為3的SubEventQueue的列表,3個SubEventQueue分別用于存儲三種不同優(yōu)先級(IMMEDIATE/HIGH/LOW)的InnerEvent。
idleEvents_用于存儲優(yōu)先級為IDLE的InnerEvent。
同理,Remove方法即從上述subEventQueue_ 或 idleEvent_ 中刪除某事件。
EventQueue
通過成員變量 ioWaiter_ 取出定時或延時的InnerEvent事件。同時實現(xiàn)對文件描述符監(jiān)聽事件的回調(diào)。
ioWaiter_ 成員的類型默認(rèn)使用NoneIoWaiter , 此時不支持文件描述符的監(jiān)聽。隊列中的延時或定時事件的取出,通過NoneIoWaiter調(diào)用 std::condition_variable機制實現(xiàn)。
當(dāng)EventHandler中的AddFileDescriptorListener被調(diào)用時,ioWaiter_成員的類型自動轉(zhuǎn)換成EpollIoWaiter類型。EpollIoWaiter支持文件描述符的監(jiān)聽事件。此時,隊列中的延時或定時事件的取出,通過系統(tǒng)的epoll機制實現(xiàn)。
關(guān)鍵代碼摘錄:
EventRunner
// Start event looper.
for (auto event = queue_->GetEvent(); event; event = queue_->GetEvent()) { //監(jiān)聽輪詢queue_
std::shared_ptr<EventHandler> handler = event->GetOwner();
// Make sure owner of the event exists.
if (handler) {
std::shared_ptr<Logger> logging = logger_;
std::stringstream address;
address << handler.get();
if (logging != nullptr) {
if (!event->HasTask()) {
logging->Log("Dispatching to handler event id = " + std::to_string(event->GetInnerEventId()));
} else {
logging->Log("Dispatching to handler event task name = " + event->GetTaskName());
}
}
handler->DistributeEvent(event); //執(zhí)行aInnerEvent事件
if (logging != nullptr) {
logging->Log("Finished to handler(0x" + address.str() + ")");
}
}
// Release event manually, otherwise event will be released until next event coming.
event.reset();
}
EventQueue
void EventQueue::Insert(InnerEvent::Pointer &event, Priority priority) //事件或任務(wù)的插入
{
if (!event) {
HILOGE("Insert: Could not insert an invalid event");
return;
}
std::lock_guard<std::mutex> lock(queueLock_);
bool needNotify = false;
switch (priority) { //根據(jù)Event的優(yōu)先級將該Event插入不同的事件隊列
case Priority::IMMEDIATE:
case Priority::HIGH:
case Priority::LOW: {
needNotify = (event->GetHandleTime() < wakeUpTime_);
InsertEventsLocked(subEventQueues_[static_cast<uint32_t>(priority)].queue, event);
break;
}
case Priority::IDLE: {
// Never wake up thread if insert an idle event.
InsertEventsLocked(idleEvents_, event);
break;
}
default:
break;
}
if (needNotify) {
ioWaiter_->NotifyOne();
}
}
InnerEvent::Pointer EventQueue::GetEvent() //事件或任務(wù)的取出
{
std::unique_lock<std::mutex> lock(queueLock_);
while (!finished_) {
InnerEvent::TimePoint nextWakeUpTime = InnerEvent::TimePoint::max();
InnerEvent::Pointer event = GetExpiredEventLocked(nextWakeUpTime);
if (event) {
return event;
}
WaitUntilLocked(nextWakeUpTime, lock);
}
HILOGD("GetEvent: Break out");
return InnerEvent::Pointer(nullptr, nullptr);
}
EventHandler
void EventHandler::DistributeEvent(const InnerEvent::Pointer &event) //執(zhí)行InnerEvent事件
{
if (!event) {
HILOGE("DistributeEvent: Could not distribute an invalid event");
return;
}
// Save old event handler.
std::weak_ptr<EventHandler> oldHandler = currentEventHandler;
// Save current event handler into thread local data.
currentEventHandler = shared_from_this();
auto spanId = event->GetTraceId();
auto traceId = HiTrace::GetId();
bool allowTraceOutPut = AllowHiTraceOutPut(spanId, event->HasWaiter());
if (allowTraceOutPut) {
HiTrace::SetId(*spanId);
HiTracePointerOutPut(spanId, event, "Receive", HiTraceTracepointType::HITRACE_TP_SR);
}
InnerEvent::TimePoint nowStart = InnerEvent::Clock::now();
DeliveryTimeAction(event, nowStart);
if (event->HasTask()) { //如果有回調(diào)函數(shù)
// Call task callback directly if contains a task.
(event->GetTaskCallback())(); //執(zhí)行回調(diào)函數(shù)
} else {
// Otherwise let developers to handle it.
ProcessEvent(event); //執(zhí)行ProcessEvent
}
DistributeTimeAction(event, nowStart);
if (allowTraceOutPut) {
HiTrace::Tracepoint(HiTraceTracepointType::HITRACE_TP_SS, *spanId, "Event Distribute over");
HiTrace::ClearId();
if (traceId.IsValid()) {
HiTrace::SetId(traceId);
}
}
// Restore current event handler.
if (oldHandler.expired()) {
currentEventHandler = nullptr;
} else {
currentEventHandler = oldHandler;
}
}