如何構(gòu)建以事件驅(qū)動(dòng)型實(shí)時(shí)信息系統(tǒng)
譯文【51CTO.com快譯】在如今競(jìng)爭(zhēng)激烈的商業(yè)競(jìng)爭(zhēng)環(huán)境中,數(shù)據(jù)的處理往往需要具有實(shí)時(shí)性。如果競(jìng)爭(zhēng)對(duì)手在數(shù)據(jù)管控上事先采取了行動(dòng),那么勢(shì)必會(huì)在業(yè)務(wù)上取得一定的優(yōu)勢(shì)。這也正是我們需要構(gòu)建能夠?qū)崟r(shí)處理信息(數(shù)據(jù))的企業(yè)級(jí)系統(tǒng)的重要原因。在實(shí)時(shí)處理數(shù)據(jù)時(shí),我們除了可以將系統(tǒng)設(shè)計(jì)為以異步的方式,對(duì)事件進(jìn)行操作之外,還可以使用同步的請(qǐng)求響應(yīng)消息,來構(gòu)建實(shí)時(shí)的系統(tǒng)。同時(shí),為了保持此類系統(tǒng)能夠高效地使用資源,我們需要了解如何基于事件驅(qū)動(dòng)的方法,來構(gòu)建實(shí)時(shí)的信息系統(tǒng)。
圖:實(shí)時(shí)信息系統(tǒng)解決方案架構(gòu)
如上圖所示,各類事件通常來源于包括移動(dòng)和Web應(yīng)用在內(nèi)的不同渠道。其中:
- 接收事件組件,負(fù)責(zé)在任何給定的時(shí)間內(nèi)處理大量的數(shù)據(jù),其速率從每秒上千個(gè)事件、到每秒數(shù)上百萬個(gè)事件不等。在接收到事件之后,該組件會(huì)將其傳遞給對(duì)應(yīng)的處理組件。
- 事件處理組件,負(fù)責(zé)對(duì)事件中帶有的數(shù)據(jù)進(jìn)行操作,包括:過濾,清理,轉(zhuǎn)換和匯總等。根據(jù)組件的行為,事件處理既可以獨(dú)立于事件接收組件、被單獨(dú)地執(zhí)行,也可以作為相關(guān)的操作來完成。事件處理組件可以將原始事件存儲(chǔ)在各種數(shù)據(jù)商店(data store)中,以便進(jìn)行審核(如果未被接收組件處理的話),并且可以根據(jù)用例,將計(jì)算結(jié)果存儲(chǔ)在那些單獨(dú)的數(shù)據(jù)商店之中。因此,大多數(shù)事件都能夠被實(shí)時(shí)處理,并發(fā)布到事件發(fā)布組件中,以傳遞給使用者(consumer)。
- 事件發(fā)布組件,負(fù)責(zé)將處理后的數(shù)據(jù)實(shí)時(shí)地推送給使用者。這些使用者既可以是移動(dòng)或Web應(yīng)用,又可以是對(duì)已處理事件起作用的其他系統(tǒng)。除了這些實(shí)時(shí)事件的發(fā)布之外,在某些用例中,我們還需要通過HTTP通道,以同步、或請(qǐng)求-響應(yīng)的方式,將處理后的摘要信息發(fā)布給移動(dòng)和Web應(yīng)用。
上述參考架構(gòu)可用于那些需要處置現(xiàn)場(chǎng)正在發(fā)生的事件,并將其發(fā)布到后臺(tái)應(yīng)用等多種用例中。例如:在出現(xiàn)緊急情況時(shí),現(xiàn)場(chǎng)人員可以實(shí)時(shí)地發(fā)送有關(guān)需求與狀況的細(xì)節(jié)信息,而后臺(tái)團(tuán)隊(duì)則能夠毫不拖延地進(jìn)行必要的物資調(diào)配與派送。此外,我們也可以使用此類架構(gòu)來構(gòu)建農(nóng)業(yè)的供應(yīng)鏈。例如:農(nóng)民們通過運(yùn)貨車輛將農(nóng)作物運(yùn)送到連鎖超市。農(nóng)民可以在農(nóng)作物準(zhǔn)備就緒時(shí),更新其詳細(xì)信息。而超市后臺(tái)團(tuán)隊(duì)則會(huì)實(shí)時(shí)地從各個(gè)位置獲取更新,并安排車輛及時(shí)地收集農(nóng)作物,以避免延遲。
使用WSO2和Kafka的參考架構(gòu)
消息代理是將消息發(fā)送者與接收者相分離的組件。目前,市場(chǎng)上有很多消息代理類產(chǎn)品,它們各有優(yōu)、缺點(diǎn)。其中最流行的當(dāng)屬Kafka、NATS和RabbitMQ。當(dāng)然,Kafka也可以作為NATS和RabbitMQ的最佳功能性代理。
在此,我們選擇Kafka作為事件消息的代理;選擇功能豐富、簡(jiǎn)單且開源的WSO2Streaming Integrator作為事件處理器;使用既支持流媒體、又支持REST風(fēng)格的WSO2 API Manager,作為事件發(fā)布者。當(dāng)然,這些組件也可以被市場(chǎng)上的其他類似工具所替換。下圖展示了構(gòu)成實(shí)時(shí)事件驅(qū)動(dòng)型信息系統(tǒng)的各個(gè)組件,及其相互連接。
圖:具有代理和WSO2平臺(tái)的實(shí)時(shí)事件驅(qū)動(dòng)型信息系統(tǒng)
在該架構(gòu)中,事件代理會(huì)接收來自移動(dòng)和Web應(yīng)用等源頭的事件負(fù)載。WSO2 Streaming Integrator會(huì)處理這些事件,然后將各種結(jié)果事件通過WebSocket連接,發(fā)布到WSO2 API Manager上。WSO2 API Manager擁有一個(gè)公布給網(wǎng)關(guān)的WebSocket API,諸如移動(dòng)和Web之類的consumer(消費(fèi)者)應(yīng)用會(huì)使用該API,實(shí)時(shí)地接收各種事件。同時(shí),WSO2 Streaming Integrator可以將原始事件和匯總的結(jié)果,通過標(biāo)準(zhǔn)的REST API,從WSO2 API Manager處公布給相關(guān)的consumer。下圖對(duì)上述架構(gòu)進(jìn)行了細(xì)化。
圖:具有Kafka和WSO2平臺(tái)詳細(xì)信息的實(shí)時(shí)事件驅(qū)動(dòng)型信息系統(tǒng)
如上圖所示,事件源通過Kafka客戶端,將事件發(fā)布到Kafka代理中那些可用的topic(主題)處。WSO2 Streaming Integrator不但可以訂閱這些topic,還能通過已配置的Kafka源,實(shí)時(shí)地使用來自Kafka的各種事件。由Siddhi語言編寫的各項(xiàng)操作將處理這些事件,并傳遞給諸如WebSocket之類的事件sink(接收器)。同時(shí),WSO2 SI會(huì)按需通過各種數(shù)據(jù)商店(data stores),將事件存儲(chǔ)到對(duì)應(yīng)的數(shù)據(jù)庫表中。
WSO2 API Manager通過WebSocket API來將WebSocket sink的詳細(xì)信息配置到API的端點(diǎn)上。據(jù)此,那些使用WebSocket API的客戶端應(yīng)用將能實(shí)時(shí)地接收到已經(jīng)處理的各種事件。
同時(shí),那些已處理的信息和原始事件會(huì)被存儲(chǔ)到一個(gè)通過WSO2 Enterprise Integrator公布了REST數(shù)據(jù)服務(wù)的數(shù)據(jù)庫中。此處的數(shù)據(jù)服務(wù)是通過將WSO2 API Manager作為受保護(hù)的REST API予以公布,并通過客戶端應(yīng)用實(shí)現(xiàn)同步通信的服務(wù)。此外,作為一種能夠支持大多數(shù)企業(yè)系統(tǒng)需求的成熟架構(gòu),我們可以通過擴(kuò)展,來支持諸如:混合集成需求、API管理平臺(tái)等多種企業(yè)用例。
只有WSO2平臺(tái),沒有Kafka的參考架構(gòu)
如果貴組織剛開始著手構(gòu)建實(shí)時(shí)的事件驅(qū)動(dòng)型信息系統(tǒng),而且數(shù)據(jù)負(fù)載量并不大的話,那么就可以僅使用WSO2平臺(tái),來構(gòu)建前文提到的精簡(jiǎn)版架構(gòu)。下圖展示了一種沒有消息代理的實(shí)現(xiàn)方式。
圖:具有WSO2平臺(tái)的實(shí)時(shí)事件驅(qū)動(dòng)型信息系統(tǒng)
該架構(gòu)與前文提到的架構(gòu)之間唯一的區(qū)別在于:雖然缺少事件代理,但是客戶端應(yīng)用能夠通過HTTP的調(diào)用,將事件直接發(fā)送到WSO2 Streaming Integrator處。當(dāng)然,由于該架構(gòu)沒有消息代理,因此WSO2 SI需要將原始事件存儲(chǔ)在數(shù)據(jù)庫中,以供各項(xiàng)審核。而它的其余功能則與前文的架構(gòu)相同。下圖展示了該架構(gòu)的詳細(xì)組成結(jié)構(gòu)。
圖:具有WSO2平臺(tái)詳細(xì)信息的實(shí)時(shí)事件驅(qū)動(dòng)型信息系統(tǒng)
如上圖所示,WSO2 SI被配置為通過HTTP接口來接收事件。而Siddhi應(yīng)用中的HTTP源則被配置為通過不同的操作,來處理各種事件,然后發(fā)布到WebSocket sink中。同時(shí),各種原始事件通過數(shù)據(jù)商店被存儲(chǔ)在數(shù)據(jù)庫中,并將各種聚合的結(jié)果通過不同的數(shù)據(jù)商店存儲(chǔ)到另一個(gè)數(shù)據(jù)表里。除此之外,該系統(tǒng)的其余功能與前文提到的基于代理的實(shí)現(xiàn)方式基本一致。
從Kafka到Websocket Siddhi應(yīng)用的示例代碼
下面我們將給出一個(gè)Siddhi的應(yīng)用示例。它能夠從Kafka的topic中讀取事件,并通過WebSocket服務(wù)器,將各種事件發(fā)布(或輸出)到某個(gè)日志sink處。當(dāng)然,在發(fā)布之前,它會(huì)對(duì)每個(gè)事件進(jìn)行簡(jiǎn)單地檢查(或篩選),以確保其數(shù)量小于500。具體代碼請(qǐng)參見--https://gist.github.com/chanakaudaya/efe8dfed2558811f0316a7839dbfef57。其中,您可以找到有關(guān)如何使用Streaming Integrator,來設(shè)置Kafka的詳細(xì)示例。同時(shí),您也可以通過文檔鏈接--https://ei.docs.wso2.com/zh_CN/latest/streaming-integrator/examples/working-with-kafka/,來試運(yùn)行該Siddhi應(yīng)用。
如何創(chuàng)建連接到WebSocket端點(diǎn)的WebSocket API
如下圖所示,您可以通過WSO2 API Manager的發(fā)布者(publisher)接口來創(chuàng)建WebSocket API,并使用WS服務(wù)器將這些事件發(fā)布到客戶端。
您可以在下圖的上部菜單中選擇“設(shè)計(jì)新的WebSocket API”(或“創(chuàng)建API”),然后在下一個(gè)窗口中提供詳細(xì)的信息。
接著,您可以選擇“創(chuàng)建并發(fā)布”選項(xiàng),將WebSocket API推送到開發(fā)人員的門戶(portal)處,以便用戶在其中使用有效的OAuth2令牌。
通過參考文檔鏈接-- https://apim.docs.wso2.com/zh-CN/latest/learn/tutorials/create-and-publish-websocket-api/#create-and-publish-a-websocket-api,您可以逐步了解到如何創(chuàng)建WebSocket API,并能夠試運(yùn)行其客戶端的示例。
作為拓展,您還可以從如下鏈接處,獲得有關(guān)WSO2的大量代碼示例:
- WSO2 Streaming Integrator教程--https://ei.docs.wso2.com/en/latest/streaming-integrator/guides/use-cases/
- WSO2 API Manager教程--https://apim.docs.wso2.com/en/latest/learn/design-api/create-api/create-a-rest-api/
- WSO2 Enterprise Integrator教程--https://ei.docs.wso2.com/en/latest/micro-integrator/use-cases/learn-overview/
原標(biāo)題:How To Build a Real-Time, Event-Driven Information System ,作者: Chanaka Fernando
【51CTO譯稿,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文譯者和出處為51CTO.com】