spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成
Spark 編程模型
在Spark 中, 我們通過對分布式數(shù)據(jù)集的操作來表達(dá)計(jì)算意圖 ,這些計(jì)算會自動(dòng)在集群上
井行執(zhí)行 這樣的數(shù)據(jù)集被稱為彈性分布式數(shù)據(jù)集 Resilient Distributed Dataset ),簡稱 RDD
RDD 是Spark 分布式數(shù)據(jù)和計(jì)算的基本抽象。在 Spark 中,對數(shù)據(jù)的所有操作不外乎創(chuàng)建
RDD 、轉(zhuǎn)換己有 RDD 以及調(diào)用 RDD 操作進(jìn)行求值 rdd 和wordrnap 都是 MapPartition RDD 類型 RD ,而 wordreduce ShuffiedRDD 類型的 RDD
RDD 支持2種 類型的操作 轉(zhuǎn)換操作( Transformation Operation )和行動(dòng)操作 Action
Operation )。有些資料還會細(xì)分為創(chuàng)建操作、轉(zhuǎn)換操作、控制操作和行動(dòng)操作4 類型。轉(zhuǎn)換
操作會由一個(gè) RDD 生成一個(gè)新的 RDD 行動(dòng)操作會對 RDD 算出 個(gè)結(jié)果,并把結(jié)果返回驅(qū)
動(dòng)器程序,或者把結(jié)果存儲到外部存儲系統(tǒng)中 轉(zhuǎn)換操作和行動(dòng)操作的區(qū)別在于 Spark 計(jì)算 RDD
的方式不同 雖然可以在任何時(shí)候定義新的 RDD ,但 Spark 只會惰性計(jì)算這些 RDD 。它們只有
第一次在 個(gè)行動(dòng)操作中用到時(shí)才會 正計(jì)算。
轉(zhuǎn)換操作和行動(dòng)操作的對比
通過轉(zhuǎn)換操作,從己有的 RDD 中派生出新的 RDD Spark 會使用譜系圖( Lineage Graph,
很多資料也會翻譯為“血統(tǒng)”)來記錄這些不同 RDD 間的依賴關(guān)系 Spark 要用這些信息
來按需計(jì)算每個(gè) RDD ,也可以依賴譜系圖在持久化的 RD 丟失部分?jǐn)?shù)據(jù)時(shí)恢復(fù)丟失的數(shù)據(jù)。
行動(dòng)操作會把最終求得的結(jié)果返回驅(qū)動(dòng)器程序,或者寫入外部存儲系統(tǒng)。由于行動(dòng)操作需要生
產(chǎn)實(shí)際的輸出,所以它們會強(qiáng)制執(zhí)行那些求值必須用到的 RDD 轉(zhuǎn)換操作。
Spark 中RDD 計(jì)算是以分區(qū)( Part ion )為單位的,將 RDD 分為很 個(gè)分區(qū)分布到集群
的節(jié)點(diǎn)中,分區(qū)的多少涉及對這個(gè) RD 進(jìn)行并行計(jì)算的粒度。如圖 12-2 所示 實(shí)線方框 A、B、C、D、E、F、G陰影背景的矩形 表示分區(qū)。 A、B、C、D、E、F、G之間的依賴關(guān)系構(gòu)成整個(gè)應(yīng)用的譜系圖。
依賴關(guān)系還可以分為窄依賴和寬依賴。窄依賴 Narrow ependen cie )是指每個(gè)父 RDD 的
分區(qū)都至多被一個(gè)RDD 的分區(qū)使用, 而寬依賴( Wide Dependencies )是指多個(gè)子 RDD 的分區(qū)依賴一個(gè)父 RDD 分區(qū)。圖 12-2 中,C和D 之間是窄依賴,而 A和B之間是寬依賴。 RDD中行動(dòng)操作的執(zhí)行會以寬依賴為分界來構(gòu)建各個(gè)調(diào)度階段,各個(gè)調(diào)度階段 內(nèi)部的窄依賴、前后鏈接構(gòu)成流水線。圖中的 個(gè)虛線方框分別代表了 個(gè)不同的調(diào)度階段。 對于執(zhí)行失敗的任 ,只 要它對應(yīng)的調(diào)度階段的父類信息仍然可用,那么該 務(wù)就會分散
到其他節(jié)點(diǎn)重新執(zhí)行。如果某些調(diào) 階段不可用,則重新提交相應(yīng)的任務(wù),并以并行方式計(jì)算
丟失的地方。在整個(gè)作業(yè)中,如果某個(gè)任務(wù)執(zhí)行緩慢, 則系統(tǒng)會在其他節(jié)點(diǎn)上執(zhí)行該任務(wù)的副 本,并取最先得到的結(jié)果作為最終的結(jié)果。
下面就以 12 節(jié)中相同的單詞統(tǒng)計(jì)程序?yàn)槔齺矸治?Spark 的編程模型,與 12.1 節(jié)中所不
同的是, 這里是 個(gè)完整的 Scala 程序,程序?qū)?yīng)的 Maven 依賴如下
單詞統(tǒng)計(jì)程序如代碼清單 12-1 示。
代碼清單 12-1 單詞統(tǒng)計(jì)程序
main() 方法主體的第①和第②行中首先創(chuàng)建一個(gè) SparkConf 對象來配置應(yīng)用程序,然后基于這個(gè) SparkConf 建了一個(gè) SparkContext 象。一旦有了 SparkContext ,就可 以用它創(chuàng)建RDD 第③行代碼中調(diào)用 sc textFile ()來創(chuàng) 建一個(gè)代表文 件中各行文本的 RDD 第④行中
rdd flatMap(_.split(””)) .map(x=>幟, ))這一段內(nèi)容的依賴關(guān)系是窄依賴,而reduceByKey(_+ _)操 作對單詞進(jìn)行計(jì)數(shù)時(shí)屬于寬依賴。第⑤行中將排序后的結(jié)果存儲起來。最后第⑦行中使用 top()
方法來關(guān)閉應(yīng)用。
在SPARK_HOME/bin 目錄中還有一個(gè) spark-submit 腳本,用于將應(yīng)用 快速部署到Spark 集
群。 比如這里的 WordCount 程序 當(dāng)我 希望通過 park-submit 進(jìn)行部署 ,只需要將應(yīng)用打
包成 jar 包(即下面示例中的 wordcount. )井上傳到 Spark 集群 然后通 spark-submit 進(jìn)行
部署即 ,示例如下
[root@no del spark)# bin/spark- submit --class scala.spark.demo . WordCount wordcount . jar --executor- memory lG --master spark : //localhost : 7077 2018 - 08 - 06 15:39 : 54 WARN NativeCodeLoader:62 - Unable to load native hadoop library for your platform . .. using builtin- ] ava classes where applicable 2018-08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Running Spark vers on 2 . 3 . 1 2018 - 08 - 06 15 : 39 : 55 INFO SparkContext : 54 - Submitted applicat on WordCount 2018 - 08 - 06 15:39 : 55 INFO SecurityManager : 54 - Chang ng view acls to : root 2018 - 08 - 06 1 5 : 39 : 55 I NFO SecurityManager 54- Chang ng modify acls to : root ( .... ;占略若干)2018 - 08 - 07 12 : 25 : 47 INFO AbstractConnector : 318 - Stopped Spark@62 99e2cl {HTTP /1 . 1 , [http/ 1 . l) } { 0. 0 . 0. 0: 4 04 0} 2018 - 08 - 07 12 : 25 : 47 INFO SparkUI : 54 - Stopped Spark web UI at http: // 10 . 199 . 172 . 111 : 4040 2018 - 08-07 12 : 25 : 47 INFO MapOutp tTrackerMasterEndpo nt 54 - MapOutputTrackerMasterEndpoint stop ped' 2018 - 08-07 12 : 25:47 INFO MemoryStore : 54 - MemoryStore cleared 2018 - 08-07 12 : 25 : 47 INFO BlockManager:54 - BlockManager stopped 2018 - 08 - 07 12 : 25 : 47 INFO BlockManagerMaster : 54 - BlockManagerMaster stopped 2018 - 08 - 07 12 : 25 : 47 INFO OutputCommitCoordinator OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped ! 2018 08-06 15 : 46 : 57 INFO SparkCo text 54 - Successfully stopped SparkContext 20 1 8 - 08-06 1 5 : 46:57 INFO ShutdownHookManager : 54 Shutdown hook called 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHoo kManager : 54 - Delet ng directory /tmp/spark- fa955139-270c-4899 - 82b7 - 4959983alcb0 2018 - 08 - 06 15 : 46 : 57 INFO ShutdownHookManager : 54 - Deleting directory /tmp/spark-3f359966- 2167 - 4bb9 - 863a - 2d8a8d5e8fbe實(shí)例中的--class 用來指定應(yīng)用程序 主類,這里為 eal ark.demo.WordCount;
execu or memory 用來指定執(zhí)行器節(jié)點(diǎn)的內(nèi)容,這里設(shè)置為 lG 。最后得到的輸出結(jié)果如
下所示。
[root@node l spark) # ls /tmp/spar k part 00000 SUCCESS [root@nodel spark)# cat /tmp/spark/part-00000 (, 91) (# ' 37) (the , 19) (in, 7) (to , 7) (for, 6) (if, 5) (then, 5) (under, 4) (stty, 4) (not, 4)Spark 的運(yùn)行結(jié)構(gòu)
在分布式環(huán)境下 Spark 集群采用的是主從架構(gòu)。如圖 12-3 示,在一個(gè)Spark 集群中,
有一個(gè)節(jié)點(diǎn)負(fù)責(zé)中央?yún)f(xié)調(diào),調(diào)度各個(gè)分布式工作節(jié)點(diǎn),這個(gè)中央?yún)f(xié)調(diào)節(jié)點(diǎn)被稱為驅(qū)動(dòng)器( Driver
節(jié)點(diǎn) 與之對應(yīng)的工作節(jié)點(diǎn)被稱為執(zhí)行器( Executor )節(jié)點(diǎn)。驅(qū)動(dòng)器節(jié)點(diǎn)可以和大量的執(zhí)行器節(jié)
點(diǎn)進(jìn)行通信 它們都作為獨(dú)立的進(jìn)程運(yùn)行。驅(qū)動(dòng)器節(jié)點(diǎn)和所有的執(zhí)行器節(jié)點(diǎn)一起被稱為 Spark
應(yīng)用( Application)。
Spark 應(yīng)用通過一個(gè)叫作集群管理器( luster Manager )的外部服務(wù)在集群中的機(jī)器上啟動(dòng)。 Spark 自帶的集群管理器被稱為獨(dú)立集群管理器 Spark 也能運(yùn)行 YARN Mesos Kubemetes 這類開源集群管理器上Spark 驅(qū)動(dòng)器節(jié)點(diǎn)是執(zhí)行程序中的 main()方法的進(jìn)程。它執(zhí)行用戶編寫的用來創(chuàng)建 SparkContext RDD ,以及進(jìn)行 RDD 轉(zhuǎn)換操作和行動(dòng)操作的代碼。其實(shí),當(dāng)啟動(dòng) park-shell 時(shí),就啟動(dòng)了一個(gè) park 驅(qū)動(dòng)程序。驅(qū)動(dòng)程序一旦停止 Spark 應(yīng)用也就結(jié)束了
Kafka與Spark trea ing 的整合
采用 Spark Stre ming 流式處理 fka 中的數(shù)據(jù),首先需要把數(shù)據(jù)從 Kafka 中接收過 ,然
后轉(zhuǎn)換為 Spark Streaming 中的 DStrea 。接收數(shù)據(jù)的方式一共有兩種:利用接收器Receiver 的方式接收數(shù)據(jù)和直接Kafka中讀取數(shù)據(jù) 。
Receiver 方式通過KafkaUtils. creates trea ()方法來創(chuàng)建一個(gè)DS tream 對象 ,它不關(guān)注消費(fèi)的位移的處理,Receive方式的結(jié)構(gòu)如圖 12-9所示 但這種方式在 Spark 任務(wù)執(zhí)行異常 導(dǎo)致 數(shù)據(jù)丟失,如果要保證數(shù)據(jù)的可靠性,則需要開啟預(yù)寫式日志,簡稱 AL (Write Ahead Logs) , 只有收到的數(shù)據(jù)被持久化到 WAL 之后才會更新 Kafka 中的消費(fèi)位移。收 的數(shù)據(jù) WAL儲存
位置信息被可靠地存儲,如果期間出現(xiàn)故障,那么這些信息被用來從錯(cuò)誤中恢復(fù),并繼續(xù)處理
數(shù)據(jù)。
WAL 的方式可以保證從 Kafka 中接收的數(shù)據(jù)不被丟失 但是在某些異常情況下,一些數(shù)據(jù)
被可靠地保存到了 WAL 中,但是還沒有來得及更新消費(fèi)位移,這樣會造成 Kafka 中的數(shù)據(jù)被
Spark 拉取 了不止一次。同時(shí)在 Receiver 方式中 Spark的RDD 分區(qū) Kafka 的分區(qū)并不是相
關(guān)的,因此增加 Kafk 中主題的分區(qū)數(shù)并不能增加 Spark處理的并行度,僅僅增加了接收器接
收數(shù)據(jù)的并行度
Direct 方式是從 Spark 1.3 開始引入的,它通過 KafkaUtil s.createDire ctStream() 方法創(chuàng)建一個(gè)
DStream 象, Direct 方式的結(jié)構(gòu)如圖 12-10 所示。該方式中 Kafka 的一個(gè)分區(qū)與 SparkRDD對應(yīng),通過定期掃描所訂閱的 afka 每個(gè)主題的每個(gè)分區(qū)的最新偏移量以確定當(dāng)前批處理數(shù)據(jù)偏 移范圍。與 Rec iver 方式相比, irect 方式不需要維護(hù)一份WAL 數(shù)據(jù),由 park Streaming 程序自控制位移的處理,通常通過檢查點(diǎn)機(jī)制處理消費(fèi)位移,這樣可以保證 Kafka 中的數(shù)據(jù)只 會被 Spark 拉取一次。
下面使用一個(gè)簡單的例子來演示 Spark Streaming和Kafka 的集成。在該示例中,每秒往
Kafka寫入一個(gè)0到9之間的隨機(jī)數(shù),通過 Spark Streaming從Kafka 中獲取數(shù)據(jù)并實(shí) 計(jì)算批次間隔內(nèi)的數(shù)據(jù)的數(shù)值之和
往Kafk 中寫入隨 數(shù)的主要代碼如下:
Random random = new Random( ); wh le (true) { String msg = String.val ueOf( r andom.nextint(lO) ); ProducerRecord, String> message = new ProducerRecord<>(topic , msg ); producer.send(message) . get() ; TimeUnit.SECONDS . sleep(l) ;Kafka與Spark Streaming的集成示例如代碼清單 12-3所示,代碼中的批次間隔設(shè)置為 2s
示例中的主題 topic spark 包含4個(gè)分區(qū)。
代碼清單12-3 Kafka與Spa Streaming的集成示例
其實(shí),kafka的設(shè)計(jì)實(shí)現(xiàn),涉及到太多的底層技術(shù),為了能夠把它吃透,需要花大量的時(shí)間和精力。
在這里,送大家一張 Kafka 學(xué)習(xí)框架,分為 Kafka 入門、Kafka 的基本使用、客戶端詳解、Kafka 原理介紹、Kafka 運(yùn)維與監(jiān)控以及高級 Kafka 應(yīng)用。
需要這份的kafka朋友們轉(zhuǎn)發(fā)收藏+關(guān)注私信“資料”立即獲取
總結(jié)
以上是生活随笔為你收集整理的spark 集群单词统计_最近Kafka这么火,聊一聊Kafka:Kafka与Spark的集成的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python引入同目录文件_Python
- 下一篇: mysql和mariadb对比_MySQ