storm 机器上日志查询_Storm原理与实践大数据技术栈14
回顧:大數據平臺技術棧?(ps:可點擊查看),今天就來說說其中的Storm!
來自:有米加瓦
一、Storm簡介
1. 引例
在介紹Storm之前,我們先看一個日志統計的例子:假如我們想要根據用戶的訪問日志統計使用斗魚客戶端的用戶的地域分布情況,一般情況下我們會分這幾步:
取出訪問日志中客戶端的IP
把IP轉換成對應地域
按照地域進行統計
Hadoop貌似就可以輕松搞定:
map做ip提取,轉換成地域
reduce以地域為key聚合,計數統計
從HDFS取出結果
如果有時效性要求呢?
小時級:還行,每小時跑一個MapReduce Job
10分鐘:還湊合能跑
5分鐘 :夠嗆了,等槽位可能要幾分鐘呢
1分鐘 :算了吧,啟動Job就要幾十秒呢
秒級 :… 要滿足秒級別的數據統計需求,需要
進程常駐運行;
數據在內存中
Storm正好適合這種需求。
2. 特性
Storm是一個分布式實時流式計算平臺。主要特性如下:
簡單的編程模型:類似于MapReduce降低了并行批處理復雜性,Storm降低了實時處理的復雜性,只需實現幾個接口即可(Spout實現ISpout接口,Bolt實現IBolt接口)。
支持多種語言:你可以在Storm之上使用各種編程語言。默認支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現一個簡單的Storm通信協議即可。
容錯性:nimbus、supervisor都是無狀態的, 可以用kill -9來殺死Nimbus和Supervisor進程, 然后再重啟它們,任務照常進行; 當worker失敗后, supervisor會嘗試在本機重啟它。
分布式:計算是在多個線程、進程和服務器之間并行進行的。
持久性、可靠性:消息被持久化到本地磁盤,并且支持數據備份防止數據丟失。
可靠的消息處理:Storm保證每個消息至少能得到一次完整處理。任務失敗時,它會負責從消息源重試消息(ack機制)。
快速、實時:Storm保證每個消息能能得到快速的處理。
3. 與常用其他大數據計算平臺對比
Storm vs. MapReduce Storm的一個拓撲常駐內存運行,MR作業運行完了進行就被kill了;storm是流式處理,MR是批處理;Storm數據在內存中不寫磁盤,而MR會與磁盤進行交互;Storm的DAG(有向無環圖)模型可以組合多個階段,而MR只可以有MAP和REDUCE兩個階段。
Storm vs. Spark Streaming Storm處理的是每次傳入的一條數據,Spark Streaming實際處理的是微批量數據。
二、Storm的架構和運行時原理
1. 集群架構
如上圖所示,一個典型的storm集群包含一個主控節點Nimbus,負責資源分配和任務調度;還有若干個子節點Supervisor,負責接受nimbus分配的任務,啟動和停止屬于自己管理的worker進程;Nimbus和Supervisor之間的所有協調工作都是通過Zookeeper集群完成。
2. Storm的容錯(Fault Tolerance)機制
Nimbus和Supervisor進程被設計成快速失敗(fail fast)的(當遇到異常的情況,進程就會掛掉)并且是無狀態的(狀態都保存在Zookeeper或者在磁盤上)。
Nimbus與Supervisor本身也是無狀態的,狀態信息是由zookeeper存儲(實現了高可用,當nimbus掛掉,可以找另外一個節點啟動nimbus進程,狀態信息從zookeeper獲得)。
在Nimbus進程失敗后,可以快速重啟恢復正常工作,不需要很長的時間來進行初始化和狀態恢復。
當Nimbus從zookeeper得知有supervisor節點掛掉,可以將該節點的任務重新分配給其他子節點。
Nimbus在“某種程度”上屬于單點故障的。在實際中,即使Nimbus進程掛掉,也不會有災難性的事情發生 。
當Nimbus掛掉會怎樣?
已經存在的拓撲可以繼續正常運行,但是不能提交新拓撲;
正在運行的worker進程仍然可以繼續工作。而且當worker掛掉,Supervisor會一直重啟worker。
失敗的任務不會被分配到其他機器(是Nimbus的職責)上了
當一個Supervisor(slave節點)掛掉會怎樣?
分配到這臺機器的所有任務(task)會超時,Nimbus會把這些任務(task)重新分配給其他機器。當一個worker掛掉會怎么樣?
當一個worker掛掉,Supervisor會重啟它。如果啟動一直失敗那么此時worker也就不能和Nimbus保持心跳了,Nimbus會重新分配worker到其他機器
3. Storm的編程模型
Strom在運行中可分為spout與bolt兩個組件,其中,數據源從spout開始,數據以tuple的方式發送到bolt,多個bolt可以串連起來,一個bolt也可以接入多個spot/bolt。運行時Topology如下圖:
編程模型的一些基本概念:
元組
storm使用tuple(元組)來作為它的數據模型。每個tuple由一堆域(field)組成,每個域有一個值,并且每個值可以是任何類型。
一個tuple可以看作一個沒有方法的java對象。總體來看,storm支持所有的基本類型、字符串以及字節數組作為tuple的值類型。
Spout
i. BaseRichSpout是實現 IRichSpout接口的類,對上述必要的方法有默認的實現;
ii. 如果業務需要自定義ack()、fail() 等方法,選擇實現 IRichSpout接口;
iii. 如果業務沒有自定義需求,選擇繼承BaseRichSpout類,可以不實現并不一定需要用戶實現的方法,簡化開發。
i. open方法是初始化動作。允許你在該spout初始化時做一些動作,傳入了上下文,方便取上下文的一些數據。
ii. close方法在該spout關閉前執行。
iii. activate和deactivate :一個spout可以被暫時激活和關閉,這兩個方法分別在對應的時刻被調用。
iv. nextTuple 用來發射數據。Spout中最重要的方法。
v. ack(Object)傳入的Object其實是一個id,唯一表示一個tuple。該方法是這個id所對應的tuple被成功處理后執行。
vi. fail(Object)同ack,只不過是tuple處理失敗時執行。
Spout是在一個topology中產生源數據流的組件。通常情況下spout會從外部數據源中讀取數據,然后轉換為topology內部的源數據。Spout是一個主動的角色,其接口中有個nextTuple()函數,storm框架會不停地調用此函數,用戶只要在其中生成源數據即可。
實現Spout時,需要實現最頂層抽象ISpout接口里面的幾個方法
實現Spout時,還需要實現Icomponent接口,來聲明發射到下游bolt的字段名稱。
通常情況下,實現一個Spout,可以直接實現接口IRichSpout,如果不想寫多余的代碼,可以直接繼承BaseRichSpout。
Bolt
prepare方法是初始化動作。允許你在該Bolt初始化時做一些動作,傳入了上下文,方便取上下文的一些數據。
excute 用來處理數據。Bolt中最重要的方法。
cleanup在該Bolt關閉前執行.
在拓撲中所有的計算邏輯都是在Bolt中實現的。一個Bolt可以處理任意數量的輸入流,產生任意數量新的輸出流。Bolt可以做函數處理,過濾,流的合并,聚合,存儲到數據庫等操作。在Bolt中最主要的函數是execute函數,它使用一個新的元組當作輸入。Bolt使用OutputCollector對象來吐出新的元組。
實現Bolt時,需要實現IBolt接口,它聲明了Bolt的核心方法,負責Topology所有的計算邏輯:
實現Bolt時,還需要實現Icomponent接口,來聲明發射到下游bolt的字段名稱
通常情況下,實現一個Bolt ,可以直接實現接口IRichBolt/IBasicBolt,也可以直接繼承BaseRichBolt/BaseBasicBolt。IBasicBolt/BaseBasicBolt在emit數據的時候,會自動和輸入的tuple相關聯,而在execute方法結束的時候那個輸入tuple會被自動ack。使用IRichBolt/BaseRichBolt需要在emit數據的時候,顯示指定該數據的源tuple要加上第二個參數anchor tuple,以保持tracker鏈路,即collector.emit(oldTuple,newTuple);并且需要在execute執行成功后調用OutputCollector.ack(tuple), 當失敗處理時,執行OutputCollector.fail(tuple)。
Stream Groupings(流分組)
定義了一個流在Bolt任務間該如何被切分。
- 隨機分組(Shuffle grouping):隨機分發tuple到Bolt的任務,保證每個任務獲得相等數量的tuple。
- 字段分組(Fields grouping):根據指定字段分割數據流,并分組。例如,根據“user-id”字段,相同“user-id”的元組總是分發到同一個任務,不同“user-id”的元組可能分發到不同的任務。
- 全部分組(All grouping):tuple被復制到bolt的所有任務。這種類型需要謹慎使用。
- 全局分組(Global grouping):全部流都分配到bolt的同一個任務。明確地說,是分配給ID最小的那個task。
- 無分組(None grouping):你不需要關心流是如何分組。目前,無分組等效于隨機分組。
- 直接分組(Direct grouping):這是一個特別的分組類型。元組生產者決定tuple由哪個元組處理者任務接收。
4. Storm消息處理的可靠性機制
可靠性機制(Ack機制)指的是Storm可以保證從Spout發出的每個消息都能被完全處理。一條消息被“完整處理”,指一個從Spout發出的元組所觸發的消息樹中所有的消息都被Storm處理了。如果在指定的超時時間里,這個Spout元組觸發的消息樹中有任何一個消息沒有處理完,就認為這個Spout元組處理失敗了。這個超時時間是通過每個拓撲的Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS配置項來進行配置的,默認是30秒。
Storm 是這樣實現可靠性機制的:
Storm 的拓撲有一些特殊的稱為“acker”的任務,這些任務負責跟蹤每個 Spout 發出的 tuple 的 DAG。當一個 acker 發現一個 DAG 結束了,它就會給創建 spout tuple 的 Spout 任務發送一條消息,讓這個任務來應答這個消息。你可以使用Config.TOPOLOGY_ACKERS 來配置拓撲的 acker 數量。Storm 默認會將 acker 的數量設置為1,不過如果你有大量消息的處理需求,你可能需要增加這個數量。
acker任務跟蹤一個元組樹,只占用固定大小的空間(大約20字節)。若采用 Ack機制,每個處理的tuple, 必須被ack或者fail。因為storm追蹤每個tuple要占用內存。所以如果不ack/fail每一個tuple, 那么最終你會看到OutOfMemory錯誤。
編程實現(必要條件):acker數設置大于0;Spout發送元組時,指定messageId;bolt處理完元組時,一定要調用ack/fail方法。
5. Storm的并發機制
在一個 Storm 集群中,Storm 主要通過以下三個部件來運行拓撲:工作進程(worker processes)、執行器(executors)、任務(tasks)。三者的關系如下:
1個worker進程執行的是1個topology的子集(注:不會出現1個worker為多個topology服務)。1個worker進程會啟動1個或多個executor線程來執行1個topology的component(spout或bolt)。因此,1個運行中的topology就是由集群中多臺物理機上的多個worker進程組成的。
executor是1個被worker進程啟動的單獨線程。每個executor只會運行1個topology的1個component(spout或bolt)的task(注:task可以是1個或多個,storm默認是1個component只生成1個task,executor線程里會在每次循環里順序調用所有task實例)。
task是最終運行spout或bolt中代碼的單元(注:1個task即為spout或bolt的1個實例,executor線程在執行期間會調用該task的nextTuple或execute方法)。topology啟動后,1個component(spout或bolt)的task數目是固定不變的,但該component使用的executor線程數可以動態調整(例如:1個executor線程可以執行該component的1個或多個task實例)。這意味著,對于1個component存在這樣的條件:#threads<=#tasks(即:線程數小于等于task數目)。默認情況下task的數目等于executor線程數目,即1個executor線程只運行1個task。
三、構建基于Storm的實時數據分析平臺實戰經驗
構建基于Storm的實時數據分析平臺,第一步當然應該是搭建storm集群。這個網上的教程還有輪子實在是太多,我就不貼出來了。請大家Google或者Baidu之,然后一步步搭建集群就完了。
1. Storm使用的一些實戰經驗
在架構上,推薦 “消息中間件 + storm + 外部存儲” 3架馬車式架構
Storm從消息中間件中取出數據,計算出結果,存儲到外部存儲上
通常消息中間件推薦使用RocketMQ,Kafka
外部存儲推薦使用HBase,Redis
該架構,非常方便Storm程序進行重啟(如因為增加業務升級程序)
職責清晰化,減少和外部系統的交互,Storm將計算結果存儲到外部存儲后,用戶的查詢就無需訪問Storm中服務進程,查詢外部存儲即可。在實際計算中,常常發現需要做數據訂正,因此在設計整個項目時,需要考慮重跑功能 。在最終生成的結果中,數據最好帶時間戳 。
結合Storm UI查看topology各個組件的負載,合理配置各組件的并發度。
Spout和Bolt的構造函數只會在submit Topology時調一次,然后序列化起來,直接發給工作節點,工作節點里實例化時不會被調用里,所以復雜的成員變量記得都定義成transient,在open(),prepare()里初始化及連接數據庫等資源。
按照性能來說, 使用ack機制普通接口 < 關掉ack機制的普通接口, 因此,需要根據業務對數據處理的速率需求決定是否采用ack機制。
當使用fieldGrouping方式時,有可能造成有的task任務重,有的task任務輕,因此讓整個數據流變慢, 盡量讓task之間壓力均勻。
KafkaSpout的并發度最好設置成Kafka的分區數。消費Kafka時, 一個分區只能一個線程消費,因此有可能簡單的增加并發無法解決問題, 可以嘗試增加Kafka的分區數。
如果topology性能有問題, 可以嘗試關掉ack機制,查看性能如何,如果性能有大幅提升,則預示著瓶頸不在spout, 有可能是Acker的并發少了,或者業務處理邏輯慢了。
2. Storm編程實踐-WordCount
Spout
SpiltSentenceBolt
WordCountBolt
ReportBolt
Topology
Result
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
總結
以上是生活随笔為你收集整理的storm 机器上日志查询_Storm原理与实践大数据技术栈14的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spss如何进行显著性差异分析
- 下一篇: 高内聚低耦合_高渗透环氧树脂灌浆料