本地日志数据实时接入到hadoop集群的数据接入方案
1.?概述
本手冊主要介紹了,一個將傳統數據接入到Hadoop集群的數據接入方案和實施方法。供數據接入和集群運維人員參考。
1.1.??整體方案
Flume作為日志收集工具,監控一個文件目錄或者一個文件,當有新數據加入時,收集新數據發送給Kafka。Kafka用來做數據緩存和消息訂閱。Kafka里面的消息可以定時落地到HDFS上,也可以用Spark?Streaming來做實時處理,然后將處理后的數據落地到HDFS上。
1.2.?數據接入流程
本數據接入方案,分為以下幾個步驟:
l?安裝部署Flume:在每個數據采集節點上安裝數據采集工具Flume。詳見“2、安裝部署Flume”。
l?數據預處理:生成特定格式的數據,供Flume采集。詳見“3、數據預處理”。
l?Flume采集數據到Kafka:?Flume采集數據并發送到Kafka消息隊列。詳見“4、Flume采集數據到Kafka”。
l?Kafka數據落地:將Kafka數據落地到HDFS。詳見“5、Kafka數據落地”。
?
2.?安裝部署Flume
若要采集數據節點的本地數據,每個節點都需要安裝一個Flume工具,用來做數據采集。
2.2.1下載并安裝
到官網去下載最新版本的Flume
下載地址為:http://flume.apache.org/,目前最新版本為1.6.0,需要1.7及以上版本的JDK。
1、解壓
tar?-xzvf?apache-flume-1.6.0-bin.tar.gz??-C?/usr/local/
2、安裝JDK1.7
???如果節點上JDK版本低于1.7,需要安裝1.7或以上版本的JDK
JDK?1.7?下載地址:
http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html
在Flume目錄下創建一個java目錄,存放JDK
cd?/usr/local/apache-flume-1.6.0-bin
mkdir?java
cd?java
tar?-xzvf?jdk-7u79-linux-x64.tar.gz
?
2.2.2配置Flume系統參數
修改?flume-env.sh?配置文件,主要是JAVA_HOME變量設置
cd?/usr/local/apache-flume-1.6.0-bin/conf
cpflume-env.sh.templateflume-env.sh
在flume-env.sh里面設置FLUME_CLASSPATH變量和JAVA_HOME變量,
示例:
export?JAVA_HOME=/usr/local/apache-flume-1.6.0-bin/java/jdk1.7.0_79
FLUME_CLASSPATH="/usr/local/apache-flume-1.6.0-bin/"
變量具體內容根據實際修改
?
2.2.3添加Flume第三方依賴
添加第三方依賴包flume-plugins-1.0-SNAPSHOT.jar,此包實現了一個Flume攔截器,將Flume采集到的數據進行序列化、結構化等預處理,最后每條數據生成一條Event數據返回。
?
cd?/usr/local/apache-flume-1.6.0-bin
mkdir?plugins.d????--創建依賴目錄,目錄名必須為plugins.d
cd?plugins.d?
mkdir?flume-plugins??????????--項目目錄,目錄名隨意
cd?flume-plugins
mkdir?lib???????????--jar包目錄,目錄名必須為lib
將第三方jar包flume-plugins-1.0-SNAPSHOT.jar放在lib目錄下
2.2.4添加Hive配置文件
將hive-site.xml文件拷貝到/usr/local/apache-flume-1.6.0-bin/conf目錄下,并修改hive元數據地址與真實地址對應。如下所示:
?<property>
? <name>hive.metastore.uris</name>
? <value>thrift://m103:9083,thrift://m105:9083</value>
?</property>
?
2.2.5創建Flume?agent配置文件
創建flume啟動配置文件,指定source,channel,sink?3個組件內容。每個組件都有好幾種配置選項,具體配置請查看Flume官網。創建配置文件flume.conf,示例如下:
?
vim?flume.conf
a1.sources?=?x1
a1.sinks?=?y1
a1.channels?=?z1
#?Describe/configure?the?source
a1.sources.x1.type?=?exec
a1.sources.x1.channels?=?z1
a1.sources.x1.command?=?tail?-F?/home/xdf/exec.txt
#?Describe?the?sink
a1.sinks.y1.type?=?logger
#?Use?a?channel?which?buffers?events?in?memory
a1.channels.z1.type?=?memory
a1.channels.z1.capacity?=?1000
a1.channels.z1.transactionCapacity?=?100
#?Bind?the?source?and?sink?to?the?channel
a1.sources.x1.channels?=?z1
a1.sinks.y1.channel?=?z1
?
2.2.6啟動Flume?Agent
生產環境下,參數-Dflume.root.logger=INFO,console去掉console,此處只為方便查看測試結果,選擇將日志打印到控制臺。若Flume?agent正常啟動,說明Flume安裝成功。
?
?
cd?/usr/local/apache-flume-1.6.0-bin
./bin/flume-ng?agent?--conf?conf?--conf-file?flume.conf?--name?a3?-Dflume.root.logger=INFO,console
?
2.2.7?測試
上面配置的example.conf文件,實現的功能是監控文件/home/xdf/exec.txt,如果有新數據寫入時,Flume就會采集到新數據并打印在控制臺上。
測試用例:向/home/xdf/exec.txt文件中寫入內容“hello?flume”,查看控制臺是否打印出“hello?flume”。正常輸出如下:
?
echo?'hello?flume'?>>?/home/xdf/exec.txt
2015-06-30?16:01:52,910?(SinkRunner-PollingRunner-DefaultSinkProcessor)?[INFO?-?org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)]?Event:?{?headers:{}?body:?68?65?6C?6C?6F?20?66?6C?75?6D?65?hello?flume?}
?
至此,Flume安裝部署完畢。
3.?數據預處理
1、Flume采集數據都是按行分割的,一行代表一條記錄。如果原始數據不符合要求,需要對數據進行預處理。示例如下:
原始數據格式為:
out:?===?START?OF?INFORMATION?SECTION?===
out:?Vendor:???????????????TOSHIBA
out:?Product:??????????????MBF2300RC
out:?Revision:?????????????0109
out:?User?Capacity:????????300,000,000,000?bytes?[300?GB]
out:?Logical?block?size:???512?bytes
???經過預處理,我們將數據變為一條5個字段的記錄:
TOSHIBA;MBF2300RC;0109;300;512
?
2、如果要將上面數據接入到hive中,我們還需要下面幾個處理:
a.?創建一張hive表
create?table?test(Vendor?string,Product?string,Revision?string,User_Capacity?string,block?string);
b.?在Kafka節點上創建一個topic,名字與上面hive表名對應,格式為“hive-數據庫名-表名”。示例如下:
bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-test??--partitions?1?--replication-factor?1
c.?將第一步得到的記錄數據與topic整合成一條記錄,用“@@”分割。示例如下:
hive-xdf-test?@@TOSHIBA;MBF2300RC;0109;300;512
d.?Flume采集整合后的一條數據,通過topic獲取hive表的元數據,根據元數據對記錄數據進行結構化、序列化處理,然后經過Kafka存入到hive表中。具體操作參考下面具體步驟所示。
4.?Flume采集數據到Kafka
Flume如果要將采集到的數據發送到Kafka,需要指定配置文件(如下:flume_test.conf)的sink類型為KafkaSink,并且指定Kafka?的broker?list。配置文件示例如下,紅色標注的為KafkaSink配置項:
vim?flume_test.conf
a3.channels?=?c3
a3.sources?=?r3
a3.sinks?=?k3
?
a3.sources.r3.type?=?exec
a3.sources.r3.channels?=?c3
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
a3.sources.r3.fileHeader?=?false
a3.sources.r3.basenameHeader?=?false
a3.sources.r3.interceptors?=?i3
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
a3.sources.r3.interceptors.i3.separator?=?;
a3.sources.r3.decodeErrorPolicy=IGNORE
?
a3.channels.c3.type?=?memory
a3.channels.c3.capacity?=?10000
a3.channels.c3.transactionCapacity?=?1000
?
a3.sinks.k3.channel?=?c3
#?a3.sinks.k3.type?=?logger
#a3.sinks.k3.batchSize?=?10
a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink
a3.sinks.k3.brokerList?=?localhost:9092
?
?
注意:此處有一個攔截器插件的定義,它就是用來做結構化、序列化數據預處理的。此插件由上面配置的Flume第三方jar包中獲得。
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
?
5.?Kafka數據落地
我們提供了一個Camus工具,來定時將Kafka中的數據落地到hive表中。
Camus工具包含以下三個文件:
| 文件 | 說明 | 
| camus-example-0.1.0-cdh-SNAPSHOT-shaded.jar | 程序運行jar包 | 
| camus.properties | 配置文件 | 
| camusrun.sh | 運行腳本 | 
?
配置文件需要根據實際情況,修改以下兩個參數
kafka.whitelist.topics=hive-xdf-test?????????----數據對應的topic
kafka.brokers=m105:9092,m103:9092????????????----kafka?broker?lists
需要指定多個topic時,用逗號間隔,示例:
Kafka.whitelist.topics=topic1,topic2,topic3
修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的數據接入到topic所對應的hive表中了。
6.?具體案例
6.1?Smart數據接入
6.1.2?創建hive表
最終我們要將smart數據接入到hive表中,所以我們首先要創建一個滿足smart數據結構的hive表。
create?table?smart_data(serial_number?String?,update_time?string,smart_health_status?string?,current_drive_temperature?int,drive_trip_temperature?int,elements_in_grown_defect_list?int,manufactured_time?string?,cycle_count?int????,start_stop_cycles?int????,load_unload_count?int????,load_unload_cycles?int????,blocks_sent_to_initiator?bigint?,blocks_received_from_initiator?bigint?,blocks_read_from_cache?bigint?,num_commands_size_not_larger_than_segment_size?bigint?,num_commands_size_larger_than_segment_size?bigint?,num_hours_powered_up?string??????,num_minutes_next_test?int????,read_corrected_ecc_fast?bigint?,read_corrected_ecc_delayed?bigint?,read_corrected_re?bigint?,read_total_errors_corrected?bigint?,read_correction_algo_invocations?bigint?,read_gigabytes_processed?bigint?,read_total_uncorrected_errors?string?,write_corrected_ecc_fast?bigint?,write_corrected_ecc_delayed?bigint?,write_corrected_re?bigint?,write_total_errors_corrected?bigint?,write_correction_algo_invocations?bigint?,write_gigabytes_processed?bigint?,write_total_uncorrected_errors?string?,verify_corrected_ecc_fast?bigint?,verify_corrected_ecc_delayed?bigint?,verify_corrected_re?bigint?,verify_total_errors_corrected?bigint?,verify_correction_algo_invocations?bigint?,verify_gigabytes_processed?bigint?,verify_total_uncorrected_errors?bigint?,non_medium_error_count?bigint);
6.1.2?創建topic
Flume采集到的數據要生成一條條的event數據傳給kafka消息系統保存,kafka需要事先創建一個topic來生產和消費指定數據。為系統正常運行,我們統一定義topic的名字結構為“hive-數據庫名-表名”。需要在kafka集群節點上創建topic,示例如下:
bin/kafka-topics?--create?--zookeeper?localhost:2181/kafka?????--topic?hive-xdf-smart_data??--partitions?1?
--replication-factor?1
注意:此處的數據庫名、表名,必須為上一步創建的hive表,因為Flume會通過此topic名來獲取hive表的元數據信息,從而生成對應event數據。
6.1.2?配置Flume?agent啟動參數
生成參數文件smart_test.conf如下:
vim?smart_test.conf
a3.channels?=?c3
a3.sources?=?r3
a3.sinks?=?k3
?
a3.sources.r3.type?=?exec
a3.sources.r3.channels?=?c3
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
a3.sources.r3.fileHeader?=?false
a3.sources.r3.basenameHeader?=?false
a3.sources.r3.interceptors?=?i3
a3.sources.r3.interceptors.i3.type?=iie.flume.interceptor.CSVInterceptor$Builder
a3.sources.r3.interceptors.i3.separator?=?;
a3.sources.r3.decodeErrorPolicy=IGNORE
?
a3.channels.c3.type?=?memory
a3.channels.c3.capacity?=?10000
a3.channels.c3.transactionCapacity?=?1000
?
a3.sinks.k3.channel?=?c3
#?a3.sinks.k3.type?=?logger
#a3.sinks.k3.batchSize?=?10
a3.sinks.k3.type?=?org.apache.flume.sink.kafka.KafkaSink
a3.sinks.k3.brokerList?=?localhost:9092
?
注意:
1、此處數據源sources的類型為exec。具體命令為:
a3.sources.r3.command?=?tail?-F?/home/xdf/exec.txt
我們定時在每個節點運行一個腳本生成一條smart數據,將數據寫入/home/xdf/exec.txt文件。
?
flume用上面那個命令一直監控文件/home/xdf/exec.txt,如有新數據寫入,則采集傳輸到kafka里。
?
2、指定了一個自定義的第三方插件,Flume過濾器CSVInterceptor,將CSV格式的數據轉化成結構化,序列化的Event格式。
?
3、Sink為KafkaSink,數據會寫到kafka里面,特別注意:這里需要指定對應的brokerList,示例如下:
a3.sinks.k3.brokerList?=?m103:9092,m105:9092
6.1.3?開啟Flume?Agent
執行命令:
cd?/usr/local/apache-flume-1.6.0-bin
./bin/flume-ng?agent?--conf?conf?--conf-file?smart_test.conf?--name?a3?-Dflume.root.logger=INFO
6.1.4?生成Smart數據
在每個數據節點上運行createEvent.py腳本,生成一條結構化好的smart數據。
腳本有兩個參數smart_data.log,hive-xdf-smart_data,前者為smart命令輸出的原始信息文件,后者是topic名字,即上一步生成的topic名。
python?createEvent.py?smart_data.log?hive-xdf-smart_data?>?
/home/xdf/exec.txt
?
此腳本會解析smart原始信息,生成一條帶topic字段的結構化smart數據寫入到/home/xdf/exec.txt文件中,數據格式如下:
hive-xdf-smart_data@@EB00PC208HFC;2015-06-23?18:56:09;OK;28;65;0;week?08?of?year?2012;50000;21;200000;69;-1;-1;-1;-1;-1;-1;-1;0;0;0;0;0;0;300744.962;0;0;0;0;0;0;10841.446;0;-1;-1;-1;-1;-1;-1;-1
用符號“@@”將topic跟smart數據分開,smart數據每列間用逗號隔開。
6.1.5?測試時查看Kafka數據
查看數據是否成功生成到kafka中,可在kafka節點上,通過下面命令查看:
kafka-console-consumer?--zookeeper?localhost:2181/kafka?--topic?hive-xdf-smart_data?--from-beginning
結果展示:
6.1.6?Kafka數據落地到hive表中
打開camus.properties配置文件,修改以下兩個參數
kafka.whitelist.topics=hive-xdf-smart_data?????----smart數據對應topic
kafka.brokers=m105:9092,m103:9092???????????????----kafka?broker?lists
修改完配置文件后,定時運行camusrun.sh腳本,就會將新生成的smart數據接入到topic所對應的hive表中了。
至此,數據接入流程完畢。
轉載于:https://www.cnblogs.com/xiaodf/p/5027167.html
總結
以上是生活随笔為你收集整理的本地日志数据实时接入到hadoop集群的数据接入方案的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: daterangepicker 日期范围
- 下一篇: Entity Framework 使用注
