利用Microsoft StreamInsight控制較大數據流詳解
StreamInsight 流處理體系結構和熟悉的基于.NET 的開發(fā)平臺使開發(fā)人員可以快速實現強大而高效的事件處理應用程序。本文我們主要就介紹一下利用Microsoft StreamInsight控制較大數據流的具體方法,接下來就讓我們一起來了解一下這部分內容吧。
利用Microsoft StreamInsight 控制較大數據流
生產線的產量下降后,將容易出現用戶媒體流跳過這些流程,或者您的一個產品成為了“必需產品”的情況。真正的竅門是在這些情況發(fā)生時進行識別,或根據以往趨勢對其做出預測。
成功預測這些情況需要使用近乎實時的方法。在對相關數據進行提取、轉換并加載到 SQL Server Analysis Services (SSAS) 等傳統(tǒng)商業(yè)智能 (BI) 解決方案中時,情況早已發(fā)生改變。同樣,一些系統(tǒng)依靠請求-響應模式來從事務性數據存儲(如 SQL Server Reporting Services 或 SSRS、報告)中請求已更新的數據,像這樣的系統(tǒng)總是在接近請求-輪詢間隔結束時運行陳舊數據。輪詢間隔通常是固定的,因此即使突然發(fā)生有趣的活動,消耗系統(tǒng)也不會知道,直到進入下一個間隔。相反,消耗系統(tǒng)應該在滿足趣味條件時連續(xù)收到通知。
在檢測新興趨勢時,時間間隔至關重要 - 在過去的五分鐘內,一個特定項目發(fā)生了 100 次購買,顯而易見,這比過去五個月間的持續(xù)購買更能指示新興趨勢。SSAS 和 SSRS 等傳統(tǒng)系統(tǒng)需要開發(fā)人員通過事務性存儲中多維數據集或時間戳列中的單獨維度來自行跟蹤數據的及時性。理論上,用于識別新興情況的工具可能具有時間內置的概念,并能提供使用該工具所需的豐富 API。
最后,對未來的準確指示來源于對過去的分析。實際上,這就是傳統(tǒng) BI 的所有功能 - 對大量的歷史數據進行匯總和分析,從而識別趨勢。遺憾的是,與更多的事務性系統(tǒng)相比,在使用這些系統(tǒng)時需要不同的工具和查詢語言。成功識別新興情況需要實現過去數據和當前數據的無縫關聯。只有當對這兩種數據使用相同的工具和查詢語言時,才可能實現這種緊密集成。
對于生產線監(jiān)視等特定情況,可通過存在的針對性極強的自定義工具來執(zhí)行這些功能,但是這些工具通常比較昂貴且用途并不廣泛。
為了防止生產線產量下降或確保您的產品定價合適,關鍵在于具有足夠的響應能力,能夠根據情況的更改而進行識別與調整。若要輕松快速地識別這些情況,歷史查詢和實時查詢應使用相同的開發(fā)人員友好的工具集和查詢語言,系統(tǒng)應該以近乎實時的方式來處理大量的數據(大約為每秒成百上千個事件),同時引擎應該足夠靈活,能夠處理跨越多個問題域的情況。
幸運的是,存在這樣的工具。它稱為 Microsoft StreamInsight。
StreamInsight 體系結構概述
StreamInsight 是一種復雜事件處理引擎,它每秒能夠處理成百上千的事件,且延遲極低。它可以由任何進程(如 Windows 服務)托管,也可以直接嵌入任何應用程序。StreamInsight 具有簡單的適配器模型,用于輸入和輸出數據,并且實時數據和歷史數據的查詢像任何其他來自任何 Microsoft .NET Framework 語言的程序集一樣使用獲取的相同 LINQ 語法。其作為 SQL Server 2008 R2 的一部分授予許可。
StreamInsight 的高級體系結構非常簡單:通過輸入適配器從各種源收集事件。這些事件均通過查詢進行分析和轉換,并且查詢結果通過輸出適配器分發(fā)給其他系統(tǒng)和人?! D 1 顯示了這一簡單結構。
圖 1 Microsoft StreamInsight 高級體系結構
就像面向服務的體系結構關注消息,而數據庫系統(tǒng)關注行一樣,StreamInsight 等復雜事件處理系統(tǒng)按照事件進行組織。事件是簡單的數據段以及與該數據相關的時間 - 與一天中特定時間的傳感器讀數或股票行情價格相似。事件所攜帶的數據稱為它的負載。
StreamInsight 支持三種類型的事件。點事件是即時且不持續(xù)的事件。間隔事件是其負載與特定時間段相關的事件。邊緣事件與間隔事件相似,但當邊緣事件到達時,其持續(xù)時間未知。而系統(tǒng)設置了開始時間,且事件實際上具有無限持續(xù)時間,直到另一個邊緣事件到達才會為這一事件設置結束時間。例如,速度計讀數可能為點事件,因為它不斷更改,但是超市的牛奶價格可能為邊緣事件,因為其關聯時間較長。當牛奶的零售價格更改時(比如,由于分銷商定價發(fā)生更改),新價格的持續(xù)時間未知,因此,與間隔事件相比,邊緣事件要更為合適。稍后,當分銷商再次更新其定價時,新的邊緣事件將覆蓋先前定價更改的持續(xù)時間,而另一個邊緣事件將設置新的價格以便繼續(xù)。
StreamInsight 中的輸入適配器和輸出適配器是適配器設計模式的抽象示例。StreamInsight 引擎在其自有的事件表示上運行,但是這些事件的實際來源可能有較大差異,范圍從專有接口到硬件傳感器到由企業(yè)的應用程序生成的狀態(tài)消息。輸入適配器將源事件轉換為引擎能夠理解的事件流。
來自 StreamInsight 查詢的結果表示特定商業(yè)知識,且能夠高度專業(yè)化。將這些結果路由至最合適的地點,這點至關重要。輸出適配器可用于將事件的內部表示轉換為打印到控制臺的文本、通過 Windows Communication Foundation (WCF) 發(fā)送到另一個系統(tǒng)以供處理的消息,甚至 Windows Presentation Foundation 應用程序中圖表上的點。有關使用文本文件、WCF 和 SQL 等的示例適配器可從 streaminsight.codeplex.com 獲得。
StreamInsight Queries by Example
乍一看,StreamInsight 查詢似乎與從數據庫中查詢行相似,但是兩者之間存在重大差異。查詢數據庫時,系統(tǒng)會構造并執(zhí)行查詢,同時返回結果。如果基礎數據發(fā)生更改,輸出并不會因為已運行查詢而受影響。數據庫查詢結果表示某一時刻的快照,可以通過請求-響應模式使用。
StreamInsight 查詢?yōu)楝F有查詢。隨著新輸入事件的到達,查詢不斷響應,并且根據需要創(chuàng)建新的輸出事件。
本文中的查詢示例來自可供下載的示例解決方案。這些示例開始較簡單,但隨著查詢語言新功能的引入,功能變得更加強大。所有查詢都使用同一負載類。以下是一個簡單類的定義,該類具有 Region 屬性和 Value 屬性:
- public class EventPayload
- {
- public string Region { get; set; }
- public double Value { get; set; }
- public override string ToString()
- {
- return string.Format("{0}\t{1:F4}", Region, Value);
- }
- }
示例應用程序中的查詢使用一臺輸入適配器和一臺輸出適配器來進行,輸入適配器可隨機生成數據,輸出適配器只需將各事件寫入控制臺。為清晰起見,對示例應用程序中的適配器進行了簡化。
若要運行每個查詢,請在示例解決方案中取消注釋 Program.cs 文件中的行,該示例解決方案可將查詢分配給稱為“template”的本地變量。
以下是一個基本查詢,它通過 Value 屬性來篩選事件:
- var filtered =
- from i in inputStream
- where i.Value > 0.5
- select i;
具有使用 LINQ 經驗的任何開發(fā)人員應該非常熟悉此查詢。因為 StreamInsight 使用 LINQ 作為它的查詢語言,因此此查詢與 LINQ to SQL 查詢類似,訪問數據庫或對 IList 進行內存中篩選。當事件從輸入適配器到達時,其負載將受到檢查,并且如果 Value 屬性的值大于 0.5,事件將被傳遞到輸出適配器,并在此將其打印到控制臺。
應用程序運行時,可以看到事件不斷到達輸出中。這實際上是一個推模型。當事件到達時,StreamInsight 會計算來自輸入的新輸出事件,這與數據庫等拉模型不同,在拉模型中,應用程序必須定期輪詢數據源,以查看新數據是否已經到達。這能與 Microsoft .NET Framework 4 中可用的 IObservable 支持完美結合,我們將在后續(xù)章節(jié)中對此進行介紹。
使用推模型代替輪詢來處理連續(xù)數據是個非常好的主意,但是 StreamInsight 的真正功能體現在查詢時間相關的屬性上。當事件通過輸入適配器到達時,它們獲得了一個時間戳。該時間戳可能來自數據源本身(假設事件表示歷史數據,且?guī)в杏糜诖鎯r間的顯示列),或者可以設置為事件到達的時間。實際上,時間是 StreamInsight 查詢語言中的第一個類。
查詢通常與標準數據庫查詢類似,標準數據庫查詢在尾部粘貼有時間限制符,如“每五秒”或“五秒的時間跨度上每三秒”。例如,以下是一個簡單查詢,它每五秒查詢一次 Value 屬性的平均值:
- var aggregated =
- from i in inputStream.TumblingWindow(TimeSpan.FromSeconds(5),
- HoppingWindowOutputPolicy.ClipToWindowEnd)
- select new { Avg = i.Avg(p => p.Value)};
數據窗口
因為時間概念是復雜事件處理系統(tǒng)的基礎必需概念,因此應以簡單的方式來使用系統(tǒng)中查詢邏輯的時間組件,這點非常重要。StreamInsight 使用窗口概念來表示按時間分組。之前的查詢使用翻轉窗口。應用程序運行時,查詢將每五秒生成單個輸出事件(窗口的大?。?。輸出事件表示前五秒的平均值。像 LINQ to SQL 或 LINQ to Object 一樣,聚合方法(如 Sum 和 Average)能夠將按時間分組的事件匯總為單個值,或可以使用 Select 將輸出投影成不同格式。
翻轉窗口只是另一種窗口類型的特例:跳躍窗口。跳躍窗口也有大小,但是它們也具有不等于其窗口大小的跳躍大小。這表示跳躍窗口可以互相重疊。
例如,窗口大小為五秒、跳躍大小為三秒的跳躍窗口將每三秒生成輸出(跳躍大小),提供前五秒的平均值(窗口大小)。它一次向前跳躍三秒,且持續(xù)五秒。圖 2 顯示分組為翻轉窗口和跳躍窗口的事件流。
圖 2 翻轉窗口和跳躍窗口
請注意,翻轉窗口并不重疊,但是對于跳躍窗口,如果跳躍大小小于窗口大小,則可以重疊。如果窗口重疊,事件將可能在多個窗口中結束,如同時存在于窗口 1 和窗口 2 中的第三個事件。邊緣事件(具有持續(xù)時間)也可能在窗口邊緣重疊,并在多個窗口中結束,如翻轉窗口中的倒數第二個事件。
另一種常見窗口類型為計數窗口。計數窗口包含特定數量的事件,而不是某一時間點或時間段內的事件。要查詢最后三個到達的事件的平均數,可能需要使用計數窗口。計數窗口當前的一個限制是不支持 Sum 和 Average 等內置聚合方法。您必須創(chuàng)建用戶定義的聚合。下文會對這一簡單流程進行介紹。
最后一種窗口類型為快照窗口。在邊緣事件的環(huán)境下,快照窗口最容易理解。每次事件的開始或結束即表示當前窗口的完成和新窗口的開始。圖 3 顯示如何將邊緣事件分組為快照窗口。請注意每個事件邊界觸發(fā)窗口邊界的方式。E1 開始,w1 也開始。當 E2 開始時,w1 完成,而 w2 開始。下個邊緣是 E1 結束,使得 w2 完成,而 w3 開始。結果為三個窗口:包含 E1 的 w1,包含 E1 和 E2 的 w2 以及包含 E3 的 w3。事件分組為窗口后,它們會受到拉伸,從而使事件的開始與結束時間與窗口的相同。
圖 3 快照窗口
更多復雜查詢
在提供可用窗口與基本查詢方法(如地點、分組依據和排序依據)的情況下,可以進行多種查詢。以下是一個查詢,其將輸入事件按地區(qū)分組,然后使用跳躍窗口來輸出最后一分鐘各個 Region 的負載 Value 的總和:
- var payloadByRegion =
- from i in inputStream
- group i by i.Region into byRegion
- from c in byRegion.HoppingWindow(
- TimeSpan.FromMinutes(1),
- TimeSpan.FromSeconds(2),
- HoppingWindowOutputPolicy.ClipToWindowEnd)
- select new {
- Region = byRegion.Key,
- Sum = c.Sum(p => p.Value)};
這些窗口使用兩秒的跳躍大小,因此引擎每兩秒發(fā)送輸出事件。
因為查詢運算符是在 IQueryable 接口中定義的,因此可以撰寫查詢。以下代碼使用上一個查詢,其按地區(qū)查找總和,并計算總和最高的地區(qū)??煺沾翱谠试S事件流按總和分類,因此可以使用 Take 方法獲取總和最高的地區(qū):
- var highestRegion =
- // Uses groupBy query
- (from i in payloadByRegion.SnapshotWindow(
- SnapshotWindowOutputPolicy.Clip)
- from sumByRegion in i
- orderby sumByRegion.Sum descending
- select sumByRegion).Take(1);
一般情況是有關快速移動事件(如傳感器中的讀數)到慢速移動或靜態(tài)參考數據(如傳感器的固定位置)流的查詢。查詢使用聯接來實現此目的。
StreamInsight 聯接語法與任何其他 LINQ 聯接相同,但有一點需要注意:當事件的持續(xù)時間重疊時,它們才會聯接在一起。如果傳感器 1 在時間 t1 報告了一個值,但是有關傳感器 1 位置的參考數據僅對時間 t2 到 t3 有效,那么聯接將不匹配。持續(xù)時間的聯接條件并沒有明確寫入查詢定義中;這是 StreamInsight 引擎的基本屬性。使用靜態(tài)數據時,通常情況下,輸入適配器實際上將數據處理為帶有無限持續(xù)時間的邊緣事件。這樣將能成功完成到快速移動事件流的所有聯接。
通過聯接來關聯多個事件流是一個非常強大的概念。裝配線、石油生產設施或高容量網站通常不會因為隔離的事件而發(fā)生故障。一個用于觸發(fā)溫度警報的設備部件通常不會導致生產線癱瘓;生產線癱瘓可能由于多個原因造成,如溫度在某一持續(xù)時間段內過高,同時某一工具使用過多,而操作員正在換班。
如果沒有聯接,隔離事件將不會有這么多的商業(yè)價值。通過對歷史數據使用聯接和 StreamInsight 查詢,用戶可以將隔離流與非常具體的監(jiān)控條件相關聯,然后進行實時監(jiān)控?,F有查詢能夠查找可能導致故障的情況,并自動生成可路由至系統(tǒng)的輸出事件,該系統(tǒng)知道如何使過熱的設備部件脫機,而不是等到該部件造成整條生產線停產。
在零售情況中,有關某段時間按項目劃分的銷售量的事件可以輸入到定價系統(tǒng)和客戶訂單歷史記錄中,從而確保每個項目具有最佳的定價,或決定在用戶結賬前向其推薦的項目。由于查詢易于創(chuàng)建、修改和撰寫,因此您可以從簡單的情況開始,并隨時間的流逝進行優(yōu)化,從而增加業(yè)務價值。
用戶定義的聚合
StreamInsight 附帶最常見的聚合函數,包括 Count、Sum 和 Average。當這些函數不夠時(或您需要在前文提到的計數窗口進行聚合),StreamInsight 支持用戶定義的聚合函數。
要創(chuàng)建用戶定義的聚合,其流程包括兩個步驟:編寫實際聚合方法,然后通過擴展方法將該方法公布到 LINQ。
進行第一步時,如果聚合與時間無關,則從 CepAggregate<TInput, TOutput> 繼承,如果聚合與時間有關,則從 CepTimeSensitiveAggregate<TInput,TOutput> 繼承。這些抽象類具有單獨的實現方法,稱為 GenerateOutput。
- public class EveryOtherSum :
- CepAggregate<double, double> {
- public override double GenerateOutput(
- IEnumerable<double> payloads) {
- var sum = default(double);
- var include = true;
- foreach (var d in payloads) {
- if (include) sum += d;
- include = !include;
- }
- return sum;
- }
- }
進行第二步時,需要在 CepWindow<TPayload> 上創(chuàng)建擴展方法,以便可以在查詢中使用您的聚合。CepUserDefinedAggregateAttribute 適用于擴展方法,以便通知 StreamInsight 在哪里可以找到聚合的實現(在這種情況下,類是在第一步中創(chuàng)建的)。在可下載的示例應用程序中,本流程兩個步驟的代碼均可在 EveryOtherSum.cs 文件中找到。
關于利用Microsoft StreamInsight控制較大數據流的相關知識就介紹到這里了,希望本次的介紹能夠對您有所收獲!
【編輯推薦】