flume-source
1.1 Avro Source
監(jiān)聽Avro端口,從Avro client streams接收events。要求屬性是粗體字。利用Avro Source可以實現(xiàn)多級流動、扇出流、扇入流等效果。另外也可以接受通過flume提供的Avro客戶端發(fā)送的日志信息。
?
!channels??–??
!type??–???類型名稱,"AVRO"
!bind??–???需要監(jiān)聽的主機名或IP
!port??–???要監(jiān)聽的端口
threads????–???工作線程最大線程數(shù)
selector.type?????
selector.*?????
interceptors??–???空格分隔的攔截器列表
interceptors.*????????
compression-type??none???壓縮類型,可以是“none”或“default”,這個值必須和AvroSource的壓縮格式匹配
sslfalse??是否啟用ssl加密,如果啟用還需要配置一個“keystore”和一個“keystore-password”。
keystore???–???為SSL提供的java密鑰文件所在路徑。
keystore-password–???為SSL提供的java密鑰文件?密碼。
keystore-typeJKS密鑰庫類型可以是“JKS”或“PKCS12”。
exclude-protocolsSSLv3??空格分隔開的列表,用來指定在SSL / TLS協(xié)議中排除。SSLv3將總是被排除除了所指定的協(xié)議。
ipFilter???false??如果需要為netty開啟ip過濾,將此項設(shè)置為true
ipFilterRules–???定義netty的ip過濾設(shè)置表達式規(guī)則
agent a1例子:
ipFilterRules例子:
ipFilterRules=allow:ip:127.*,? allow:name:localhost,deny:ip:*?
編寫配置文件? 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
#描述/配置Sourcea1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444?
?啟動flume:
????./flume-ng agent --conf ../conf --conf-file ../conf/template2.conf --name a1 -Dflume.root.logger=INFO,console
?通過flume提供的avro客戶端向指定機器指定端口發(fā)送日志信息:
????./flume-ng avro-client --conf ../conf --host 0.0.0.0 --port 44444 --filename ../mydata/log1.txt
會發(fā)現(xiàn)確實收集到日志。
1.2 Thrift Source
監(jiān)聽Thrift端口和從外部Thrift client streams接收events。要求屬性為粗體字:
agent a1 例子:
1.3 Exec Source
?Exec Source在啟動時運行一個Unix命令行,并期望這過程在標準輸出上連續(xù)生產(chǎn)數(shù)據(jù)。要求屬性為粗體字:
agent a1例子:
'shell'配置被用來通過一個命令shell調(diào)用‘command’。
1.4 JMS Source
JMS Source從JMS目標(如隊列或者主題)讀取消息。JMS應(yīng)用程序應(yīng)該可以與任何JMS提供程序一起工作,但是只能使用ActiveMQ進行測試。要求屬性是粗體字。
?agent a1例子:
1.5 Spooling Directory Source
該source讓你通過放置被提取文件在磁盤”spooling“目錄下這一方式,提取數(shù)據(jù)。該source將會監(jiān)控指定目錄的新增文件,當新文件出現(xiàn)時解析event。event解析邏輯是可插入的。當一個給定文件被全部讀取進channel之后,它被重命名,以標識為已完成(或者可選擇deleted)。
要注意的是,放置到自動搜集目錄下的文件不能修改,如果修改,則flume會報錯。另外,也不能產(chǎn)生重名的文件,如果有重名的文件被放置進來,則flume會報錯。
!channels??–??
!type??–???類型,需要指定為"spooldir"
!spoolDir??–???讀取文件的路徑,即"搜集目錄"
fileSuffix.COMPLETED對處理完成的文件追加的后綴
agent-1例子:
案例:
編寫配置文件? 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
#描述/配置Source a1.sources.r1.type = spooldir a1.sources.r1.spoolDir=/home/park/work/apache-flume-1.6.0-bin/mydata?
?啟動flume:
????./flume-ng agent --conf ../conf --conf-file ../conf/template4.conf --name a1 -Dflume.root.logger=INFO,console
?向指定目錄中傳輸文件,發(fā)現(xiàn)flume收集到了該文件,將文件中的每一行都作為日志來處理
1.6 Taildir Source
注意:該source不能用于windows。
agent a1例子:
1.7 Twitter 1% firehose Source(試驗)
?略
1.8 Kafka Source
?Kafka Source是Apache Kafka消費者,從Kfaka topics讀取消息。如果你有多個Kafka source在跑,你可以配置它們在相同的Consumer Group,以使它們每個讀取topics獨特的分區(qū)。
以逗號分隔的topic列表進行topic訂閱的例子:
通過正則表達式進行topic訂閱的例子:
安全和Kafka Source
Kafka 0.9.0支持SASL/GSSAPI 或者 SSL 協(xié)議。
設(shè)置?kafka.consumer.security.protocol的值:
①SASL_PLAINTEXT - Kerberos or plaintext authentication with no data encryption
②SASL_SSL -?Kerberos or plaintext authentication with data encryption
③SSL - TLS based encryption with optional authentication.
TLS和Kafka Source
帶有服務(wù)端認證和數(shù)據(jù)加密配置的例子:
注意:屬性ssl.endpoint.identification.algorithm沒有定義,因此沒有hostname驗證,為了是hostname驗證,可以設(shè)置屬性:
如果要求有客戶端認證,在Flume agent配置中添加下述配置。每個Flume agent必須有它的客戶端憑證,以便被Kafka brokers信任。
如果keystore和key使用不用的密碼保護,那么ssl.key.password屬性需要提供出來:
Kerberos和Kafka Soure
kerberos配置文件可以在flume-env.sh通過JAVA_OPTS指定:
使用SASL_PLAINTEST的安全配置示例:
使用SASL_SSL的安全配置示例:
JAAS文件實例(暫時沒看懂):
1.9 NetCat TCP Source
?netcat source監(jiān)聽一個給定的端口,然后把接收到的數(shù)據(jù)每一行轉(zhuǎn)換成一個event。要求屬性是粗體字。
!channels–??
!type–???類型名稱,需要被設(shè)置為"netcat"
!bind–???指定要綁定到的ip或主機名。
!port–???指定要綁定到的端口號
max-line-length???512單行最大字節(jié)數(shù)
agent a1示例:
?
1.10 NetCat UDP Source
??netcat source監(jiān)聽一個給定的端口,然后把text文件的每一行轉(zhuǎn)換成一個event。要求屬性是粗體字。
agent a1的示例:
1.11 Sequence Generator Source
一個簡單的序列生成器可以不斷生成events,帶有counter計數(shù)器,從0開始,以1遞增,在totalEvents停止。當不能發(fā)送events到channels時會不斷嘗試。
agent a1示例:
1.12 Syslog Sources
讀取系統(tǒng)日志,并生成Flume events。UDP source以整條消息作為一個簡單event。TCP source以新一行”n“分割的字符串作為一個新的event。
1.12.1 Syslog TCP Source?
原始的,可靠的Syslog TCP source。
agent a1的syslog TCP source示例:
1.12.2 Multiport Syslog TCP Source
這是一個新的,更快的,多端口的Syslog TCP source版本。注意ports配置替代port。
agent a1的multiport syslog TCP source示例:
1.12.3 Syslog UDP Source?
agent a1的syslog UDP source示例:
1.13 HTTP Source
HTTP Source接受HTTP的GET和POST請求作為Flume的事件,其中GET方式應(yīng)該只用于試驗。
該Source需要提供一個可插拔的"處理器"來將請求轉(zhuǎn)換為事件對象,這個處理器必須實現(xiàn)HTTPSourceHandler接口,該處理器接受一個HttpServletRequest對象,并返回一個Flume Envent對象集合。
從一個HTTP請求中得到的事件將在一個事務(wù)中提交到通道中。因此允許像文件通道那樣對通道提高效率。
如果處理器拋出一個異常,Source將會返回一個400的HTTP狀態(tài)碼。
如果通道已滿,無法再將Event加入Channel,則Source返回503的HTTP狀態(tài)碼,表示暫時不可用。
!type????類型,必須為"HTTP"
!port–???監(jiān)聽的端口
bind???0.0.0.0????監(jiān)聽的主機名或ip
handler??????org.apache.flume.source.http.JSONHandler處理器類,需要實現(xiàn)HTTPSourceHandler接口
handler.*??–???處理器的配置參數(shù)
selector.type
selector.*???
interceptors??–??
interceptors.*????????
enableSSL??false??是否開啟SSL,如果需要設(shè)置為true。注意,HTTP不支持SSLv3。
excludeProtocols??SSLv3??空格分隔的要排除的SSL/TLS協(xié)議。SSLv3總是被排除的。
keystore??????密鑰庫文件所在位置。
keystorePassword Keystore?密鑰庫密碼
agent a1的http source示例:
Handler屬性有兩種,一是JSONHandler,一是BlobHandler。
BlobHandler用于處理請求參數(shù)帶有比較大的對象(Binary Large Object),如PDF或者JPG。
案例2:
編寫配置文件? 修改上面給出的配置文件,除了Source部分配置不同,其余部分都一樣。不同的地方如下:
| 1 2 3 | #描述/配置Source ????a1.sources.r1.type??= http ????a1.sources.r1.port? = 66666 |
?啟動flume:
????./flume-ng agent --conf ../conf --conf-file ../conf/template6.conf --name a1 -Dflume.root.logger=INFO,console
?通過命令發(fā)送HTTP請求到指定端口:
????curl -X POST -d '[{ "headers" :{"a" : "a1","b" : "b1"},"body" : "hello~http~flume~"}]' http://0.0.0.0:6666
1.14 Stress Source
StressSource 是內(nèi)部負載生成source的實現(xiàn),這對于壓力測試是非常有用的。它允許用戶配置Event有效載荷的大小。
agent a1的示例:
1.15 Legacy Sources
?legacy sources允許Flume 1.x agent接收來自Flume 0.9.4 agents的events。
?legacy source 支持Avro和Thrift RPC 連接。為了使用兩個Flume 版本搭建的橋梁,你需要開始一個帶有avroLegacy或者thriftLegacy source的Flume 1.x agent。0.9.4agent應(yīng)該有agent Sink指向1.x agent的host/port。
1.15.1 Avro Legacy Source
agent a1的示例:
1.15.2 Thrift Legacy Source
agent a1的示例:
?
1.16 Custom Source(自定義Source)
自定義Source是你實現(xiàn)Source接口。當啟動Flume agent時,一個自定義source類和它依賴項必須在agent的classpath中。
agent a1的示例:
?
1.17 Scrible Source
Scribe是另一種類型的提取系統(tǒng)。采用現(xiàn)有的Scribe提取系統(tǒng),Flume應(yīng)該使用基于Thrift的兼容傳輸協(xié)議的ScribeSource。
agent a1示例:
轉(zhuǎn)載于:https://www.cnblogs.com/duanxz/p/9157465.html
總結(jié)
以上是生活随笔為你收集整理的flume-source的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数控g36是什么指令?
- 下一篇: 这个人属于不文明行为吗