apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)
apache ignite
 在本文中, “使用Apache Ignite進行高性能內存計算”一書的作者將討論使用Apache Strom和Apache Ignite進行復雜的事件處理。 本文的一部分摘自 
 書 。 
術語“復雜事件處理”或CEP沒有廣泛或高度接受的定義。 Wikipedia的以下引用可以簡要描述什么是復雜事件處理:
“復雜事件處理(CEP)主要是一個事件處理概念,用于處理多個事件,以識別事件云中有意義的事件為目標。 CEP采用的技術包括檢測許多事件的復雜模式,事件相關性和抽象性,事件層次結構以及事件之間的因果關系,成員關系和時間安排以及事件驅動過程。
為簡單起見,復雜事件處理(CEP)是一種用于在真實世界中永不停止或流式傳輸事件數據的低延遲過濾,聚合和計算的技術。 在IT環境中,原始基礎結構和業務事件的數量和速度均呈指數級增長。 此外,移動設備的爆炸式增長和高速連接的普遍性加劇了移動數據的爆炸式增長。 同時,對業務流程敏捷性和執行的需求僅在增長。 這兩個趨勢給組織施加了壓力,要求它們提高其能力以支持事件驅動的實施架構模式。 實時事件處理需要基礎架構和應用程序開發環境來執行事件處理要求。 這些要求通常包括從日常使用案例擴展到極高的速度或各種數據和事件吞吐量的需求,潛在的延遲時間以微秒為單位,而不是響應時間的秒數。
Apache Ignite允許在內存中以可伸縮且容錯的方式處理連續不斷的數據流,而不是在數據到達數據庫后對其進行分析。 這不僅使您能夠關聯關系并從大量數據中檢測有意義的模式,還可以更快,更高效地完成此操作。 事件歷史記錄可以在內存中保留任何時間長度(對于長時間運行的事件序列至關重要),也可以作為事務記錄在存儲的數據庫中。
Apache Ignite CEP可以在眾多行業中使用,以下是一些一流的用例:
還有更多的工業或功能領域,您可以在其中使用Apache Ignite處理流事件數據,例如保險,運輸和公共部門。 復雜事件處理或CEP包含其過程的三個主要部分:
如上圖所示,數據是從不同來源獲取的。 源可以是任何傳感器(IoT),Web應用程序或行業應用程序。 可以直接在Ignite群集上以收集方式并發處理流數據。 另外,數據可以從其他來源豐富或過濾掉。 計算數據后,可以將計算或匯總的數據導出到其他系統以進行可視化或采取措施。
Apache Ignite Storm Streamer模塊提供了通過Storm到Ignite緩存的流傳輸。 在開始使用Ignite流媒體之前,讓我們看一下Apache Storm,以獲取有關Apache Storm的一些基礎知識。
Apache Storm是一個分布式容錯實時計算系統。 在短時間內,Apache Storm成為分布式實時處理系統的標準,該系統使您可以處理大量數據。 Apache Storm項目是開源的,用Java和Clojure編寫。 它成為實時分析的首選。 Apache Ignite Storm流媒體模塊提供了一種方便的方法,可通過Storm將數據流傳輸到Ignite緩存。
關鍵概念:
Apache Storm從一端讀取??原始數據流,并將其通過一系列小型處理單元,然后在另一端輸出處理后的信息。 讓我們詳細了解Apache Storm的主要組件–
元組 –它是Storm的主要數據結構。 這是元素的有序列表。 通常,元組支持所有基本數據類型。
流 –這是一個無界且無序的元組序列。
嘴 -流的來源,簡單來說,壺嘴從拓撲中的源讀取數據。 噴嘴可以是可靠的或不可靠的。 噴口可以與隊列,Web日志,事件數據等對話。
螺栓 –螺栓是邏輯處理單元,它負責處理數據和創建新的流。 螺栓可以執行過濾,聚合,聯接,與文件/數據庫交互等操作。 螺栓從噴嘴接收數據,然后發射到一個或多個螺栓。
拓撲 –拓撲是“噴口和螺栓”的有向圖,該圖的每個節點都包含數據處理邏輯(螺栓),而連接邊定義數據(流)的流。
與Hadoop不同,Storm可使拓撲永久運行直到您將其殺死。 一個簡單的拓撲結構從噴口開始,從源頭發射流到螺栓以處理數據。 Apache Storm的主要工作是運行拓撲,并將在給定的時間運行任意數量的拓撲。
開箱即用的Ignite提供了Storm Bolt(StormStreamer)的實現,以將計算的數據流式傳輸到Ignite緩存中。 另一方面,您可以記下自定義的Strom Bolt,以將流數據提取到Ignite中。 要開發自定義的Storm Bolt,只需實現* BaseBasicBolt *或* IRichBolt * Storm接口。 但是,如果決定使用StormStreamer,則必須配置一些屬性才能正確運行Ignite Bolt。 所有必填屬性如下所示:
| 1個 | 快取名稱 | 將在其中存儲數據的Ignite緩存的緩存名稱。 | 
| 2 | IgniteTupleField | 命名“點燃元組”字段,通過它在拓撲中獲取元組數據。 默認情況下,該值為ignite。 | 
| 3 | IgniteConfigFile | 此屬性將設置Ignite彈簧配置 文件。 允許您向和發送消息和使用消息 從點燃主題。 | 
| 4 | 允許覆蓋 | 它將啟用覆蓋緩存中的現有值,默認值為false。 | 
| 5 | 自動刷新頻率 | 自動刷新頻率(以毫秒為單位)。 從本質上講,這是拖纜將在 嘗試將到目前為止添加的所有數據提交到遠程 節點。 默認值為10秒。 | 
掌握了基礎知識之后,我們來構建一些有用的工具來檢查Ignite StormStreamer的工作方式。 該應用程序的基本思想是設計噴嘴和螺栓的一種拓撲,該拓撲可以處理來自交通日志文件的大量數據,并在特定值超過預定義閾值時觸發警報。 使用拓撲,可以逐行讀取日志文件,并且該拓撲旨在監視傳入的數據。 在我們的例子中,日志文件將包含數據,例如車輛注冊號,速度和來自高速公路交通攝像頭的高速公路名稱。 如果車輛超過速度限制(例如120km / h),Storm拓撲會將數據發送到Ignite緩存。
接下來的清單將顯示我們將在示例中使用的CSV文件類型,其中包含車輛數據信息,例如車輛注冊號,車輛行駛的速度和高速公路的位置。
AB 123, 160, North city BC 123, 170, South city CD 234, 40, South city DE 123, 40, East city EF 123, 190, South city GH 123, 150, West city XY 123, 110, North city GF 123, 100, South city PO 234, 140, South city XX 123, 110, East city YY 123, 120, South city ZQ 123, 100, West city 以上示例的思想取自Dobbs博士的期刊。 由于本書不是為了研究Apache Storm,因此我將使示例盡可能簡單。 另外,我還添加了著名的Storm單詞計數示例,該示例通過StormStreamer模塊將單詞計數值提取到Ignite緩存中。 如果您對代碼感到好奇,可以通過以下網址獲得 
 Chapter-cep / storm 。 上面的CSV文件將成為Storm拓撲的來源。 
