Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.                        
                                1、安裝flume 
 2、到Spark-Streaming官網下載poll方式的Sink 
 3、將sink放入到flume的lib包里面 
 4、先啟動flume(多個),然后在啟動Streaming程序
下載spark-flume 
 http://spark.apache.org/documentation.html 
 到Spark-1.6.2中 
 http://spark.apache.org/docs/1.6.2/,
搜一下flume
最后在安裝的flume中加入:commons-lang3-3.3.2.jar、scala-library-2.10.5.jar、spark-streaming-flume-sink_2.10-1.6.1.jar,效果如右側: 
 
同步到集群中的其它的flume中:
[root@hadoop1 lib]# pwd /home/tuzq/software/apache-flume-1.6.0-bin/lib [root@hadoop1 lib]# scp -r * root@hadoop2:$PWD [root@hadoop1 lib]# scp -r * root@hadoop3:$PWD [root@hadoop1 lib]# scp -r * root@hadoop4:$PWD [root@hadoop1 lib]# scp -r * root@hadoop5:$PWD編寫flume的配置文件:
[root@hadoop1 agentconf]# pwd /home/tuzq/software/apache-flume-1.6.0-bin/agentconf [root@hadoop1 agentconf]# vim flume-poll.conf其中flume-poll.conf的內容如下:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1# source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir = /home/tuzq/software/flumedata a1.sources.r1.fileHeader = true# Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink #表示從這里拉數據 a1.sinks.k1.hostname = hadoop1 a1.sinks.k1.port = 8888# Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1啟動flume.
[root@hadoop1 apache-flume-1.6.0-bin]# cd /home/tuzq/software/apache-flume-1.6.0-bin [root@hadoop1 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c agentconf/ -f agentconf/flume-poll.conf -Dflume.root.logger=WARN,console啟動后的效果如下: 
 
這樣,一直啟動Flume
然后編寫從Flume中讀取數據的程序。 
 pom文件的內容如下:
編寫代碼:
package cn.toto.sparkimport java.net.InetSocketAddressimport org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by toto on 2017/7/13.*/ object FlumeStreamingWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("FlumeStreamingWordCount").setMaster("local[2]")//創建StreamingContext并設置產生批次的間隔時間val ssc = new StreamingContext(conf,Seconds(15))//從Socket端口中創建RDD,這里的SocketAddress可以傳遞多個val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =FlumeUtils.createPollingStream(ssc, Array(new InetSocketAddress("hadoop1", 8888)),StorageLevel.MEMORY_AND_DISK)//去取Flume中的數據val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" "))val wordAndOne : DStream[(String,Int)] = words.map((_,1))val result : DStream[(String,Int)] = wordAndOne.reduceByKey(_+_)//打印result.print()//開啟程序ssc.start()//等待結束ssc.awaitTermination()} }啟動程序。然后往Flume監控的flumedata目錄下放入文件,如: 
 
其中1.txt的內容如下: 
 
最后在IDEA的控制臺中觀察結果: 
 
總結
以上是生活随笔為你收集整理的Spark Streaming之:Flume监控目录下文件内容变化,然后Spark Streaming实时监听Flume,然后从其上拉取数据,并计算出结果的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 信用卡最低还款的坏处
- 下一篇: 父突然去世怎写挽联
