Flume-ng 高可用搭建-与测试
生活随笔
收集整理的這篇文章主要介紹了
Flume-ng 高可用搭建-与测试
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
前提:
1)五臺虛擬機(三臺也可以)
2)flume單節(jié)點測試并學(xué)會
3)hadoop集群搭建完成
Flume NG集群,架構(gòu)圖
Flume的存儲可以支持多種,這里只列舉了HDFS
角色分配
| Agent1 | chun1 | Web Server |
| Agent2 | chun2 | Web Server |
| Agent3 | chun3 | Web Server |
| Collector1 | chun4 | AgentMstr1 |
| Collector2 | chun5 | AgentMstr1 |
表中所示,Agent1,Agent2,Agent3數(shù)據(jù)分別流入到Collector1和Collector2,Flume NG本身提供了Failover機制,可以自動切換和恢復(fù)。在上圖中,有3個產(chǎn)生日志服務(wù)器分布在不同的機房,要把所有的日志都收集到一個集群中存儲。下 面我們開發(fā)配置Flume NG集群
配置
在單點Flume中(這里介紹了單點的配置),基本配置都完成了,我們只需要新添加兩個配置文件,它們是agent.properties和collector.properties,其配置內(nèi)容如下所示:
agent配置
(根據(jù)自己需求把source讀的路徑(r1.command )和要配置的collector的主機名修改也就是k1和k2的hostname)
[root@chun1 flume-1.9.0-bin]# vi conf/agent.properties#agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2#set gruop agent1.sinkgroups = g1#set channel agent1.channels.c1.type = memory agent1.channels.c1.capacity = 1000 agent1.channels.c1.transactionCapacity = 100agent1.sources.r1.channels = c1 agent1.sources.r1.type = exec agent1.sources.r1.command = tail -F /usr/local/flume-1.9.0/job/log/test.log agent1.sources.r1.interceptors = i1 i2 agent1.sources.r1.interceptors.i1.type = static agent1.sources.r1.interceptors.i1.key = Type agent1.sources.r1.interceptors.i1.value = LOGIN agent1.sources.r1.interceptors.i2.type = timestamp# set sink1 agent1.sinks.k1.channel = c1 agent1.sinks.k1.type = avro agent1.sinks.k1.hostname = chun4 agent1.sinks.k1.port = 52020# set sink2 agent1.sinks.k2.channel = c1 agent1.sinks.k2.type = avro agent1.sinks.k2.hostname = chun5 agent1.sinks.k2.port = 52020#set sink group agent1.sinkgroups.g1.sinks = k1 k2#set failover agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1 agent1.sinkgroups.g1.processor.maxpenalty = 10000修改后把flume發(fā)送給chun1,chun2,chun3,chun4,chun5( 發(fā)送后chun1,chun2,chun3不需要修改)
(chun4,chun5把剛才創(chuàng)建的agent.properties刪除,添加一個collector.properties 并加入以下內(nèi)容)
collector配置
記得把主機名改掉
[root@chun4 conf]# vi collector.properties #set Agent name a1.sources = r1 a1.channels = c1 a1.sinks = k1#set channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100# other node,nna to nns a1.sources.r1.type = avro a1.sources.r1.bind = chun4 //chun5的此處要改 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = chun4 //chun5的此處要改 a1.sources.r1.channels = c1#set sink to hdfs a1.sinks.k1.type=hdfs a1.sinks.k1.hdfs.path=/home/hdfs/flume/logdfs a1.sinks.k1.hdfs.fileType=DataStream a1.sinks.k1.hdfs.writeFormat=TEXT a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.channel=c1 a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d運行 (先啟動兩個collector然后在啟動三個agent)
在4,5上運行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n a1 -c conf -f conf/collector.properties -Dflume.root.logger=DEBUG,console在1,2,3上運行
cd /usr/local/flume-1.9.0bin/flume-ng agent -n agent1 -c conf -f conf/agent.properties -Dflume.root.logger=DEBUG,console插入數(shù)據(jù)
往test.txt里插入數(shù)據(jù)
代碼意思:沒0.5秒循環(huán)插入chun-chun-chun
while true > do > echo 'chun-chun-chun' >> /usr/local/flume-1.9.0/job/log/test.log > sleep 0.5 > done查看 (hdfs的web端查看)
這時你會發(fā)現(xiàn)只有flume-ng1下有數(shù)據(jù):說明是先往chun4上傳
然后把chun4的進程殺死,就會發(fā)現(xiàn)數(shù)據(jù)開始往chun5傳
然后再次打開(再次啟動報錯請看)數(shù)據(jù)又到chun4了(數(shù)據(jù)會先往權(quán)重高的傳輸)
(配置文件里有設(shè)置權(quán)重
agent1.sinkgroups.g1.processor.priority.k1 = 10 agent1.sinkgroups.g1.processor.priority.k2 = 1)可以根據(jù)自己需求設(shè)置
總結(jié)
以上是生活随笔為你收集整理的Flume-ng 高可用搭建-与测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Python CheckiO 题解】T
- 下一篇: 用sqoop把hdfs数据存储到mysq