如上圖所示, FileSourceSpout接受輸入的CSV日志文件,逐行讀取數據,并將數據發送到SpeedLimitBolt以進行進一步的閾值處理。 處理完成后,如果發現有任何超過速度限制的汽車,則將數據發送到Ignite StormStreamer螺栓,然后將其提取到緩存中。 讓我們深入了解Storm拓撲。
第1步:
因為這是一個Storm拓撲,所以必須在maven項目中添加Storm和Ignite StormStreamer依賴項。
<dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-storm</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-core</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.ignite</groupId><artifactId>ignite-spring</artifactId><version>1.6.0</version> </dependency> <dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>0.10.0</version><exclusions><exclusion><groupId>log4j</groupId><artifactId>log4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions> </dependency>在撰寫本書時,僅支持Apache Storm版本0.10.0。 請注意,按照Ignite文檔中的描述,您不需要任何Kafka模塊即可運行或執行此示例。
第2步:
創建的Ignite配置文件(見例如,ignite.xml文件/chapter-cep/storm/src/resources/example-ignite.xml ),并確保它是可以從類路徑。 Ignite配置的內容與本章的上一部分相同。
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:util="http://www.springframework.org/schema/util"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/utilhttp://www.springframework.org/schema/util/spring-util.xsd"><bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"><!-- Enable client mode. --><property name="clientMode" value="true"/><!-- Cache accessed from IgniteSink. --><property name="cacheConfiguration"><list><!-- Partitioned cache example configuration with configurations adjusted to server nodes'. --><bean class="org.apache.ignite.configuration.CacheConfiguration"><property name="atomicityMode" value="ATOMIC"/><property name="name" value="testCache"/></bean></list></property><!-- Enable cache events. --><property name="includeEventTypes"><list><!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). --><util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/></list></property><!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. --><property name="discoverySpi"><bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"><property name="ipFinder"><bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"><property name="addresses"><list><value>127.0.0.1:47500</value></list></property></bean></property></bean></property></bean> </beans>第三步:
創建一個ignite-storm.properties文件,以添加緩存名稱,元組名稱和Ignite配置的名稱,如下所示。
cache.name=testCache tuple.name=ignite ignite.spring.xml=example-ignite.xml第4步:
接下來,創建FileSourceSpout Java類,如下所示,
public class FileSourceSpout extends BaseRichSpout {private static final Logger LOGGER = LogManager.getLogger(FileSourceSpout.class);private SpoutOutputCollector outputCollector;@Overridepublic void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.outputCollector = spoutOutputCollector;} @Overridepublic void nextTuple() {try {Path filePath = Paths.get(this.getClass().getClassLoader().getResource("source.csv").toURI());try(Stream<String> lines = Files.lines(filePath)){lines.forEach(line ->{outputCollector.emit(new Values(line));});} catch(IOException e){LOGGER.error(e.getMessage());}} catch (URISyntaxException e) {LOGGER.error(e.getMessage());}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("trafficLog"));} }FileSourceSpout代碼具有三種重要方法
- open():此方法將在spout的開頭被調用,并為您提供上下文信息。
- nextTuple():此方法將允許您一次將一個元組傳遞給Storm拓撲以進行處理,在這種方法中,我逐行讀取CSV文件,并將該行作為元組發出給螺栓。
- defineOutputFields():此方法聲明輸出元組的名稱,在本例中,名稱應為trafficLog。
步驟5:
現在創建實現BaseBasicBolt接口的SpeedLimitBolt.java類。
public class SpeedLimitBolt extends BaseBasicBolt {private static final String IGNITE_FIELD = "ignite";private static final int SPEED_THRESHOLD = 120;private static final Logger LOGGER = LogManager.getLogger(SpeedLimitBolt.class);@Overridepublic void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {String line = (String)tuple.getValue(0);if(!line.isEmpty()){String[] elements = line.split(",");// we are interested in speed and the car registration numberint speed = Integer.valueOf((elements[1]).trim());String car = elements[0];if(speed > SPEED_THRESHOLD){TreeMap<String, Integer> carValue = new TreeMap<String, Integer>();carValue.put(car, speed);basicOutputCollector.emit(new Values(carValue));LOGGER.info("Speed violation found:"+ car + " speed:" + speed);}}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields(IGNITE_FIELD));} }讓我們再次逐行進行。
- execute():這是實現螺栓的業務邏輯的方法,在這種情況下,我用逗號分隔行并檢查汽車的速度限制。 如果給定汽車的速度限制高于閾值,則我們將從該元組創建新的樹圖數據類型,并將該元組發送到下一個螺栓,在本例中,下一個螺栓將是StormStreamer。
- defineOutputFields():此方法類似于FileSourceSpout中的clarifyOutputFields()方法,它聲明將返回Ignite元組以進行進一步處理。
請注意,元組名稱IGNITE在這里很重要, StormStreamer將僅處理名稱為Ignite的元組。
步驟6:
現在是時候創建我們的拓撲來運行我們的示例了。 拓撲將噴口和螺栓綁在一張圖中,該圖定義了數據如何在組件之間流動。 它還提供了Storm在創建集群中組件實例時使用的并行提示。 要實現拓撲,請在src \ main \ java \ com \ blu \ imdg \ storm \ topology目錄中創建一個名為SpeedViolationTopology.java的新文件。 使用以下內容作為文件的內容:
public class SpeedViolationTopology {private static final int STORM_EXECUTORS = 2;public static void main(String[] args) throws Exception {if (getProperties() == null || getProperties().isEmpty()) {System.out.println("Property file <ignite-storm.property> is not found or empty");return;}// Ignite Stream Iboltfinal StormStreamer<String, String> stormStreamer = new StormStreamer<>();stormStreamer.setAutoFlushFrequency(10L);stormStreamer.setAllowOverwrite(true);stormStreamer.setCacheName(getProperties().getProperty("cache.name"));stormStreamer.setIgniteTupleField(getProperties().getProperty("tuple.name"));stormStreamer.setIgniteConfigFile(getProperties().getProperty("ignite.spring.xml"));TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new FileSourceSpout(), 1);builder.setBolt("limit", new SpeedLimitBolt(), 1).fieldsGrouping("spout", new Fields("trafficLog"));// set ignite boltbuilder.setBolt("ignite-bolt", stormStreamer, STORM_EXECUTORS).shuffleGrouping("limit");Config conf = new Config();conf.setDebug(false);conf.setMaxTaskParallelism(1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("speed-violation", conf, builder.createTopology());Thread.sleep(10000);cluster.shutdown();}private static Properties getProperties() {Properties properties = new Properties();InputStream ins = SpeedViolationTopology.class.getClassLoader().getResourceAsStream("ignite-storm.properties");try {properties.load(ins);} catch (IOException e) {e.printStackTrace();properties = null;}return properties;} }讓我們再次逐行進行。 首先,我們閱讀ignite-strom.properties文件以獲取所有必要的參數,然后再配置StormStreamer螺栓。 風暴拓撲基本上是一個Thrift結構。 TopologyBuilder類提供了一種簡單而優雅的方法來構建復雜的Storm拓撲。 TopologyBuilder類具有setSpout和setBolt的方法。 接下來,我們使用“拓撲”構建器構建Storm拓撲,并添加了帶有名稱spout和1執行程序的并行提示的spout。
我們還將SpeedLimitBolt定義為具有1個執行程序的并行提示的拓撲。 接下來,我們使用shufflegrouping設置StormStreamer螺栓, shufflegrouping訂閱該螺栓,并在StormStreamer螺栓的各個實例之間平均分配元組(限制)。
出于開發目的,我們使用LocalCluster實例創建本地集群,并使用SubmitTopology方法提交拓撲。 將拓撲提交到集群后,我們將等待10秒鐘,等待集群計算提交的拓撲,然后使用LocalCluster的 shutdown方法關閉集群。
步驟7:
接下來,首先運行Apache Ignite或集群的本地節點。 構建maven項目后,使用以下命令在本地運行拓撲。
mvn compile exec:java -Dstorm.topology=com.blu.imdg.storm.topology.SpeedViolationTopology該應用程序將產生很多系統日志,如下所示。
現在,如果我們通過ignitevisior驗證了Ignite緩存,我們應該將以下輸出輸出到控制臺中。
輸出顯示結果,這是我們期望的。 從我們的source.csv日志文件中,只有五輛車超過了120 km / h的速度限制。
這幾乎是對Ignite Storm Streamer的實用概述的總結。 如果您對Ignite Camel或Ignite Flume Streamer感到好奇,請參閱“使用Apache Ignite進行高性能內存計算”一書 。 您也可以與作者聯系以獲取該書的免費副本,該書可以免費分發給學生和教師。
翻譯自: https://www.javacodegeeks.com/2016/10/complex-event-processing-cep-apache-storm-apache-ignite.html
apache ignite
總結
以上是生活随笔為你收集整理的apache ignite_使用Apache Storm和Apache Ignite进行复杂事件处理(CEP)的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: kata_小规模流处理kata。 第2部
- 下一篇: 创建文件夹Linux命令(创建文件夹 l
