利用Flume将MySQL表数据准实时抽取到HDFS
轉(zhuǎn)自:http://blog.csdn.net/wzy0623/article/details/73650053
?
一、為什么要用到Flume
? ? ? ? 在以前搭建HAWQ數(shù)據(jù)倉(cāng)庫(kù)實(shí)驗(yàn)環(huán)境時(shí),我使用Sqoop抽取從MySQL數(shù)據(jù)庫(kù)增量抽取數(shù)據(jù)到HDFS,然后用HAWQ的外部表進(jìn)行訪(fǎng)問(wèn)。這種方式只需要很少量的配置即可完成數(shù)據(jù)抽取任務(wù),但缺點(diǎn)同樣明顯,那就是實(shí)時(shí)性。Sqoop使用MapReduce讀寫(xiě)數(shù)據(jù),而MapReduce是為了批處理場(chǎng)景設(shè)計(jì)的,目標(biāo)是大吞吐量,并不太關(guān)心低延時(shí)問(wèn)題。就像實(shí)驗(yàn)中所做的,每天定時(shí)增量抽取數(shù)據(jù)一次。? ? ? ? Flume是一個(gè)海量日志采集、聚合和傳輸?shù)南到y(tǒng),支持在日志系統(tǒng)中定制各類(lèi)數(shù)據(jù)發(fā)送方,用于收集數(shù)據(jù)。同時(shí),Flume提供對(duì)數(shù)據(jù)進(jìn)行簡(jiǎn)單處理,并寫(xiě)到各種數(shù)據(jù)接受方的能力。Flume以流方式處理數(shù)據(jù),可作為代理持續(xù)運(yùn)行。當(dāng)新的數(shù)據(jù)可用時(shí),Flume能夠立即獲取數(shù)據(jù)并輸出至目標(biāo),這樣就可以在很大程度上解決實(shí)時(shí)性問(wèn)題。
? ? ? ? Flume是最初只是一個(gè)日志收集器,但隨著flume-ng-sql-source插件的出現(xiàn),使得Flume從關(guān)系數(shù)據(jù)庫(kù)采集數(shù)據(jù)成為可能。下面簡(jiǎn)單介紹Flume,并詳細(xì)說(shuō)明如何配置Flume將MySQL表數(shù)據(jù)準(zhǔn)實(shí)時(shí)抽取到HDFS。
二、Flume簡(jiǎn)介
1. Flume的概念
? ? ? ? Flume是分布式的日志收集系統(tǒng),它將各個(gè)服務(wù)器中的數(shù)據(jù)收集起來(lái)并送到指定的地方去,比如說(shuō)送到HDFS,簡(jiǎn)單來(lái)說(shuō)flume就是收集日志的,其架構(gòu)如圖1所示。圖1
2. Event的概念?
? ? ? ? 在這里有必要先介紹一下Flume中event的相關(guān)概念:Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(source)收集過(guò)來(lái),在將收集到的數(shù)據(jù)送到指定的目的地(sink)。為了保證輸送的過(guò)程一定成功,在送到目的地(sink)之前,會(huì)先緩存數(shù)據(jù)(channel),待數(shù)據(jù)真正到達(dá)目的地(sink)后,Flume再刪除自己緩存的數(shù)據(jù)。?? ? ? ?在整個(gè)數(shù)據(jù)的傳輸?shù)倪^(guò)程中,流動(dòng)的是event,即事務(wù)保證是在event級(jí)別進(jìn)行的。那么什么是event呢?Event將傳輸?shù)臄?shù)據(jù)進(jìn)行封裝,是Flume傳輸數(shù)據(jù)的基本單位,如果是文本文件,通常是一行記錄。Event也是事務(wù)的基本單位。Event從source,流向channel,再到sink,本身為一個(gè)字節(jié)數(shù)組,并可攜帶headers(頭信息)信息。Event代表著一個(gè)數(shù)據(jù)的最小完整單元,從外部數(shù)據(jù)源來(lái),向外部的目的地去。
3. Flume架構(gòu)介紹?
? ? ? ? Flume之所以這么神奇,是源于它自身的一個(gè)設(shè)計(jì),這個(gè)設(shè)計(jì)就是agent。Agent本身是一個(gè)Java進(jìn)程,運(yùn)行在日志收集節(jié)點(diǎn)——所謂日志收集節(jié)點(diǎn)就是服務(wù)器節(jié)點(diǎn)。 Agent里面包含3個(gè)核心的組件:source、channel和sink,類(lèi)似生產(chǎn)者、倉(cāng)庫(kù)、消費(fèi)者的架構(gòu)。?- Source:source組件是專(zhuān)門(mén)用來(lái)收集數(shù)據(jù)的,可以處理各種類(lèi)型、各種格式的日志數(shù)據(jù),包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定義。?
- Channel:source組件把數(shù)據(jù)收集來(lái)以后,臨時(shí)存放在channel中,即channel組件在agent中是專(zhuān)門(mén)用來(lái)存放臨時(shí)數(shù)據(jù)的——對(duì)采集到的數(shù)據(jù)進(jìn)行簡(jiǎn)單的緩存,可以存放在memory、jdbc、file等等。?
- Sink:sink組件是用于把數(shù)據(jù)發(fā)送到目的地的組件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、Hbase、solr、自定義。?
4. Flume的運(yùn)行機(jī)制?
? ? ? ? Flume的核心就是一個(gè)agent,這個(gè)agent對(duì)外有兩個(gè)進(jìn)行交互的地方,一個(gè)是接受數(shù)據(jù)輸入的source,一個(gè)是數(shù)據(jù)輸出的sink,sink負(fù)責(zé)將數(shù)據(jù)發(fā)送到外部指定的目的地。source接收到數(shù)據(jù)之后,將數(shù)據(jù)發(fā)送給channel,chanel作為一個(gè)數(shù)據(jù)緩沖區(qū)會(huì)臨時(shí)存放這些數(shù)據(jù),隨后sink會(huì)將channel中的數(shù)據(jù)發(fā)送到指定的地方,例如HDFS等。注意:只有在sink將channel中的數(shù)據(jù)成功發(fā)送出去之后,channel才會(huì)將臨時(shí)數(shù)據(jù)進(jìn)行刪除,這種機(jī)制保證了數(shù)據(jù)傳輸?shù)目煽啃耘c安全性。?三、安裝Hadoop和Flume
? ? ? ? 我的實(shí)驗(yàn)在HDP 2.5.0上進(jìn)行,HDP安裝中包含F(xiàn)lume,只要配置Flume服務(wù)即可。HDP的安裝步驟參見(jiàn)“HAWQ技術(shù)解析(二) —— 安裝部署”四、配置與測(cè)試
1. 建立MySQL數(shù)據(jù)庫(kù)表
? ? ? ? 建立測(cè)試表并添加數(shù)據(jù)。[sql]?view plaincopy
2. 建立相關(guān)目錄與文件
(1)創(chuàng)建本地狀態(tài)文件[plain]?view plaincopy
(2)建立HDFS目標(biāo)目錄
[plain]?view plaincopy
3. 準(zhǔn)備JAR包
? ? ? ? 從http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下載flume-ng-sql-source-1.3.7.jar文件,并復(fù)制到Flume庫(kù)目錄。[plain]?view plaincopy
[plain]?view plaincopy
4. 建立HAWQ外部表
[sql]?view plaincopy5. 配置Flume
? ? ? ? 在A(yíng)mbari -> Flume -> Configs -> flume.conf中配置如下屬性:[plain]?view plaincopy
?
| 屬性 | 描述 |
| agent.channels.ch1.type | Agent的channel類(lèi)型 |
| agent.sources.sql-source.channels | Source對(duì)應(yīng)的channel名稱(chēng) |
| agent.channels | Channel名稱(chēng) |
| agent.sinks | Sink名稱(chēng) |
| agent.sources | Source名稱(chēng) |
| agent.sources.sql-source.type | Source類(lèi)型 |
| agent.sources.sql-source.connection.url | 數(shù)據(jù)庫(kù)URL |
| agent.sources.sql-source.user | 數(shù)據(jù)庫(kù)用戶(hù)名 |
| agent.sources.sql-source.password | 數(shù)據(jù)庫(kù)密碼 |
| agent.sources.sql-source.table | 數(shù)據(jù)庫(kù)表名 |
| agent.sources.sql-source.columns.to.select | 查詢(xún)的列 |
| agent.sources.sql-source.incremental.column.name | 增量列名 |
| agent.sources.sql-source.incremental.value | 增量初始值 |
| agent.sources.sql-source.run.query.delay | 發(fā)起查詢(xún)的時(shí)間間隔,單位是毫秒 |
| agent.sources.sql-source.status.file.path | 狀態(tài)文件路徑 |
| agent.sources.sql-source.status.file.name | 狀態(tài)文件名稱(chēng) |
| agent.sinks.HDFS.channel | Sink對(duì)應(yīng)的channel名稱(chēng) |
| agent.sinks.HDFS.type | Sink類(lèi)型 |
| agent.sinks.HDFS.hdfs.path | Sink路徑 |
| agent.sinks.HDFS.hdfs.fileType | 流數(shù)據(jù)的文件類(lèi)型 |
| agent.sinks.HDFS.hdfs.writeFormat | 數(shù)據(jù)寫(xiě)入格式 |
| agent.sinks.HDFS.hdfs.rollSize | 目標(biāo)文件輪轉(zhuǎn)大小,單位是字節(jié) |
| agent.sinks.HDFS.hdfs.rollInterval | hdfs sink間隔多長(zhǎng)將臨時(shí)文件滾動(dòng)成最終目標(biāo)文件,單位是秒;如果設(shè)置成0,則表示不根據(jù)時(shí)間來(lái)滾動(dòng)文件 |
| agent.sinks.HDFS.hdfs.rollCount | 當(dāng)events數(shù)據(jù)達(dá)到該數(shù)量時(shí)候,將臨時(shí)文件滾動(dòng)成目標(biāo)文件;如果設(shè)置成0,則表示不根據(jù)events數(shù)據(jù)來(lái)滾動(dòng)文件 |
?
表1
6. 運(yùn)行Flume代理
? ? ? ? 保存上一步的設(shè)置,然后重啟Flume服務(wù),如圖2所示。圖2
? ? ? ? 重啟后,狀態(tài)文件已經(jīng)記錄了將最新的id值7,如圖3所示。
圖3
? ? ? ? 查看目標(biāo)路徑,生成了一個(gè)臨時(shí)文件,其中有7條記錄,如圖4所示。
圖4
? ? ? ? 查詢(xún)HAWQ外部表,結(jié)果也有全部7條數(shù)據(jù),如圖5所示。
圖5
? ? ? ? 至此,初始數(shù)據(jù)抽取已經(jīng)完成。
7. 測(cè)試準(zhǔn)實(shí)時(shí)增量抽取
? ? ? ? 在源表中新增id為8、9、10的三條記錄。[sql]?view plaincopy
圖6
五、方案優(yōu)缺點(diǎn)
? ? ? ? 利用Flume采集關(guān)系數(shù)據(jù)庫(kù)表數(shù)據(jù)最大的優(yōu)點(diǎn)是配置簡(jiǎn)單,不用編程。相比tungsten-replicator的復(fù)雜性,Flume只要在flume.conf文件中配置source、channel及sink的相關(guān)屬性,已經(jīng)沒(méi)什么難度了。而與現(xiàn)在很火的canal比較,雖然不夠靈活,但畢竟一行代碼也不用寫(xiě)。再有該方案采用普通SQL輪詢(xún)的方式實(shí)現(xiàn),具有通用性,適用于所有關(guān)系庫(kù)數(shù)據(jù)源。? ? ? ? 這種方案的缺點(diǎn)與其優(yōu)點(diǎn)一樣突出,主要體現(xiàn)在以下幾方面。
- 在源庫(kù)上執(zhí)行了查詢(xún),具有入侵性。
- 通過(guò)輪詢(xún)的方式實(shí)現(xiàn)增量,只能做到準(zhǔn)實(shí)時(shí),而且輪詢(xún)間隔越短,對(duì)源庫(kù)的影響越大。
- 只能識(shí)別新增數(shù)據(jù),檢測(cè)不到刪除與更新。
- 要求源庫(kù)必須有用于表示增量的字段。
參考:
Flume架構(gòu)以及應(yīng)用介紹Streaming MySQL Database Table Data to HDFS with Flume
how to read data from oracle using FLUME to kafka broker
https://github.com/keedio/flume-ng-sql-source
?
轉(zhuǎn)載于:https://www.cnblogs.com/hark0623/p/7083278.html
總結(jié)
以上是生活随笔為你收集整理的利用Flume将MySQL表数据准实时抽取到HDFS的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 哈希表查找速度为什么那么快?快在哪里了?
- 下一篇: JavaEE配置工具