matlab cep,【CEP】重构和改进HelloInsightObservable
1、官方源碼的不足
HelloInsightObservable官方源碼利用toToPointStream方法將觀察者的實例轉化為點事件流,接著在點事件流中使用linq查詢e>50的輸入,并將其輸出
運行結果如下:
其不足之處在于代碼有點混亂,而且只有一個觀察者。
接下來本文就逐步修改,并且實現多個觀察者的情況。
program.cs中定義觀察者,將觀察者“訂閱”到目標對象的語句如下:
var outputObserver = new OutputObserver();
var outputObservable = query.ToObservable();//將事件流轉化為可觀察的輸出
outputObservable.Subscribe(outputObserver);//提供通知信息到outputObserver
那容易想到的思路是直接在program.cs中添加多個觀察者,再使用Subscribe方法訂閱多個觀察者。但是輸出每次都有變動,由于不同觀察者輸出一樣也看不出明顯規律,偶爾還會由于枚舉觀察者的過程中觀察者集合變動而產生異常
這是因為InputObservable.cs中模擬輸入流的GenerateInput是Timer的回調函數。每一個觀察者在運行之后都會將Timer設為停止狀態,別的觀察者在Timer已經啟動的情況下加入不是很恰當。令人奇怪的是官方源碼在InputObservable. cs的構造函數中啟動了Timer,既然沒打算添加多個觀察者,那在GenerateInput中遍歷觀察者集合Observers的語句有什么意義?
2、修改OutputObserver,添加name屬性
2.1、新建項目C#控制臺應用HelloInsight_edit
添加如下引用:
Microsoft.ComplexEventProcessing;
Microsoft.ComplexEventProcessing.Observable;
System.Reactive;
System.Reactive.Providers;
2.2、實現接口IObserver
namespace?HelloInsight_edit
{
public?class?OutputObserver:IObserver//實現IObserver接口
{
private?string?name;
public?OutputObserver(string?name){
this.name?=?name;
}
public?virtual?void?OnCompleted()
{
Console.WriteLine("Stopping?query...");
}
public?virtual?void?OnError(Exception?e)
{
Console.WriteLine("Unexpected?error?occured");
}
public?virtual?void?OnNext(int?value)
{
Console.WriteLine("{0}觀察到的value:?{1}",?this.name,value);
}
}
}
為簡單起見,IObserver的抽象類型都使用int型,以后Main方法創建事件流的時候也會相應修改。
3、修改事件源,實現IObservable接口
我們要刪掉構造方法中的timer.change(timeSpan,timeSpan),新建了update方法,用來調用這句話。這樣可以使得多個observer都添加到observers中之后再啟動Timer。
public?class?EventSource:IObservable
{
private?List>?observers?=?new?List>();
private?readonly?int?dataNumber;
private?int?generatedNumber;
private?Random?random;
private?readonly?Timer?timer;
private?readonly?int?timeSpan;
//add
private?int?_randomNumber;
public?EventSource(int?dataNumber)
{
Console.WriteLine("我是構造方法");
this.random?=?new?Random();
this.dataNumber?=?dataNumber;
this.generatedNumber?=?0;
this.timer?=?new?Timer(GenerateInput);//callback是一個委托,表示要執行的方法
this.timeSpan?=?100;//每個隨機數字產生的時間間隔?1000ms
//timer.Change(timeSpan,?timeSpan);//此語句控制數據
this._randomNumber?=?-1;//初始化隨機數字
}
public?int?RandomNumber
{
get?{?return?_randomNumber;?}
set?{?this._randomNumber?=?value;?}
}
public?void?Update()
{
timer.Change(timeSpan,?timeSpan);
}
private?void?GenerateInput(object?_)
{
foreach?(var?observer?in?observers)
{
_randomNumber=?random.Next(100);
Console.WriteLine("Random?generated?data?{0}?:?{1}",?generatedNumber,?_randomNumber);
observer.OnNext(_randomNumber);
generatedNumber++;
if?(generatedNumber?>=?dataNumber)
{
observer.OnCompleted();
timer.Change(Timeout.Infinite,?timeSpan);
return;
}
}
timer.Change(timeSpan,?timeSpan);
}
public?void?AddObserver(IObserver?observer)
{
observers.Add(observer);
}
public?void?RemoveObserver(IObserver?observer)
{
observers.Remove(observer);
}
//必須實現的方法
public?IDisposable?Subscribe(IObserver?observer)
{
if?(observer?!=?null?&&?!observers.Contains(observer))
{
observers.Add(observer);
}
Console.WriteLine("我是subscriber");
return?observer?as?IDisposable;
}
}
4、修改program.cs
將輸入源的實例es轉化為點事件流stream,query過濾得到stream中大于50的事件流,query2過濾得到stream大于70的事件流。建立了3個觀察者roger、luffy和nami,我們用luffy觀察query,用nami觀察query2。
修好program.cs之后就可以調試了噢耶……
using?System;
using?System.Collections.Generic;
using?System.Linq;
using?System.Text;
using?System.Threading.Tasks;
//add
using?Microsoft.ComplexEventProcessing;
using?Microsoft.ComplexEventProcessing.Linq;
namespace?HelloInsight_edit
{
class?Program
{
static?void?Main(string[]?args)
{
//將EventSource類作為CEP引擎的輸入。
EventSource?es?=?new?EventSource(10);
var?server?=?Server.Create("Default");
var?application?=?server.CreateApplication("Observable?Application");
//注意以下4行,這里與適配器方式的程序不同的是,沒有插入CTI事件。
var?stream?=?es.ToPointStream(application,
e?=>?PointEvent.CreateInsert(DateTime.Now,?e),
AdvanceTimeSettings.StrictlyIncreasingStartTime,
"Observable?Stream");
var?query?=?from?e?in?stream
where?e?>?50
select?e;
OutputObserver?roger?=?new?OutputObserver("roger");
OutputObserver?luffy?=?new?OutputObserver("luffy");
OutputObserver?nami?=?new?OutputObserver("nami");
Console.WriteLine("Starting?query...");
//直接對原始流添加觀察者
//es.AddObserver(roger);?es.AddObserver(luffy);?es.AddObserver(nami);
//對newStream添加觀察者
var?newStream?=?query.ToObservable();
newStream.Subscribe(luffy);?//newStream.Subscribe(nami);//添加多個訂閱者可能會有異常
//對newStream2添加觀察者
var?query2?=?from?e?in?stream
where?e?>?70
select?e;
var?newStream2?=?query2.ToObservable();
newStream2.Subscribe(nami);
//調用timer.change(定義callback的等待時間和時間間隔)
es.Update();
Console.ReadLine();
}
}
}
運行結果:
可以看出,Subscribe兩個觀察者的操作先執行。
在遍歷觀察者集合observers的過程中,每組顯示2個隨機數。luffy和nami依次觀察第一個和第二個。
{?個人理解為newStream.Subscribe(luffy);的功能類似于一個綁定了luffy的線程,遍歷結束之后全部用戶開始依次輸出。全局變量generatedNumber負責整體次數}
這不是我們要的功能。
對于流中每個事件,不同觀察者都觀察到才行。
4.1、重寫GenerateInput(object _)
將生成隨機數的語句放到遍歷操作foreach之前
private?void?GenerateInput(object?_)
{
_randomNumber?=?random.Next(100);
if?(generatedNumber?<=?dataNumber)
{
Console.WriteLine("Random?generated?data?{0}?:?{1}",?generatedNumber,?_randomNumber);
foreach?(var?observer?in?observers)?observer.OnNext(_randomNumber);//使用最大程度實現的OnNext
}
else
{
observers.ElementAt(0).OnCompleted();
timer.Change(Timeout.Infinite,?timeSpan);
}
generatedNumber++;
timer.Change(timeSpan,?timeSpan);
}
運行結果:
可以看出,對于流中每個事件,luffy檢測到了大于50的事件,nami檢測到了大于70的事件,實現了預定的目標。
{!接下來我們要將觀察者模式、點事件流檢測和WCF(Windows Communication Foundation)相結合,實現事件源和觀察者WCF通信,便于接下來部署到網絡中}
5、參考資料
[1]IObserver接口
[2]IDisposable接口
[3]virtual方法
總結
以上是生活随笔為你收集整理的matlab cep,【CEP】重构和改进HelloInsightObservable的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab 交叉验证 代码,交叉验证(
- 下一篇: 频谱泄露 振动 matlab,关于MAT