Kettle构建Hadoop ETL实践(五):数据抽取
目錄
一、Kettle數據抽取概覽
1. 文件抽取
(1)處理文本文件
(2)處理XML文件
2. 數據庫抽取
二、變化數據捕獲
1. 基于源數據的CDC
2. 基于觸發器的CDC
3. 基于快照的CDC
4. 基于日志的CDC
三、使用Sqoop抽取數據
1. Sqoop簡介
2. 使用Sqoop抽取數據
3. Sqoop優化
(1)調整Sqoop命令行參數
(2)調整數據庫
四、小結
? ? ? ? 本篇介紹如何利用Kettle提供的轉換步驟和作業項實現Hadoop數據倉庫的數據抽取,即ETL過程中的Extract部分。首先簡述Kettle中幾種抽取數據的組件,然后講述變化數據捕獲(Change Data Capture,CDC),以及Kettle如何支持不同的CDC技術。Hadoop生態圈中的Sqoop工具可以直接在關系數據庫和HDFS或Hive之間互導數據,而Kettle支持Sqoop輸入、輸出作業項。最后我們使用Kettle里的Sqoop作業項以及基于時間戳的CDC轉換實現銷售訂單示例的數據抽取過程,將MySQL中的源數據抽取到Hive的rds數據庫中。
? ? ? ? 數據抽取是一個艱難的工作,因為數據源是多樣和復雜的。在傳統數據倉庫環境下,數據通常來源于事務類應用系統,大部分這類系統都是把數據存儲在MySQL、Oracle或SQL Server等關系數據庫中。一般要從業務角度進行抽取,這也是一個挑戰,從技術上來看,最好能使用JDBC直連數據庫。如果數據庫不是關系型的或者沒有可用的驅動,一般就需要使用具有固定分隔符的文本文件來獲取數據。還有一種情況是數據屬于外部系統,不能直連,使用文本文件交換數據是唯一選擇。除此之外,Kettle提供了幾種方法來訪問互聯網數據,如通過RSS或者Salesforce.com網站直連,或者通過Web Service等。
一、Kettle數據抽取概覽
? ? ? ? Kettle大部分數據抽取類的步驟都放在“輸入”類別下。輸入類的步驟,顧名思義就是從外部數據源抽取數據,把數據輸入到Kettle的數據流中。Kettle 8.3的輸入類下有37個步驟,其中最常用的是“文本文件輸入”和“表輸入”。一般來說準備要讀取的數據(尤其是文件類數據)的功能往往在作業里完成,實際讀取數據才在轉換這一層。各個步驟和作業項的功能選項,大都能直接從選項名稱了解其含義。詳細說明可使用Kettle在線幫助文檔。在菜單條上選擇“幫助” -> “顯示歡迎屏幕” -> “Documentation”打開在線幫助文檔。
1. 文件抽取
? ? ? ? Kettle在轉換里提供了文件基本的讀寫操作,對于文件的其它操作(移動、復制、創建、刪除、比較、壓縮、解壓縮等)都在“文件管理”作業項中。在使用“文本文件輸出”步驟前,不必先創建一個文件。如果文件不存在,該步驟會自動創建一個。下面介紹兩種最常用的處理場景,即從文本文件與XML文件抽取數據。
(1)處理文本文件
? ? ? ? 文本文件可能是使用ETL工具處理的最簡單的一種數據源,讀寫文本文件沒有太多技巧。文本文件易于交換,壓縮比較高,任何文本編輯器都可以用于打開文本文件。總體說有以下兩類文本文件:
- 固定分隔符文件:這種文件里,每列都由特定字符分隔。通常這類文件也稱為CSV(逗號分隔值)文件或TSV(制表符分隔值)文件。
- 固定寬度文件:每列都有指定的長度。盡管固定寬度文件的格式非常明確,但也需要一些時間來定義。Kettle在“固定寬度文件輸入”的“獲取字段”選項里提供了一些輔助工具,但如果要在分隔符文件和固定寬度文件之間選擇,最好還是選擇分隔符文件。
? ? ? ? 對于這兩種文件,都可以選擇文件編碼。UTF-8是通常情況下的標準編碼格式,但其它編碼格式,如ANSI或UTF-8-BOM也在廣泛使用。為了正常讀取文件內容,必須要設置正確的文件編碼。文件編輯軟件能夠查看文件編碼,如使用Notepad++打開文件,選擇“編碼”菜單即可查看或修改當前文件的編碼。
? ? ? ? “CSV文件輸入”是基本的文本文件輸入步驟,CSV文件是一種用具有固定列分隔符的文本文件。在處理這種文件之前要確定分隔符和字段。“CSV文件輸入”步驟和與之相似的“固定寬度文件輸入”步驟都不太適合一次處理多個文件,這兩個步驟其實都是“文本文件輸入”步驟的簡化版。“文本文件輸入”步驟是一個功能強大的步驟,也是處理文本文件的首選步驟。其主要功能如下:
- 從前一個步驟讀取文件名。
- 一次運行讀取多個文件。
- 從.zip或.gzip壓縮文件中讀取文件。
- 不用指定文件結構就可以顯示文件內容。注意需要指定文件格式(DOS、UNIX或Mixed),因為Kettle需要知道文件的換行符。在DOS用\r\n代表換行,UNIX只用\n代表換行。
- 指定轉義字符,用來讀取字段數據里包含分隔符的字段。通常的轉義字符是反斜線(\)。
- 錯誤處理。
- 過濾。
- 指定本地化的日期格式。
? ? ? ? 當然使用這些功能是有代價的,“文本文件輸入”步驟比“CSV文件輸入”步驟和“固定寬度文件輸入”步驟需要占用更多內存和CPU處理能力。
? ? ? ? 下面看一個Kettle處理的常見場景。假設有一組zip壓縮文件,每個zip文件中包含若干文本文件,所有文本文件具有相同的格式。需求是將文本文件中的記錄抽取到數據庫表中,并且標明每條記錄所屬的文本文件和zip文件。在“Kettle構建Hadoop ETL實踐(一):ETL與Kettle”里介紹Kettle虛擬文件系統時,我們知道了Kettle使用Apache的通用VFS作為文件處理接口,能夠直接讀取zip壓縮包中的多個文件,本例將使用這一特性。
? ? ? ? 我們用的例子文件是a.zip和b.zip,a.zip中包含1.txt和2.txt兩個文件,b.zip中包含3.txt和4.txt兩個文件。文本文件具有三個字段,以逗號作為列分隔符。4個文本文件的內容如下,反斜杠是轉義字符:
# 1.txt 11,1a\,aa,2020-01-01 01:01:01 12,1b\,bb,2020-01-01 02:02:02 13,1c\,cc,2020-01-01 03:03:03# 2.txt 21,2a\,aa,2020-02-02 01:01:01 22,2b\,bb,2020-02-02 02:02:02 23,2c\,cc,2020-02-02 03:03:03# 3.txt 31,3a\,aa,2020-03-03 01:01:01 32,3b\,bb,2020-03-03 02:02:02 33,3c\,cc,2020-03-03 03:03:03# 4.txt 41,4a\,aa,2020-04-04 01:01:01 42,4b\,bb,2020-04-04 02:02:02 43,4c\,cc,2020-04-04 03:03:03? ? ? ? 創建的目標表如下,c1、c2、c3三個字段對應分別對應文本文件中的三列,c4字段存儲記錄所屬的文件名:
create table t_txt (c1 int(11) default null,c2 varchar(20) default null,c3 datetime default null,c4 varchar(100) default null);? ? ? ? 創建的Kettle轉換如圖5-1所示,包含“自定義常量數據”、“獲取文件名”、“文本文件輸入”、“表輸出”四個步驟。
圖5-1 從文本文件抽取數據? ? ? ? “自定義常量數據”步驟用于定義zip和txt的文件名。當然也可以直接在“獲取文件名”步驟中的“文件或目錄”寫死所要讀取的文件名。這里使用“自定義常量數據”步驟的目的是想使輸入的文件名參數化,當需要從不同的文件抽取時,只需修改這個步驟,而后面的步驟都不用變更。
? ? ? ? 在“自定義常量數據”步驟里的“元數據”標簽頁中創建兩個字符串類型的字段zip和txt,然后在“數據”標簽頁中給這兩個字段賦值如圖5-2所示。注意兩個字段值的寫法。zip字段以zip協議開頭,后面是zip文件的絕對路徑,以‘!/’結尾。txt字段值為正則表達式,表示zip包中所有‘.txt’后綴的文件。
圖5-2 在“自定義常量數據”步驟中設置文件名? ? ? ? “獲取文件名”步驟的設置如圖5-3所示。選中“文件名定義在字段里”選項,“從字段獲取文件名”選擇“zip”,“從字段獲取通配符”選擇“txt”。這兩個字段的值從前一步驟傳遞過來。
圖5-3 “獲取文件名”步驟? ? ? ? 下一步驟是“文本文件輸入”步驟。首先要確定文件的結構,打開“文本文件輸入”步驟設置對話框,在“文件”標簽頁中點擊“瀏覽”按鈕,找到其中一個zip文件,然后點擊“增加”按鈕把這個文件添加到“選中的文件”列表中,如“zip:/root/kettle_hadoop/5/a.zip!/”。現在可以點擊“文件”標簽頁中的“顯示文件內容”按鈕打開這個文件,可以看到這個文件的列分隔符、是否帶有表頭和封閉符等信息。我們可以使用這些信息來設置“內容”標簽頁里的選項,本例具體如圖5-4所示。
圖5-4 “內容”標簽頁定義文本文件格式? ? ? ? 定義完文件格式后,再選擇“字段”標簽頁并點擊“獲取字段”按鈕。Kettle會盡量判斷出每個字段的數據類型,本例中如圖5-5所示。
圖5-5 自動獲取文本文件字段? ? ? ? 為了驗證設置的正確性,點擊“預覽記錄”按鈕,如果出現預覽的數據,說明設置正確。下一步需要把“獲取文件名”步驟和“文本文件輸入”步驟連接起來。回到“文本文件輸入”步驟的“文件”標簽頁,選中“從以前的步驟接受文件名”和“從以前的步驟接受字段名”,并選中“獲取文件名”步驟作為文件名的來源,選中filename字段作為文件名的字段,該字段由“獲取文件名”步驟所生成。注意現在不能再使用“預覽記錄”選項,只能在該步驟上選擇轉換里的預覽。
? ? ? ? 我們注意到在“文本文件輸入”步驟里也有路徑和文件名正則表達式選項,但最好把選擇文件的過程單獨放在“獲取文件名”步驟里。因為“獲取文件名”步驟可以從前面的步驟獲得路徑名和文件名的正則表達式,這樣比較靈活。而且“文本文件輸入”步驟本身不能獲取到文件名。? ? ? ? 最后一個步驟是“表輸出”,將文件內容裝載到數據庫表中。在該步驟中勾選“指定數據庫字段”選項,然后在“數據庫字段”標簽頁點擊“獲取字段”按鈕,在“插入的字段”列表中將會出現前面步驟數據流中的所有字段。只需要選擇表字段及其對應的流字段,本例中為:
c1?? ?Field_000 c2?? ?Field_001 c3?? ?Field_002 c4?? ?filename? ? ? ? 保存并執行該轉換后,t_txt表中數據如下:
mysql> select * from t_txt; +------+-------+---------------------+-----------------------------------------------+ | c1 ? | c2 ? ?| c3 ? ? ? ? ? ? ? ? ?| c4 ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?| +------+-------+---------------------+-----------------------------------------------+ | ? 11 | 1a,aa | 2020-01-01 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | ? 12 | 1b,bb | 2020-01-01 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | ? 13 | 1c,cc | 2020-01-01 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/1.txt | | ? 21 | 2a,aa | 2020-02-02 01:01:01 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | ? 22 | 2b,bb | 2020-02-02 02:02:02 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | ? 23 | 2c,cc | 2020-02-02 03:03:03 | zip:file:///root/kettle_hadoop/5/a.zip!/2.txt | | ? 31 | 3a,aa | 2020-03-03 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | ? 32 | 3b,bb | 2020-03-03 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | ? 33 | 3c,cc | 2020-03-03 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/3.txt | | ? 41 | 4a,aa | 2020-04-04 01:01:01 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | | ? 42 | 4b,bb | 2020-04-04 02:02:02 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | | ? 43 | 4c,cc | 2020-04-04 03:03:03 | zip:file:///root/kettle_hadoop/5/b.zip!/4.txt | +------+-------+---------------------+-----------------------------------------------+ 12 rows in set (0.00 sec)(2)處理XML文件
? ? ? ? XML是擴展標識語言(eXtensible Markup Language)的縮寫,是一種在平面文件中定義數據結構和內容的開放標準。XML格式非常流行,很多系統都使用這種格式交換數據。XML實際是一種遵照規范的結構化的文本文件,可以使用文本編輯器打開。Kettle里有四種驗證XML數據是否有效的方法。
- 驗證XML文件是否有效:只驗證XML是否有完整的開始和結束標簽,及各層嵌套的結構是否完整。
- DTD驗證:檢查XML文件的結構是否符合DTD(Data Type Definition)文件的要求。DTD可以是一個獨立的文件,也可以包含在XML文件中。
- XSD驗證(作業):檢查XML文件的結構是否符合XML Schema定義文件的要求。
- XSD驗證(轉換):和上面相同,但XML是在數據流的字段里(如數據庫的列里包含XML格式數據)。
? ? ? ? 可以使用“Get data from XML”步驟讀取XML文件。讀取XML文件的主要障礙就是分析嵌套的文件結構。從這個步驟輸出的數據流是平面的沒有嵌套的數據結構,可以存儲在關系數據庫中。與之相反,“Add XML”步驟用來把平面數據構造成嵌套形式的XML格式數據。
? ? ? ? 如果想把XML轉成其它格式,如另一種格式的XML文件、平面文件或HTML文件,要使用“XSL transformation”步驟。XSL是擴展樣式語言(eXtensible Stylesheet Language)的縮寫,這是一種用來轉換XML文檔的XML語言。轉換里的“XSD validator”步驟驗證數據流里的XML格式的數據,作業里的“XSD validator”作業項用于驗證一個完整的XML文件。? ? ? ? XML是一種非常靈活的格式,可以用來表達很多種數據結構,下面看一個簡單的示例。首先準備一個XML文檔,然后創建一個轉換,從該文檔抽取數據,并把數據保存在一個MySQL表中。最后再創建一個功能相反的轉換,從MySQL表中抽取數據并保存成XML文件。
? ? ? ? 示例XML文檔sample.xml的內容如下:
<rows><info><infodata user="user1"><data>data1</data></infodata><infodata user="user2"><data>data2</data></infodata></info><row><parameter><user>user1</user><password>pass1</password></parameter><parameter><user>user2</user><password>pass2</password></parameter><parameter><user>user3</user><password>pass3</password></parameter>?? ?</row> </rows><rows>節點下包括了一個<info>節點和一個<row>節點。這兩個節點分別又包含一組<infodata>節點和一組<parameter>節點。<infodata>節點具有屬性user。<parameter>節點下的<user>節點包括了某個<infodata>節點的user屬性值。
? ? ? ? 對應MySQL表t_xml結構如下:
mysql> desc t_xml; +----------+-------------+------+-----+---------+-------+ | Field ? ?| Type ? ? ? ?| Null | Key | Default | Extra | +----------+-------------+------+-----+---------+-------+ | rn ? ? ? | int(11) ? ? | YES ?| ? ? | NULL ? ?| ? ? ? | | username | varchar(20) | YES ?| ? ? | NULL ? ?| ? ? ? | | pass ? ? | varchar(20) | YES ?| ? ? | NULL ? ?| ? ? ? | | info ? ? | varchar(20) | YES ?| ? ? | NULL ? ?| ? ? ? | | xmlfile ?| varchar(50) | YES ?| ? ? | NULL ? ?| ? ? ? | +----------+-------------+------+-----+---------+-------+ 5 rows in set (0.01 sec)rn存儲記錄行號,username和pass字段分別存儲XML文檔中<user>和<password>節點的值,info字段保存<data>節點的值,xmlfile保存XML文件名。
? ? ? ? 如圖5-6所示的轉換從sample.xml文件抽取數據并轉載到數據庫表中。
圖5-6 抽取XML文件數據? ? ? ? 這個轉換只有“Get data from XML”和“表輸出”兩個步驟。“Get data from XML”步驟從靜態XML文件讀取數據,并輸出XML節點值,本質上是將一個層次結構平面化展開的過程。
? ? ? ? 在該步驟的“文件”標簽頁選擇要讀取的XML文件。點擊“瀏覽”按鈕選擇本地的sample.xml文件,然后點擊“增加”按鈕,/root/kettle_hadoop/5/sample.xml將出現在“選中的文件和目錄”列表中。“內容”標簽頁定義XML文件格式,如圖5-7所示。
圖5-7 定義XML文件格式? ? ? ? 標簽頁里最重要的屬性是“循環讀取路徑”。這里需要設置一個XPath表達式。XPath表達式將從XML文檔中過濾出一個節點集,就是XML節點的一個集合。集合里的每一個節點都將被解析為一行記錄,并放到輸出流中。本例中設置為/rows/row/parameter。如果已經在“文件”標簽頁中指定了一個XML文件,可以點擊“獲取XML文檔的所有路徑”按鈕幫助設置XPath屬性。這個按鈕獲取了XML文檔里的全部路徑,如圖5-8所示。
圖5-8 獲取全部路徑的選擇列表? ? ? ? “內容”標簽頁里還包括以下屬性。
- 編碼:用來定義XML文檔的編碼。如果XML文檔本身沒有指定編碼,就要用到這個選項。通常情況下,XML文檔的編碼在文件頭定義,例如:<?xml version="1.0" encoding="UTF-8"?>
- 考慮命名空間:如果文檔使用了命名空間,需要選中該選項。
- 忽略注釋:通常情況下,XML注釋也被看作是節點,如果要忽略注釋節點,要選中該選項。
- 驗證XML:如果想在抽取數據前使用DTD驗證,要選中該選項。
- 使用標記:該選項用于“字段”標簽頁的設置,在后面討論。
- 忽略空文件:如果指定的文件是空,不會拋出異常。
- 如果沒有文件不要報告錯誤:如果指定的文件不存在,不會拋出異常。
- 限制:限制生成的最大記錄行數,默認值為0,意味著對每一個抽取到的XML節點都生成一條記錄。
- 用于截取數據的XML路徑(大文件):一般情況下,XML文檔一次性讀如內存,讀取路徑XPath表達式可以應用于整個文檔。但如果XML文檔非常大,XPath表達式匹配到的所有XML節點不能一次放入內存中,此時就需要指定另一個XPath表達式把XML文檔分成多塊,就是這里的XML截取路徑。這個用于把XML文檔分塊的XPath路徑不支持全部的XPath語法,只能使用斜線分隔的節點名這種語法格式,不支持命名空間和謂詞表達式。另外截取路徑XPath必須是讀取路徑的上一級或同級目錄。
- 輸出中包括文件名/文件名字段:如果使用XML文件作為源,該選項可以在輸出流中增加一個字段保存XML文件名。“文件名字段”選項設置新增字段的字段名。
- 輸出中包括行號/行數字段:該選項可以為每一個數據行生成一個序列號。“行數字段”選項設置行號字段的字段名。
- 將文件增加到結果文件中:如果使用了XML文件,選中該選項把文件添加到結果文件列表中。在父作業中就可以再處理這個文件。
? ? ? ? 在“內容”標簽頁中已經使用XPath表達式匹配了XML節點集。“字段”標簽頁用來從XML節點抽取字段,如圖5-9所示。
圖5-9 定義抽取的字段? ? ? ? 列表中的前兩行是點擊“獲取字段”自動得到的。“名稱”列用來設置要抽取的字段名。“XML路徑”列使用XPath表達式指定從哪里獲得字段的值。XPath表達式用來匹配XML數據行里的字段。下面詳細說一下第三行data字段獲取。
? ? ? ? “字段”標簽頁里的XPath表達式支持一種非標準化的稱為token的擴展形式。token用來參數化XPath表達式,它可以把字段值綁定到XPath表達式里。本例中data字段的XPath是../../info/infodata[@user=@_user-]/data。../代表的返回上一層,當前路徑是/rows/row/parameter,因此對應的絕對路經為/rows/info。infodata[@user=@_user-]這一段指的是infodata目錄下滿足條件的用戶,也就是token的作用所在。@user所引用的是infodata節點的user屬性值,表達式@_user-就是token,這個token包括一個@符號,一條下劃線,然后是字段名user,最后是一個短橫線。可以看到token的功能和數據庫中表join相似,user1的用戶名密碼等屬性沒有和data數據在一個讀取路徑下,那么通過token我們就可以像表一樣給它們連接起來,得到user1的數值data1。
? ? ? ? 如果要使用token,需要選中“內容”標簽頁里的“使用標記”復選框。另外使用token有以下幾個限制:
- XML文檔中被引用的節點(<infodata>)必須出現在引用它的節點(<user>)之前。
- token里使用的字段(本例的user)必須出現在使用token的字段(本例的data)之前。
- token語法只對“字段”標簽頁中的XPath表達式有效,不能用于“內容”標簽頁中的XPath表達式。
? ? ? ? 本例中的第二個步驟是“表輸出”,只要連接到目標數據庫表,勾選“指定數據庫字段”選項,然后在“數據庫字段”標簽頁定義表字段與流字段的關系如下:
username?? ?user pass?? ??? ?password info?? ??? ?data xmlfile?? ??filename rn?? ??? ???rn? ? ? ? 保存并成功執行轉換后,表t_xml數據如下:
mysql> select * from test.t_xml; +------+----------+-------+-------+----------------------------------+ | rn ? | username | pass ?| info ?| xmlfile ? ? ? ? ? ? ? ? ? ? ? ? ?| +------+----------+-------+-------+----------------------------------+ | ? ?1 | user1 ? ?| pass1 | data1 | /root/kettle_hadoop/5/sample.xml | | ? ?2 | user2 ? ?| pass2 | data2 | /root/kettle_hadoop/5/sample.xml | | ? ?3 | user3 ? ?| pass3 | NULL ?| /root/kettle_hadoop/5/sample.xml | +------+----------+-------+-------+----------------------------------+ 3 rows in set (0.00 sec)? ? ? ? 圖5-10所示的轉換執行一個反向的過程,讀取數據庫表數據,然后用數據生成XML節點。“表輸入”步驟連接的數據庫表就是上個轉換所裝載的t_xml。
圖5-10 生成XML節點? ? ? ? “Add XML”步驟用于生成XML節點。對輸入流里的每一行,該步驟會添加一個包含XML字符串的新字段,并把這一行發送到下一個步驟中。在配置對話框里有“內容”和“字段”兩個標簽頁,可以設置生成的XML節點的名稱、屬性、內容等。本例中的內容標簽頁各選項值如下:
- 編碼:UTF-8
- Output Value:xmlvaluename
- 根XML元素:ROW
- Omit XML header:勾選
- Omit null values from XML result:勾掉
? ? ? ? “內容”這個標簽名字有一點令人迷惑,它實際用于設置生成的XML節點的屬性,而不是它的內容。“編碼”下拉列表用來指定一個編碼(默認UTF-8)。“Output Value”屬性設置保存XML節點的字段名。“根XML元素”屬性設置XML節點的名稱。注意,節點名稱目前是一個字符串常量,不能指定一個字段來動態設置節點名稱。“Omit XML header”復選框用來只生成XML片段,以后合并到其它XML文檔中。對于最外層的節點來說,一定要清除這個選項,以便生成帶有XML定義的XML文檔。“Omit null values from XML result”復選框可以用來控制對NULL的展現,是對文檔內容的設置。
? ? ? ? “字段”標簽頁用來控制如何使用輸入流字段生成XML文檔的內容或屬性。可以通過點擊“獲取字段”按鈕,自動得到從前面的步驟輸出的字段,本例中為表t_xml的 rn、username、pass、info四個字段,如圖5-11所示。
圖5-11 “Add XML”步驟的“字段”標簽頁? ? ? ? 輸入流字段可以通過四種方式來構成XML文檔。
- 生成“根XML元素”的子節點,把字段內容作為子節點的內容。表格中的“Element name”用來設置節點名。
- 生成“根XML元素”的屬性,把字段內容作為屬性的內容。這種方式需要把表格里的“屬性”列設置為Y,并把“Attribute parent name”列留空。
- 把字段內容作為“根XML元素”的文本內容。這種方式的配置和上面的第一種方式的配置非常類似。唯一的不同之處是必須使用“根XML元素”的名字作為節點的名字。盡管配置變化不大,最后效果相差卻很大:不會生成子節點,字段的值作為“根XML元素”節點的內容。
- 生成“根XML元素”的子節點,把字段內容作為子節點的屬性。這種方式的配置和第二種方式類似。不同之處就是需要在“Attribute parent name”列中輸入要設置的節點的名字。
? ? ? ? 如果字段中有NULL值,默認情況下會產生一個空節點或屬性值。可以選中“內容”標簽頁中的“Omit null values from XML result”選項來忽略這樣的節點或屬性值。
? ? ? ? 執行轉換后,xmlvaluename字段的值如下,可以點擊“Add XML”步驟右鍵菜單中的Preview菜單項來查看。
<Row><rn>1</rn><username>user1</username><pass>pass1</pass><info>data1</info></Row> <Row><rn>2</rn><username>user2</username><pass>pass2</pass><info>data2</info></Row> <Row><rn>3</rn><username>user3</username><pass>pass3</pass><info></info></Row>2. 數據庫抽取
? ? ? ? 本節討論如何從傳統關系型數據庫抽取數據,從“表輸入”步驟開始,用示例解釋這個步驟里的參數和變量如何工作。源數據表就用處理文本文件時創建的t_txt表。“表輸入”步驟的功能實際上是向所連接的數據庫發送select查詢語句,并將查詢結果返回到輸出流中。
? ? ? ? 可以有兩種參數化的查詢方法:使用參數和使用變量替換。使用參數的方法需要在“表輸入”步驟前面有一個步驟,用來給“表輸入”步驟提供一個或多個參數,這些參數替換“表輸入”步驟的SQL語句里的問號。這種方法的配置窗口如圖5-12所示。
圖5-12 參數化查詢? ? ? ? 這個例子中的“自定義常量數據”步驟定義了兩個常量a和b,數據類型分別是String和Date,兩個常量的數據這就是后面“表輸入”步驟查詢語句中替換兩個問號的數據。例如在“自定義常量數據”步驟的“數據”標簽頁中給常量a和b分別賦值‘a’和‘2020-02-02’,則轉換執行時,“表輸入”步驟的查詢語句實際為:
SELECTc1 , c2 , c3 , c4 FROM t_txt where c2 like concat('%','a','%') and c3 >='2020-02-02';? ? ? ? 點擊“表輸入”步驟右鍵的Preview菜單項預覽數據,顯示如下:
21?? ?2a,aa?? ?2020/02/02 01:01:01.000000000?? ?zip:file:///root/kettle_hadoop/5/a.zip!/2.txt 31?? ?3a,aa?? ?2020/03/03 01:01:01.000000000?? ?zip:file:///root/kettle_hadoop/5/b.zip!/3.txt 41?? ?4a,aa?? ?2020/04/04 01:01:01.000000000?? ?zip:file:///root/kettle_hadoop/5/b.zip!/4.txt? ? ? ? “表輸入”步驟中的主要選項含義如下。
- 允許簡易轉換:選中此選項后,在可能情況下避免轉換進行不必要的數據類型轉換,可以顯著提高性能。
- 替換SQL語句里的變量:選擇此選項可替換腳本中的變量。此特性提供了使用變量替換的測試功能。
- 從步驟插入數據:選擇提供替換SQL語句中問號參數數據的步驟。
- 執行每一行:選擇此選項可對每一輸入行執行查詢。
- 記錄數量限制:指定要從數據庫中讀取的行數,缺省值0表示讀取所有行。
? ? ? ? 本例的“自定義常量數據”步驟只用來演示,實際使用中,最好用其它步驟替換它。在本篇后面的CDC部分能看到一個類似的例子。
? ? ? ? 第二種參數化查詢方法是使用變量,變量要在使用變量的轉換之前的轉換中進行設置。設置變量的轉換如圖5-13所示,設置變量的轉換往往是作業里的第一個轉換。
圖5-13 設置變量的轉換? ? ? ? 兩個變量var_c2和var_c3的值來自前面的“自定義常量數據”步驟里a和b定義的值。在后面轉換的“表輸入”步驟中可以使用這些變量,查詢里的變量名被變量的值替換。使用變量的表輸入步驟如圖5-14所示。
圖5-14 使用變量的表輸入步驟? ? ? ? 為了查看轉換的執行結果,使用“文本文件輸出”步驟將表輸入步驟的查詢結果寫入一個文本文件。上面兩個轉換都在一個作業里,圖5-15顯示了這兩個轉換,第一個轉換時設置變量,第二個轉換使用變量作為表輸入步驟的參數。
圖5-15 使用變量的作業? ? ? ? 本例中常量a和b的值分別為‘b’和‘2020-01-01’。執行作業后,生成的文本文件內容如下:
[root@localhost 5]# more file.txt? 12?? ?1b,bb?? ?2020-01-01 02:02:02?? ?zip:file:///root/kettle_hadoop/5/a.zip!/1.txt 22?? ?2b,bb?? ?2020-02-02 02:02:02?? ?zip:file:///root/kettle_hadoop/5/a.zip!/2.txt 32?? ?3b,bb?? ?2020-03-03 02:02:02?? ?zip:file:///root/kettle_hadoop/5/b.zip!/3.txt 42?? ?4b,bb?? ?2020-04-04 02:02:02?? ?zip:file:///root/kettle_hadoop/5/b.zip!/4.txt二、變化數據捕獲
? ? ? ? 抽取數據是ETL處理過程的第一個步驟,也是數據倉庫中最重要和最具有挑戰性的部分,適當的數據抽取是成功建立數據倉庫的關鍵。從源抽取數據導入數據倉庫或過渡區有兩種方式,可以從源把數據抓取出來(拉),也可以請求源把數據發送(推)到數據倉庫。影響選擇數據抽取方式的一個重要因素是操作型系統的可用性和數據量,這是抽取整個數據還是僅僅抽取自最后一次抽取以來的變化數據的基礎。我們考慮以下兩個問題:
- 需要抽取哪部分源數據加載到數據倉庫?有兩種可選方式,完全抽取和變化數據捕獲。
- 數據抽取的方向是什么?有兩種方式,拉模式,即數據倉庫主動去源系統拉取數據;推模式,由源系統將自己的數據推送給數據倉庫。
? ? ? ? 對于第二個問題來說,通常要改變或增加操作型業務系統的功能是非常困難的,這種困難不僅是技術上的,還有來自于業務系統用戶及其開發者的阻力。理論上講,數據倉庫不應該要求對源系統做任何改造,實際上也很少由源系統推數據給數據倉庫。因此對這個問題的答案比較明確,大都采用拉數據模式。下面我們著重討論第一個問題。
? ? ? ? 如果數據量很小并且易處理,一般來說采取完全源數據抽取,就是將所有的文件記錄或所有的數據庫表數據抽取至數據倉庫。這種方式適合基礎編碼類型的源數據,比如郵政編碼、學歷、民族等。基礎編碼型源數據通常是維度表的數據來源。如果源數據量很大,抽取全部數據是不可行的,那么只能抽取變化的源數據,即最后一次抽取以來發生了變化的數據。這種數據抽取模式稱為變化數據捕獲,簡稱CDC,常被用于抽取操作型系統的事務數據,比如銷售訂單、用戶注冊,或各種類型的應用日志記錄等。
? ? ? ? CDC大體可以分為兩種,一種是侵入式的,另一種是非侵入式的。所謂侵入式的是指CDC操作會給源系統帶來性能的影響。只要CDC操作以任何一種方式對源庫執行了SQL語句,就可以認為是侵入式的CDC。常用的四種CDC方法是:基于源數據的CDC、基于觸發器的CDC、基于快照的CDC、基于日志的CDC,其中前三種是侵入性的。表5-1總結了四種CDC方案的特點。
| ? | 源數據 | 觸發器 | 快照 | 日志 |
| 能區分插入/更新 | 否 | 是 | 是 | 是 |
| 周期內,檢測到多次更新 | 否 | 是 | 否 | 是 |
| 能檢測到刪除 | 否 | 是 | 是 | 是 |
| 不具有侵入性 | 否 | 否 | 否 | 是 |
| 支持實時 | 否 | 是 | 否 | 是 |
| 不依賴數據庫 | 是 | 否 | 是 | 否 |
表5-1 四種CDC方案比較
1. 基于源數據的CDC
? ? ? ? 基于源數據的CDC要求源數據里有相關的屬性字段,抽取過程可以利用這些屬性字段來判斷哪些數據是增量數據。最常見的屬性字段有以下兩種。
- 時間戳:這種方法至少需要一個更新時間戳,但最好有兩個,一個插入時間戳,表示記錄何時創建,一個更新時間戳,表示記錄最后一次更新的時間。
- 序列:大多數數據庫系統都提供自增功能。如果數據庫表列被定義成自增的,就可以很容易地根據該列識別出新插入的數據。
? ? ? ? 這種方法的實現較為簡單,假設表t1中有一個時間戳字段last_inserted,t2表中有一個自增序列字段id,則下面SQL語句的查詢結果就是新增的數據,其中{last_load_time}和{last_load_id}分別表示ETL系統中記錄的最后一次數據裝載時間和最大自增序列號。
select * from t1 where last_inserted > {last_load_time}; select * from t2 where id > {last_load_id};? ? ? ? 通常需要建立一個額外的數據庫表存儲上一次更新時間或上一次抽取的最后一個序列號。實踐中,一般是在一個獨立的模式下或在數據過渡區里創建這個參數表。下面來看Kettle里使用時間戳方式CDC的例子。前一篇建立的ETL示例模型中,source.sales_order表的entry_date字段表示訂單錄入的時間。我們還需要把上一次裝載時間存儲在屬性文件或參數表里。先使用下面的腳本在hive里的rds庫中建立一個名為cdc_time的時間戳表,并設置初始數據。
use rds; ?drop table if exists cdc_time ; ? create table cdc_time ? ( id int, last_load date, current_load date) ? clustered by (id) into 1 buckets ? ?? stored as orc tblproperties ('transactional'='true');insert into table cdc_time select 1, '1971-01-01', '1971-01-01' ;? ? ? ? 后面的Kettle轉換中需要對cdc_time執行行級更新,因此該表必須分桶、使用ORC格式、設置支持事務。id字段用于分桶,不做更新操作。時間戳有last_load和current_load兩個字段。之所以需要兩個字段,是因為抽取到的數據可能會多于本次需要處理的數據。比如,兩點執行ETL過程,則零點到兩點這兩個小時的數據不會在本次處理。為了確定這個截至時間點,需要給時間戳設定一個上限條件,即這里的current_load字段值。本示例的時間粒度為每天,時間戳只要保留日期部分即可,因此數據類型選為date。最開始這個兩個時間戳都設置成一個早于所有業務數據的時間,當開始裝載時,current_load時間戳設置為當前時間。
? ? ? ? 該表的邏輯描述如下。
1. 裝載作業開始后,作業要先把current_load設置成作業的開始日期,可以通過如圖5-16的“設置系統日期”轉換實現。
? ? ? ? 在“獲取系統信息”步驟里創建一個當前日期的字段cur_date,以及一個前一天的日期pre_date字段,然后將兩個字段的數據復制分發到“插入/更新”步驟和“字段選擇”步驟。“插入/更新”步驟的“更新字段”部分,用流里的字段“cur_date”去更新表里的字段“current_load”。另外還要設置“用來查詢的關鍵字”部分,把表的“current_load”條件設置為“IS NOT NULL”即可。其含義是當“current_load”為空時執行插入,否則執行更新操作。
? ? ? ? 在“字段選擇”步驟的“元數據”標簽頁中,修改pre_date字段的類型為“Date”,格式為“yyyy-MM-dd”。格式化的前一天日期值傳遞給“設置變量”步驟,該步驟將pre_date字段值定義為一個變量PRE_DATE,用于將日期拼接到上傳至HDFS的文件名中。變量活動類型(作用域)為“Valid in the root job”,即調用該轉換的所有作業均可使用該變量。
2. 從sales_order表里抽取數據的查詢使用開始日期和結束日期,如圖5-17所示的“裝載銷售訂單表”轉換所示。
圖5-17 “裝載銷售訂單表”轉換? ? ? ? ?“表輸入”步驟獲取到cdc_time表的last_load和current_load日期。“數據庫連接步驟”用前一步驟獲得的last_load和current_load值替換查詢語句中問號標識的參數。通過比較表字段entry_date的值判斷新增的數據。這里假設源系統中銷售訂單記錄一旦入庫就不再改變,或者可以忽略改變。也就是說銷售訂單是一個隨時間變化單向追加數據的表。sales_order表中有兩個關于時間的字段,order_date表示訂單時間,entry_date表示訂單數據實際插入表里的時間,在后面第九篇“(九)事實表技術”討論“遲到的事實”時就會看到兩個時間可能不同。那么用哪個字段作為CDC的時間戳呢?設想這樣的情況,一個銷售訂單的訂單時間是2020年1月1日,實際插入表里的時間是2020年1月2日,ETL每天0點執行,抽取前一天的數據。如果按order_date抽取數據,條件為where order_date >= '2020-01-02' AND order_date < '2020-01-03',則2020年1月3日0點執行的ETL不會捕獲到這個新增的訂單數據,所以應該以entry_date作為CDC的時間戳。
? ? ? ? 最后將新增數據通過“Hadoop file output”步驟上傳到rds.sales_order表對應的HDFS目錄下。“文件”標簽頁中的“Hadoop Cluster”選擇CDH631,“Folder/File”輸入“/user/hive/warehouse/rds.db/sales_order/sales_order_${PRE_DATE}”,其中${PRE_DATE}引用的就是圖5-16中“設置變量”步驟定義的變量。“內容”標簽頁指定分隔符為逗號,格式選擇Unix,編碼為UTF-8。“字段”標簽頁選擇sales_order表中全部六個字段。
3. 如果轉換中沒有發生任何錯誤,要把current_load字段里的值復制到last_load字段里,用如圖5-18所示的“SQL”作業項實現。如果轉換中發生了錯誤,時間戳需要保持不變,以便后面再次執行。
圖5-18 更新last_load的“SQL”作業項? ? ? ? 將上述轉換和作業項放到一個作業中,如圖5-19所示。
圖5-19 基于時間戳的CDC作業? ? ? ? 首次作業成功執行后,hive表sales_order所對應的HDFS目錄下生成了一個帶有前一天日期的文件:
[root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/ Found 1 items -rw-r--r-- ? 3 root hive ? ? ? 5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt [root@manager~]#rds.sales_order裝載全部100條銷售訂單記錄,rds.cdc_time的last_load和current_load均更新為當前日期:
hive> use rds; OK hive> select * from sales_order; OK 1?? ?6?? ?2?? ?2020-03-01 20:13:34?? ?2020-03-01 20:13:34?? ?3777.00 2?? ?4?? ?2?? ?2020-03-03 19:07:07?? ?2020-03-03 19:07:07?? ?9227.00 ... 99?? ?3?? ?1?? ?2020-08-29 01:20:11?? ?2020-08-29 01:20:11?? ?9058.00 100?? ?1?? ?2?? ?2020-08-31 09:43:38?? ?2020-08-31 09:43:38?? ?5607.00 Time taken: 2.41 seconds, Fetched: 100 row(s) hive> select * from cdc_time; OK 1?? ?2020-09-25?? ?2020-09-25 hive>? ? ? ? 基于時間戳和自增序列的方法是CDC最簡單的實現方式,也是最常用的方法,但它的缺點也很明顯,主要如下:
- 不能區分插入和更新操作。只有當源系統包含了插入時間戳和更新時間戳兩個字段,才能區別插入和更新,否則不能區分。
- 不能記錄刪除記錄的操作。不能捕獲到刪除操作,除非是邏輯刪除,即記錄沒有被真的刪除,只是做了邏輯上的刪除標志。
- 無法識別多次更新。如果在一次同步周期內,數據被更新了多次,只能同步最后一次更新操作,中間的更行操作將會丟失。
- 不具有實時能力。時間戳和基于序列的數據抽取一般適用于批量操作,不適合于實時場景下的數據抽取。
? ? ? ? 這種方法是具有侵入性的,如果操作型系統中沒有時間戳或時間戳信息是不可用的,那么不得不通過修改源系統把時間戳包含進去,要求修改操作型系統的表包含一個新的時間戳字段。有些數據庫系統可以自動維護timestamp類型的值。如在MySQL中只要如下定義,當執行insert或update操作時,所影響數據行的ts字段將會自動更新為當前時間:
alter table t1 add column ts timestamp default current_timestamp on update current_timestamp;而有些數據庫系統,需要建立一個觸發器,在修改一行時更新時間戳字段的值。下面是一個Oracle數據庫的例子。當t1表上執行了insert或update操作時,觸發器會將last_updated字段更新為當前系統時間。
alter table t1 add last_updated date;create or replace trigger trigger_on_t1_changebefore insert or updateon t1for each row begin:new.last_updated := sysdate; end; /? ? ? ? 在實施這些操作前必須被源系統的擁有者所接受,并且要仔細評估對源系統產生的影響。
2. 基于觸發器的CDC
? ? ? ? 當執行INSERT、UPDATE、DELETE這些SQL語句時,可以激活數據庫里的觸發器,并執行一些動作,就是說觸發器可以用來捕獲變更的數據并把數據保存到中間臨時表里。然后這些變更的數據再從臨時表中取出,被抽取到數據倉庫的過渡區里。大多數場合下,不允許向操作型數據庫里添加觸發器(業務數據庫的變動通常都異常慎重),而且這種方法會降低系統的性能,所以此方法用的并不是很多。
? ? ? ? 作為直接在源數據庫上建立觸發器的替代方案,可以使用源數據庫的復制功能,把源數據庫上的數據復制到備庫上,在備庫上建立觸發器以提供CDC功能。盡管這種方法看上去過程冗余,且需要額外的存儲空間,但實際上這種方法非常有效,而且沒有侵入性。復制是大部分數據庫系統的標準功能,如MySQL、Oracle和SQL Server等都有各自的數據復制方案。
? ? ? ? 一個類似于內部觸發器的例子是Oracle的物化視圖日志。這種日志被物化視圖用來識別改變的數據,并且這種日志對象能夠被最終用戶訪問。一個物化視圖日志可以建立在每一個需要捕獲變化數據的源表上。之后任何時間在源表上對任何數據行做修改時,都有一條記錄插入到物化視圖日志中表示這一行被修改了。如果想使用基于觸發器的CDC機制,并且源數據庫是Oracle,這種物化視圖日志方案是很方便的。物化視圖日志依賴于觸發器,但是它們提供了一個益處是,建立和維護這個變化數據捕獲系統已經由Oracle自動管理了。我們甚至可以在物化視圖上建立自己的觸發器,每次物化視圖刷新時,觸發器基于刷新時間點的物化視圖日志歸并結果,在一些場景下(只要記錄兩次刷新時間點數據的差異,不需要記錄兩次刷新之間的歷史變化)可以簡化應用處理。下面是一個Oracle物化視圖的例子。每條數據的變化可以查詢物化視圖日志表mlog$_tbl1,兩個刷新時間點之間的數據差異,可以查詢mv_tbl1_tri表。
-- 建立mv測試表 ? create table tbl1(a number,b varchar2 (20)); ? create unique index tbl1_pk on tbl1 (a); ? alter table tbl1 add (constraint tbl1_pl primary key(a)); ? -- 建立mv日志,單一表聚合視圖的快速刷新需要指定including new values子句 ? create materialized view log on tbl1 including new values; ? -- 建立mv ? create materialized view mv_tbl1 build immediate refresh fast ? start with to_date('2013-06-01 08:00:00','yyyy-mm-dd hh24:mi:ss') ? next sysdate + 1/24 ? as select * from tbl1; ? -- 建立trigger測試表 ? create table mv_tbl1_tri (a number,b varchar (20),c varchar (20)); ? -- 建立trigger ? create or replace trigger tri_mv ?after delete or insert or update ?on mv_tbl1 ?referencing new as new old as old ?for each row ? begin ?case ?when inserting then ?insert into mv_tbl1_tri values (:new.a, :new.b, 'insert'); ?when updating then ?insert into mv_tbl1_tri values (:new.a, :new.b, 'update'); ?when deleting then ?insert into mv_tbl1_tri values (:old.a, :old.b, 'delete'); ?end case; ? exception ?when others then ?raise; ? end tri_mv; ? / ? -- 對表tbl1進行一系列增刪改操作 -- ...-- 手工刷新mv ? exec dbms_mview.refresh('mv_tbl1'); ? -- 查看物化視圖日志 select * from mlog$_tbl1; ? -- 檢查trigger測試表 ? select * from mv_tbl1_tri;?3. 基于快照的CDC
? ? ? ? 如果沒有時間戳,也不允許使用觸發器,就要使用快照表了。可以通過比較源表和快照表來獲得數據變化。快照就是一次性抽取源系統中的全部數據,把這些數據裝載到數據倉庫的過渡區中。下次需要同步時,再從源系統中抽取全部數據,并把全部數據也放到數據倉庫的過渡區中,作為這個表的第二個版本,然后再比較這兩個版本的數據,從而找到變化。
? ? ? ? 有多個方法可以獲得這兩個版本數據的差異。假設表有兩個列id和name,id是主鍵列。該表的第一、二個版本的快照表名為snapshot_1、snapshot_2。下面的SQL語句在主鍵id列上做全外鏈接,并根據主鍵比較的結果增加一個標志字段,I表示新增,U表示更新,D代表刪除,N代表沒有變化。外層查詢過濾掉沒有變化的記錄。
select * from? (select case when t2.id is null then 'D'when t1.id is null then 'I'when t1.name <> t2.name then 'U'else 'N'end as flag,case when t2.id is null then t1.id else t2.id end as id, t2.namefrom snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) awhere flag <> 'N';當然,這樣的SQL語句需要數據庫支持全外鏈接,對于MySQL這樣不支持全外鏈接的數據庫,可以使用類似下面的SQL語句:
select 'U' as flag, t2.id as id, t2.name as namefrom snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.idwhere t1.name != t2.nameunion all? select 'D' as flag, t1.id as id, t1.name as namefrom snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.idwhere t2.id is nullunion all? select 'I' as flag, t2.id as id, t2.name as namefrom snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.idwhere t1.id is null;? ? ? ? Kettle里的“合并記錄”步驟能夠比較兩個表的差異。該步驟讀取兩個使用關鍵字排序的輸入數據流,并基于數據流里的關鍵字比較其它字段。可以選擇要比較的字段,并設置一個標志字段,作為比較結果輸出字段。我們用示例模型里的source.sales_order表做個例子。
1. 先把source.sales_order表復制到另一個數據庫中。
create table test.sales_order select * from source.sales_order;2. 創建一個用于快照CDC的轉換,如圖5-20所示。
圖5-20 用于快照CDC的轉換? ? ? ? 創建兩個“表輸入”步驟,一個連接source.sales_order,另一個連接test.sales_order,SQL查詢語句如下:
SELECTorder_number , customer_number , product_code , order_date , entry_date , order_amount FROM sales_order order by order_number;然后添加一個“合并記錄”步驟,如圖5-21所示。
圖5-21 “合并記錄”步驟設置把兩個表輸入步驟都連接到“合并記錄”步驟,在步驟中在選擇新舊數據源,設置標志字段名,該字段的值為new、changed、deleted或identical,分別標識新增、修改、刪除和沒有變化的記錄。另外設置主鍵字段和需要比較的字段。
? ? ? ? 為了過濾沒有發生變化的數據,在后面加一個“過濾記錄”步驟,過濾條件是“flagfield=identical”,把所有沒有變化的數據發送到“空操作”步驟,把新增、修改、刪除的數據發送到“數據同步”步驟,該步驟可以根據標志字段自動進行新增、修改、刪除等操作。“一般”和“高級”標簽頁的配置如圖5-22所示。
圖5-22 “數據同步”步驟設置根據數據流中flagfield字段的值決定要執行的插入、更新或刪除操作。當目標表test.sales_order中的order_number與數據流order_number相同時,更新目標表的全部六個字段。
3. 驗證轉換
? ? ? ? 初始source.sales_order和test.sales_order兩個表數據相同:
對source.sales_order數據做一些修改:
insert into source.sales_order values (101,1,1,now(),now(),100); delete from source.sales_order where order_number=99; update source.sales_order set order_amount=5606 where order_number=100;預覽“合并記錄”步驟的數據:
order_number?? ?customer_number?? ?product_code?? ?order_date?? ?entry_date?? ?order_amount?? ?flagfield ... 98?? ?2?? ?1?? ?2020/08/27 14:02:35.000000000?? ?2020/08/27 14:02:35.000000000?? ?8144.0?? ?identical 99?? ?3?? ?1?? ?2020/08/29 01:20:11.000000000?? ?2020/08/29 01:20:11.000000000?? ?9058.0?? ?deleted 100?? ?1?? ?2?? ?2020/08/31 09:43:38.000000000?? ?2020/08/31 09:43:38.000000000?? ?5606.0?? ?changed 101?? ?1?? ?1?? ?2020/09/24 16:53:56.000000000?? ?2020/09/24 16:53:56.000000000?? ?100.0?? ?new成功執行轉換后,test.sales_order的數據已經和source.sales_order同步:
+--------------+-----------------+--------------+---------------------+---------------------+--------------+ | order_number | customer_number | product_code | order_date ? ? ? ? ?| entry_date ? ? ? ? ?| order_amount | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ ... | ? ? ? ? ? 98 | ? ? ? ? ? ? ? 2 | ? ? ? ? ? ?1 | 2020-08-27 14:02:35 | 2020-08-27 14:02:35 | ? ? ?8144.00 | | ? ? ? ? ?100 | ? ? ? ? ? ? ? 1 | ? ? ? ? ? ?2 | 2020-08-31 09:43:38 | 2020-08-31 09:43:38 | ? ? ?5606.00 | | ? ? ? ? ?101 | ? ? ? ? ? ? ? 1 | ? ? ? ? ? ?1 | 2020-09-24 16:53:56 | 2020-09-24 16:53:56 | ? ? ? 100.00 | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ 100 rows in set (0.00 sec)4. 恢復原數據
insert into source.sales_order values (99,3,1,'2020-08-29 01:20:11','2020-08-29 01:20:11',9058); update source.sales_order set order_amount=5607 where order_number=100; delete from source.sales_order where order_number=101;? ? ? ? 基于快照的CDC可以檢測到插入、更新和刪除的數據,這是相對于基于時間戳的CDC方案的優點。它的缺點是需要大量的存儲空間來保存快照,因為比較的是兩個全量數據集合。同樣的原因,當表很大時,這種查詢會有比較嚴重的性能問題。
4. 基于日志的CDC
? ? ? ? 最復雜的和最沒有侵入性的CDC方法是基于日志的方式。數據庫會把每個插入、更新、刪除操作記錄到日志里。如使用MySQL數據庫,只要在數據庫服務器中啟用二進制日志(設置log_bin服務器系統變量),之后就可以實時從數據庫日志中讀取到所有數據庫寫操作,并使用這些操作來更新數據倉庫中的數據。現在十分流行的canal就是基于這個原理,將自己模擬成一個從庫,接收主庫的二進制日志,從而捕獲數據變化。
? ? ? ? 也可以手工解析二進制日志,將其轉為可以理解的格式,然后再把里面的操作按照順序讀取出來。MySQL提供了一個叫做mysqlbinlog的日志讀取工具。這個工具可以把二進制的日志格式轉換為可讀的格式,然后就可以把這種格式的輸出保存到文本文件里,或者直接把這種格式的日志應用到MySQL客戶端用于數據還原操作。mysqlbinlog工具有很多命令行參數,其中最重要的一組參數可以設置開始/截止時間戳,這樣能夠只從日志里截取一段時間的日志。另外,日志里的每個日志項都有一個序列號,也可以用來做偏移操作。MySQL的日志提供了上述兩種方式來防止CDC過程發生重復或丟失數據的情況。下面是使用mysqlbinlog的兩個例子。
mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456 mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt第一條命令將jbms_binlog.000002文件中從120偏移量以后的操作應用到一個MySQL數據庫中。第二條命令將jbms_binlog.000002文件中一段時間的操作格式化輸出到一個文本文件中。
? ? ? ? 其它數據庫也有類似的方法,下面再來看一個使用Oracle日志分析的實例。有個項目提出的需求是這樣的:部署兩個相同的Oracle數據庫A、B,兩個庫之間沒有網絡連接,要定期把A庫里的數據復制到B庫。要求:1. 應用程序不做修改。2. 實現增量數據更新,并且不允許重復數據導入。
? ? ? ? Oracle提供了DBMS_LOGMNR系統包可以分析歸檔日志。我們只要將A庫的歸檔日志文件通過離線介質拷貝到B庫中,再在B庫上使用DBMS_LOGMNR解析歸檔日志,最后將格式化后的輸出應用于B庫。使用DBMS_LOGMNR分析歸檔日志并redo變化的方案如下:
? ? ? ? 因為網不通,手工拷貝文件的工作不可避免,所以可以認為上述步驟均為手工操作。第1步為上線前的數據庫準備,是一次性工作;第2、3步為周期性工作。對于第3步,可以用PL/SQL腳本實現。首先在B庫機器上上規劃好目錄,這里D:\logmine為主目錄,D:\logmine\redo_log存放從A庫拷貝來的歸檔日志文件。然后在B庫上執行一次初始化對象腳本,建立一個外部表,存儲歸檔日志文件名稱。
create or replace directory logfilename_dir as 'D:\logmine\'; ? grant read, write on directory logfilename_dir to u1; ?conn user1/password1 ?begin ?excute immediate 'create table logname_ext (logfile_name varchar2(300)) organization external (type oracle_loader default directory data_dir logfilename_dir location (''log_file_name.txt''))'; ? exception when others then ?if sqlcode = -955 then -- 名稱已由現有對象使用 ?null; ?else ?raise; ?end if; ?? end; ? /? ? ? ? 每次數據同步時要做的工作是:
create_ext_table.bat腳本文件內容如下:
echo off ? dir /a-d /b /s D:\logmine\redo_log\*.log > D:\logmine\log_file_name.txt ? sqlplus user1/password1 @D:\logmine\create_ext_table.sqlcreate_ext_table.sql腳本文件的內容如下:
begin ?for x in (select logfile_name from logname_ext) loop ?dbms_logmnr.add_logfile(x.logfile_name); ?end loop; ? end; ? / ?execute dbms_logmnr.start_logmnr(options => dbms_logmnr.committed_data_only); ?begin ?for x in (select sql_redo ??from v$logmnr_contents?-- 只應用U1用戶模式的數據變化,一定要按提交的SCN排序?where table_space != 'SYSTEM' and instr(sql_redo,'"U1".') > 0 ?order by commit_scn) ?loop ?execute immediate x.sql_redo; ?end loop; ? end; ? / ?exit;? ? ? ? 使用基于數據庫的日志工具也有缺陷,即只能用來處理一種特定的數據庫,如果要在異構的數據庫環境下使用基于日志的CDC方法,就要使用Oracle GoldenGate之類的商業軟件。
三、使用Sqoop抽取數據
? ? ? ? 有了前面的討論和實驗,我們現在已經可以處理從源系統獲取數據的各種情況。回想上一篇建立的銷售訂單示例,源系統的MySQL數據庫中已經添加好測試數據,Hive中建立了rds數據庫作為過渡區,dw庫存儲維度表和事實表。這里我們將使用一種新的工具將MySQL數據抽取到Hive的rds庫中,它就是Sqoop。
1. Sqoop簡介
? ? ? ? Sqoop是一個在Hadoop與結構化數據存儲(如關系數據庫)之間高效傳輸大批量數據的工具。它在2012年3月被成功孵化,現在已是Apache的頂級項目。Sqoop有Sqoop1和Sqoop2兩代,Sqoop1最后的穩定版本是1.4.7,Sqoop2最后版本是1.99.6。需要注意的是,1.99.6與1.4.7并不兼容,而且截止目前為止,1.99.6并不完善,不推薦在生產環境中部署。
? ? ? ? 第一代Sqoop的設計目標很簡單:
- 在企業級數據倉庫、關系數據庫、文檔系統和HDFS、HBase或Hive之間導入導出數據。
- 基于客戶端的模型。
- 連接器使用廠商提供的驅動。
- 沒有集中的元數據存儲。
- 只有Map任務,沒有Reduce任務,數據傳輸和轉化都由Mappers提供。
- 可以使用Oozie調度和管理Sqoop作業。
? ? ? ? Sqoop1是用Java開發的,完全客戶端驅動,嚴重依賴于JDBC,可以使用簡單的命令行命令導入導出數據。例如:
# 把MySQL中testdb.PERSON表的數據導入HDFS sqoop import --connect jdbc:mysql://localhost/testdb --table PERSON --username test --password 123456上面這條命令形成一系列任務:
- 生成MySQL的SQL代碼。
- 執行MySQL的SQL代碼。
- 生成Map作業。
- 執行Map作業。
- 數據傳輸到HDFS。
上面這條命令形成一系列任務:
- 生成Map作業。
- 執行Map作業。
- 從HDFS的/user/localadmin/CLIENTS路徑傳輸數據。
- 生成MySQL的SQL代碼。
- 向MySQL的testdb.CLIENTS_INTG表插入數據
? ? ? ? Sqoop1有許多簡單易用的特性,如可以在命令行指定直接導入至Hive或HDFS。連接器可以連接大部分流行的數據庫:Oracle、SQLServer、MySQL、Teradata、PostgreSQL等。Sqoop1的主要問題包括:繁多的命令行參數;不安全的連接方式,如直接在命令行寫密碼等;沒有元數據存儲,只能本地配置和管理,使復用受到限制。
? ? ? ? Sqoop2體系結構比Sqoop1復雜得多,它被設計用來解決Sqoop1的問題,主要體現在易用性、可擴展性和安全性三個方面。
易用性
? ? ? ? Sqoop1需要客戶端的安裝和配置,而Sqoop2是在服務器端安裝和配置。這意味著連接器只在一個地方統一配置,由管理員角色管理,操作員角色使用。類似地,只需要在一臺服務器上配置JDBC驅動和數據庫連接。Sqoop2還有一個基于Web的服務:前端是命令行接口(CLI)和瀏覽器,后端是一個元數據知識庫。用戶可以通過交互式的Web接口進行導入導出,避免了錯誤選項和繁冗步驟。Sqoop2還在服務器端整合了Hive和HBase。Oozie通過REST API管理Sqoop任務,這樣當安裝一個新的Sqoop連接器后,無需在Oozie中安裝它。
可擴展性
? ? ? ? 在Sqoop2中,連接器不再受限于JDBC的SQL語法,如不必指定database、table等,甚至可以定義自己使用的SQL方言。例如,Couchbase不需要指定表名,只需在充填或卸載操作時重載它。通用的功能將從連接器中抽取出來,使之只負責數據傳輸。在Reduce階段實現通用功能,確保連接器可以從將來的功能性開發中受益。連接器不再需要提供與其它系統整合等下游功能,因此,連接器的開發者不再需要了解所有Sqoop支持的特性。
安全性
? ? ? ? Sqoop1用戶是通過執行sqoop命令運行Sqoop。Sqoop作業的安全性主要由是否對執行Sqoop的用戶信任所決定。Sqoop2將作為基于應用的服務,通過按不同角色連接對象,支持對外部系統的安全訪問。為了進一步安全,Sqoop2不再允許生成代碼、請求直接訪問Hive或HBase,也不對運行的作業開放訪問所有客戶端的權限。Sqoop2將連接作為一級對象,包含證書的連接一旦生成,可以被不同的導入導出作業多次使用。連接由管理員生成,被操作員使用,因此避免了最終用戶的權限泛濫。此外,連接可以被限制只能進行某些基本操作,如導入導出,還可通過限制同一時間打開連接的總數和一個禁止連接的選項來管理資源。
? ? ? ? 當前的Sqoop2還缺少Sqoop1的某些特性,因此Cloudera的建議是,只有當Sqoop2完全滿足需要的特性時才使用它,否則繼續使用Sqoop1。CDH 6.3.1中只包含Sqoop1,版本為1.4.7。
2. 使用Sqoop抽取數據
? ? ? ? 在銷售訂單示例中使用Sqoop1進行數據抽取。表5-2匯總了示例中維度表和事實表用到的源數據表及其抽取模式。
| 源數據表 | rds庫中的表 | dw庫中的表 | 抽取模式 |
| customer | customer | customer_dim | 整體、拉取 |
| product | product | product_dim | 整體、拉取 |
| sales_order | sales_order | order_dim、sales_order_fact | 基于時間戳的CDC、拉取 |
表5-2 銷售訂單抽取模式
? ? ? ? 對于customer、product這兩個表采用整體拉取的方式抽數據。ETL通常是按一個固定的時間間隔,周期性定時執行的,因此對于整體拉取的方式而言,每次導入的數據需要覆蓋上次導入的數據。Kettle作業中的“Sqoop import”作業項,可以調用Sqoop命令,從關系數據庫抽取數據到HDFS或hive表。我們使用該作業項將源庫中的customer、product兩表數據全量覆蓋導入hive表所對應的HDFS目錄,而調用圖5-19所示的作業,實現對sales_order表的增量數據導入。整體作業如圖5-23所示。
圖5-23 將數據從source庫抽取到rds庫的作業“Sqoop import customer”作業項選項設置如圖5-24所示。
圖5-24 “Sqoop import customer”作業項設置? ? ? ? 源庫表為MySQL的customer表,目標為CDH631集群中,hive庫表rds.customer所對應的HDFS目錄/user/hive/warehouse/rds.db/customer。點擊“Advanced Options”,將顯示所有Sqoop所支持的命令行參數。通過點擊“List View”或“Command Line View”圖標,參數將分別以列表或命令行形式展現。這里只需設置“delete-target-dir”參數的值為true。Sqoop import要求目標HDFS的目錄為空,為了能夠冪等執行作業,需要設置delete-target-dir參數。所謂冪等操作指的是其執行任意多次所產生的影響均與一次執行的影響相同。這樣就能在導入失敗或修復bug后可以再次執行該操作,而不用擔心重復執行會對系統造成數據混亂。定義好的作業項等價于以下sqoop命令:
sqoop import --connect jdbc:mysql://node3:3306/source --delete-target-dir --password 123456 --table customer --target-dir /user/hive/warehouse/rds.db/customer --username root? ? ? ? “Sqoop import product”作業項只是將源和目標表換成了product,其它都與“Sqoop import customer”相同。“load_sales_order”子作業調用圖5-19所示的基于時間戳的CDC作業,向rds.sales_order表增量裝載數據。
? ? ? ? 下面測試增量導入。前面介紹基于時間戳的CDC時,我們已經首次執行過裝載sales_order表的作業,cdc_time表的日期為'2020-09-25'。現在向MySQL源庫增加兩條數據:
use source; set @customer_number := floor(1 + rand() * 6); ? ? set @product_code := floor(1 + rand() * 2); ? ? set @order_date := from_unixtime(unix_timestamp('2020-09-26') + rand() * (unix_timestamp('2020-09-27') - unix_timestamp('2020-09-26'))); ? set @amount := floor(1000 + rand() * 9000); ?insert into sales_order ?? values (101,@customer_number,@product_code,@order_date,@order_date,@amount); ?set @customer_number := floor(1 + rand() * 6); ? set @product_code := floor(1 + rand() * 2); ? set @order_date := from_unixtime(unix_timestamp('2020-09-27') + rand() * (unix_timestamp('2020-09-28') - unix_timestamp('2020-09-27'))); ? set @amount := floor(1000 + rand() * 9000); ?insert into sales_order ?? values (102,@customer_number,@product_code,@order_date,@order_date,@amount); ?commit;? ? ? ? 上面的語句向sales_order插入了兩條記錄,一條是9月26日的,另一條是9月27日的:
+--------------+-----------------+--------------+---------------------+---------------------+--------------+ | order_number | customer_number | product_code | order_date ? ? ? ? ?| entry_date ? ? ? ? ?| order_amount | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ ... | ? ? ? ? ?101 | ? ? ? ? ? ? ? 4 | ? ? ? ? ? ?1 | 2020-09-26 21:51:18 | 2020-09-26 21:51:18 | ? ? ?3402.00 | | ? ? ? ? ?102 | ? ? ? ? ? ? ? 4 | ? ? ? ? ? ?1 | 2020-09-27 06:15:43 | 2020-09-27 06:15:43 | ? ? ?6963.00 | +--------------+-----------------+--------------+---------------------+---------------------+--------------+ 102 rows in set (0.01 sec)? ? ? ? 下面執行圖5-23所示的Kettle作業。customer、product重新全量覆蓋裝載數據,sales_order表只裝載最新的兩條數據。作業成功執行后,HDFS目錄/user/hive/warehouse/rds.db/sales_order/下有兩個文件:
[root@manager~]#hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/ Found 2 items -rw-r--r-- ? 3 root hive ? ? ? 5892 2020-09-28 13:38 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-24.txt -rw-r--r-- ? 3 root hive ? ? ? ?120 2020-09-28 15:32 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-09-27.txt [root@manager~]#rds.sales_order表數據如下:
hive> select * from rds.sales_order; OK 1?? ?6?? ?2?? ?2020-03-01 20:13:34?? ?2020-03-01 20:13:34?? ?3777.00 2?? ?4?? ?2?? ?2020-03-03 19:07:07?? ?2020-03-03 19:07:07?? ?9227.00 ... 101?? ?4?? ?1?? ?2020-09-26 21:51:18?? ?2020-09-26 21:51:18?? ?3402.00 102?? ?4?? ?1?? ?2020-09-27 06:15:43?? ?2020-09-27 06:15:43?? ?6963.00 Time taken: 3.168 seconds, Fetched: 102 row(s) hive>時間戳表rds.cdc_time數據也已經更新為當前日期:
hive> select * from rds.cdc_time; OK 1?? ?2020-09-28?? ?2020-09-28 Time taken: 1.369 seconds, Fetched: 1 row(s) hive>作業的執行結果符合預期。
3. Sqoop優化
? ? ? ? 當使用Sqoop在關系數據庫和HDFS之間傳輸數據時,有多個因素影響其性能。可以通過調整Sqoop命令行參數或數據庫參數優化Sqoop的性能。本節簡要描述這兩種優化方法。
(1)調整Sqoop命令行參數
? ? ? ? 可以調整下面的Sqoop參數優化性能。
- batch:該參數的語法是--batch,指示使用批處理模式執行底層的SQL語句。在導出數據時,該參數能夠將相關的SQL語句組合在一起批量執行。也可以使用有效的API在JDBC接口中配置批處理參數。
- boundary-query:指定導入數據的范圍值。當僅使用split-by參數指定的分隔列不是最優時,可以使用boundary-query參數指定任意返回兩個數字列的查詢。它的語法如下:--boundary-query select min(id), max(id) from <tablename>。在配置boundary-query參數時,查詢語句中必須連同表名一起指定min(id)和max(id)。如果沒有配置該參數,缺省時Sqoop使用select min(<split-by>), max(<split-by>) from <tablename>查詢找出分隔列的邊界值。
- direct:該參數的語法是--direct,指示在導入數據時使用關系數據庫自帶的工具(如果存在的話),如MySQL的mysqlimport。這樣可以比jdbc連接的方式更為高效地將數據導入到關系數據庫中。
- Dsqoop.export.records.per.statement:在導出數據時,可以將Dsqoop.export.records.per.statement參數與批處理參數結合在一起使用。該參數指示在一條insert語句插入的行數。當指定了這個參數時,Sqoop運行下面的插入語句:INSERT INTO table VALUES (...), (...), (...),...;某些情況下這可以提升近一倍的性能。
- fetch-size:導入數據時,指示每次從數據庫讀取的記錄數。使用下面的語法:--fetch-size=<n>,其中<n>表示Sqoop每次必須取回的記錄數,缺省值為1000。可以基于讀取的數據量、可用的內存和帶寬大小適當增加fetch-size的值。某些情況下這可以提25%的性能。
- num-mappers:該參數的語法為--num-mappers <number of map tasks>,用于指定并行數據導入的map任務數,缺省值為4。應該將該值設置成低于數據庫所支持的最大連接數。
- split-by:該參數的語法為--split-by <column name>,指定用于Sqoop分隔工作單元的列名,不能與--autoreset-to-one-mapper選項一起使用。如果不指定列名,Sqoop基于主鍵列分隔工作單元。
(2)調整數據庫
? ? ? ? 為了優化關系數據庫的性能,可執行下面的任務:
- 為精確調整查詢,分析數據庫統計信息。
- 將不同的表空間存儲到不同的物理硬盤。
- 預判數據庫的增長。
- 使用explain plan類似的語句調整查詢語句。
- 導入導出數據時禁用外鍵約束。
- 導入數據前刪除索引,導入完成后再重建。
- 優化JDBC URL連接參數。
- 確定使用最好的連接接口。
四、小結
? ? ? ? 本篇中用我們介紹了如何使用Kettle完成數據抽取任務。包括兩種最常用的從文件抽取數據的場景,即把文本文件或XML文件作為輸入。我們還說明了兩種參數化數據庫查詢的方法,即使用參數和變量。變化數據捕獲(CDC)是一項就有挑戰性的工作,時間戳、觸發器、快照表、日志是常用的四種常用變化數據捕獲方法。Sqoop是一個在Hadoop與結構化數據存儲,如關系數據庫之間高效傳輸大批量數據的工具,支持全量和增量數據抽取。Kettle中包含了Sqoop import和Sqoop export作業項,用于從Kettle執行Sqoop命令。
?
總結
以上是生活随笔為你收集整理的Kettle构建Hadoop ETL实践(五):数据抽取的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用手机摄像头做网络ip摄像头用open
- 下一篇: 数据库错误码大全