使用storm 实时计算_使用Storm进行可扩展的实时状态更新
使用storm 實(shí)時(shí)計(jì)算
在本文中,我將說明如何借助Storm框架以可擴(kuò)展且無鎖定的方式在數(shù)據(jù)庫中維護(hù)實(shí)時(shí)事件驅(qū)動(dòng)流程的當(dāng)前狀態(tài)。
Storm是基于事件的數(shù)據(jù)處理引擎。 它的模型依賴于基本原語,例如事件轉(zhuǎn)換,過濾,聚合……,我們將它們組合成拓?fù)?。 拓?fù)涞膱?zhí)行通常分布在多個(gè)節(jié)點(diǎn)上,并且風(fēng)暴群集還可以并行執(zhí)行給定拓?fù)涞亩鄠€(gè)實(shí)例。 因此,在設(shè)計(jì)時(shí),必須牢記哪些Storm原語在分區(qū)范圍內(nèi)執(zhí)行,即在一個(gè)群集節(jié)點(diǎn)的級(jí)別上執(zhí)行,以及哪些在群集范圍內(nèi)執(zhí)行(又稱為重新分區(qū)操作) ,因?yàn)樗鼈兩婕皩⑹录闹幸瞥龅木W(wǎng)絡(luò)流量。分區(qū)到分區(qū))。 Storm Trident API文檔明確提到了哪些功能做什么,作用范圍如何。 Storm的分區(qū)概念與Kafka隊(duì)列的分區(qū)概念保持一致, Kafka隊(duì)列是入站事件的常見來源。
拓?fù)渫ǔP枰S護(hù)一些執(zhí)行的持續(xù)狀態(tài)。 例如,這可以是一些傳感器值的滑動(dòng)窗口平均值,從推文中提取的近期情緒,在不同位置出現(xiàn)的人數(shù)。……由于某些狀態(tài)更新操作具有分區(qū)范圍(例如partitionAggregate ),因此可伸縮性模型在這里尤為重要。其他則具有集群范圍(例如groupby + perstitentAggregate的組合)。 這篇文章中說明了這一點(diǎn)。
示例代碼在githup上可用 。 它基于Storm 0.8.2,Cassandra 1.2.5和JDK 1.7.0。 請(qǐng)注意,該示例未包含適當(dāng)?shù)腻e(cuò)誤處理:噴口或螺栓均不支持重試失敗的元組,我將在以后的文章中解決。 另外,我使用Java序列化將數(shù)據(jù)存儲(chǔ)在元組中,因此,即使Storm支持多種語言,我的示例也是特定于Java的。
實(shí)際示例:出席事件
我的示例是模擬一個(gè)跟蹤人們?cè)诮ㄖ飪?nèi)位置的系統(tǒng)。 每當(dāng)用戶進(jìn)入或離開房間時(shí),每個(gè)房間入口處的傳感器都會(huì)發(fā)出如下事件:
{"eventType": "ENTER", "userId": "John_5", "time": 1374922058918, "roomId": "Cafetaria", "id": "bf499c0bd09856e7e0f68271336103e0A", "corrId": "bf499c0bd09856e7e0f68271336103e0"} {"eventType": "ENTER", "userId": "Zoe_15", "time": 1374915978294, "roomId": "Conf1", "id": "3051649a933a5ca5aeff0d951aa44994A", "corrId": "3051649a933a5ca5aeff0d951aa44994"} {"eventType": "LEAVE", "userId": "Jenny_6", "time": 1374934783522, "roomId": "Conf1", "id": "6abb451d45061968d9ca01b984445ee8B", "corrId": "6abb451d45061968d9ca01b984445ee8"} {"eventType": "ENTER", "userId": "Zoe_12", "time": 1374921990623, "roomId": "Hall", "id": "86a691490fff3fd4d805dce39f832b31A", "corrId": "86a691490fff3fd4d805dce39f832b31"} {"eventType": "LEAVE", "userId": "Marie_11", "time": 1374927215277, "roomId": "Conf1", "id": "837e05916349b42bc4c5f65c0b2bca9dB", "corrId": "837e05916349b42bc4c5f65c0b2bca9d"} {"eventType": "ENTER", "userId": "Robert_8", "time": 1374911746598, "roomId": "Annex1", "id": "c461a50e236cb5b4d6b2f45d1de5cbb5A", "corrId": "c461a50e236cb5b4d6b2f45d1de5cbb5"}對(duì)(“ ENTER”和“ LEAVE”)對(duì)中的每個(gè)事件與一個(gè)房間內(nèi)一個(gè)用戶的一個(gè)占用時(shí)間段相對(duì)應(yīng)。 這可能對(duì)傳感器提出了很多要求,但是出于本示例的目的,這使我的生活更加輕松 。
為了使事情變得有趣,讓我們想象一下,不能保證到達(dá)我們服務(wù)器的事件遵循時(shí)間順序(請(qǐng)參見生成事件的python腳本中的shuffle()調(diào)用)。
我們將構(gòu)建一個(gè)Storm拓?fù)?#xff0c;該拓?fù)鋵?gòu)建每個(gè)房間的每分鐘每分鐘的占用時(shí)間線,如本文結(jié)尾處的時(shí)間圖所示。 在數(shù)據(jù)庫中,房間時(shí)間線被切成一個(gè)小時(shí)的時(shí)間段,這些時(shí)間段被獨(dú)立存儲(chǔ)和更新。 這是Cafetaria占用1小時(shí)的示例:
{"roomId":"Cafetaria","sliceStartMillis":1374926400000,"occupancies":[11,12,12,12,13,15,15,14,17,18,18,19,20,22,22,22,21,22,23,25,25,25,28,28,33,32,31,31,29,28,27,27,25, 22,22,21,20,19,19,19,17,17,16,16,15,15,16,15,14,13,13,12,11,10,9,11,10,9,11,10]}為了實(shí)現(xiàn)這一點(diǎn),我們的拓?fù)湫枰?#xff1a;
- 根據(jù)correlationID重新組合“ ENTER”和“ LEAVE”事件,并為此用戶在此房間中產(chǎn)生相應(yīng)的存在時(shí)間
- 將每個(gè)在場(chǎng)期間的影響應(yīng)用于房間入住時(shí)間表
順便說一句,Cassandra提供了Counter列 ,盡管我可以很好地替代它們,但我在這里不使用它們。 但是,我的目的是說明Storm功能,即使它會(huì)使方法有些虛構(gòu)。
分組依據(jù)/ persistentAggregate / iBackingMap說明
在查看示例代碼之前,讓我們澄清一下這些“三叉戟風(fēng)暴”原語如何協(xié)同工作。
想象一下,我們從上午9:47到上午10:34收到了兩個(gè)描述用戶在roomA中存在??的事件。 更新會(huì)議室的時(shí)間表需要:
- 從數(shù)據(jù)庫加載兩個(gè)受影響的時(shí)間軸切片:[9.00am,10:00 am]和[10.00am,11:00 am]
- 在這兩個(gè)時(shí)間軸切片中添加此用戶的狀態(tài)
- 將它們保存到數(shù)據(jù)庫
但是,像這樣天真地實(shí)現(xiàn)此目標(biāo)并不是最佳選擇,首先是因?yàn)樗總€(gè)事件使用兩個(gè)DB請(qǐng)求,其次是因?yàn)檫@種“讀取-更新-寫入”序列通常需要一種鎖定機(jī)制,這種鎖定機(jī)制通常無法很好地?cái)U(kuò)展。
為了解決第一點(diǎn),我們想為幾個(gè)事件重新組合數(shù)據(jù)庫操作。 在Storm中,事件(或元組 )被成批處理。 IBackingMap是一個(gè)我們可以實(shí)現(xiàn)的原語,它使我們可以立即查看整批元組。 我們將使用它在批處理的開始(multiget)和結(jié)束時(shí)的所有DB-write操作(multiput)重新分組。 但是,multiget不允許我們查看元組本身,而只能查看“查詢鍵”,這是根據(jù)元組內(nèi)容計(jì)算出來的,如下所述。
原因在于上面提到的關(guān)于天真的實(shí)現(xiàn)的第二點(diǎn):我們想并行執(zhí)行幾個(gè)[multiget +更新邏輯+ multiput]流,而不依賴鎖。 這是通過確保那些并行子流程更新不相交的數(shù)據(jù)集來實(shí)現(xiàn)的。 這就要求定義拆分成并行流的拓?fù)湓剡€控制每個(gè)流內(nèi)DB中要加載和更新的數(shù)據(jù)。 該元素是Storm groupBy原語:它通過按字段值對(duì)元組進(jìn)行分組來定義拆分,并且它通過將“ groupedBy”值作為對(duì)multiget的查詢關(guān)鍵字來控制每個(gè)并行流更新的數(shù)據(jù)。
下圖在房間占用示例中對(duì)此進(jìn)行了說明(簡(jiǎn)化為每個(gè)房間僅存儲(chǔ)一個(gè)時(shí)間線,而不是每個(gè)一小時(shí)的時(shí)間片一個(gè)時(shí)間線):
但是,并行性并沒有完全發(fā)生(例如,當(dāng)前的Storm實(shí)現(xiàn)在分組流中依次調(diào)用每個(gè)reducer / combiner),但這是設(shè)計(jì)拓?fù)鋾r(shí)要牢記的一個(gè)好模型。
有趣的是,在groupBy和multiget之間發(fā)生了一些Storm魔術(shù)。 回想一下,Storm旨在進(jìn)行大規(guī)模分布,這意味著每個(gè)流在多個(gè)節(jié)點(diǎn)上并行執(zhí)行,從諸如Hadoop HDFS或分布式Kafka隊(duì)列之類的分布式數(shù)據(jù)源獲取輸入數(shù)據(jù)。 這意味著groupBy()同時(shí)在多個(gè)節(jié)點(diǎn)上執(zhí)行,所有可能處理的事件都需要組合在一起。 groupBy是一個(gè)重新分區(qū)操作 ,可確保將所有需要分組的事件發(fā)送到同一節(jié)點(diǎn),并由IBackingMap +組合器或約簡(jiǎn)器的同一實(shí)例處理,因此不會(huì)發(fā)生爭(zhēng)用情況。
同樣,Storm要求我們將IBackingMap包裝到可用的Storm MapState原語(或我們自己的原語)之一中,通常用于處理失敗/重播的元組。 如上所述,我不在本文中討論這一方面。
使用這種方法,我們必須實(shí)現(xiàn)IBackingMap,以便它尊重以下屬性:
- 對(duì)于不同的鍵值,由multiget讀取和由IBackingMap的multiput操作寫入的數(shù)據(jù)庫行必須是不同的。
我想這就是他們將這些值稱為“關(guān)鍵”的原因 (盡管任何尊重此屬性的方法都可以)。
回到例子
讓我們看看這在實(shí)踐中是如何工作的。 該示例的主要拓?fù)湓诖颂幙捎?#xff1a;
// reading events .newStream("occupancy", new SimpleFileStringSpout("data/events.json", "rawOccupancyEvent")) .each(new Fields("rawOccupancyEvent"), new EventBuilder(), new Fields("occupancyEvent"))第一部分只是讀取JSON格式的輸入事件(我正在使用簡(jiǎn)單的文件輸出),對(duì)它們進(jìn)行反序列化,然后使用Java序列化將它們放入稱為“ occupancyEvent”的元組字段中。 這些元組中的每一個(gè)都描述了用戶在房間內(nèi)或房間外的“ ENTER”或“ LEAVE”事件。
// gathering "enter" and "leave" events into "presence periods" .each(new Fields("occupancyEvent"), new ExtractCorrelationId(), new Fields("correlationId")) .groupBy(new Fields("correlationId")) .persistentAggregate( PeriodBackingMap.FACTORY, new Fields("occupancyEvent"), new PeriodBuilder(), new Fields("presencePeriod")) .newValuesStream()當(dāng)我們遇到correlationId的不同值時(shí),groupBy原語會(huì)創(chuàng)建盡可能多的元組組(這可能意味著很多,因?yàn)橥ǔW疃鄡蓚€(gè)事件具有相同的correlationId)。 當(dāng)前批處理中具有相同相關(guān)ID的所有元組將重新組合在一起,并且一組或幾組元組將一起呈現(xiàn)給persistentAggregate中定義的元素。 PeriodBackingMap是IBackingMap的實(shí)現(xiàn),其中實(shí)現(xiàn)了multiget方法,該方法將接收下一步將要處理的元組組的所有相關(guān)ID(例如:{“ roomA”,“ roomB”,“ Hall ”},如上圖所示)。
public List<RoomPresencePeriod> multiGet(List<List<Object>> keys) {return CassandraDB.DB.getPresencePeriods(toCorrelationIdList(keys)); }該代碼只需要從數(shù)據(jù)庫中檢索每個(gè)相關(guān)ID的潛在存在期間即可。 因?yàn)槲覀儗?duì)一個(gè)元組字段進(jìn)行了groupBy,所以每個(gè)List在這里都包含一個(gè)單個(gè)String:correlationId。 請(qǐng)注意,我們返回的列表必須與鍵列表的大小完全相同,以便Storm知道哪個(gè)周期對(duì)應(yīng)于哪個(gè)鍵。 因此,對(duì)于數(shù)據(jù)庫中不存在的任何鍵,我們只需在結(jié)果列表中放置一個(gè)空值即可。
一旦加載,Storm就會(huì)將一個(gè)具有相同相關(guān)性ID的元組一個(gè)一個(gè)地呈現(xiàn)給我們的化簡(jiǎn)器PeriodBuilder 。 在我們的例子中,我們知道在此批次中,每個(gè)唯一的relativeId最多被調(diào)用兩次,但是一般來說可能更多,或者如果當(dāng)前批次中不存在其他ENTER / LEAVE事件,則僅被調(diào)用一次。 在對(duì)muliget()/ multiput()的調(diào)用與我們的reducer之間,借助我們選擇的MapState實(shí)現(xiàn),Storm讓我們可以插入適當(dāng)?shù)倪壿媮碇胤畔惹笆〉脑M。 在以后的文章中有更多的信息……
一旦我們減少了每個(gè)元組序列,Storm就會(huì)將結(jié)果傳遞給IBackingMap的mulitput(),在這里我們只是將所有內(nèi)容“追加”到數(shù)據(jù)庫:
public void multiPut(List<List<Object>> keys, List<RoomPresencePeriod> newOrUpdatedPeriods) {CassandraDB.DB.upsertPeriods(newOrUpdatedPeriods); }Storm persistenceAggregate使用我們的化簡(jiǎn)提供給multitput()的值,自動(dòng)將其發(fā)送到拓?fù)湓M的后續(xù)部分。 這意味著我們剛剛建立的在線狀態(tài)很容易作為元組字段使用,我們可以使用它們直接更新會(huì)議室時(shí)間線:
// building room timeline .each(new Fields("presencePeriod"), new IsPeriodComplete()) .each(new Fields("presencePeriod"), new BuildHourlyUpdateInfo(), new Fields("roomId", "roundStartTime")) .groupBy(new Fields("roomId", "roundStartTime")) .persistentAggregate( TimelineBackingMap.FACTORY, new Fields("presencePeriod","roomId", "roundStartTime"), new TimelineUpdater(), new Fields("hourlyTimeline"))第一行只是過濾掉尚未包含“ ENTER”和“ LEAVE”事件的任何期間。
然后, BuildHourlyUpdateInfo實(shí)現(xiàn)一對(duì)多的元組發(fā)射邏輯:對(duì)于每個(gè)占用期,它僅在“開始時(shí)間”內(nèi)發(fā)射一個(gè)元組。 例如,從9:47 am到10:34 am在roomA中的占用將在此處觸發(fā)針對(duì)RoomA的9.00am時(shí)間軸切片的元組的發(fā)射,以及另一個(gè)針對(duì)10.00am的元組的發(fā)射。
下一部分實(shí)現(xiàn)了與以前相同的groupBy / IBackingMap方法,只是這次使用了兩個(gè)分組鍵而不是一個(gè)(因此,mulitget中的List <Object>將包含兩個(gè)值:一個(gè)String和一個(gè)Long)。 由于我們存儲(chǔ)一個(gè)小時(shí)的時(shí)間軸塊,因此上述IBackingMap的必要屬性得到了尊重。 多重獲取為每個(gè)(“ roomId”,“開始時(shí)間”)對(duì)檢索時(shí)間線塊,然后TimelineUpdater (再次使用reducer)用與當(dāng)前批次中找到的該時(shí)間線片相對(duì)應(yīng)的每個(gè)存在時(shí)間更新時(shí)間線片(這就是BuildHourlyUpdateInfo的一對(duì)多元組發(fā)射邏輯)和multiput()僅保存結(jié)果。
導(dǎo)致咖啡廳占用
當(dāng)我們看著它時(shí),一切總是更加美麗,所以讓我們來繪制房間的占用情況 。 稍加一些R代碼 ,我們就可以一分鐘一分鐘地看到房間的占用情況(這并不意味著什么,因?yàn)樗袛?shù)據(jù)都是隨機(jī)的,但是……):
結(jié)論
希望本文能為維護(hù)Storm拓?fù)渲械臓顟B(tài)提供一種有用的方法。 我還嘗試說明了將處理邏輯實(shí)現(xiàn)為小型拓?fù)湓氐膶?shí)現(xiàn),將其彼此插入,而不是將一些“冗長(zhǎng)的螺栓”捆綁在冗長(zhǎng)而復(fù)雜的邏輯部分上。
Storm的一個(gè)重要方面是它的可擴(kuò)展性,很可能去插入它的子類或在任何地方插入它的子類來調(diào)整其行為。 春天有十年前的那種聰明而有趣的感覺(哦,該死,我現(xiàn)在有點(diǎn)老了……^ __ ^)
參考:來自Svend博客的 JCG合作伙伴 Svend Vanderveken 使用Storm進(jìn)行的可伸縮實(shí)時(shí)狀態(tài)更新 。翻譯自: https://www.javacodegeeks.com/2013/08/scalable-real-time-state-update-with-storm.html
使用storm 實(shí)時(shí)計(jì)算
總結(jié)
以上是生活随笔為你收集整理的使用storm 实时计算_使用Storm进行可扩展的实时状态更新的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 计算机网络dce是什么意思,DTE与DC
- 下一篇: Linux操作系统学习02