Flume知识点全面总结教程
?目錄
1.前言
1.1什么是flume?
1.2Flume特性
2.Flume核心概念
2.1agent
2.2Event:flume內部數據傳輸的封裝形式
2.3Transaction:事務控制機制
2.4攔截器
3.Flume安裝部署
3.1參數配置
3.2啟動命令
4.Flume入門案例
4.1數據流
?4.2 組件選擇
?4.3 部署配置實現
5.Flume常用內置組件詳解
6.Flume常用組件詳解:Source
6.1netcat?source
6.1.1工作機制:
6.1.2配置文件:
6.2 exec?source
6.2.1工作機制:
?6.2.2參數詳解:
6.2.3配置文件:
6.3 spooldir?source
6.3.1工作機制:
6.3.2參數詳解:
6.3.3配置文件:
6.3.4啟動測試:
6.4 avro?source
6.4.1工作機制
6.4.2參數詳解
6.4.3配置文件
6.4.4啟動測試
6.4.5啟動測試利用avro?source和avro sink實現agent級聯
6.5 kafka source
6.5.1工作機制
6.5.2參數詳解
6.5.3配置文件
6.5.4啟動測試
6.6?taildir?source
6.6.1?工作機制
???????6.6.2?參數詳解
6.6.3配置文件
??????????????6.6.4啟動測試
7Flume常用組件詳解:Interceptor攔截器
7.1timestamp?攔截器
7.1.1作用
7.1.2參數
7.1.3配置示例
7.1.4??????????????測試
7.2static攔截器
7.2.1作用
???????7.2.2參數
???????7.2.3配置示例
???????7.2.4測試
7.3Host 攔截器
7.3.1作用
???????7.3.2參數
7.3.3配置示例
7.3.4測試
7.4?UUID 攔截器
7.4.1作用
7.4.2參數
7.4.3配置
??????????????7.4.4測試
8?Flume常用組件詳解:channel
8.1 memory channel
8.1.1特性
??????????????8.1.2參數
??????????????8.1.3配置示例
??????????????8.1.4測試
??????????????8.1.5擴展了解
???????????8.2 file channel
8.2.1特性
???????8.2.3參數
???????8.2.4配置示例
???????8.2.5??????????????測試
???????8.3kafka channel
?????????????????????8.3.1特性
???????????????????????????????????8.3.2參數
?????????????????????????????????????????????????8.3.3配置測試
9 Flume常用組件詳解:sink
9.1.1特性
??????????????9.1.2參數
??????????????9.1.3配置示例
9.1.4測試
9.2?kafka sink
9.2.1特性
9.2.2參數
9.3 avro?sink
9.3.1 特性
10 Flume常用組件詳解:Selector
10.1實踐一:replicating selector(復制選擇器)
????????????10.1.1目標場景
????????????????????????????10.1???????.2Flume agent配置
10.1???????.3?Collector1 配置
10.1.4?Collector2 配置
10.1.5?測試驗證
10.2?實踐二:multiplexing selector(多路選擇器)
10.2.1目標場景
?10.2.2?第一級1 / 2配置
????????10.2.3?第二級配置
??????????????????????10.2.4?測試驗證
11 Flume常用組件詳解:grouping processor
12Flume自定義擴展組件
12.1自定義Source
12.1.1需求場景
??????????????12.1.2實現思路
12.1.3代碼架構
?????12.1.4?具體實現
12.2?自定義攔截器
12.2.2實現思路
12.2.3?自定義攔截器的開發
13?綜合案例
13.1?案例場景
13.2?實現思路
13.4?配置文件
13.5?啟動測試
14?面試加強
14.1?flume事務機制
14.2flume agent內部機制
14.3?ganglia及flume監控
14.4?Flume調優
1.前言
flume是由cloudera軟件公司產出的可分布式日志收集系統,后與2009年被捐贈了apache軟件基金會,為hadoop相關組件之一。尤其近幾年隨著flume的不斷被完善以及升級版本的逐一推出,特別是flume-ng;同時flume內部的各種組件不斷豐富,用戶在開發的過程中使用的便利性得到很大的改善,現已成為apache top項目之一.
補充:cloudera公司的主打產品是CDH(hadoop的一個企業級商業發行版)
1.1什么是flume?
???Apache Flume 是一個從可以收集例如日志,事件等數據資源,并將這些數量龐大的數據從各項數據資源中集中起來存儲的工具/服務。flume具有高可用,分布式和豐富的配置工具,其結構如下圖所示:
Flume: 是一個數據采集工具;可以從各種各樣的數據源(服務器)上采集數據傳輸(匯聚)到大數據生態的各種存儲系統中(Hdfs、hbase、hive、kafka);
開箱即用!(安裝部署、修改配置文件)
1.2Flume特性
Flume是一個分布式、可靠、和高可用的海量日志采集、匯聚和傳輸的系統。
Flume可以采集文件,socket數據包(網絡端口)、文件夾、kafka、mysql數據庫等各種形式源數據,又可以將采集到的數據(下沉sink)輸出到HDFS、hbase、hive、kafka等眾多外部存儲系統中
一般的采集、傳輸需求,通過對flume的簡單配置即可實現;不用開發一行代碼!
Flume針對特殊場景也具備良好的自定義擴展能力,因此,flume可以適用于大部分的日常數據采集場景
2.Flume核心概念
2.1agent
Flume中最核心的角色是agent,flume采集系統就是由一個個agent連接起來所形成的一個或簡單或復雜的數據傳輸通道。
對于每一個Agent來說,它就是一個獨立的守護進程(JVM),它負責從數據源接收數據,并發往下一個目的地,如下圖所示:
每一個agent相當于一個數據(被封裝成Event對象)傳遞員,內部有三個組件:
Source:采集組件,用于跟數據源對接,以獲取數據;它有各種各樣的內置實現;
Sink:下沉組件,用于往下一級agent傳遞數據或者向最終存儲系統傳遞數據
Channel:傳輸通道組件,用于從source將數據傳遞到sink
單個agent采集數據
?多級agent之間串聯
2.2Event:flume內部數據傳輸的封裝形式
數據在Flum內部中數據以Event的封裝形式存在。
因此,Source組件在獲取到原始數據后,需要封裝成Event放入channel;
Sink組件從channel中取出Event后,需要根據配置要求,轉成其他形式的數據輸出。
Event封裝對象主要有兩部分組成: Headers和 ?Body
Header是一個集合 ?Map[String,String],用于攜帶一些KV形式的元數據(標志、描述等)
Boby: 就是一個字節數組;裝載具體的數據內容
|
2018-11-03 18:44:44,913 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 20 61 20 61 61 61 20 61 20 0D ???????????????a a aaa a . } |
2.3Transaction:事務控制機制
Flume的事務機制(類似數據庫的事務機制):
Flume使用兩個獨立的事務分別負責從Soucrce到Channel,以及從Channel到Sink的event傳遞。比如spooling directory source 為文件的每一個event?batch創建一個事務,一旦事務中所有的事件全部傳遞到Channel且提交成功,那么Soucrce就將event batch標記為完成。
同理,事務以類似的方式處理從Channel到Sink的傳遞過程,如果因為某種原因使得事件無法記錄,那么事務將會回滾,且所有的事件都會保持到Channel中,等待重新傳遞。
事務機制涉及到如下重要參數:
a1.sources.s1.batchSize?=100
a1.sinks.k1.batchSize = 200
a1.channels.c1.transactionCapacity?= 300 (應該大于source或者sink的批次大小)
<?transactionCapacity 是說,channel中保存的事務的個數>
跟channel的數據緩存空間容量區別開來:
a1.channels.c1.capacity = 10000
那么事務是如何保證數據的端到端完整性的呢?看下面有兩個agent的情況:
數據流程:
- source 1產生Event,通過“put”、“commit”操作將Event放到Channel 1中
- sink 1通過“take”操作從Channel 1中取出Event,并把它發送到Source 2中
- source 2通過“put”、“commit”操作將Event放到Channel 2中
- source 2向sink 1發送成功信號,sink 1“commit”步驟2中的“take”操作(其實就是刪除Channel 1中的Event)
說明:在任何時刻,Event至少在一個Channel中是完整有效的
2.4攔截器
攔截器工作在source組件之后,source產生的event會被傳入攔截器根據需要進行攔截處理
而且,攔截器可以組成攔截器鏈!
攔截器在flume中有一些內置的功能比較常用的攔截器
用戶也可以根據自己的數據處理需求,自己開發自定義攔截器!
這也是flume的一個可以用來自定義擴展的接口!
3.Flume安裝部署
3.1參數配置
Flume的安裝非常簡單,只需要解壓即可,當然,前提是已有hadoop環境
1.上傳安裝包到數據源所在節點上
然后解壓 ?tar -zxvf apache-flume-1.8.0-bin.tar.gz
2、根據數據采集的需求配置采集方案,描述在配置文件中(文件名可任意自定義)
3、指定采集方案配置文件,在相應的節點上啟動flume agent
3.2啟動命令
bin/flume-ng agent?-c?./conf ………….
|
commands: ??help ?????????????????????顯示本幫助信息 ??agent ????????????????????啟動一個agent進程 ??avro-client ????????????????啟動一個用于測試avro?source的客戶端(能夠發送avro序列化流) ??version ???????????????????顯示當前flume的版本信息 global options: ??全局通用選項 ??--conf,-c <conf> ?????????指定flume的系統配置文件所在目錄 ??--classpath,-C <cp> ???????添加額外的jar路徑 ??--dryrun,-d ??????????????不去真實啟動flume?agent,而是打印當前命令 ??--plugins-path <dirs> ??????指定插件(jar)所在路徑 ??-Dproperty=value ?????????傳入java環境參數 ??-Xproperty=value ?????????傳入所需的JVM配置參數 agent options: ??--name,-n <name> ?????????agent的別名(在用戶采集方案配置文件中) ??--conf-file,-f <file> ?????????指定用戶采集方案配置文件的路徑 ??--zkConnString,-z <str> ?????指定zookeeper的連接地址 ??--zkBasePath,-p <path> ?????指定用戶配置文件所在的zookeeper?path,比如:/flume/config ??--no-reload-conf ???????????關閉配置文件動態加載 ??--help,-h ??????????????????display help text avro-client options: ??--rpcProps,-P <file> ??RPC client properties file with server connection params ??--host,-H <host> ????avro序列化數據所要發往的目標主機(avro?source所在機器) ??--port,-p <port> ?????avro序列化數據所要發往的目標主機的端口號 ??--dirname <dir> ?????需要被序列化發走的數據所在目錄(提前準備好測試數據放在一個文件中) ??--filename,-F <file> ??需要被序列化發走的數據所在文件(default: std input) ??--headerFile,-R <file> ?存儲header?key-value的文件 ??--help,-h ????????????幫助信息 ??Either --rpcProps or both --host and --port must be specified. Note that if <conf> directory is specified, then it is always included first in the classpath. |
開啟內置監控功能
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
4.Flume入門案例
先用一個最簡單的例子來測試一下程序環境是否正常
4.1數據流
?4.2
組件選擇
- Source組件 NetCat:
- Channel組件:
Memory?Channel
capacity: 緩存的容量 ,可緩存的event的數量
transactionCapacity: 事務容量。支持出錯情況下的event回滾事件數量。
- Sink組件: logger Sink
?4.3
部署配置實現
- 創建部署配置文件
在flume的安裝目錄下,新建一個文件夾,myconf
#?cd?myconf
#?vi??netcat-logger.conf
|
# 定義這個agent中各組件的名字 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 描述和配置source組件:r1 a1.sources.r1.type =?netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 9999 # source 和 channel關聯 a1.sources.r1.channels = c1?? # 描述和配置sink組件:k1 a1.sinks.k1.type = logger #?sink也要關聯channel a1.sinks.k1.channel = c1 # 描述和配置channel組件,此處使用是內存緩存的方式 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 |
flume-ng?命令的模式:
- ?啟動一個采集器:
|
[root@hdp-01 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf ???-Dflume.root.logger=INFO,console |
agent ???運行一個采集器
-n a1 ?指定我們這個agent的名字
-c conf ??指定flume自身的配置文件所在目錄
-f conf/netcat-logger.conf ?指定自定義的采集方案
在工作環境中的命令為:
|
nohup bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf 1>/dev/null 2>&1 & |
- 測試
往agent的source所監聽的端口上發送數據,讓agent有數據可采。
通過telnet命令向端口發送消息:
[root@hdp-01 ~]#?telnet?hdp-01 9999
如果沒有telnet命令,用yum安裝一個即可: ?
yum?-y install?telnet
就可以通過日志查看:
注意: 注釋不能寫在配置的后面,只能單獨一行寫。
5.Flume常用內置組件詳解
Flume支持眾多的source和sink類型,詳細手冊可參考官方文檔
Flume 1.9.0 User Guide — Apache Flume
6.Flume常用組件詳解:Source
6.1netcat?source
6.1.1工作機制:
啟動一個socket服務,監聽一個端口;
將端口上收到的數據,轉成event寫入channel;
6.1.2配置文件:
|
a1.sources = s1 a1.sources.s1.type = netcat a1.sources.s1.bind = 0.0.0.0 a1.sources.s1.port = 44444 a1.sources.s1.channels = c1 |
6.2
exec?source
6.2.1工作機制:
啟動一個用戶所指定的linux?shell命令;
采集這個linux shell命令的標準輸出,作為收集到的數據,轉為event寫入channel;
?6.2.2參數詳解:
|
channels |
– |
本source要發往的channel |
|
type |
– |
本source的類別名稱:exec |
|
command |
– |
本source所要運行的linux命令,比如: tail?-F /path/file |
|
shell |
– |
指定運行上述命令所用shell |
|
restartThrottle |
10000 |
命令die了以后,重啟的時間間隔 |
|
restart |
false |
命令die了以后,是否要重啟 |
|
logStdErr |
false |
是否收集命令的錯誤輸出stderr |
|
batchSize |
20 |
提交的event批次大小 |
|
batchTimeout |
3000 |
發往下游沒完成前,等待的時間 |
|
selector.type |
replicating |
指定channel選擇器:replicating or multiplexing |
|
selector.* |
選擇器的具體參數 |
|
|
interceptors |
– |
指定攔截器 |
|
interceptors.* |
?指定的攔截器的具體參數 |
6.2.3配置文件:
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
啟動測試:
1.準備一個日志文件
2.寫一個腳本模擬往日志文件中持續寫入數據
|
for i in {1..10000}; do echo ${i}--------------------------- >> access.log ; sleep 0.5; done |
3.創建一個flume自定義配置文件
4.啟動flume采集
注意:通過人為破壞測試,發現這個exec?source,不會記錄宕機前所采集數據的偏移量位置,重啟后可能會造成數據丟失!
6.3
spooldir?source
6.3.1工作機制:
監視一個指定的文件夾,如果文件夾下有沒采集過的新文件,則將這些新文件中的數據采集,并轉成event寫入channel;
注意:spooling目錄中的文件必須是不可變的,而且是不能重名的!否則,source會loudly?fail!
6.3.2參數詳解:
|
Property Name |
Default |
Description |
|
channels |
– |
|
|
type |
– |
The component type name, needs to be?spooldir. |
|
spoolDir |
– |
The directory from which to read files from. |
|
fileSuffix |
.COMPLETED |
采集完成的文件,添加什么后綴名 |
|
deletePolicy |
never |
是否刪除采完的文件:?never?or?immediate |
|
fileHeader |
false |
是否將所采集文件的絕對路徑添加到header中 |
|
fileHeaderKey |
file |
上述header的key名稱 |
|
basenameHeader |
false |
是否將文件名添加到header |
|
basenameHeaderKey |
basename |
上述header的key名稱 |
|
includePattern |
^.*$ |
指定需要采集的文件名的正則表達式 |
|
ignorePattern |
^$ |
指定要排除的文件名的正則表達式 如果一個文件名即符合includePattern又匹配ignorePattern,則該文件不采 |
|
trackerDir |
.flumespool |
記錄元數據的目錄所在路徑,可以用絕對路徑也可以用相對路徑(相對于采集目錄) |
|
trackingPolicy |
rename |
采集進度跟蹤策略,有兩種:?“rename”和?“tracker_dir”. 本參數只在deletePolicy=never時才生效 ?“rename”- 采完的文件根據filesuffix重命名 ?“tracker_dir” - 采完的文件會在trackerDir目錄中生成一個同名的空文件 |
|
consumeOrder |
oldest |
采集順序:?oldest,?youngest?and?random. oldest和youngest情況下,可能會帶來一定效率的損失;(需要對文件夾中所有文件進行一次掃描以尋找最old或最young的) |
|
pollDelay |
500 |
Delay (in milliseconds) used when polling for new files. |
|
recursiveDirectorySearch |
false |
Whether to monitor sub directories for new files to read. |
|
maxBackoff |
4000 |
The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. |
|
batchSize |
100 |
一次傳輸到channel的event條數(一批) |
|
inputCharset |
UTF-8 |
Character set used by deserializers that treat the input file as text. |
|
decodeErrorPolicy |
FAIL |
What to do when we see a non-decodable character in the input file.?FAIL: Throw an exception and fail to parse the file.?REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD.?IGNORE: Drop the unparseable character sequence. |
|
deserializer |
LINE |
Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder. |
|
deserializer.* |
Varies per event deserializer. |
|
|
bufferMaxLines |
– |
(Obselete) This option is now ignored. |
|
bufferMaxLineLength |
5000 |
(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. |
|
selector.type |
replicating |
replicating or multiplexing |
|
selector.* |
Depends on the selector.type value |
|
|
interceptors |
– |
Space-separated list of interceptors |
|
interceptors.* |
6.3.3配置文件:
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = spooldir a1.sources.s1.spoolDir = /root/weblog a1.sources.s1.batchSize = 200 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
6.3.4啟動測試:
|
bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console |
注意:spooldir?source?與exec?source不同,spooldir?source本身是可靠的!會記錄崩潰之前的采集位置!
6.4 avro?source
Avro?source 是通過監聽一個網絡端口來接受數據,而且接受的數據必須是使用avro序列化框架序列化后的數據;
Avro是一種序列化框架,跨語言的;
擴展:什么是序列化,什么是序列化框架?
序列化: 是將一個有復雜結構的數據塊(對象)變成扁平的(線性的)二進制序列
序列化框架: 一套現成的軟件,可以按照既定策略,將對象轉成二進制序列
比如: jdk就有: ObjectOutputStream
???????hadoop就有: Writable
???????跨平臺的序列化框架: avro
6.4.1工作機制
啟動一個網絡服務,監聽一個端口,收集端口上收到的avro序列化數據流!
該source中擁有avro的反序列化器,能夠將收到的二進制流進行正確反序列化,并裝入一個event寫入channel!??????
6.4.2參數詳解
|
Property Name |
Default |
Description |
|
channels |
– |
|
|
type |
– |
本source的別名:?avro |
|
bind |
– |
要綁定的地址 |
|
port |
– |
要綁定的端口號 |
|
threads |
– |
服務的最大線程數 |
|
selector.type |
||
|
selector.* |
||
|
interceptors |
– |
Space-separated list of interceptors |
|
interceptors.* |
||
|
compression-type |
none |
壓縮類型:跟發過來的數據是否壓縮要匹配:none | deflate |
|
ssl |
false |
Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see?SSL/TLS support?section). |
|
keystore |
– |
This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error). |
|
keystore-password |
– |
The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error). |
|
keystore-type |
JKS |
The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS). |
|
exclude-protocols |
SSLv3 |
Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
|
include-protocols |
– |
Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols. |
|
exclude-cipher-suites |
– |
Space-separated list of cipher suites to exclude. |
|
include-cipher-suites |
– |
Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites. |
|
ipFilter |
false |
Set this to true to enable ipFiltering for netty |
|
ipFilterRules |
– |
Define N netty ipFilter pattern rules with this config. |
6.4.3配置文件
|
a1.sources = r1 a1.sources.r1.type = avro a1.sources.r1.channels = c1 a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4141 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
6.4.4啟動測試
啟動agent:
|
bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,consol |
用一個客戶端去給啟動好的source發送avro序列化數據:
|
bin/flume-ng avro-client --host c703 --port 4141 |
6.4.5啟動測試利用avro?source和avro sink實現agent級聯
6.4.5.1需求說明
6.4.5.2配置文件
- 上游配置文件
vi ?exec-m-avro.conf
|
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /tmp/logs/access.log a1.sources.r1.batchSize = 100 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.trasactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = h3 a1.sinks.k1.port = 4455 |
- 下游配置文件
vi ?avro-m-log.conf
|
a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 4455 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.trasactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sinks.k1.type = logger |
6.4.5.3啟動測試
- 先啟動下游:
bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console
- 再啟動上游:
bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf
- 然后寫一個腳本在h1上模擬生成數據
|
while true do echo "hello " ?>> /tmp/logs/access.log sleep 0.1 done |
6.5 kafka source
6.5.1???????工作機制
Kafka source的工作機制:就是用kafka?consumer連接kafka,讀取數據,然后轉換成event,寫入channel
6.5.2??????????????參數詳解
|
Property Name |
Default |
Description |
|
channels |
– |
?數據發往的channel |
|
type |
– |
本source的名稱: org.apache.flume.source.kafka.KafkaSource |
|
kafka.bootstrap.servers |
– |
Kafka?broker服務器列表,逗號分隔 |
|
kafka.consumer.group.id |
flume |
Kafka消費者組id |
|
kafka.topics |
– |
Kafka消息主題列表,逗號隔開 |
|
kafka.topics.regex |
– |
用正則表達式來指定一批topic;本參數的優先級高于kafka.topics |
|
batchSize |
1000 |
寫入channel的event?批,最大消息條數 |
|
batchDurationMillis |
1000 |
批次寫入channel的最大時長 |
|
backoffSleepIncrement |
1000 |
Kafka?Topic?顯示為空時觸發的初始和增量等待時間。 |
|
maxBackoffSleep |
5000 |
Kafka?Topic?顯示為空時觸發的最長等待時間 |
|
useFlumeEventFormat |
false |
默認情況下,event 將從Kafka Topic 直接作為字節直接進入event 主體。設置為true以讀取event 作為Flume Avro二進制格式。與Kafka Sink上的相同屬性或Kafka Channel上的parseAsFlumeEvent屬性一起使用時,這將保留在生成端發送的任何Flume標頭。 |
|
setTopicHeader |
true |
是否要往header中加入一個kv:topic信息 |
|
topicHeader |
topic |
應上面開關的需求,加入kv:topic?=>topic名稱 |
|
kafka.consumer.security.protocol |
PLAINTEXT |
Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup. |
|
more consumer security props |
If using SASL_PLAINTEXT, SASL_SSL or SSL refer to?Kafka security?for additional properties that need to be set on consumer. |
|
|
Other Kafka Consumer Properties |
– |
本source,允許直接配置任意的kafka消費者參數,格式如下: For example:?kafka.consumer.auto.offset.reset (就是在消費者參數前加統一前綴:?kafka.consumer.) |
??????????????6.5.3配置文件
|
a1.sources = s1 a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.s1.channels = c1 a1.sources.s1.batchSize = 100 a1.sources.s1.batchDurationMillis = 2000 a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092 a1.sources.s1.kafka.topics = TAOGE a1.sources.s1.kafka.consumer.group.id = g1 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
??????????????6.5.4啟動測試
1. 首先,操作kafka,準備好topic
#?查看當前kafka集群中的topic:
bin/kafka-topics.sh ?--list --zookeeper c701:2181
#?創建一個新的topic
bin/kafka-topics.sh ?--create --topic TAOGE --partitions 3 --replication-factor 2 --zookeeper c701:2181
#?查看topic的詳細信息
bin/kafka-topics.sh --describe --topic TAOGE --zookeeper c701:2181
#?控制臺生產者,向topic中寫入數據
bin/kafka-console-producer.sh --broker-list c701:9092,c702:9092,c703:9092 --topic TAOGE
2. 啟動flume?agent來采集kafka中的數據
bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf ?-Dflume.root.logger=INFO,console
注意:
Source往channel中寫入數據的批次大小 ?<= ?channel的事務控制容量大小
6.6?taildir?source
???????6.6.1?工作機制
監視指定目錄下的一批文件,只要某個文件中有新寫入的行,則會被tail到
它會記錄每一個文件所tail到的位置,記錄到一個指定的positionfile保存目錄中,格式為json(如果需要的時候,可以人為修改,就可以讓source從任意指定的位置開始讀取數據)
所以,這個source真的像官網所吹的,是可靠的reliable!
它對采集完成的文件,不會做任何修改(比如重命名,刪除…..)
taildir source會把讀到的數據成功寫入channel后,再更新記錄偏移量
這種機制,能保證數據不會漏采(丟失),但是有可能會產生數據重復!
?????????????????????6.6.2?參數詳解
|
Property Name |
Default |
Description |
|
channels |
– |
所要寫往的channel |
|
type |
– |
本source的別名:?TAILDIR. |
|
filegroups |
– |
空格分割的組名,每一組代表著一批文件 g1 g2 |
|
filegroups.<filegroupName> |
– |
每個文件組的絕路路徑,文件名可用正則表達式 |
|
positionFile |
~/.flume/taildir_position.json |
記錄偏移量位置的文件所在路徑 |
|
headers.<filegroupName>.<headerKey> |
– |
Header value which is the set with header key. Multiple headers can be specified for one file group. |
|
byteOffsetHeader |
false |
Whether to add the byte offset of a tailed line to a header called ‘byteoffset’. |
|
skipToEnd |
false |
Whether to skip the position to EOF in the case of files not written on the position file. |
|
idleTimeout |
120000 |
關閉非活動文件的時延。如果被關閉的這個文件又在某個時間有了新增行,會被此source檢測到,并重新打開 |
|
writePosInterval |
3000 |
3s 記錄一次偏移量到positionfile |
|
batchSize |
100 |
提交event到channel的批次最大條數 |
|
maxBatchCount |
Long.MAX_VALUE |
控制在一個文件上連續讀取的最大批次個數(如果某個文件正在被高速寫入,那就應該讓這個參數調為最大值,以讓source可以集中精力專采這個文件) |
|
backoffSleepIncrement |
1000 |
The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. |
|
maxBackoffSleep |
5000 |
The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. |
|
cachePatternMatching |
true |
Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. |
|
fileHeader |
false |
Whether to add a header storing the absolute path filename. |
|
fileHeaderKey |
file |
Header key to use when appending absolute path filename to event header. |
??????????????6.6.3配置文件
|
a1.sources = r1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/flumedata/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/weblog/access.log a1.sources.r1.fileHeader = true a1.sources.ri.maxBatchCount = 1000 a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
????????????????????????????6.6.4啟動測試
|
bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console |
經過人為破壞測試,發現, this source還是真正挺reliable的!
不會丟失數據,但在極端情況下可能會產生重復數據!
7Flume常用組件詳解:Interceptor攔截器
攔截器是什么?
就是工作在source之后,它可以從source獲得event,做一個邏輯處理,然后再返回處理之后的event
這樣一來,就可以讓用戶不需要改動source代碼的情況下,就可以插入一些數據處理邏輯;
Flume supports chaining of interceptors.
閱讀源碼,獲取的知識:
攔截器的調用順序:
SourceRunner
ExecSource
ChannelProcessor
SourceRunner -》 source 的start( )方法 --》讀到一批數據,調channelProcessor.processEventBatch(events) --> 調攔截器進行攔截處理 ?--> 調選擇器selector獲取要發送的channle --> 提交數據
7.1timestamp?攔截器
???????7.1.1作用
向event中,寫入一個kv到header里
k名稱可配置;v就是當前的時間戳(毫秒)
???????7.1.2參數
|
Property Name |
Default |
Description |
|
type |
– |
本攔截器的名稱:timestamp |
|
headerName |
timestamp |
要插入header的key名 |
|
preserveExisting |
false |
如果header中已存在同名key,是否要覆蓋 |
??????????????7.1.3配置示例
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.interceptors = i1 a1.sources.s1.interceptors.i1.type = timestamp a1.sources.s1.interceptors.i1.preserveExisting = false a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
??????????????7.1.4??????????????測試
|
2019-06-09 10:24:21,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1560047061012} body: 31 30 32 34 34 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 10244........... } |
7.2static攔截器
???????7.2.1作用
讓用戶往event中添加一個自定義的header??key-value,當然,這個key-value是在配置文件中配死的;
?????????????????????7.2.2參數
|
Property Name |
Default |
Description |
|
type |
– |
別名:?static |
|
preserveExisting |
true |
是否覆蓋同名kv |
|
key |
key |
你要插入的key名 |
|
value |
value |
你要插入的value |
?????????????????????7.2.3配置示例
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.interceptors = i1 i2 i3 a1.sources.s1.interceptors.i1.type = timestamp a1.sources.s1.interceptors.i1.preserveExisting = false a1.sources.s1.interceptors.i2.type = host a1.sources.s1.interceptors.i2.preserveExisting = false a1.sources.s1.interceptors.i2.useIP = true a1.sources.r1.interceptors.i3.type = static a1.sources.r1.interceptors.i3.key = hero a1.sources.r1.interceptors.i3.value = TAOGE a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
?????????????????????7.2.4測試
7.3Host 攔截器
???????7.3.1作用
往event的header中插入主機名(ip)信息
?????????????????????7.3.2參數
|
Property Name |
Default |
Description |
|
type |
– |
本攔截器的別名:?host |
|
preserveExisting |
false |
是否覆蓋已存在的hader key-value |
|
useIP |
true |
插入ip還是主機名 |
|
hostHeader |
host |
要插入header的key名 |
7.3.3配置示例
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.interceptors = i1?i2 a1.sources.s1.interceptors.i1.type = timestamp a1.sources.s1.interceptors.i1.preserveExisting = false a1.sources.s1.interceptors.i2.type = host a1.sources.s1.interceptors.i2.preserveExisting = false a1.sources.s1.interceptors.i2.useIP = true a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
7.3.4測試
??????????????7.4?UUID 攔截器
7.4.1作用
生成uuid放入event的header中
??????????????7.4.2參數
|
Property Name |
Default |
Description |
|
type |
– |
全名:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
|
headerName |
id |
Key名稱 |
|
preserveExisting |
true |
是否覆蓋同名key |
|
prefix |
"" |
Uuid前的前綴 |
??????????????7.4.3配置
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.interceptors = i1 i2 i3 i4 a1.sources.s1.interceptors.i1.type = timestamp a1.sources.s1.interceptors.i1.preserveExisting = false a1.sources.s1.interceptors.i2.type = host a1.sources.s1.interceptors.i2.preserveExisting = false a1.sources.s1.interceptors.i2.useIP = true a1.sources.s1.interceptors.i3.type = static a1.sources.s1.interceptors.i3.key = hero a1.sources.s1.interceptors.i3.value = TAOGE a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder a1.sources.s1.interceptors.i4.headName = duanzong a1.sources.s1.interceptors.i4.prefix = ?666_ a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
????????????????????????????7.4.4測試
8?Flume常用組件詳解:channel
channel是agent中用來緩存event的repository(池,倉庫)
source往channel中添加event
sink從channel中取并移除event
channel跟事務控制有極大關系;
channel?有容量大小、可靠性級別、事務容量等特性;
8.1 memory channel
??????????????8.1.1特性
事件被存儲在實現配置好容量的內存(隊列)中。
速度快,但可靠性較低,有可能會丟失數據
????????????????????????????8.1.2參數
|
Property Name |
Default |
Description |
|
type |
– |
別名:?memory |
|
capacity |
100 |
能存儲的最大事件event數 |
|
transactionCapacity |
100 |
最大事務控制容量 |
|
keep-alive |
3 |
添加或移除event的超時時間 |
|
byteCapacityBufferPercentage |
20 |
除了body以外的字節所能占用的容量百分比 |
|
byteCapacity |
see description |
channel中最大的總byte數(只計算body) |
????????????????????????????8.1.3配置示例
|
a1.channels?=?c1 a1.channels.c1.type?=?memory a1.channels.c1.capacity?=?10000 a1.channels.c1.transactionCapacity?=?10000 a1.channels.c1.byteCapacityBufferPercentage?=?20 a1.channels.c1.byteCapacity?=?800000 |
????????????????????????????8.1.4測試
????????????????????????????8.1.5擴展了解
Memory channel源碼閱讀
|
// lock to guard queue, mainly needed to keep it locked down during resizes// it should never be held through a blocking operation private?Object queueLock = new?Object(); //queue為Memory Channel中存放Event的地方,這里用了LinkedBlockingDeque來實現 @GuardedBy(value = "queueLock")p rivate?LinkedBlockingDeque<Event> queue; //下面的兩個信號量用來做同步操作,queueRemaining表示queue中的剩余空間,queueStored表示queue中的使用空間 // invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted) // we maintain the remaining permits = queue.remaining - takeList.size()// this allows local threads waiting for space in the queue to commit without denying access to the // shared lock to threads that would make more space on the queue private?Semaphore queueRemaining; // used to make "reservations" to grab data from the queue. // by using this we can block for a while to get data without locking all other threads out // like we would if we tried to use a blocking call on queue private?Semaphore queueStored; //下面幾個變量為配置文件中Memory Channel的配置項 // 一個事務中Event的最大數目 private?volatile?Integer transCapacity; // 向queue中添加、移除Event的等待時間 private?volatile?int?keepAlive; // queue中,所有Event所能占用的最大空間 private?volatile?int?byteCapacity; private?volatile?int?lastByteCapacity; // queue中,所有Event的header所能占用的最大空間占byteCapacity的比例 private?volatile?int?byteCapacityBufferPercentage; // 用于標示byteCapacity中剩余空間的信號量 private?Semaphore bytesRemaining; // 用于記錄Memory Channel的一些指標,后面可以通過配置監控來觀察Flume的運行情況 private?ChannelCounter channelCounter; |
然后重點說下MemoryChannel里面的MemoryTransaction,它是Transaction類的子類,從其文檔來看,一個Transaction的使用模式都是類似的:
?Channel ch = ...Transaction tx = ch.getTransaction();try {tx.begin();...// ch.put(event) or ch.take()...tx.commit();} catch (ChannelException ex) {tx.rollback();...} finally {tx.close();}
可以看到一個Transaction主要有、put、take、commit、rollback這四個方法,我們在實現其子類時,主要也是實現著四個方法。
Flume官方為了方便開發者實現自己的Transaction,定義了BasicTransactionSemantics,這時開發者只需要繼承這個輔助類,并且實現其相應的、doPut、doTake、doCommit、doRollback方法即可,MemoryChannel就是繼承了這個輔助類。
private class MemoryTransaction extends BasicTransactionSemantics {//和MemoryChannel一樣,內部使用LinkedBlockingDeque來保存沒有commit的Eventprivate LinkedBlockingDeque<Event> takeList;private LinkedBlockingDeque<Event> putList;private final ChannelCounter channelCounter;//下面兩個變量用來表示put的Event的大小、take的Event的大小private int putByteCounter = 0;private int takeByteCounter = 0;public MemoryTransaction(int transCapacity, ChannelCounter counter) {//用transCapacity來初始化put、take的隊列putList = new LinkedBlockingDeque<Event>(transCapacity);takeList = new LinkedBlockingDeque<Event>(transCapacity);channelCounter = counter;}@Overrideprotected void doPut(Event event) throws InterruptedException {//doPut操作,先判斷putList中是否還有剩余空間,有則把Event插入到該隊列中,同時更新putByteCounter//沒有剩余空間的話,直接報ChannelExceptionchannelCounter.incrementEventPutAttemptCount();int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("Put queue for MemoryTransaction of capacity " +putList.size() + " full, consider committing more frequently, " +"increasing capacity or increasing thread count");}putByteCounter += eventByteSize;}@Overrideprotected Event doTake() throws InterruptedException {//doTake操作,首先判斷takeList中是否還有剩余空間channelCounter.incrementEventTakeAttemptCount();if(takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for MemoryTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count");}//然后判斷,該MemoryChannel中的queue中是否還有空間,這里通過信號量來判斷if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;//從MemoryChannel中的queue中取出一個eventsynchronized(queueLock) {event = queue.poll();}Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +"signalling existence of entry");//放到takeList中,然后更新takeByteCounter變量takeList.put(event);int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);takeByteCounter += eventByteSize;return event;}@Overrideprotected void doCommit() throws InterruptedException {//該對應一個事務的提交//首先判斷putList與takeList的相對大小int remainingChange = takeList.size() - putList.size();//如果takeList小,說明向該MemoryChannel放的數據比取的數據要多,所以需要判斷該MemoryChannel是否有空間來放if(remainingChange < 0) {// 1. 首先通過信號量來判斷是否還有剩余空間if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,TimeUnit.SECONDS)) {throw new ChannelException("Cannot commit transaction. Byte capacity " +"allocated to store event body " + byteCapacity * byteCapacitySlotSize +"reached. Please increase heap space/byte capacity allocated to " +"the channel as the sinks may not be keeping up with the sources");}// 2. 然后判斷,在給定的keepAlive時間內,能否獲取到充足的queue空間if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("Space for commit to queue couldn't be acquired." +" Sinks are likely not keeping up with sources, or the buffer size is too tight");}}int puts = putList.size();int takes = takeList.size();//如果上面的兩個判斷都過了,那么把putList中的Event放到該MemoryChannel中的queue中。synchronized(queueLock) {if(puts > 0 ) {while(!putList.isEmpty()) {if(!queue.offer(putList.removeFirst())) {throw new RuntimeException("Queue add failed, this shouldn't be able to happen");}}}//清空本次事務中用到的putList與takeList,釋放資源putList.clear();takeList.clear();}//更新控制queue大小的信號量bytesRemaining,因為把takeList清空了,所以直接把takeByteCounter加到bytesRemaining中。bytesRemaining.release(takeByteCounter);takeByteCounter = 0;putByteCounter = 0;//因為把putList中的Event放到了MemoryChannel中的queue,所以把puts加到queueStored中去。queueStored.release(puts);//如果takeList比putList大,說明該MemoryChannel中queue的數量應該是減少了,所以把(takeList-putList)的差值加到信號量queueRemainingif(remainingChange > 0) {queueRemaining.release(remainingChange);}if (puts > 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes > 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size());}@Overrideprotected void doRollback() {//當一個事務失敗時,會進行回滾,即調用本方法//首先把takeList中的Event放回到MemoryChannel中的queue中。int takes = takeList.size();synchronized(queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +"queue to rollback takes. This should never happen, please report");while(!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}//然后清空putListputList.clear();}//因為清空了putList,所以需要把putList所占用的空間大小添加到bytesRemaining中bytesRemaining.release(putByteCounter);putByteCounter = 0;takeByteCounter = 0;//因為把takeList中的Event回退到queue中去了,所以需要把takeList的大小添加到queueStored中queueStored.release(takes);channelCounter.setChannelSize(queue.size());}}
MemoryChannel的邏輯相對簡單,主要是通過MemoryTransaction中的putList、takeList與MemoryChannel中的queue打交道,這里的queue相當于持久化層,只不過放到了內存中,如果是FileChannel的話,會把這個queue放到本地文件中。下面表示了Event在一個使用了MemoryChannel的agent中數據流向:
source ---> putList ---> queue ---> takeList ---> sink
還需要注意的一點是,這里的事務可以嵌套使用,如下圖:
?當有兩個agent級連時,sink的事務中包含了一個source的事務,這也應證了前面所說的:
在任何時刻,Event至少在一個Channel中是完整有效的
?????????????????????????8.2 file channel
???????8.2.1特性
event被緩存在本地磁盤文件中
可靠性高,不會丟失
但在極端情況下可能會重復數據
?????????????????????8.2.3參數
|
Property Name Default |
Description |
? |
|
type |
– |
別名:?file. |
|
checkpointDir |
~/.flume/file-channel/checkpoint |
Checkpoint信息保存目錄 |
|
useDualCheckpoints |
false |
Checkpoint是否雙重checkpoint機制 |
|
backupCheckpointDir |
– |
備份checkpoint的保存目錄 |
|
dataDirs |
~/.flume/file-channel/data |
Event數據緩存目錄 |
|
transactionCapacity |
10000 |
事務管理容量 |
|
checkpointInterval |
30000 |
記錄checkpoint信息的時間間隔 |
|
maxFileSize |
2146435071 |
控制一個數據文件的大小規格 |
|
minimumRequiredSpace |
524288000 |
所需的最低磁盤空間,低于則停止接收新數據 |
|
capacity |
1000000 |
最大event緩存數 |
|
keep-alive |
3 |
等待添加數據的最大時間 |
?????????????????????8.2.4配置示例
|
a1.sources = r1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/weblog/access.log a1.sources.r1.fileHeader = true a1.sources.ri.maxBatchCount = 1000 a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.checkpointDir = /root/flume_chkp a1.channels.c1.dataDirs = /root/flume_data a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
?????????????????????8.2.5??????????????測試
在使用taildir?source??和 ?file?channel的情況下,經過反復各種人為破壞,發現,沒有數據丟失的現象發生;
但是,如果時間點掐的比較好(sink 取了一批數據寫出,但還沒來得及向channel提交事務),會產生數據重復的現象!
?????????????????????8.3kafka channel
???????????????????????????????????8.3.1特性
agent利用kafka作為channel數據緩存
kafka channel要跟 kafka?source、 kafka?sink區別開來
kafka?channel在應用時,可以沒有source?|??或者可以沒有sink
如果是需要把kafka作為最終采集存儲,那么就只要 ?source?+?kafka?channel
如果是把kafka作為數據源,要將kafka中的數據寫往hdfs,那么就只要 kafka?channel?+?hdfs?sink
?????????????????????????????????????????????????8.3.2參數
|
Property Name |
Default |
Description |
|
type |
– |
名字:?org.apache.flume.channel.kafka.KafkaChannel |
|
kafka.bootstrap.servers |
– |
Kafka服務器地址 |
|
kafka.topic |
flume-channel |
所使用的topic |
|
kafka.consumer.group.id |
flume |
消費者組id |
|
parseAsFlumeEvent |
true |
跟上、下游匹配,是否需要將數據解析為Flume的Event格式 |
|
pollTimeout |
500 |
從kafka取數據的超時時間 |
|
defaultPartitionId |
– |
默認指派的partitionid |
|
partitionIdHeader |
– |
將一個event指派到某個分區時所使用的header 的key |
|
kafka.consumer.auto.offset.reset |
latest |
初始化讀取偏移量的策略 |
???????????????????????????????????????????????????????????????8.3.3配置測試
配置示例1:用exec?soure讀文件,往kafka?channel中寫
不帶sink!自己用消費者去kafka集群中讀取采集到的數據!
|
a1.sources = r1 # 配置兩個channel,為了便于觀察 # c1 是kafkachannel ,c2是一個內存channel a1.channels = c1 c2 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/a.log a1.sources.r1.channels = c1 c2 # kafka-channel具體配置,該channel沒有sink a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092 a1.channels.c1.parseAsFlumeEvent = false # 內存channel 配置,并對接一個logger sink來觀察 a1.channels.c2.type = memory a1.sinks.k1.type = logger a1.sinks.k1.channel = c2 |
運行測試:
1.準備好a.log文件
2.啟動好kafka
3.啟動agent
4.往a.log寫入數據
5.用kafka的控制臺消費者消費主題,看是否拿到數據
配置示例2:用logger?sink,從kafka?channel中取數據
|
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/a.log a1.sources.r1.channels = c1 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092 a1.channels.c1.parseAsFlumeEvent = false a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
運行測試:
2.啟動好kafka
3.啟動flume agent
4.用kafka的控制臺生產者向topic中寫入數據
5.在flume?agent的控制臺上觀察是否取到數據
9 Flume常用組件詳解:sink
sink是從channel中獲取、移除數據,并輸出到下游(可能是下一級agent,也可能是最終目標存儲系統)
9.1hdfs sink
???????9.1.1特性
數據被最終發往hdfs
可以生成text文件或 sequence?文件,而且支持壓縮;
支持生成文件的周期性roll機制:基于文件size,或者時間間隔,或者event數量;
目標路徑,可以使用動態通配符替換,比如用%D代表當前日期;
當然,它也能從event的header中,取到一些標記來作為通配符替換;
header:{type=acb}
/weblog/%{type}/%D/ ?就會被替換成: /weblog/abc/19-06-09/
????????????????????????????9.1.2參數
|
Name |
Default |
Description |
|
channel |
– |
從哪個channel取數據 |
|
type |
– |
別名:?hdfs |
|
hdfs.path |
– |
目標hdfs存儲路徑(URI) |
|
hdfs.filePrefix |
FlumeData |
指定生成的文件名前綴 |
|
hdfs.fileSuffix |
– |
后綴 |
|
hdfs.inUsePrefix |
– |
正在寫入的文件的前綴標識 |
|
hdfs.inUseSuffix |
.tmp |
正在寫入的文件的后綴標識 |
|
hdfs.rollInterval |
30 |
切換文件的條件:間隔時間;為0則不生效 |
|
hdfs.rollSize |
134217728 |
切換文件的條件:文件大小;為0則不生效 |
|
hdfs.rollCount |
10 |
切換文件的條件:event條數;為0則不生效 |
|
hdfs.idleTimeout |
0 |
不活躍文件的關閉超時時長;0則不自動關閉 |
|
hdfs.batchSize |
100 |
從channel中取一批數據的最大大小; |
|
hdfs.codeC |
– |
壓縮編碼: gzip, bzip2, lzo, lzop, snappy |
|
hdfs.fileType |
SequenceFile |
目標文件格式: SequenceFile,?DataStream?or?CompressedStream? 注意:DataStream?不能支持壓縮 CompressedStream?必須設置壓縮編碼 SequenceFile 可壓縮可不壓縮 |
|
hdfs.maxOpenFiles |
5000 |
允許同時最多打開的文件數;如果超出,則會關閉最早打開的 |
|
hdfs.minBlockReplicas |
– |
目標文件的block副本數 |
|
hdfs.writeFormat |
Writable |
指定sequence?file中的對象類型;支持Text和Writable 同時請使用Text,否則后續數據處理平臺可能無法解析 |
|
hdfs.threadsPoolSize |
10 |
操作HDFS時的線程池大小 |
|
hdfs.rollTimerPoolSize |
1 |
檢查文件是否需要被roll的線程數 |
|
hdfs.kerberosPrincipal |
– |
Kerberos user principal for accessing secure HDFS |
|
hdfs.kerberosKeytab |
– |
Kerberos keytab for accessing secure HDFS |
|
hdfs.proxyUser |
||
|
hdfs.round |
false |
目錄通配符切換是是否需要切掉尾數 |
|
hdfs.roundValue |
10 |
時間尾數切掉多少 |
|
hdfs.roundUnit |
minute |
時間尾數切掉大小的單位-?second,?minute?or?hour. |
|
hdfs.timeZone |
Local Time |
時間通配符所使用的時區 |
|
hdfs.useLocalTimeStamp |
false |
所用的時間是否要從agent?sink本地獲取 |
|
hdfs.closeTries |
0 |
重命名已完成文件的重試次數;0則一直嘗試重命名 |
|
hdfs.retryInterval |
180 |
關閉一個文件的重試時間間隔 |
|
serializer |
TEXT |
將channel中的event?body解析成什么格式:Text|?avro_event ; 也可以使用自定義的序列化器 |
|
serializer.* |
|
小提示:什么叫做URI |
????????????????????????????9.1.3配置示例
|
## 定義 a1.sources = r1 a1.sinks = k1 a1.channels = c1 ## source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /root/logs/a.log a1.sources.r1.channels = c1 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp ## channel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000000 ## sink a1.sinks.k1.channel = c1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://h1:8020/doitedu/%Y-%m-%d/%H-%M a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.filePrefix = doit_ a1.sinks.k1.hdfs.fileSuffix = .log.gz a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 102400 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip a1.sinks.k1.hdfs.writeFormat = Text |
??????????????9.1.4測試
1. 啟動hdfs
2. 清除以前的taildirsource產生的偏移量記錄文件、filechannel緩存的數據目錄和checkpoint目錄
3. 啟動agent
4. 用for循環腳本往日志文件中不斷寫入新的數據
5. 到hdfs中觀察結果
???????9.2?kafka sink
有了kafka?channel后, ?kafka?sink的必要性就降低了。因為我們可以用kafka作為channel來接收source產生的數據!
9.2.1特性
9.2.2參數
|
Property Name |
Default |
Description |
|
type |
– |
名稱:?org.apache.flume.sink.kafka.KafkaSink |
|
kafka.bootstrap.servers |
– |
Kafka服務器列表 |
|
kafka.topic |
default-flume-topic |
Kafka的topic |
|
flumeBatchSize |
100 |
從channel中取event的批次大小 |
|
kafka.producer.acks |
1 |
Kafka生產者消息推送應答級別: 1?: Leader接收到即回應 0 :不等回應? -1:副本同步完成,再回應 |
|
useFlumeEventFormat |
false |
是否使用avro序列化 |
|
defaultPartitionId |
– |
指定所有event默認發往的分區id;不指定則按kafka生產者的分區器,均勻分發 |
|
partitionIdHeader |
– |
通過在header中指定分區id,來約束這個event發往的分區 |
|
allowTopicOverride |
true |
允許在event的header中指定要寫入的topic |
|
topicHeader |
topic |
如果上一條開關開啟,則在header中放入的key的名稱 |
|
Other Kafka Producer Properties |
– |
可以用kafka.producer.xxx來配置任何kafka?producer的參數 |
??????????????9.2.3配置示例
|
a1.sources = r1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/weblog/access.log a1.sources.r1.fileHeader = true a1.sources.r1.maxBatchCount = 1000 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = timestamp a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.checkpointDir = /root/flume_chkp a1.channels.c1.dataDirs = /root/flume_data a1.sinks = k1 a1.sinks.k1.channel = c1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy |
??????????????9.2.4測試
1. 清掉 taildir的偏移量記錄文件;清掉filechannel的數據緩存和checkpoint記錄;
2. 準備一個日志文件,并不斷寫入數據
3. 啟動flume的agent
4. 用kafka的客戶端去觀察數據結果
9.3 avro?sink
9.3.1 特性
avro?sink用來向avro?source發送avro序列化數據,這樣就可以實現agent之間的級聯
??????????????9.3.2參數
|
Property Name |
Default |
Description |
|
channel |
– |
|
|
type |
– |
The component type name, needs to be?avro. |
|
hostname |
– |
目標avro source的主機 |
|
port |
– |
目標avro source的綁定端口 |
|
batch-size |
100 |
number of event to batch together for send. |
|
connect-timeout |
20000 |
連接超時時間 |
|
request-timeout |
20000 |
請求超時時間 |
|
reset-connection-interval |
none |
Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent. |
|
compression-type |
none |
This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource |
|
compression-level |
6 |
The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression |
|
ssl |
false |
Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”. |
|
trust-all-certs |
false |
If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection. |
|
truststore |
– |
The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used. |
|
truststore-password |
– |
The password for the truststore. If not specified, then the global keystore password will be used (if defined). |
|
truststore-type |
JKS |
The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS). |
|
exclude-protocols |
SSLv3 |
Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified. |
|
maxIoWorkers |
2 * the number of available processors in the machine |
The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory. |
????????????????????????????9.3.3配置示例
級聯配置,需要至少兩個flume?agent來演示
在C703上,配置avro?sink?發送者
|
## c703 ## a1.sources = s1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.channels = c1 a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = c701 a1.sinks.k1.port = 4545 |
在C701上,配置avro?source?接收者
|
## c701 ## a1.sources = s1 a1.sources.s1.type = avro a1.sources.s1.hostname = 0.0.0.0 a1.sources.s1.port = 4545 a1.sources.s1.channel = c1 a1.channels = c1 a1.channels.c1.type = memory a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
????????????????????????????9.3.4啟動測試
先在C701上啟動接受者avro?source(服務)
bin/flume-ng agent -n a1 -c conf/ -f myconf/avro-mem-logger.conf -Dflume.root.logger=INFO,console
再在C703上啟動發送者avro?sink(客戶端)
bin/flume-ng agent -n a1 -c conf/ -f myconf/tail-mem-avro.conf -Dflume.root.logger=INFO,console
10 Flume常用組件詳解:Selector
一個source可以對接多個channel
那么,source的數據如何在多個channel之間傳遞,就由selector來控制
配置應該掛載到source組件上
???????10.1實踐一:replicating selector(復制選擇器)
replicating selector就是默認的選擇器
官網配置參考
??????????????????????????10.1.1目標場景
selector將event復制,分發給所有下游節點
??????????????????????????????????????????10.1???????.2Flume agent配置
# Name the components on this agent ?a1.sources = r1 ?a1.sinks = k1 k2 ?a1.channels = c1 c2 ?# http source, with replicating selectora1.sources.r1.type = httpa1.sources.r1.port = 6666a1.sources.r1.bind = mastera1.sources.r1.selector.type = replicating ?# Describe the sink ?a1.sinks.k1.type = avro ?a1.sinks.k1.hostname = slave1 ?# bind to remote host,RPCa1.sinks.k1.port = 6666a1.sinks.k2.type = avro# bind to remote host,PRCa1.sinks.k2.hostname = slave2a1.sinks.k2.port = 6666# 2 channels in selector testa1.channels.c1.type = memory ?a1.channels.c1.capacity = 1000 ?a1.channels.c1.transactionCapacity = 100 ?a1.channels.c2.type = memory ?a1.channels.c2.capacity = 1000 ?a1.channels.c2.transactionCapacity = 100 ?# bind source ,sink to channelsa1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1 ?a1.sinks.k2.channel = c2
??????????????10.1???????.3?Collector1 配置
# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave1a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
???????10.1.4?Collector2 配置
# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave2a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
???????10.1.5?測試驗證
???????10.2?實踐二:multiplexing selector(多路選擇器)
multiplexing selector可以根據event中的一個指定key的value來決定這條消息會寫入哪個channel,具體在選擇時,需要配置一個映射關系,比如
a1.sources.r1.selector.mapping.CZ=c1 ?; 就意味著header中的value為CZ的話,這條消息就會被寫入c1這個channel
multiplexing selector官方配置參考
???????10.2.1目標場景
????????10.2.2?第一級1 / 2配置
a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = execa1.sources.r1.command = tail -F /root/logs/a.loga1.sources.r1.channels = c1a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = flag# 第一臺value=1,另一臺value=2a1.sources.r1.interceptors.i1.value = 1a1.channels.c1.type = memorya1.sinks.k1.type = avroa1.sinks.k1.hostname = h3a1.sinks.k1.port = 44444a1.sinks.k1.channel = c1
??????????????????????10.2.3?第二級配置
a1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2# source配置a1.sources.r1.channels = c1 c2a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444# source的選擇器配置a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = flaga1.sources.r1.selector.default = c2a1.sources.r1.selector.mapping.1 = c1a1.sources.r1.selector.mapping.2 = c2# channle 配置a1.channels.c1 = memorya1.channels.c2 = memory# 兩個sink分別對接兩個channel的配置a1.sinks.k1.type = loggera1.sinks.k1.channel = c1a1.sinks.k2.type = loggera1.sinks.k2.channel = c2
????????????????????????????????????10.2.4?測試驗證
11 Flume常用組件詳解:grouping processor
一個agent中,多個sink可以被組裝到一個組,而數據在組內多個sink之間發送,有兩種模式:
模式1: Failover Sink Processor??失敗切換
一組中只有優先級高的那個sink在工作,另一個是等待中
如果高優先級的sink發送數據失敗,則專用低優先級的sink去工作!并且,在配置時間penalty之后,還會嘗試用高優先級的去發送數據!
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
## 對兩個sink分配不同的優先級
a1.sinkgroups.g1.processor.priority.k1 = 200
a1.sinkgroups.g1.processor.priority.k2 = 100
## 主sink失敗后,停用懲罰時間
a1.sinkgroups.g1.processor.maxpenalty = 5000
模式2: Load balancing Sink Processor ?負載均衡
允許channel中的數據在一組sink中的多個sink之間進行輪轉,策略有:
round-robin(輪著發)
random(隨機挑)
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
12Flume自定義擴展組件
12.1自定義Source
??????????????12.1.1需求場景
什么情況下需要自定義source:
一般是某種數據源,用flume內置的source組件無法解析,比如XML文檔
而本教程中的例子:實現文本日志的采集,并能記住偏移量!
????????????????????????????12.1.2實現思路
首先,找到自定義source所要實現或繼承的父類/接口
然后,重寫方法(插入自己的需求邏輯)
然后,將代碼打成jar包,傳入flume的lib目錄
最后,寫配置文件調用自定義的source
??????????????12.1.3代碼架構
source是由 SourceRunner—》EventDrivenSourceRunner來調用
|
sourceRunner 拿到?source實例對象 然后調?source的start方法 source讀數據,然后將數據轉成event 然后將event傳給channelprocessor.processEvent(event) processEvent中第一個動作就是調用攔截器攔截這個event,然后再往channel中寫入 |
?????12.1.4?具體實現
- 線程池實現版:
|
package cn.doitedu.flume.custom; import org.apache.commons.io.FileUtils; import java.io.*; /** ????private String positionfilepath; ????private ExecutorService exec; ????/** ????????super.start(); ????????// 獲取歷史偏移量 ????????// 構造一個線程池 ????} ????/** ???????try{ ????/** ????????// 這是我們source用來記錄偏移量的文件路徑 ????????// 這是我們source要采集的日志文件的路徑 ????????// 這是用戶配置的采集事務批次最大值 ????????// 如果日志文件路徑沒有指定,則拋異常 ????} ????/** ????????long offset; ????????public HoldOffsetRunnable(long offset, String logfilepath, ChannelProcessor channelProcessor, int batchsize, String positionfilepath) { ????????public void run() { ????????????try { ????????????????// 循環讀數據 ????????????????// 記錄上一批提交的時間 ????????????????????// 將數據轉成event ????????????????????// 判斷批次大小是否滿 或者 時間到了沒有 ????????????????????????// 記錄提交時間 ????????????????????????// 記錄偏移量 ????????????????????????// 清空本批event ????????????????????} ????????????????????// 不滿足,繼續讀 ????????// 判斷是否批次間隔超時 |
- 單線程實現版
12.1.5 啟動測試
代碼打成jar包,上傳flume的lib
然后寫agent配置文件
|
a1.sources = s1 a1.channels = c1 a1.sinks = k1 # source 配置 a1.sources.s1.type = cn.doitedu.flume.custom.HoldOffesetSource a1.sources.s1.position_file_path= /root/myposition a1.sources.s1.data_file_path = /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.batchTime = 5000 a1.sources.s1.channels = c1 a1.channels.c1.type = memory a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 |
然后啟動flume-agent即可
12.2?自定義攔截器
12.2.1需求場景
公司的點擊流日志數據在一個目錄中不斷生成: /var/log/click_streaming.log
日志文件會隨著文件的大小達到一定閾值(128M)而被重命名,比如:click_streaming.log.1,并且會生成一個新的click_streaming.log文件繼續寫入日志流;
日志文件中數據格式如下:
13888776677,click,/baoming,張飛,湖北省
13889976655,click,/xuexi,關羽,山西省
……..
現在需要用flume去日志服務器上采集數據寫入HDFS,并且要求,對數據中的手機號、姓名字段進行加密(MD5加密,并將加密結果變成BASE64編碼);
??????????????12.2.2實現思路
采集的目標目錄中,會不斷生成新文件
那么,我們的source組件可以選擇 ?taildir:
1.可以監控到新文件 ?
2.可以記錄采集的偏移量
channel,為了保證可靠性,可以選擇 ?filechannel?:
1.會在磁盤上緩存event??
2.會在磁盤上記錄事務狀態
目標存儲是HDFS,sink自然是選擇hdfs?sink;
加密需求的解決:
如果從source上解決,那只能修改 taildir組件的源碼;
如果從sink上解決,那只能修改hdfs?sink組件的源碼;
上述兩種,都需要修改源碼,不是最佳選擇!
最佳選擇:通過攔截器來實現對數據的加工!而flume中沒有現成的內置攔截器可以實現字段加密,我們可以自定義自己的攔截器;
??????????????12.2.3?自定義攔截器的開發
????????????????????????????12.2.3.1基本套路
框架中,自定義擴展接口的套路:
1. 要實現或者繼承框架中提供的接口或父類,實現、重寫其中的方法
2. 寫好的代碼要打成jar包,并放入flume的lib目錄
3. 要將自定義的類,寫入相關agent配置文件
??????????????????????????????????????????12.2.3.2攔截器設計
應對本場景使用自定義攔截器,還要考慮幾個參數的問題:
用戶要加密的字段,可能會變化,代碼的可配置性需要匹配
1. 可以在配置文件中設計一個參數,來指定要加密的字段:??
indices (要加密的字段索引)
以及索引的切割符idxSplitBy
--》 比如: ?a1.sources.interceptors.i1.indices = 0:3
a1.sources.interceptors.i1.idxSplitBy = :
2.為了能夠正確切分數據中的字段,還需要一個參數:字段的分隔符dataSplitBy ?
--》 比如: a1.sources.interceptors.i1.dataSplitBy= ,
??????????????????????????????????????????12.2.3.3flume中的攔截器接口規范
首先,引入flume的開發依賴
<dependency>
????<groupId>org.apache.flume</groupId>
????<artifactId>flume-ng-core</artifactId>
????<version>1.9.0</version>
????<scope>provided</scope>
</dependency>
自定義攔截器的工作機制,其實很簡單:
flume先調自定義攔截器中的一個內部Builder類的config()方法進行參數配置
flume再調Builder類的build()方法獲取自定義攔截器的實例對象(可以在構造過程中傳遞參數)
flume再反復調用攔截器對象的intercept(List<Event> events)方法來修改event
????????????????????????????????????????????????????????12.2.3.4?代碼實現
|
package cn.doitedu.flume.custom; import org.apache.commons.codec.binary.Base64; import java.util.ArrayList; public class EncryptInterceptor ?implements Interceptor { ????/** ????// 這個方法會被框架調用一次,用來做一些初始化工作 ????} ????// 攔截方法--對一個event進行處理 ????????byte[] body = event.getBody(); ????????// 數據的字段數組 ????????// 需要加密的索引的數組 ????????for (String s : idxArr) { ????????????int index = Integer.parseInt(s); ????????// 將加密過的字段重新拼接成一條數據,并使用原來的分隔符 ????????sb.deleteCharAt(sb.lastIndexOf(dataSplitBy)); ????????// 返回加密后的字段所封裝的event對象 ????// 攔截方法--對一批event進行處理 ????????ArrayList<Event> lst = new ArrayList<Event>(); ????????for (Event event : events) { ????????return lst; ????// agent退出前,會調一次該方法,進行需要的清理、關閉操作 ????} ????/** ????????// 構造一個攔截器實例 ????????????return new EncryptInterceptor(indices,idxSplitBy,dataSplitBy); ????????// 獲取配置文件中的攔截器參數 ????????} ????public static class Constants { |
??????????????????????????????????????????????????????????????????????12.2.3.5?運行測試
1. 先將代碼打成jar包
2. 上傳到flume安裝節點上,并放入flume的lib目錄
3. 寫采集方案配置文件
|
a1.sources = s1 a1.sources.s1.channels = c1 a1.sources.s1.type = exec a1.sources.s1.command = tail -F /root/weblog/access.log a1.sources.s1.batchSize = 100 a1.sources.s1.interceptors = i1 a1.sources.s1.interceptors.i1.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder a1.sources.s1.interceptors.i1.indices = 0:4 a1.sources.s1.interceptors.i1.idxSplitBy = : a1.sources.s1.interceptors.i1.dataSplitBy = , a1.channels = c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 200 a1.channels.c1.transactionCapacity = 100 a1.sinks = k1 a1.sinks.k1.type = logger a1.sinks.k1.channel = c1 # 寫到kafka就用這一段sink配置 # a1.sinks = k1 # a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink # a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092 # a1.sinks.k1.channel = c1 |
4. 準備數據日志文件,與你的處理邏輯相符,如下所示
13888776677,click,/baoming,張飛,湖北省
13889976655,click,/xuexi,關羽,山西省
…….
5.運行
bin/flume-ng agent -n a1 -c conf/ -f agentconf/spooldir-myi-m-log.conf -Dflume.root.logger=INFO,console
13?綜合案例
???????13.1?案例場景
A、B等日志服務機器實時生產日志,日志分為多種類型:
log1/access.log
log2/nginx.log
log3/web.log
現在要求:
?把日志服務器中的各類日志采集匯總到一個中轉agent上,然后分類寫入hdfs中。
但是在hdfs中要求的目錄為:
/source/logs/access/20160101/**
/source/logs/nginx/20160101/**
/source/logs/web/20160101/**
并要求可以按指定的索引,將對應字段內容加密!
???????13.2?實現思路
1. 每臺日志服務器上部署一個flume?agent?-?-?-> level1,每個agent配置3個source對應3類數據
2. leve1_1級的agent在采集數據時,添加一個header,指定數據的類別
3. level_1級的agent要配置兩個avro sink,各自對接一個下級的agent
4. level_1還要配置sink?processoràfail?over??sink?processor,控制兩個sink中只有一個avro sink在工作,如果失敗再切換到另一個avro sink
5.level_1還要配置字段加密攔截器
6. level_2 級配置兩個flume?agent,使用avro?source接收數據
7. level_2 級的hdfs?sink,目錄配置使用動態通配符,取到event中的類別header,以便于將不同類別數據寫入不同hdfs?目錄!
??????????????13.4?配置文件
level_1級配置文件
|
a1.sources = r1 r2 r3 ## source 配置 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/chekp1/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /root/weblog/log1/access.log a1.sources.r1.maxBatchCount = 1000 a1.sources.r1.interceptors = i1 i2 i3 a1.sources.r1.interceptors.i1.type = timestamp a1.sources.r1.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder a1.sources.r1.interceptors.i2.indices = 0:4 a1.sources.r1.interceptors.i2.idxSplitBy = : a1.sources.r1.interceptors.i2.dataSplitBy = , a1.sources.r1.interceptors.i3.type = static a1.sources.r1.interceptors.i3.key = logtype a1.sources.r1.interceptors.i3.value = access a1.sources.r2.type = TAILDIR a1.sources.r2.channels = c1 a1.sources.r2.positionFile = /root/chekp2/taildir_position.json a1.sources.r2.filegroups = f1 a1.sources.r2.filegroups.f1 = /root/weblog/log2/nginx.log a1.sources.r2.maxBatchCount = 1000 a1.sources.r2.interceptors = i1 i2 i3 a1.sources.r2.interceptors.i1.type = timestamp a1.sources.r2.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder a1.sources.r2.interceptors.i2.indices = 0:4 a1.sources.r2.interceptors.i2.idxSplitBy = : a1.sources.r2.interceptors.i2.dataSplitBy = , a1.sources.r2.interceptors.i3.type = static a1.sources.r2.interceptors.i3.key = logtype a1.sources.r2.interceptors.i3.value = nginx a1.sources.r3.type = TAILDIR a1.sources.r3.channels = c1 a1.sources.r3.positionFile = /root/chekp3/taildir_position.json a1.sources.r3.filegroups = f1 a1.sources.r3.filegroups.f1 = /root/weblog/log3/weblog.log a1.sources.r3.maxBatchCount = 1000 a1.sources.r3.interceptors = i1 i2 i3 a1.sources.r3.interceptors.i1.type = timestamp a1.sources.r3.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder a1.sources.r3.interceptors.i2.indices = 0:4 a1.sources.r3.interceptors.i2.idxSplitBy = : a1.sources.r3.interceptors.i2.dataSplitBy = , a1.sources.r3.interceptors.i3.type = static a1.sources.r3.interceptors.i3.key = logtype a1.sources.r3.interceptors.i3.value = weblog ## channel 配置 a1.channels = c1 a1.channels.c1.type = file a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.checkpointDir = /root/channel_chkp a1.channels.c1.dataDirs = /root/channel_data ## sink 配置 a1.sinks = k1 k2 a1.sinks.k1.type = avro a1.sinks.k1.channel = c1 a1.sinks.k1.hostname = c704 a1.sinks.k1.port = 4545 a1.sinks.k2.type = avro a1.sinks.k2.channel = c1 a1.sinks.k2.hostname = c705 a1.sinks.k2.port = 4545 ## sink processor - fail over 失敗配置 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover ## 對兩個sink分配不同的優先級 a1.sinkgroups.g1.processor.priority.k1 = 200 a1.sinkgroups.g1.processor.priority.k2 = 100 ## 主sink失敗后,停用懲罰時間 a1.sinkgroups.g1.processor.maxpenalty = 5000 |
level_2配置
|
a1.sources = s1 a1.channels = c1 a1.sinks = k1 ## source 配置 a1.sources.s1.type = avro a1.sources.s1.bind= 0.0.0.0 a1.sources.s1.port = 4545 a1.sources.s1.channels = c1 a1.sources.s1.interceptors = i1 a1.sources.s1.interceptors.i1.type = timestamp ## channel 配置 a1.channels.c1.type = file a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.checkpointDir = /root/lev2_channel_chkp a1.channels.c1.dataDirs = /root/lev2_channel_data ## sink配置 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://c701:8020/doitedu/%{logtype}/%Y-%m-%d/%H/ a1.sinks.k1.hdfs.filePrefix = doitedu- a1.sinks.k1.hdfs.fileSuffix = .log.gz a1.sinks.k1.hdfs.rollInterval = 0 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 ## 配置壓縮 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip ## 數據格式 a1.sinks.k1.hdfs.serializer = TEXT |
???????13.5?啟動測試
1. 先把自定義攔截器代碼jar包放入level_1級(C701/C702/C703)的所有flume的lib目錄中;
2. 將各臺機器上之前的一些checkpoint、緩存等目錄清除;
2. 啟動level_2級的兩個agent(C704、C705上);
3. 在level_1的所有機器上,創建日志數據目錄,并寫腳本模擬往3類日志中寫入日志:
4.在level_1的所有機器上啟動level_1級的flume?agent
5.到hdfs上觀察結果
6.嘗試kill掉2級的c704 的 agent,看是否能夠故障切換
14?面試加強
???????14.1?flume事務機制
二、Delivery 保證
認識 Flume 對事件投遞的可靠性保證是非常重要的,它往往是我們是否使用 Flume 來解決問題的決定因素之一。
消息投遞的可靠保證有三種:
- At-least-once
- At-most-once
- Exactly-once
基本上所有工具的使用用戶都希望工具框架能保證消息 Exactly-once ,這樣就不必在設計實現上考慮消息的丟失或者重復的處理場景。但是事實上很少有工具和框架能做到這一點,真正能做到這一點所付出的成本往往很大,或者帶來的額外影響反而讓你覺得不值得。假設 Flume 真的做到了 Exactly-once ,那勢必降低了穩定性和吞吐量,所以?Flume 選擇的策略是 At-least-once 。
當然這里的 At-least-once 需要加上引號,并不是說用上 Flume 的隨便哪個組件組成一個實例,運行過程中就能保存消息不會丟失。事實上 At-least-once 原則只是說的是 Source 、 Channel 和 Sink 三者之間上下投遞消息的保證。而當你選擇 MemoryChannel 時,實例如果異常掛了再重啟,在 channel 中的未被 sink 所消費的殘留數據也就丟失了,從而沒辦法保證整條鏈路的 At-least-once。
Flume 的 At-least-once 保證的實現基礎是建立了自身的 Transaction 機制。Flume 的 Transaction 有4個生命周期函數,分別是 start、 commit、rollback 和 close。
當 Source 往 Channel 批量投遞事件時首先調用 start 開啟事務,批量
put 完事件后通過 commit 來提交事務,如果 commit 異常則 rollback ,然后 close 事務,最后 Source 將剛才提交的一批消息事件向源服務 ack(比如 kafka 提交新的 offset )。Sink 消費 Channel 也是相同的模式,唯一的區別就是 Sink 需要在向目標源完成寫入之后才對事務進行 commit。兩個組件的相同做法都是只有向下游成功投遞了消息才會向上游 ack,從而保證了數據能 At-least-once 向下投遞。
???????14.2flume agent內部機制
組件:
1、ChannelSelector
ChannelSelector 的作用就是選出 Event 將要被發往哪個 Channel。其共有兩種類型,分別是 Replicating(復制)和 Multiplexing(多路復用)。 ReplicatingSelector 會將同一個 Event 發往所有的 Channel,Multiplexing 會根據相應的原則,將不同的 Event 發往不同的 Channel。
2、SinkProcessor
(1) SinkProcessor 共 有 三 種 類 型 , 分 別 是 DefaultSinkProcessor 、
LoadBalancingSinkProcessor 和 FailoverSinkProcessor。
(2) DefaultSinkProcessor 對應的是單個的 Sink,
LoadBalancingSinkProcessor 和 FailoverSinkProcessor 對應的是 Sink Group。
(3) LoadBalancingSinkProcessor 可以實現負載均衡的功能,FailoverSinkProcessor 可以實現故障轉移的功能。
???????14.3?ganglia及flume監控
開啟內置監控功能
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545
將監控數據發往ganglia進行展現
-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890
???????14.4?Flume調優
flume-ng agent包括source、channel、sink三個部分,這三部分都運行在JVM上,而JVM運行在linux操作系統之上。因此,對于flume的性能調優,就是對這三部分及影響因素調優。
1、source的配置
該項目中采用的是 taildir source,他的讀取速度能夠跟上命令行寫入日志的速度,故并未做特殊的處理。
2、channel的配置
可選的channel配置一般有兩種,一是memory channel,二是file channel。
建議在內存足夠的情況下,優先選擇memory channel。
嘗試過相同配置下使用file channel和memory channel,file channel明顯速度較慢,并且會生成log的文件,應該是用作緩存,當source已經接收但是還未寫入sink時的event都會存在這個文件中。這樣的好處是保證數據不會丟失,所以當對數據的丟失情況非常敏感且對實時性沒有太大要求的時候,還是使用file memory吧。。
一開始的memory channel配置用的是默認的,然后控制臺報出了如下警告:
The channel is full or unexpected failure. The source will try again after 1000 ms
這個是因為當前被采集的文件過大,可以通過增大keep-alive的值解決。深層的原因是文件采集的速度和sink的速度沒有匹配好。
所以memory channel有三個比較重要的參數需要配置:
#channel中最多緩存多少
a1.channels.c1.capacity = 5000
#channel一次最多吐給sink多少
a1.channels.c1.transactionCapacity = 2000
#event的活躍時間
a1.channels.c1.keep-alive = 10
3、sink的配置
可以通過壓縮來節省空間和網絡流量,但是會增加cpu的消耗。
batch:size越大性能越好,但是太大會影響時效性,一般batch size和源數據端的大小相同。
4、java內存的配置
export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"
主要涉及Xms和Xmx兩個參數,可以根據實際的服務器的內存大小進行設計。
5、OS內核參數的配置
如果單臺服務器啟動的flume agent過多的話,默認的內核參數設置偏小,需要調整。(待補充,暫時還未涉及)。
總結
以上是生活随笔為你收集整理的Flume知识点全面总结教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: titan
- 下一篇: 统一协同工作平台用户管理、单点登录以及任