【原】StreamInsight 浅入浅出(四)—— 例子
對(duì)于StreamInsight這種不是很線性的架構(gòu),最好還是直接拿出來一個(gè)例子,簡(jiǎn)單但完整的把流程走過一遍,更能看清所謂“流”、“事件”、“適配器”之類到底是什么東西,有什么關(guān)系。
官方例子下載地址:http://go.microsoft.com/fwlink/?LinkId=180356,這里就理一遍其中最簡(jiǎn)單的例子:TrafficJoinQuery
場(chǎng)景描述
這個(gè)例子的場(chǎng)景可以描述為:有九個(gè)測(cè)速器,編號(hào)為1001~1009,分別放置在3個(gè)地點(diǎn)。每個(gè)測(cè)速器每20s會(huì)記錄下這20s內(nèi)通過的車輛數(shù)以及它們的平均速度。現(xiàn)在要統(tǒng)計(jì)出每個(gè)測(cè)速器記錄的一分鐘內(nèi)車輛數(shù)的平均數(shù):
比如1001號(hào)測(cè)速器,10:00:00~10:00:20記錄了20輛車,10:00:20~10:00:40記錄了15輛車,10:00:40~10:01:00記錄了25輛車,10:01:00~10:01:20記錄了5輛車,那么1001號(hào)測(cè)速器在10:00:00~10:01:00這一分鐘內(nèi)車輛數(shù)的平均數(shù)就是(20+15+25)/3=20,而在10:00:20~10:01:20這一分鐘內(nèi)車輛數(shù)的平均數(shù)就是(15+25+5)/3=15。
這里最重要的就是搞清每一次計(jì)數(shù)的時(shí)候,哪些數(shù)據(jù)是包括其中的。
提供的數(shù)據(jù)是兩個(gè)csv文件,一個(gè)是包含了時(shí)間、測(cè)速器編號(hào)、車數(shù)、車速的日志文件,另一個(gè)是測(cè)速器編號(hào)與所在地點(diǎn)(1,2,3)對(duì)應(yīng)的表。最終的結(jié)果在對(duì)第一張表的聚合計(jì)算的基礎(chǔ)上,再把這兩張表連接起來。
準(zhǔn)備工作
當(dāng)然要先安裝StreamInsight http://msdn.microsoft.com/zh-cn/library/ee378749.aspx 。然后注意把下載下來的例子里的
using (Server server = Server.Create("Default"))改成
using (Server server = Server.Create("XXXXX"))其中XXXXX就是你的StreamInsight的實(shí)例名。 如果想使用 Connect的方法的話,需要先開啟一個(gè) Host,提供一個(gè) EndPoint :
Server serverInsight = Server.Create("StreamInsight"); ServiceHost host = new ServiceHost(serverInsight.CreateManagementService()); WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message); binding.HostNameComparisonMode = HostNameComparisonMode.Exact; host.AddServiceEndpoint(typeof(IManagementService),binding,"http://localhost:80/StreamInsight/StreamInsight"); host.Open();然后在程序中通過
using (Server server = Server.Connect(new System.ServiceModel.EndpointAddress(@http://localhost/StreamInsight/StreamInsight)))連接到EndPoint。
適配器
例子的Solution下包括三個(gè)項(xiàng)目,其中“SimpleTextFileReader”和“SimpleTextFileWriter”是兩個(gè)適配器項(xiàng)目,分別對(duì)應(yīng)輸出、輸入適配器。從例子中可以看出,推薦的做法是適配器項(xiàng)目與主程序項(xiàng)目獨(dú)立,這樣能很容易的切換適配器。
查看這兩個(gè)項(xiàng)目,可以看出輸入適配器與輸出適配器的結(jié)構(gòu)是類似的,都包含一個(gè)工廠 Factory 類,一個(gè)提供配置信息的 Config 類,三個(gè)分別對(duì)應(yīng)三種事件模型的適配器。
Factory
對(duì)于輸出適配器,Factory類要完成的就是用Create方法,根據(jù)輸入的事件模型(EventShape)來返回對(duì)應(yīng)的適配器。而輸入適配器的Factory類由于應(yīng)用了 IDeclareAdvanceTimeProperties 接口,還要額外實(shí)現(xiàn) DeclareAdvanceTimeProperties 方法來進(jìn)行一些配置,主要是CTI事件的生成頻率、延遲時(shí)長(zhǎng)以及超時(shí)事件的處理策略的配置。具體可參見代碼中的注釋和 AdvanceTimeGenerationSettings 以及 AdapterAdvanceTimeSettings 這兩個(gè)類的構(gòu)造函數(shù)在 MSDN 中的解釋。
Config
雖然一般 Config 類都帶有"Config"的后綴,但事實(shí)上 Config 類并沒有統(tǒng)一的基類或者接口。它的作用就是由外部傳遞一些配置信息給 Factory 并進(jìn)一步傳遞到適配器中。
一般來說 Config 類中不包含公開的方法,而是由一些基本類型的屬性構(gòu)成。
在這個(gè)例子中,TextFileReaderConfig 類中配置了輸入文件的名稱(InputFileName),列的分隔符(Delimiter),文件的文化屬性(CultureName),各列的順序(InputFieldOrders),它們的用處可以在適配器中看到。而 CtiFrequency 則指明了 CTI 事件的頻率,作用于 TextFileReaderFactory 。
Adapter
不同的事件模型對(duì)應(yīng)的適配器,其代碼往往是類似的。比照 SimpleTextFileReader 工程下的三個(gè)適配器類,我們會(huì)發(fā)現(xiàn)除了 CreateEventFromLine 方法內(nèi)部有不同,其他都是近似甚至一樣的。
這里關(guān)鍵的方法是 ProduceEvents,Start 方法和 Resume 方法都調(diào)用了這個(gè)方法:
/// <summary> /// Main driver to read events from the CSV file and enqueue them. /// </summary> private void ProduceEvents() {IntervalEvent currentEvent = default(IntervalEvent);try{// Keep reading lines from the file.while (true){if (AdapterState.Stopping == AdapterState){Stopped();return;}// Did we enqueue the previous line successfully?if (this.currentLine == null){this.currentLine = this.streamReader.ReadLine();if (this.currentLine == null){// Stop adapter (and hence the query) at the end of the file.Stopped();return;}}try{// Create and fill event structure with data from text file line.currentEvent = this.CreateEventFromLine(this.currentLine);// In case we just went into the stopping state.if (currentEvent == null){continue;}}catch (Exception e){// The line couldn't be transformed into an event.// Just ignore it, and release the event's memory.ReleaseEvent(ref currentEvent);this.consoleTracer.WriteLine(this.currentLine + " could not be read into a CEP event: " + e.Message);// Make sure we read a new line next time.this.currentLine = null;continue;}if (EnqueueOperationResult.Full == Enqueue(ref currentEvent)){// If the enqueue was not successful, we keep the event.// It is good practice to release the event right away and// not hold on to it.ReleaseEvent(ref currentEvent);// We are suspended now. Tell the engine we are ready to be resumed.Ready();// Leave thread to wait for call into Resume().return;}// Enqueue was successful, so we can read a new line again.this.currentLine = null;}}catch (AdapterException e){this.consoleTracer.WriteLine("ProduceEvents - " + e.Message + e.StackTrace);} }在 While 循環(huán)中每次從日志文件中讀取一行記錄,然后利用 CreateEventFromLine 方法將該行記錄轉(zhuǎn)化為相應(yīng)的事件 currentEvent,最后通過 Enqueue 方法,把新的事件插入隊(duì)列中。如果理解了上一篇文章中的適配器的狀態(tài)機(jī),就會(huì)注意在每次讀取日志前先判斷適配器的狀態(tài)是否為 Stopping ,并在日志讀取空行(日志讀完)后停止適配器運(yùn)行。
當(dāng) Enqueue 的結(jié)果為 Full 時(shí),說明隊(duì)列已滿,這次插入是失敗的,而且當(dāng)前的狀態(tài)是 Suspended(由輸出適配器或者其他的適配器導(dǎo)致)。所以一方面通過 Ready 方法將狀態(tài)重置為 Running 好進(jìn)行下一次的插入。同時(shí)為了節(jié)省內(nèi)存,釋放 currentEvent 。
這里要注意幾個(gè) return ,因?yàn)樵谶@里說明直接退出了方法,循環(huán)中止,日志讀取中止。直到再次調(diào)用 ProduceEvents 方法,也就是外部調(diào)用 Resume 方法(在整個(gè)Query過程中,Start 方法只會(huì)在初始時(shí)調(diào)用一次),才會(huì)再次啟動(dòng)循環(huán),讀取日志。
至于 CreateEventFromLine 方法,就是通過一行日志生成對(duì)應(yīng)的事件。對(duì)于非類型化的適配器,事件負(fù)載要通過 SetField 方法賦值,這里通過 Config 中的 InputFieldOrders,將 csv 日志的各列分別對(duì)應(yīng)到事件負(fù)載的各字段中。
主程序
主項(xiàng)目 TrafficJoinQuery 中的三個(gè)文件,在 EventTypes 中的兩個(gè)類對(duì)應(yīng)兩種事件負(fù)載——測(cè)量日志與地理信息。這就體現(xiàn)了非類型化的適配器的優(yōu)勢(shì)——對(duì)于兩種事件負(fù)載,只需要同一個(gè)適配器就可以了,負(fù)載字段在運(yùn)行時(shí)根據(jù)配置信息動(dòng)態(tài)確定。
查詢模板
Program中,最復(fù)雜的是 QueryTemplate 的創(chuàng)建。所謂 QueryTemplate,顧名思義,就是查詢模板,通過預(yù)先設(shè)定一套計(jì)算方法和規(guī)則,將輸入流轉(zhuǎn)化為輸出流。這里有兩段 Linq 代碼:
// Extend duration of each sensor reading, so that they fall in // a one-minute sliding window. Group by sensor ID and calculate the // average vehicular count per group within each window. // Include the grouping key in the aggregation result. var avgCount = from oneMinReading in sensorStream.AlterEventDuration(e => TimeSpan.FromMinutes(1))group oneMinReading by oneMinReading.SensorId into oneGroupfrom eventWindow in oneGroup.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)select new { avgCount = eventWindow.Avg(e => e.VehicularCount), SensorId = oneGroup.Key };// Join sensors and locations. Moreover, filter the count // result by a threshold, which is looked up based on the // sensor location through a user-defined function. var joined = from averageEvent in avgCountjoin locationData in locationStreamon averageEvent.SensorId equals locationData.SensorIdwhere averageEvent.avgCount > UserFunctions.LocationCountThreshold(locationData.LocationId)select new{SensorId = locationData.SensorId,LocationID = locationData.LocationId,VehicularCount = averageEvent.avgCount};在第一段中先利用 AlterEventDuration 方法將每條記錄的有效時(shí)間延續(xù)至一分鐘——因?yàn)槲覀円y(tǒng)計(jì)的是一分鐘的平均值。之后對(duì) SensorId 做聚合分組,最后用 SnapshotWindow 方法截取每組每個(gè)時(shí)間段的平均值。這里 SnapshotWindow 可以認(rèn)為是給事件流的橫切面拍了一個(gè)快照,獲取的是一個(gè)時(shí)間點(diǎn)上的數(shù)據(jù)。
而第二段就是將第一段獲得的事件流與地點(diǎn)數(shù)據(jù)做連接,而且還利用 UserFunctions 提供的 LocationCountThreshold 方法過濾了一部分?jǐn)?shù)據(jù)。最終我們得到的事件負(fù)載包含了 SensorId 、LocationID 、VehicularCount 三個(gè)字段。
關(guān)于聚合、連接、時(shí)間窗口以及其他的 Linq 語法,具體會(huì)在以后介紹。
查詢綁定
有了查詢模板,也只是打了一個(gè)空架子,只有連上輸入、輸出適配器,才能得到一個(gè)能實(shí)際運(yùn)作的系統(tǒng)。在 BindQuery 方法中就將兩個(gè)輸入適配器和一個(gè)輸出適配器與查詢模板綁定在了一起。
兩個(gè)輸入適配器一個(gè)是邊緣事件適配器,一個(gè)是時(shí)間段事件適配器。前者對(duì)應(yīng)的是地理數(shù)據(jù),因?yàn)檫吘壥录跊]有接收到結(jié)束邊緣事件時(shí),它的結(jié)束時(shí)間是無窮大,也就是在整個(gè)查詢過程中是有效的,正適合需要一直有效的地理數(shù)據(jù)。而時(shí)間段事件在生成時(shí)就明確了開始時(shí)間和結(jié)束時(shí)間,符合這里車數(shù)日志記錄的情況。
輸出適配器是點(diǎn)事件,說明我們要得到的結(jié)果是每個(gè)時(shí)間點(diǎn)意義上的值。
查詢啟動(dòng)、停止與診斷
// Start the query query.Start();// Wait for the query to be suspended - that is the state // it will be in as soon as the output adapter stops due to // the end of the stream. DiagnosticView dv = server.GetDiagnosticView(query.Name);while ((string)dv[DiagnosticViewProperty.QueryState] == "Running") {// Sleep for 1s and check againThread.Sleep(1000);dv = server.GetDiagnosticView(query.Name); }// Retrieve some diagnostic information from the CEP server // about the query. Console.WriteLine(string.Empty); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/EventManager")), Console.Out); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/PlanManager")), Console.Out); RetrieveDiagnostics(server.GetDiagnosticView(new Uri("cep:/Server/Application/TrafficJoinSample/Query/TrafficSensorQuery")), Console.Out);query.Stop();啟動(dòng)、停止不需細(xì)說。由于 query.Start() 后實(shí)際是適配器用另外的線程執(zhí)行相應(yīng)的方法(ProduceEvents),主線程需要等待適配器線程執(zhí)行結(jié)束。所以這里用 DiagnosticView 獲得當(dāng)前查詢的狀態(tài)。直到不為 Running,才輸出查詢的診斷報(bào)告。最后停止查詢。
這里的診斷報(bào)告會(huì)列出一些查詢數(shù)據(jù),比如總事件數(shù)、查詢時(shí)間等。但從中很難看出查詢的具體流程是怎樣的,即使你進(jìn)行調(diào)試,由于具體的查詢實(shí)際是在各個(gè)線程中執(zhí)行的,無法順序跟蹤事件的產(chǎn)生、計(jì)算、輸出。所以,StreamInsight 提供了一個(gè)圖形化的調(diào)試工具,StreamInsight Event Flow Debugger。關(guān)于這個(gè)工具的使用,會(huì)在下一篇文章詳細(xì)介紹。
轉(zhuǎn)載于:https://www.cnblogs.com/smjack/archive/2010/10/29/1864429.html
總結(jié)
以上是生活随笔為你收集整理的【原】StreamInsight 浅入浅出(四)—— 例子的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kernel部分数据结构列表三(inod
- 下一篇: 使用delphi 开发多层应用(十三)使