Greenplum 实时数据仓库实践(5)——实时数据同步
目錄
5.1 數據抽取方式
5.1.1 基于源數據的CDC
5.1.2 基于觸發器的CDC
5.1.3 基于快照的CDC
5.1.4 基于日志的CDC
5.2 MySQL數據復制
5.2.1 復制的用途
5.2.2 二進制日志
5.2.3 復制步驟
5.3 使用Kafka
5.3.1 Kafka基本概念
1. 消息和批次
2. 主題與分區
3. 生產者和消費者
4. broker和集群
5.3.2 Kafka消費者與分區
5.4 選擇主題分區數?
5.4.1 使用單分區
5.4.2 如何選定分區數量
5.5 maxwell + Kafka + bireme
5.5.1 總體架構
5.5.2 maxwell安裝配置
5.5.3 bireme安裝配置
5.5.4 實時CDC
1. 全量同步
2. 增量同步
5.6 Canal + Kafka + ClientAdapter
5.6.1 總體架構
5.6.2 Canal Server安裝配置
5.6.3 Canal Adapter安裝配置
5.6.4 HA模式配置
1. 配置Canal Server
2. 配置Canal Adapter
3. 自動切換
5.6.5 實時CDC
5.6.6 消費延遲監控
小結
? ? ? ? 構建實時數據倉庫最大的挑戰在于從操作型數據源實時抽取數據,即ETL過程中的Extract部分。我們要以全量加增量的方式,實時捕獲源系統中所需的所有數據及其變化,而這一切都要在不影響對業務數據庫正常操作的前提下進行,目標是要滿足高負載、低延遲,難點正在于此,所以需要完全不同于批處理的技術加以實現。當操作型數據進入數據倉庫過渡區或ODS以后,就可以利用數據倉庫系統軟件提供的功能特性進行后續處理,不論是Greenplum、Hive或是其他軟件,這些處理往往只需要使用其中一種,相對來說簡單一些。
? ? ? ? Greenplum作為數據倉庫的計算引擎,其數據來源多是業務數據,其中以MySQL為主。本篇將介紹兩種主要的從MySQL實時同步數據到Greenplum的解決方案,一是maxwell + Kafka + bireme、二是Canal + Kafka + ClientAdapter,這兩個方案的共同點是都使用開源組件,不需要編寫代碼,只要進行適當配置便可運行。總體來說,兩種方案都是基于MySQL binlog捕獲數據變化,然后將binlog以數據流的形式傳入Kafka消息隊列,再以消費的方式將數據變化應用到Greenplum。但是,兩者在實現上區別很大,尤其是消費端的不同實現方式使數據載入Greenplum的性能差別巨大。由于主要的MySQL變化數據捕獲技術都是基于其復制協議,并以消息系統作為中間組件,所以先會介紹作為基礎的MySQL數據復制和Kafka。
5.1 數據抽取方式
? ? ? ? 抽取數據是ETL處理過程的第一個步驟,也是數據倉庫中最重要和最具有挑戰性的部分,適當的數據抽取是成功建立數據倉庫的關鍵。
? ? ? ? 從源抽取數據導入數據倉庫或過渡區有兩種方式,可以從源把數據抓取出來(拉),也可以請求源把數據發送(推)到數據倉庫。影響選擇數據抽取方式的一個重要因素是操作型系統的可用性和數據量,這是抽取整個數據集還是僅僅抽取自最后一次抽取以來的變化數據的基礎。我們考慮以下兩個問題:
- 需要抽取哪部分源數據加載到數據倉庫?有兩種可選方式,完全抽取和變化數據捕獲。
- 數據抽取的方向是什么?有兩種方式,拉模式,即數據倉庫主動去源系統拉取數據;推模式,由源系統將自己的數據推送給數據倉庫。
? ? ? ? 對于第二個問題來說,通常要改變或增加操作型業務系統的功能是非常困難的,這種困難不僅體現在技術上,還有來自于業務系統用戶及其開發者的阻力。理論上講,數據倉庫不應該要求對源系統做任何改造,實際上也很少由源系統推數據給數據倉庫。因此對這個問題的答案比較明確,大都采用拉數據模式。下面我們著重討論第一個問題。
? ? ? ? 如果數據量很小并且易處理,一般來說采取完全源數據抽取,就是將所有的文件記錄或所有的數據庫表數據抽取至數據倉庫。這種方式適合基礎編碼類型的源數據,比如郵政編碼、學歷、民族等。基礎編碼型源數據通常是維度表的數據來源。如果源數據量很大,抽取全部數據是不可行的,那么只能抽取變化的源數據,即最后一次抽取以來發生了變化的數據。這種數據抽取模式稱為變化數據捕獲,簡稱CDC(Change Data Capture),常被用于抽取操作型系統的事務數據,比如銷售訂單、用戶注冊,或各種類型的應用日志記錄等。
? ? ? ? CDC大體可以分為兩種,一種是侵入式的,另一種是非侵入式的。所謂侵入式的是指CDC操作會給源系統帶來性能的影響。只要CDC操作以任何一種方式對源庫執行了SQL語句,就可以認為是侵入式的CDC。常用的四種CDC方法是:基于時間戳的CDC、基于觸發器的CDC、基于快照的CDC、基于日志的CDC,其中前三種是侵入性的。表5-1總結了四種CDC方案的特點。
| 時間戳 | 觸發器 | 快照 | 日志 | |
| 能區分插入/更新 | 否 | 是 | 是 | 是 |
| 周期內,檢測到多次更新 | 否 | 是 | 否 | 是 |
| 能檢測到刪除 | 否 | 是 | 是 | 是 |
| 不具有侵入性 | 否 | 否 | 否 | 是 |
| 支持實時 | 否 | 是 | 否 | 是 |
| 不依賴數據庫 | 是 | 否 | 是 | 否 |
表5-1 四種CDC方案比較
5.1.1 基于源數據的CDC
? ? ? ? 基于源數據的CDC要求源數據里有相關的屬性列,抽取過程可以利用這些屬性列來判斷哪些數據是增量數據。最常見的屬性列有以下兩種。
- 時間戳:這種方法至少需要一個更新時間戳,但最好有兩個,一個插入時間戳,表示記錄何時創建,一個更新時間戳,表示記錄最后一次更新的時間。
- 序列:大多數數據庫系統都提供自增功能。如果數據庫表列被定義成自增的,就可以很容易地根據該列識別出新插入的數據。
? ? ? ? 這種方法的實現較為簡單,假設表t1中有一個時間戳字段last_inserted,t2表中有一個自增序列字段id,則下面SQL語句的查詢結果就是新增的數據,其中{last_load_time}和{last_load_id}分別表示ETL系統中記錄的最后一次數據裝載時間和最大自增序列號。
select * from t1 where last_inserted > {last_load_time}; select * from t2 where id > {last_load_id};? ? ? ? 通常需要建立一個額外的數據庫表存儲上一次更新時間或上一次抽取的最后一個序列號。在實踐中,一般是在一個獨立的模式下或在數據過渡區里創建這個參數表。基于時間戳和自增序列的方法是CDC最簡單的實現方式,也是最常用的方法,但它的缺點也很明顯:
- 不能區分插入和更新操作。只有當源系統包含了插入時間戳和更新時間戳兩個字段,才能區別插入和更新,否則不能區分。
- 不能記錄刪除數據的操作。不能捕獲到刪除操作,除非是邏輯刪除,即記錄沒有被真的刪除,只是做了邏輯上的刪除標志。
- 無法識別多次更新。如果在一次同步周期內,數據被更新了多次,只能同步最后一次更新操作,中間的更行操作都丟失了。
- 不具有實時能力。時間戳和基于序列的數據抽取一般適用于批量操作,不適合于實時場景下的數據抽取。
? ? ? ? 這種方法是具有侵入性的,如果操作型系統中沒有時間戳或時間戳信息是不可用的,那么不得不通過修改源系統把時間戳包含進去,首先要求修改操作型系統的表包含一個新的時間戳列,然后建立一個觸發器,在修改一行時更新時間戳列的值。在實施這些操作前必須被源系統的擁有者所接受,并且要仔細評估對源系統產生的影響。
? ? ? ? 有些方案通過高頻率掃描遞增列的方式實現準實時數據抽取。例如Flume的flume-ng-sql-source插件,缺省每5秒查詢一次源表的主鍵以捕獲新增數據,“利用Flume將MySQL表數據準實時抽取到HDFS”展示了一個具體示例。
5.1.2 基于觸發器的CDC
? ? ? ? 當執行INSERT、UPDATE、DELETE這些SQL語句時,可以激活數據庫里的觸發器,并執行一些動作,就是說觸發器可以用來捕獲變更的數據并把數據保存到中間臨時表里。然后這些變更的數據再從臨時表中取出,被抽取到數據倉庫的過渡區里。但在大多數場合下,不允許向操作型數據庫里添加觸發器(業務數據庫的變動通常都異常慎重),而且這種方法會降低系統的性能,所以此方法用的并不是很多。
? ? ? ? 作為直接在源數據庫上建立觸發器的替代方案,可以使用源數據庫的復制功能,把源數據庫上的數據復制到從庫上,在從庫上建立觸發器以提供CDC功能。盡管這種方法看上去過程冗余,且需要額外的存儲空間,但實際上這種方法非常有效,而且沒有侵入性。復制是大部分數據庫系統的標準功能,如MySQL、Oracle和SQL Server等都有各自的數據復制方案。
5.1.3 基于快照的CDC
? ? ? ? 如果沒有時間戳,也不允許使用觸發器,就要使用快照表了。可以通過比較源表和快照表來獲得數據變化。快照就是一次性抽取源系統中的全部數據,把這些數據裝載到數據倉庫的過渡區中。下次需要同步時,再從源系統中抽取全部數據,并把全部數據也放到數據倉庫的過渡區中,作為這個表的第二個版本,然后再比較這兩個版本的數據,從而找到變化。
? ? ? ? 有多個方法可以獲得這兩個版本數據的差異。假設表有兩個列id和name,id是主鍵列。該表的第一、二個版本的快照表名為snapshot_1、snapshot_2。下面的SQL語句在主鍵id列上做全外鏈接,并根據主鍵比較的結果增加一個標志字段,I表示新增,U表示更新,D代表刪除,N代表沒有變化。外層查詢過濾掉沒有變化的記錄。
select * from? (select case when t2.id is null then 'D'when t1.id is null then 'I'when t1.name <> t2.name then 'U'else 'N'end as flag,case when t2.id is null then t1.id else t2.id end as id,t2.namefrom snapshot_1 t1 full outer join snapshot_2 t2 on t1.id = t2.id) awhere flag <> 'N';? ? ? ? 當然,這樣的SQL語句需要數據庫支持全外鏈接,對于MySQL這樣不支持全外鏈接的數據庫,可以使用類似下面的SQL語句:
select 'U' as flag, t2.id as id, t2.name as namefrom snapshot_1 t1 inner join snapshot_2 t2 on t1.id = t2.idwhere t1.name != t2.nameunion all? select 'D' as flag, t1.id as id, t1.name as namefrom snapshot_1 t1 left join snapshot_2 t2 on t1.id = t2.idwhere t2.id is nullunion all? select 'I' as flag, t2.id as id, t2.name as namefrom snapshot_2 t2 left join snapshot_1 t1 on t2.id = t1.idwhere t1.id is null;? ? ? ? 基于快照的CDC可以檢測到插入、更新和刪除的數據,這是相對于基于時間戳的CDC方案的優點。它的缺點是需要大量的存儲空間來保存快照。另外,當表很大時,這種查詢會有比較嚴重的性能問題。
5.1.4 基于日志的CDC
? ? ? ? 最復雜的和最沒有侵入性的CDC方法是基于日志的方式。數據庫會把每個插入、更新、刪除操作記錄到日志里。如使用MySQL數據庫,只要在數據庫服務器中啟用二進制日志binlog(設置log_bin服務器系統變量),之后就可以實時從數據庫日志中讀取到所有數據庫寫操作,并使用這些操作來更新數據倉庫中的數據。這種方式需要把二進制日志轉為可以理解的格式,然后再把里面的操作按照順序讀取出來。
? ? ? ? MySQL提供了一個叫做mysqlbinlog的日志讀取工具。這個工具可以把二進制的日志格式轉換為可讀的格式,然后就可以把這種格式的輸出保存到文本文件里,或者直接把這種格式的日志應用到MySQL客戶端用于數據還原操作。mysqlbinlog工具有很多命令行參數,其中最重要的一組參數可以設置開始/截止時間戳,這樣能夠只從日志里截取一段時間的日志。另外,日志里的每個日志項都有一個序列號,也可以用來做偏移操作。MySQL的日志提供了上述兩種方式來防止CDC過程發生重復或丟失數據的情況。下面是使用mysqlbinlog的兩個例子。第一條命令將jbms_binlog.000002文件中從120偏移量以后的操作應用到一個MySQL數據庫中。第二條命令將jbms_binlog.000002文件中一段時間的操作格式化輸出到一個文本文件中。
mysqlbinlog --start-position=120 jbms_binlog.000002 | mysql -u root -p123456 mysqlbinlog --start-date="2011-02-27 13:10:12" --stop-date="2011-02-27 13:47:21" jbms_binlog.000002 > temp/002.txt? ? ? ? 使用基于數據庫的日志工具也有缺陷,即只能用來處理一種特定的數據庫,如果要在異構的數據庫環境下使用基于日志的CDC方法,就要使用GoldenGate之類的軟件。本篇介紹的兩種實時數據同步方案都是使用開源組件完成類似功能。
5.2 MySQL數據復制
? ? ? ? Maxwell、Canal都可以實時讀取MySQL二進制日志,本質上都是將自身偽裝成一個從庫,利用MySQL原生的主從復制協議獲取并處理二進制日志。了解MySQL復制的基本原理有助于理解和使用這些組件。
? ? ? ? 簡單說,復制就是將來自一個MySQL數據庫服務器(主庫)的數據復制到一個或多個MySQL數據庫服務器(從庫)。傳統的MySQL復制提供了一種簡單的Primary-Secondary復制方法,默認情況下,復制是單向異步的。MySQL支持兩種復制方式:基于行的復制和基于語句的復制。這兩種方式都是通過在主庫上記錄二進制日志(binlog)、在從庫重放中繼日志(relylog)的方式來實現異步的數據復制。二進制日志或中繼日志中的記錄被稱為事件。所謂異步包含兩層含義,一是主庫的二進制日志寫入與將其發送到從庫是異步進行的,二是從庫獲取與重放日志事件是異步進行的。這意味著,在同一時間點從庫上的數據更新可能落后于主庫,并且無法保證主從之間的延遲間隔。
? ? ? ? 復制給主庫增加的開銷主要體現在啟用二進制日志帶來的I/O,但是增加并不大,MySQL官方文檔中稱開啟二進制日志會產生1%的性能損耗。出于對歷史事務備份以及從介質失敗中恢復的目的,這點開銷是非常必要的。除此之外,每個從庫也會對主庫產生一些負載,例如網絡和I/O。當從庫讀取主庫的二進制日志時,也會造成一定的I/O開銷。如果從一個主庫復制到多個從庫,喚醒多個復制線程發送二進制日志內容的開銷將會累加。但所有這些復制帶來的額外開銷相對于應用對MySQL服務器造成的高負載來說都微不足道。
5.2.1 復制的用途
? ? ? ? 復制的用途主要體現在以下五個方面:
1. 橫向擴展
? ? ? ? 通過復制可以將讀操作指向從庫來獲得更好的讀擴展。所有寫入和更新都在主庫上進行,但讀取可能發生在一個或多個從庫上。在這種讀寫分離模型中,主庫專用于更新,顯然比同時進行讀寫操作會有更好的寫性能。需要注意的是,對于寫操作并不適合通過復制來擴展。在一主多從架構中,寫操作會被執行多次,正如“木桶效應”,這時整個系統的寫性能取決于寫入最慢的那部分。
2. 負載均衡
? ? ? ? 通過MySQL復制可以將讀操作分布到多個服務器上,實現對讀密集型應用的優化。對于小規模的應用,可以簡單地對機器名做硬編碼或者使用DNS輪詢(將一個機器名指向多個IP地址)。當然也可以使用復雜的方法,例如使用LVS網絡負載均衡器等,能夠很好地將負載分配到不同的MySQL服務器上。
3. 提高數據安全性
? ? ? ? 提高數據安全性可以從兩方面來理解。其一,因為數據被復制到從庫,并且從庫可以暫停復制過程,所以可以在從庫上執行備份操作而不會影響對應的主庫。其二,當主庫出現問題時,還有從庫的數據可以被訪問。但是,對備份來說,復制僅是一項有意義的技術補充,它既不是備份也不能夠取代備份。例如,當用戶誤刪除一個表,而且此操作已經在從庫上被復制執行,這種情況下只能用備份來恢復。
4. 提高高可用性
? ? ? ? 復制可以幫助應用程序避免MySQL單點失敗,一個包含復制的設計良好的故障切換系統能夠顯著縮短宕機時間。
5. 滾動升級
? ? ? ? 比較普遍的做法是,使用一個高版本MySQL作為從庫,保證在升級全部實例前,查詢能夠在從庫上按照預期執行。測試沒有問題后,將高版本的MySQL切換為主庫,并將應用連接至該主庫,然后重新搭建高版本的從庫。
? ? ? ? 后面介紹Maxwell和Canal方案時會看到,其架構正是利用了橫向擴展中的級聯主從拓撲結構,以及從庫可以安全暫停復制的特點才得以實現。
5.2.2 二進制日志
? ? ? ? MySQL復制依賴二進制日志(binlog),所以要理解復制如何工作,先要了解MySQL的二進制日志。
? ? ? ? 二進制日志包含描述數據庫更改的事件,如建表操作或對表數據的更改等。開啟二進制日志有兩個重要目的:
- 用于復制。主庫上的二進制日志提供要發送到從庫的數據更改記錄。主庫將其二進制日志中包含的事件發送到從庫,從庫執行這些事件以對其本地數據進行相同的更改。
- 用于恢復。當出現介質錯誤,如磁盤故障時,數據恢復操作需要使用二進制日志。還原備份后,重新執行備份后記錄的二進制日志中的事件,最大限度減少數據丟失。
? ? ? ? 不難看出,MySQL二進制日志所起的作用與Oracle的歸檔日志類似。二進制日志只記錄更新數據的事件,不記錄SELECT或SHOW等語句。通過設置log-bin系統變量開啟二進制日志,不同版本MySQL的缺省配置可能不同,如MySQL 5.6的缺省為不開啟,MySQL 8中缺省是開啟的。
? ? ? ? 二進制日志有STATEMENT、ROW、MIXED三種格式,通過binlog-format系統變量設置:
- STATMENT格式,基于SQL語句的復制(statement-based replication,SBR)。每一條會修改數據的SQL語句會被記錄到binlog中。這種格式的優點是不需要記錄每行的數據變化,這樣二進制日志會比較少,減少磁盤I/O,提高性能。缺點是在某些情況下會導致主庫與從庫中的數據不一致,例如last_insert_id()、now()等非確定性函數,以及用戶自定義函數(user-defined functions,udf)等易出現問題。
- ROW格式,基于行的復制(row-based replication,RBR)。該格式不記錄SQL語句的上下文信息,僅記錄哪條數據被修改了,修改成了什么樣子,能清楚記錄每一行數據的修改細節。其優點是不會出現某些特定情況下的存儲過程、函數或觸發器的調用和觸發無法被正確復制的問題。缺點是通常會產生大量的日志,尤其像大表上執行alter table操作時會讓日志暴漲。
- MIXED格式,混合復制(mixed-based replication,MBR)。它是語句和行兩種格式的混合體,默認使用STATEMENT模式保存二進制日志,對于STATEMENT模式無法正確復制的操作,會自動切換到基于行的格式,MySQL會根據執行的SQL語句選擇日志保存方式。
? ? ? ? 不同版本MySQL的binlog-format參數的缺省值可能不同,如MySQL 5.6的缺省值為STATEMENT,MySQL 8缺省使用ROW格式。二進制日志的存放位置最好設置到與MySQL數據目錄不同的磁盤分區,以降低磁盤I/O的競爭,提升性能,并且在數據磁盤故障的時候還可以利用備份和二進制日志恢復數據。
5.2.3 復制步驟
? ? ? ? 總的來說,MySQL復制有五個步驟:
圖5-1更詳細地描述了復制的細節。
圖5-1 復制如何工作
? ? ? ? 第一步是在主庫上記錄二進制日志。每次準備提交事務完成數據更新前,主庫將數據更新的事件記錄到二進制日志中。MySQL會按事務提交的順序而非每條語句的執行順序來記錄二進制日志。在記錄二進制日志后,主庫會告訴存儲引擎可以提交事務了。
? ? ? ? 下一步,從庫將主庫的二進制日志復制到其本地的中繼日志中。首先,從庫會啟動一個工作線程,稱為I/O線程。I/O線程跟主庫建立一個普通的客戶端連接,然后在主庫上啟動一個特殊的二進制日志轉儲(binlog dump)線程,它會讀取主庫上二進制日志中的事件,但不會對事件進行輪詢。如果該線程追趕上了主庫,它將進入睡眠狀態,直到主庫發送信號通知其有新的事件時才會被喚醒,從庫I/O線程會將接收到的事件記錄到中繼日志中。
? ? ? ? 從庫的SQL線程執行最后一步,該線程從中繼日志中讀取事件并在從庫上執行,從而實現從庫數據的更新。當SQL線程追趕I/O線程時,中繼日志通常已經在系統緩存中,所以讀取中繼日志的開銷很低。SQL線程執行的事件也可以通過log_slave_updates系統變量來決定是否寫入其自己的二進制日志中,這可以用于級聯復制的場景。
? ? ? ? 這種復制架構實現了獲取事件和重放事件的解耦,允許這兩個過程異步進行。也就是說I/O線程能夠獨立于SQL線程之外工作。但這種架構也限制了復制的過程,其中最重要的一點是在主庫上并發更新的查詢在從庫上通常只能串行化執行,因為缺省只有一個SQL線程來重放中繼日志中的事件。在MySQL 5.6以后已經可以通過配置slave_parallel_workers等系統變量進行并行復制,相關細節參見“組提交與多線程復制”。
5.3 使用Kafka
? ? ? ? 從MySQL復制中從庫的角度看,實際上是實現了一個消息隊列的功能。消息就是二進制日志中的事件,持久化存儲在中繼日志文件里。I/O線程是消息的生產者,向中繼日志寫數據,SQL線程是消息的消費者,從中繼日志讀取數據并在目標庫上重放。隊列是一種先進先出的數據結構,這個簡單定義就決定了隊列中的數據一定是有序的。在數據復制場景中這種有序性極為重要,如果不能保證事件重放與產生同序,主從庫的數據將會不一致,也就失去了數據復制的意義。
? ? ? ? 中繼日志、I/O線程、SQL線程是MySQL內部的實現。在本專題討論的異構環境中,源是MySQL,目標是Greenplum。作為一種不嚴格的類比,Maxwell、Canal實現的是類似I/O線程的功能,bireme、Canal的ClientAdapter組件實現的是類似SQL線程的功能。那中繼日志呢,是Kafka登場的時候了,當然Kafka比中繼日志或消息隊列要復雜得多,它是一個完整的消息系統。嚴格來說,在本實時數據同步場景中,Kafka并不是必須的。比如Canal的TCP服務器模式,就是直接將網絡數據包直接發送給消費者進行消費,消息數據只存在于內存中,并不做持久化。這種實現方式用于生產環境很不合適,既有丟失數據的風險,也缺乏必要的管理和監控,引入Kafka正好可以物盡其用。
5.3.1 Kafka基本概念
? ? ? ? Kafka是一款基于發布與訂閱的分布式消息系統,其數據按照一定順序持久化保存,并可以按需讀取。此外,Kafka的數據分布在整個集群中,具備數據故障保護和性能伸縮能力。
1. 消息和批次
? ? ? ? Kafka的數據單元被稱為消息,它可以被看作是數據庫里的一條記錄。消息由字節數組組成,所以對于Kafka來說,消息里的數據沒有特別的格式或含義。消息可以有一個可選的元數據,也就是鍵。與消息一樣,鍵也是一個字節數組,對于Kafka來說也沒有特殊的含義。當消息以一種可控的方式寫入不同分區時會用到鍵。最簡單的例子就是為鍵生成一個一致性哈希值,然后使用哈希值對主題分區進行取模,為消息選取分區。這樣可以保證具有相同鍵的消息總是被寫到相同的分區上。對數據庫來說,通常將表的主鍵作為消息的鍵,這是Kafka保證消費順序的關鍵所在,后面將詳細說明。
? ? ? ? 為了提高效率,消息被分批次寫入Kafka。批次就是一組消息,這些消息屬于同一個主題和分區。把消息分批次傳輸可以減少網絡開銷。不過,這要在延遲時間和吞吐量之間做出權衡:批次越大,單位時間處理的消息就越多,單個消息的傳輸時間就越長。批次數據會被壓縮,這樣可以提升數據的傳輸和存儲能力,但要做更多的計算處理。
2. 主題與分區
? ? ? ? Kafka的消息通過主題(topic)進行分類。主題就好比數據庫的表,或者文件系統的目錄。主題可以被分為若干個分區(partition),一個分區就是一個提交日志。消息以追加的方式寫入分區,然后以先進先出的順序讀取。注意,由于一個主題一般包含幾個分區,因此無法在整個主題范圍內保證消息的順序,但可以保證消息在單個分區內的順序。如果需要所有消息都是有序的,那么最好只用一個分區。圖5-2所示的主題有4個分區,消息被追加寫入每個分區的尾部。Kafka通過分區來實現數據冗余和伸縮性。分區可以分布在不同的服務器上,也就是說,一個主題可以橫跨多個服務器,以此來提供更強大的性能。
?圖5-2 包含多個分區的主題
3. 生產者和消費者
? ? ? ? Kafka的客戶端就是Kafka系統的用戶,它們被分為兩種基本類型:生產者和消費者。除此之外,還有其他兩個客戶端API——用于數據集成的Kafka Connect API和用于流式處理的Kafka Streams。這些客戶端API使用生產者和消費者作為內部組件,提供了高級的功能。
? ? ? ? 生產者(producer)創建消息。一般情況下,一個消息會被發布到一個特定的主題上。生產者在默認情況下把消息均勻分布到主題的所有分區上,而并不關心特定消息會被寫到哪個分區。在某些情況下,生產者會把消息直接寫到指定的分區。這通常是通過消息鍵和分區器來實現的,分區器為鍵生成一個哈希值,并將其映射到指定分區。這樣可以保證包含同一個鍵的消息會被寫到同一個分區上。生產者也可以使用自定義的分區器,根據不同業務規則將消息映射到分區。
? ? ? ? 消費者(consumer)讀取數據。消費者訂閱一個或多個主題,并按消息生成的順序讀取它們。消費者通過檢查消息的偏移量來區分已經讀取的消息。偏移量(offset)是另一種元數據,它是一個不斷遞增的整數值,在消息創建時,Kafka會把它添加到消息里。在給定分區里,每個消息的偏移量都是唯一的。消費者把每個分區最后讀取的消息偏移量保存在Zookeeper或Kafka中,如果消費者關閉或重啟,它的讀取狀態不會丟失。
? ? ? ? 消費者是消費者組(consumer group)的一部分,也就是說,會有一個或多個消費者共同讀取一個主題。組保證每個分區只能被同組中的一個消費者使用。圖5-3所示的組中,有3個消費者同時讀取一個主題。其中兩個消費者各自讀取一個分區,另一個消費者讀取其他兩個分區。消費者和分區之間的映射通常被稱為消費者對分區的所有權關系(ownership)。
? ? ? ? 通過這種方式,消費者可以消費包含大量消息的主題。而且如果一個消費者失效,組里的其他消費者可以接管失效消費者的工作。
圖5-3 消費者組從主題讀取消息
4. broker和集群
? ? ? ? 一個獨立的Kafka服務器被稱為broker。broker接收來自生產者的消息,為消息設置偏移量,并提交消息到磁盤保存。broker為消費者提供服務,對讀取分區的請求作出響應,返回已經提交到磁盤上的消息。根據特定的硬件機器性能特征,單個broker可以輕松處理數千個分區以及每秒百萬級的消息量。
? ? ? ? broker是集群的組成部分,每個集群都有一個broker同時充當了集群控制器(controller)的角色,它被自動從集群的活躍成員中選舉出來,負責將分區分配給broker和監控broker等管理工作。在集群中,一個分區從屬于一個broker,該broker被稱為分區的首領(leader)。一個分區可以分配給多個broker,這個時候發生分區復制,如圖5-4所示。這種復制機制為分區提供了消息冗余,如果有一個broker失效,其他broker可以接管領導權。不過,相關的消費者和生產者都要重新連接到新的首領。
?圖5-4 集群里的分區復制
? ? ? ? 消息保存期限(retention)是Kafka的一個重要特性。Kafka broker默認的消息保留策略是這樣的:要么保留一段時間(比如7天),要么保留到消息到達一定大小的字節數(比如1GB)。當消息數量達到這些上限時,舊消息就會過期并被刪除,所以在任何時刻,可用消息的總量都不會超過配置參數所指定的大小。主題可以配置自己的保留策略,能將消息保留到不再使用它們為止。例如,用于跟蹤用戶活動的數據可能需要保留幾天,而應用程序的度量指標可能只需要保留幾小時。可以通過配置把主題當做緊湊型日志(log compacted),只有最后一個帶有特定鍵的消息會被保留下來。這種情況對于變更日志類型的數據比較適用,因為人們只關心最后時刻發生的那個變更。
5.3.2 Kafka消費者與分區
? ? ? ? 通常消息的生成速度比消費速度快,顯然此時有必要對消費者進行橫向擴展。就像多個生產者可以向相同的主題寫入消息一樣,我們也可以使用多個消費者從同一主題讀取消息,對消息進行分流。
? ? ? ? Kafka消費者從屬于消費者組,一個組里的消費者訂閱的是同一個主題,每個消費者接收主題一部分分區的消息。假設主題T1有4個分區,我們創建了消費者C1,它是組G1里唯一的消費者,我們用它訂閱主題T1。消費者C1將收到主題T1全部4個分區的消息,如圖5-5所示。
圖5-5 1個消費者接收4個分區的消息
? ? ? ? 如果在組G1里新增一個消費者C2,那么每個消費者將分別從兩個分區接收消息。我們假設消費者C1接收分區0和分區2的消息,消費者C2接收分區1和分區3的消息,如圖5-6所示。
圖5-6 2個消費者接收4個分區的消息
? ? ? ? 如果組G1有4個消費者,那么每個消費者可以分配到一個分區,如圖5-7所示。
圖5-7 4個消費者接收4個分區的消息
? ? ? ? 如果我們往組里添加更多的消費者,超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收任何消息,如圖5-8所示。
圖5-8 5個消費者接收4個分區的消息
? ? ? ? 往群組里增加消費者是橫向擴展消費能力的主要方式。Kafka消費者經常會做一些高延遲的操作,比如把數據寫到數據庫或HDFS,或者使用數據進行比較耗時的計算。在這些情況下,單個消費者無法跟上數據生成的速度,所以可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分區的消息,這就是橫向擴展的主要手段。我們有必要為主題創建大量的分區,在負載增長時可以加入更多的消費者。不過要注意,不要讓消費者數量超過主題分區的數量,多余的消費者只會被閑置。
? ? ? ? 除了通過增加消費者來橫向擴展單個應用程序外,還經常出現多個應用程序從同一個主題讀取數據的情況。實際上,Kafka設計的主要目標之一,就是要讓主題里的數據能夠滿足企業各種應用場景的需求。在這些場景里,每個應用程序可以獲取到所有的消息,而不只是其中的一部分。只要保證每個應用程序有自己的消費者組,就可以讓它們獲取到主題的所有消息。橫向擴展Kafka消費者或消費者組并不會對性能造成負面影響。
? ? ? ? 在上面的例子里,如果新增一個只包含一個消費者的組G2,那么這個消費者將從主題T1上接收所有消息,與組G1之間互不影響。組G2可以增加更多的消費者,每個消費者可以消費若干個分區,就像組G1那樣,如圖5-9所示。總的來說,組G2還是會接收所有消息,不管有沒有其他組存在。
? ? ? ? 簡而言之,為每一個需要獲取一個或多個主題全部消息的應用程序創建一個消費者組,然后往組里添加消費者來擴展讀取能力和擴展能力,組里的每個消費者只處理一部分消息。
圖5-9 兩個消費者組對應一個主題
5.4 選擇主題分區數?
5.4.1 使用單分區
? ? ? ? 上一節提到,Kafka只能保證單個分區中消息的順序,因此如果要求與數據庫保持強一致性,最好只使用一個分區。那么,單分區的吞吐量能否滿足負載需求呢?下面就在現有環境上做一個測試,以得出有根據的量化的結論。
1. 測量MySQL binlog日志量
? ? ? ? 測試方法為使用tpcc-mysql工具,執行一段時間的壓測,然后查看這段時間產生的binlog文件大小,得出binlog吞吐量。TPC-C是專門針對聯機交易處理系統(OLTP系統)的規范,而tpcc-mysql則是percona公司基于TPC-C衍生出來的產品,專用于MySQL基準測試,下載地址為https://github.com/Percona-Lab/tpcc-mysql。關于tpcc-mysql的安裝和使用,參見“測試規劃”。
(1)從庫重置binlog
reset master; show master status;? ? ? ? 初始binlog文件名和偏移量分別是mysql-bin.000001和120。
(2)主庫執行tpcc測試
# 10倉庫,32并發線程,預熱10秒,執行300秒 ~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 10 -l 300? ? ? ? 得到的每分鐘事務數為:5543.600 TpmC
(3)壓測執行結束后,在從庫查詢binlog日志量
show binary logs;? ? ? ? 此時binlog文件名和偏移量分別是mysql-bin.000001和406396209。預熱10秒,執行300秒,binlog產生速度為:(406396209-120)/1024/1024/310 ≈ 1.25MB/S。
2. 測量kafka單分區生產者吞吐量
(1)創建topic
? ? ? ? 創建了一個單分區三副本的topic:
Topic: test?? ?Partition: 0?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340(2)執行測試
kafka-producer-perf-test.sh --topic test --num-records 500000 --record-size 2048 --throughput -1 --producer-props bootstrap.servers=172.16.1.124:9092 acks=1? ? ? ? kafka-producer-perf-test.sh是Kafka提供的生產者性能測試命令行工具,這里所使用的選項說明:
- num-records:指定發送的消息總數。
- record-size:指定每條消息的字節數,這里假設約為一個binlog event的大小。在MySQL中可用show binlog events命令查看每個event的大小。
- throughput指定每秒發送的消息數,-1為不限制。
- acks:指定生產者的應答方式,有效值為0、1、all。0表示生產者在成功寫入消息之前不會等待任何來自服務器的響應,吞吐量最高,但最可能丟失消息。1表示只要首領節點收到消息,生產者就會收到一個來自服務器的成功響應。all表示只有所有參與復制的節點全部收到消息時,生產者才會收到一個來自服務器的成功響應,最安全但延遲最高。
? ? ? ? 測試結果為:
500000 records sent, 10989.010989 records/sec (21.46 MB/sec), 1267.54 ms avg latency, 1714.00 ms max latency, 1388 ms 50th, 1475 ms 95th, 1496 ms 99th, 1693 ms 99.9th.? ? ? ? 可以看到單分區平均吞吐量約21.46 MB/S,平均每秒發送10989條2KB的消息。兩相比較,Kafka單分區生產者的消息吞吐量大約是壓測binlog吞吐量的17倍。實際生產環境的硬件配置會比本實驗環境高得多,單分區吞吐量通常可達100 MB/S。通過這個粗略測試得出的結論是單分區可以承載一般的生產數據庫負載。
3. 測量kafka單分區消費者吞吐量
? ? ? ? 單分區只能有一個消費者(一個消費組中),但可以利用多個線程提高消費性能。
? ? ? ? --threads指定消費線程數,1、3、6、12時的測試結果如下:
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec # 1線程 2021-12-09 10:57:19:198, 2021-12-09 10:57:28:921, 976.6543, 100.4478, 500047, 51429.2914, 3034, 6689, 146.0090, 74756.6153 # 3線程 2021-12-09 10:57:52:134, 2021-12-09 10:58:00:280, 976.6543, 119.8937, 500047, 61385.5880, 3039, 5107, 191.2384, 97914.0396 # 6線程 2021-12-09 10:58:58:345, 2021-12-09 10:59:06:495, 976.6543, 119.8349, 500047, 61355.4601, 3031, 5119, 190.7901, 97684.5087 # 12線程 2021-12-09 10:59:16:028, 2021-12-09 10:59:24:093, 976.6543, 121.0979, 500047, 62002.1079, 3031, 5034, 194.0116, 99333.92935.4.2 如何選定分區數量
? ? ? ? 嚴格說只要涉及多分區,一定會有消費順序問題。在非強一致性場景中,可以通過選擇表的主鍵作為分區鍵,以適當避免消費亂序帶來的數據一致性問題,同時利用多分區保持Kafka的擴展性。在選擇分區數量時,需要考慮如下幾個因素。
- 主題需要達到多大的吞吐量?例如是每秒寫入100KB還是1GB?
- 從單個分區讀取數據的最大吞吐量是多少?每個分區一般都會有一個消費者,如果知道消費者寫入數據庫的速度不會超過每秒50MB,那么從一個分區讀取數據的吞吐量也不需要超過每秒50MB。
- 可以通過類似的方法估算生產者向單個分區寫入數據的吞吐量,不過生產者的速度一般比消費者快得多,所以最好為生產者多估算一些吞吐量。
- 每個broker包含的分區個數、可用的磁盤空間和網絡帶寬。
- 如果消息是按不同鍵寫入分區的,那么為已有主題新增分區會很困難。
- 單個broker對分區個數是有限制的,因為分區越多,占用內存越多,完成首領選舉需要的時間也越長。
? ? ? ? 如果估算出主題的吞吐量和消費者吞吐量,可以用主題吞吐量除以消費者吞吐量算出分區個數。如果不知道這些信息,根據經驗,把分區大小限制在25GB以內可以得到比較理想的效果。
5.5 maxwell + Kafka + bireme
? ? ? ? 本節介紹的方法是采用 maxwell + Kafka + bireme,將MySQL數據實時同步至Greenplum。maxwell實時解析MySQL的binlog,并將輸出的JSON格式數據發送到Kafka,Kafka在此方案中主要用于消息中轉,bireme負責讀取Kafka的消息,并應用于Greenplum數據庫以增量同步數據。方法實施的主要流程為如下三步:
5.5.1 總體架構
? ? ? ? 本方案的總體架構如圖5-10所示。
圖5-10 maxwell + Kafka + bireme 架構
? ? ? ? 圖中的maswell從MySQL復制的從庫中級聯獲取binlog,這樣做的原因將在5.5.4小節“實時CDC”中詳細說明。maxwell是一個能實時讀取MySQL二進制日志binlog,并生成JSON 格式的消息,作為生產者發送給Kafka、Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其他平臺的應用程序,其中Kafka是maxwell支持最完善的一個消息系統。它的常見應用場景有ETL、維護緩存、收集表級別的dml指標、增量到搜索引擎、數據分區遷移、切庫binlog回滾方案等。maswell在GitHub上具有較高的活躍度,官網地址為地址為https://github.com/zendesk/maxwell。
? ? ? ? maxwell主要提供了下列功能:
- 支持 SELECT * FROM table 方式進行全量數據初始化。
- 支持GTID,當MySQL發生failover后,自動恢復binlog位置。
- 可以對數據進行分區,解決數據傾斜問題,發送到Kafka的數據支持database、table、column等級別的數據分區。
- 工作方式是偽裝為MySQL Slave,在主庫上創建dump線程連接,接收binlog事件,然后根據schemas信息拼裝成JSON字符串,可以接受ddl、xid、row等各種事件。
? ? ? ? bireme是一個Greenplum數據倉庫的增量同步工具,目前支持MySQL、PostgreSQL和MongoDB數據源,maxwell + Kafka 是一種支持的數據源類型。bireme作為Kafka的消費者,采用 DELETE + COPY 的方式,將數據源的修改記錄同步到Greenplum,相較于INSERT、UPDATE、DELETE方式,COPY方式速度更快,性能更優。bireme的主要特性是采用小批量加載方式(默認加載延遲時間為10秒鐘)提升數據同步的性能,但要求所有同步表在源和目標數據庫中都必須有主鍵。bireme官網地址為https://github.com/HashDataInc/bireme/。
? ? ? ? Kafka在本架構中作為消息中間件將maxwell和bireme橋接在一起,上下游組件的實現都依賴于它。正如本節開頭所述,搭建Kafka服務是實施本方案的第一步。為了簡便,在此實驗環境中使用CDH中的Kafka服務,基本信息如下:
- Kafka集群由三臺虛機組成,實例為:172.16.1.124:9092(controller)、172.16.1.125:9092、172.16.1.126:9092。
- 三臺虛機的基本硬件配置同為:4核CPU、8GB內存、千兆網卡。
- Kafka版本為kafka_2.11-2.2.1-cdh6.3.1,即編譯Kafka源代碼的Scala編譯器版本號為2.11,Kafka版本號為2.2.1,平臺為CDH 6.3.1。可執行下面的命令查看Kafka版本信息: ps -ef|grep '/libs/kafka.\{2,40\}.jar'
- 除default.replication.factor默認副本數配置為3,其他Kafka配置參數均采用缺省值。
? ? ? ? default.replication.factor參數指定broker級別的復制系數,CDH中的Kafka缺省值為1。這里將該設置改為3,也就是說每個分區總共會被3個不同的broker復制3次。默認情況下,Kafka會確保分區的每個副本被放在不同的broker上。
? ? ? ? 如果復制系數為N,那么在N-1個broker失效的情況下,仍然能夠從主題讀取數據或向主題寫入數據。所以,更高的復制系數會帶來更高的可用性。另一方面,復制系數N需要至少N個broker,而且N個數據副本會占用N倍的磁盤空間。通常要在可用性和存儲硬件之間做出權衡。
? ? ? ? 如果因broker重啟導致的主題不可用是可接受的(這在集群里屬正常行為),那么把復制系數設為1即可。復制系數為2意味著可以容忍1個broker失效。但是要知道,有時候1個broker發生失效會導致集群不穩定,迫使重啟另一個broker——集群控制器,也就是說,如果將復制系數設置為2,就有可能因為重啟等問題導致集群暫時不可用。基于以上原因,建議在要求高可用的場景里把復制系數設置為3,大多數情況下這已經足夠安全。
? ? ? ? 下面在Kafka中創建一個topic,在后面配置maxwell時將使用該topic:
# 設置Kafka可執行文件路徑 export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/; # 創建一個三分區三副本的topic,主要用于演示數據在分區間的均勻分布 kafka-topics.sh --create --topic mytopic --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3 # 查看topic kafka-topics.sh --list --bootstrap-server 172.16.1.124:9092 # 查看partition kafka-topics.sh --describe --topic mytopic --bootstrap-server 172.16.1.124:9092 # 查看每個分區的大小 kafka-log-dirs.sh --describe --topic-list mytopic --bootstrap-server 172.16.1.124:9092? ? ? ? mytopic的分區如下:
Topic: mytopic?? ?Partition: 0?? ?Leader: 340?? ?Replicas: 340,339,330?? ?Isr: 340,339,330 Topic: mytopic?? ?Partition: 1?? ?Leader: 330?? ?Replicas: 330,340,339?? ?Isr: 330,340,339 Topic: mytopic?? ?Partition: 2?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340? ? ? ? ISR(In Sync Replica)是Kafka的副本同步機制,leader會維持一個與其保持同步的副本集合,該集合就是ISR,每個分區都有一個ISR,由leader動態維護。我們要保證Kafka不丟消息,就要保證ISR這組集合中至少有一個存活,并且消息成功提交。
? ? ? ? 本實驗環境部署的其他角色如下:
- MySQL主庫:172.16.1.126:3306
- MySQL從庫:172.16.1.127:3306
- Greenplum Master:114.112.77.198:5432。
? ? ? ? Greenplum集群主機的操作系統版本為CentOS Linux release 7.9.2009 (Core),其他所有主機操作系統版本為CentOS Linux release 7.2.1511 (Core)。MySQL已經開啟主從復制,相關配置如下:
log-bin=mysql-bin ? ?# 開啟 binlog binlog-format=ROW ? ?# 選擇 ROW 格式 server_id=126 ? ? ? ?# 主庫server_id,從庫為127 log_slave_updates ? ?# 開啟級聯binlog? ? ? ? 我們還事先在MySQL中創建了maxwell用于連接數據庫的用戶,并授予了相關權限。
-- 在126主庫執行 create user 'maxwell'@'%' identified by '123456'; grant all on maxwell.* to 'maxwell'@'%'; grant select, replication client, replication slave on *.* to 'maxwell'@'%';?? ? ? ? MySQL主從復制相關配置參見“配置異步復制”,Greenplum安裝部署參見本專題上一篇“Greenplum 實時數據倉庫實踐(4)——Greenplum安裝部署”。
5.5.2 maxwell安裝配置
? ? ? ? 我們在172.16.1.126上搭建maxwell服務。
1. 下載并解壓
wget https://github.com/zendesk/maxwell/releases/download/v1.34.1/maxwell-1.34.1.tar.gz tar -zxvf maxwell-1.34.1.tar.gz2. 修改配置文件
(1)備份示例配置文件
(2)編輯config.properties文件內容如下
log_level=info metrics_type=http http_port=9090producer=kafka# Kafka配置 kafka_topic=mytopic kafka.bootstrap.servers=172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092 kafka.compression.type=snappy kafka.retries=0 kafka.acks=1 kafka.batch.size=16384 kafka_partition_hash=murmur3 producer_partition_by=primary_key# MySQL配置 host=172.16.1.127 port=3306 user=maxwell password=123456filter=exclude: *.*, include: tpcc_test.*, include: source.*, include: test.*? ? ? ? 配置項說明:
- log_level:日志級別,有效值debug、info、warn、error,默認info。
- metrics_type:監控報告類型,有效值slf4j、jmx、http、datadog。
- http_port:metrics_type為http時使用的端口號。
- producer:生產者,有效值為stdout、file、kafka、kinesis、pubsub、sqs、rabbitmq、redis,默認stdout(控制臺輸出)。
- kafka_topic:Kafka主題名,maxwell向該主題寫數據,默認值為maxwell。除了指定為靜態topic,還可以動態傳參,如namespace_%{database}_%{table},啟動maxwell時%{database}和%{table}將被具體的庫名和表名替換,并在Kafka中自動創建這些topic。namespace命名空間用于限制topic名稱,可以省略。
- kafka.bootstrap.servers:Kafka broker列表。
- kafka.compression.type:消息壓縮類型,默認不壓縮。使用壓縮可以降低網絡傳輸和存儲開銷,而這往往是向Kafka發送消息的瓶頸所在。
- kafka.retries:生產者從服務器收到錯誤時重發消息的次數,如果到達該值,生產者將放棄重試并返回錯誤。
- kafka.acks:生產者響應方式。
- kafka.batch.size:一個批次使用的內存字節數,默認16KB。生產者會把發送到同一個分區的消息放到一個批次里,然后按批次發送消息。如果該值設置得太小,因為生產者需要更頻繁地發送消息,會增加一些額外開銷。
- kafka_partition_hash:為消息選擇Kafka分區時使用的hash算法,有效值default、murmur3,默認default。
- producer_partition_by:輸入到Kafka的分區函數,有效值database、table、primary_key、transaction_id、column、random,默認database。在很多業務系統中,不同數據庫的活躍度差異很大,主體業務的數據庫操作頻繁,產生的binlog也就很多,而maxwell默認使用數據庫名作為key進行hash,那么顯而易見,binlog的操作經常都被分到同一個分區里,造成數據傾斜。這里選擇了主鍵作為分區key,同一主鍵被分到同一分區,同時選用murmurhash3哈希算法,以獲得更好的效率和分布。用主鍵作為分區key還可以使得對同一主鍵行的更新將保持與數據庫同序。
- filter:過濾規則,通過 exclude 排除,通過 include 包含,值可以為具體的數據庫、表、列,甚至用 Javascript 來定義復雜的過濾規則,可以用正則表達式描述。這里配置為接收MySQL源端tpcc_test、source、test三個庫里所有表的binlog。
- host、port、user、password:連接MySQL實例所用的IP、端口、用戶名、密碼。
? ? ? ? maxwell完整的配置參數說明參見Reference - Maxwell's Daemon。
3. 啟動maxwell
? ? ? ? maxwell用Java語言開發,啟動maxwell 1.34.1需要JDK 11運行環境,JDK 8報錯:
因此先安裝JDK 11:
yum -y install jsvc rpm -ivh jdk-11.0.12_linux-x64_bin.rpm然后啟動maxwell:
export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell --config config.properties --daemon? ? ? ? --config選項指定配置文件,--daemon選項指定maxwell實例作為守護進程到后臺運行。maxwell啟動時,會在它所連接的MySQL實例中創建一個maxwell數據庫,其中包含如下7個表,保存maxwell的元數據。
- bootstrap:用于數據初始化,5.5.4小節會介紹maxwell的bootstrap功能。
- columns:記錄所有的列信息。
- databases:記錄所有的數據庫信息。
- heartbeats:記錄心跳信息。
- positions:記錄binlog讀取位置,包括binlog文件及其偏移量。
- schemas:記錄DDL的binlog信息。
- tables:記錄所有的表信息。
? ? ? ? maxwell成功啟動后,將在日志文件中看到類似下面的信息:
[mysql@node2~/maxwell-1.34.1]$tail /home/mysql/maxwell-1.34.1/bin/../logs/MaxwellDaemon.out 15:22:54,297 INFO ?BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000001:406400351 15:22:54,302 INFO ?MaxwellHTTPServer - Maxwell http server starting 15:22:54,306 INFO ?MaxwellHTTPServer - Maxwell http server started on port 9090 15:22:54,326 INFO ?BinaryLogClient - Connected to 172.16.1.127:3306 at mysql-bin.000001/406400351 (sid:6379, cid:5179862) 15:22:54,326 INFO ?BinlogConnectorReplicator - Binlog connected. 15:22:54,339 INFO ?log - Logging initialized @4344ms to org.eclipse.jetty.util.log.Slf4jLog 15:22:54,595 INFO ?Server - jetty-9.4.41.v20210516; built: 2021-05-16T23:56:28.993Z; git: 98607f93c7833e7dc59489b13f3cb0a114fb9f4c; jvm 11.0.12+8-LTS-237 15:22:54,710 INFO ?ContextHandler - Started o.e.j.s.ServletContextHandler@4ceb3f9a{/,null,AVAILABLE} 15:22:54,746 INFO ?AbstractConnector - Started ServerConnector@202c0dbd{HTTP/1.1, (http/1.1)}{0.0.0.0:9090} 15:22:54,747 INFO ?Server - Started @4755ms [mysql@node2~/maxwell-1.34.1]$? ? ? ? 從http://172.16.1.126:9090/可以獲取所有監控指標,指標說明參見Reference - Maxwell's Daemon。
5.5.3 bireme安裝配置
? ? ? ? 我們在172.16.1.126上搭建bireme服務。
1. 下載并解壓
wget https://github.com/HashDataInc/bireme/releases/download/v2.0.0-alpha-1/bireme-2.0.0-alpha-1.tar.gz tar -zxvf bireme-2.0.0-alpha-1.tar.gz2. 修改配置文件
(1)備份示例配置文件
(2)編輯config.properties文件內容如下
target.url = jdbc:postgresql://114.112.77.198:5432/dw target.user = dwtest target.passwd = 123456data_source = mysqlmysql.type = maxwell mysql.kafka.server = 172.16.1.124:9092,172.16.1.125:9092,172.16.1.126:9092 mysql.kafka.topic = mytopicpipeline.thread_pool.size = 3state.server.addr = 172.16.1.126 state.server.port = 9091? ? ? ? 配置項說明:
- target.url、target.user、target.passwd:目標Greenplum的URL、用戶名、密碼,bireme使用這些信息連接Greenplum數據庫并寫入數據。
- data_source:指定數據源<source_name>,多個數據源用逗號分隔開,忽略空白字符。
- <source_name>.type:指定數據源的類型。
- <source_name>.kafka.server:數據源的Kafka地址。
- <source_name>.kafka.topic:數據源在Kafka中對應的topic。
- pipeline.thread_pool.size:pipeline線程數。每個數據源可以有多個pipeline,對于maxwell,每個Kafka topic分區對應一個pipeline。
- state.server.addr、state.server.port:監控服務器的IP、端口。bireme啟動一個輕量級的HTTP服務器方便用戶獲取當前的數據裝載狀態。
? ? ? ? bireme完整的配置參數說明參見https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md。
(3)編輯mysql.properties文件內容如下
test.t1 = public.t1? ? ? ? 在config.properties文件中指定數據源為mysql,因此需要新建文件~/bireme-2.0.0-alpha-1/etc/mysql.properties,在其中加入源表到目標表的映射。這里是將MySQL中test.t1表的數據同步到Greenplum的public.t1表中。(4)在源和目標庫創建表,注意都要有主鍵
-- MySQL,126主庫執行 use test; create table t1 (a int primary key);-- Greenplum set search_path=public; create table t1 (a int primary key);? ? ? ? 在MySQL主庫上建表的DDL語句會寫到binlog中,并在從庫上重放。同樣,因為我們建表前已經啟動了maxwell,該建表語句也會隨binlog傳遞到maxwell。maxwell可以通過啟用output_ddl支持DDL事件捕獲,該參數是boolean類型,默認為false。默認配置時不會將DLL的binlog事件發送到Kafka,只會記錄到日志文件和maxwell.schemas表中。
? ? ? ? 如果output_ddl設置為true,除了日志文件和maxwell.schemas表,DDL事件還會被寫到由ddl_kafka_topic參數指定的Kafka topic中,默認為kafka_topic。Kafka中的DDL消息需要由消費者實現消費邏輯,bireme不處理DDL。我們使用默認配置,DDL只記錄信息,不寫入消息,因此只要在目標Greenplum庫手工執行同構的DDL語句,使源和目標保持相同的表結構,MySQL中執行的DDL語句就不會影響后面的數據同步。
3. 啟動bireme
? ? ? ? bireme用Java語言開發,啟動bireme 2.0.0需要JDK 8運行環境,用JDK 11報錯:
因此先安裝JDK 8:
yum -y install java-1.8.0-openjdk.x86_64然后啟動bireme,我這里使用的是安裝CDH時自帶的JDK 8:
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera cd ~/bireme-2.0.0-alpha-1 bin/bireme startbireme成功啟動后,將在控制臺看到類似下面的信息:
Bireme JMX enabled by default Starting the bireme service... The bireme service has started.從http://172.16.1.126:9091/?pretty可以獲取bireme狀態:
{"source_name": "mysql","type": "MAXWELL","pipelines": [{"name": "mytopic-1","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"},{"name": "mytopic-2","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"},{"name": "mytopic-3","latest": "1970-01-01T08:00:00.000Z","delay": 0.0,"state": "NORMAL"}] }- source_name:數據源名稱。
- type:數據源類型。
- pipelines:包含了一組pipeline的同步狀態,每一個數據源可能用多個pipeline同時工作。
- name:pipeline名稱。
- latest:最新的數據產生時間。
- delay:從數據進入bireme到成功加載并返回的時間間隔。
- state是pipeline的狀態。
? ? ? ? 現在所有服務都已正常,可以進行一些簡單的測試:
-- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (1); insert into t1 values (2); insert into t1 values (3); update t1 set a=10 where a=1; delete from t1 where a=2; commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----310 (2 rows)? ? ? ? MySQL中的數據變化被實時同步到Greenplum中。
4. 如何保證數據的順序消費
? ? ? ? bireme實現中的一個pipeline就是Kafka中的一個消費者。我們建的topic有三個分區,maxwell在寫的時候指定一個主鍵作為hash key,那么同一主鍵的相關數據,一定會被分發到同一個分區中去,而單個分區中的數據一定是有序的。消費者從分區中取出數據的時候,也一定是有序的,到這里順序沒有錯亂。接著,我們在消費者里可能設置多個線程來并發處理消息(如配置pipeline.thread_pool.size=3),因為如果消費者是單線程,而處理又比較耗時的話,吞吐量太低。一個消費者多線程并發處理就可能出現亂序問題,如圖5-11所示。
圖5-11 多線程消費造成亂序
?
? ? ? ? 解決該問題的方式是寫N個內存阻塞隊列,具有相同主鍵的數據都到同一個隊列,然后對于N個線程,每個線程分別消費一個隊列即可,這樣就能保證順序性,如圖5-12所示。bireme就是使用這個模型實現的。
圖5-12 用內存阻塞隊列解決多線程消費亂序問題
?
5.5.4 實時CDC
? ? ? ? 大多數情況下,數據同步被要求在不影響線上業務的情況下聯機執行,而且還要求對線上庫的影響越小越好。例如,同步過程中對主庫加鎖會影響對主庫的訪問,因此通常是不被允許的。本節演示如何在保持對線上庫正常讀寫的前提下,通過全量加增量的方式,完成MySQL到Greenplum的實時數據同步。
? ? ? ? 為展示完整過程,先做一些清理工作,然后對主庫執行tpcc-mysql壓測,模擬正在使用的線上業務數據庫,在壓測執行期間做全部九個測試用表的全量和增量數據同步。
- maxwell、bireme清理:
- bireme要求所有表都具有主鍵。tpcc-mysql測試中的history表沒有主鍵,因此在主庫為該表添加主鍵,構成主鍵的字段為表全部八個字段的聯合。
- 主庫執行壓測模擬業務庫。后面的數據同步操作均在壓測期間執行。
1. 全量同步
(1)在Greenplum中創建模式
? ? ? ? 模式(schema)是一個有趣的概念,不同數據庫系統中的模式代表完全不同的東西。如Oracle中,默認在創建用戶的時候,就建立了一個和用戶同名的模式,并且互相綁定,因此很多情況下Oracle的用戶和模式可以通用。MySQL中的schema是database的同義詞。而Greenplum中的模式是從PostgreSQL繼承來的,其概念與SQL Server的模式更為類似,是數據庫中的邏輯對象。
? ? ? ? Greenplum的模式是數據庫中對象和數據的邏輯組織。模式允許在一個數據庫中存在多個同名的對象,如果對象屬于不同的模式,同名對象之間不會沖突。使用schema有如下好處:
- 方便管理多個用戶共享一個數據庫,但是又可以互相獨立。
- 方便管理眾多對象,更有邏輯性。
- 方便兼容某些第三方應用程序,如果創建對象時是帶schema的。
? ? ? ? 比如要設計一個復雜系統,由眾多模塊構成,有時候模塊間又需要具有獨立性。各模塊存放單獨的數據庫顯然是不合適的。此時就可使用schema來劃分各模塊間的對象,再對用戶進行適當的權限控制,這樣邏輯也非常清晰。執行以下操作在Greenplum中創建schema。
# 連接master psql -d dw -U dwtest -h 127.0.0.1 # 創建schema create schema tpcc_test; # 修改用戶搜索路徑 alter database dw set search_path to public,pg_catalog,tpcc_test;(2)在tpcc_test模式中創建tpcc-mysql測試用表
? ? ? ? tpcc-mysql安裝目錄下的create_table.sql文件中包含MySQL里的建表腳本。將該SQL腳本改為Greenplum版:
- 去掉Engine=InnoDB,這是MySQL用的。
- 將tinyint改為smallint,Greenplum沒有tinyint數據類型。
- 將datetime改為timestamp,Greenplum沒有datetime數據類型。
- 為history表添加主鍵,構成主鍵的字段為該表全部八個字段的聯合。
? ? ? ? Greenplum是分布式數據庫,一般為提高查詢性能需要在建表時通過distributed by子句指定分布鍵。如果表有主鍵,同時沒有指定分布鍵,則Greenplum自動使用主鍵作為表的分布鍵,我們出于簡便使用這種方式。關于選擇分布鍵的最佳實踐,將在下一篇的建立示例數據倉庫環境中加以說明。執行以下操作在tpcc_test模式中建表。
-- 設置當前模式 set search_path to tpcc_test;-- 創建tpcc-mysql測試用的9個表 ...?create table history ( h_c_id int,? h_c_d_id smallint,? h_c_w_id smallint, h_d_id smallint, h_w_id smallint, h_date timestamp, h_amount decimal(6,2),? h_data varchar(24), primary key (h_c_id,h_c_d_id,h_c_w_id,h_d_id,h_w_id,h_date,h_amount,h_data) );...?(3)修改bireme表映射配置
? ? ? ? 編輯~/bireme-2.0.0-alpha-1/etc/mysql.properties內容如下:
? ? ? ? 等號左邊是MySQL庫表名,右邊為對應的Greenplum模式及表名。bireme雖然提供了表映射配置文件,但實際只支持Greenplum中的public模式,如果映射其他模式,bireme啟動時會報錯:
Greenplum table and MySQL table size are inconsistent? ? ? ? 通過查看GetPrimaryKeys.java的源代碼,發現它在查詢Greenplum表的元數據時,使用的是硬編碼:
String tableList = sb.toString().substring(0, sb.toString().length() - 1) + ")"; String tableSql = "select tablename from pg_tables where schemaname='public' and tablename in "+ tableList + ""; String prSql = "SELECT NULL AS TABLE_CAT, "+ "n.nspname ?AS TABLE_SCHEM, "+ "ct.relname AS TABLE_NAME, "+ "a.attname ?AS COLUMN_NAME, "+ "(i.keys).n AS KEY_SEQ, "+ "ci.relname AS PK_NAME "+ "FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) "+ "JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) "+ "JOIN ( SELECT i.indexrelid, i.indrelid, i.indisprimary, information_schema._pg_expandarray(i.indkey) AS KEYS FROM pg_catalog.pg_index i) i ON (a.attnum = (i.keys).x AND a.attrelid = i.indrelid) "+ "JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE TRUE AND n.nspname = 'public' AND ct.relname in "+ tableList + " AND i.indisprimary ORDER BY TABLE_NAME, pk_name, key_seq";? ? ? ? 可以簡單修改源碼解決此問題,但還有另一個表映射問題。在表映射配置文件中,目標端Greenplum只能使用一個模式而不能自由配置。該問題與bireme的整體實現架構有關,它使用二維數組存儲配置項,代碼倒是簡化了,可邏輯完全不對。這個問題可不像public那么好改,估計得重構才行。
(4)停止127從庫復制
-- 在127從庫執行 stop slave;? ? ? ? 這么簡單的一句SQL卻是實現全量數據同步的關鍵所在。從庫停止復制,不影響主庫的正常使用,也就不會影響業務。此時從庫的數據處于靜止狀態,不會產生變化,這使得獲取全量數據變得輕而易舉。
(5)執行全量數據同步
? ? ? ? maxwell提供了一個命令工具 maxwell-bootstrap 幫助我們完成數據初始化,它基于 SELECT * FROM table 的方式進行全量數據讀取,不會產生多余的binlog。啟動maxwell時,如果使用--bootstrapper=sync,則初始化引導和binlog接收使用同一線程,這意味著所有binlog事件都將被阻止,直到引導完成。如果使用--bootstrapper=async(默認配置),maxwell將產生一個用于引導的單獨線程。在這種異步模式下,非引導表將由主線程正常復制,而引導表的binlog事件將排隊,并在引導過程結束時發送到復制流。
? ? ? ? 如果maxwell在下次引導時崩潰,它將完全重新引導全量數據,不管之前的進度如何。如果不需要此行為,則需要手動更新bootstrap表。具體來說,是將未完成的引導程序行標記為“完成”(is_complete=1)或刪除該行。
? ? ? ? 雖然maxwell考慮到了全量數據初始化問題,但bireme卻處理不了全量數據消費,會報類似下面的錯誤:
cn.hashdata.bireme.BiremeException: Not found. Record does not have a field named "w_id"? ? ? ? 設置producer=stdout,從控制臺可以看到bootstrap和正常insert會生產不同的JSON輸出。執行maxwell-bootstrap:
export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell-bootstrap --config config.properties --database test --table t1 --client_id maxwell控制臺輸出:
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1638778749,"xid":39429616,"commit":true,"data":{"id":1,"database_name":"tpcc_test","table_name":"t1","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":1,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}} {"database":"tpcc_test","table":"t1","type":"bootstrap-start","ts":1638778770,"data":{}} {"database":"tpcc_test","table":"t1","type":"bootstrap-insert","ts":1638778770,"data":{"a":1}} {"database":"tpcc_test","table":"t1","type":"bootstrap-complete","ts":1638778770,"data":{}}普通insert控制臺輸出:
{"database":"tpcc_test","table":"t1","type":"insert","ts":1638778816,"xid":39429828,"commit":true,"data":{"a":2}}? ? ? ? bireme不處理bootstrap相關類型,因此這里無法使用maxwell-bootstrap進行全量數據同步。我們執行以下操作,手工將源表的全量數據復制到目標表。
- 在127從庫將源表數據導出成文本文件
- 復制到198目標服務器
- 在198上將文本文件導入目標表
2. 增量同步
? ? ? ? maxwell是從從庫接收binlog,停止復制使得從庫的binlog不再發生變化,從而給maxwell提供了一個增量數據同步的初始binlog位點。只要此時啟動maxwell與bireme服務,然后開啟從庫的復制,增量數據就會自動執行同步。
(1)啟動maxwell
export JAVA_HOME=/usr/java/jdk-11.0.12 cd ~/maxwell-1.34.1 bin/maxwell --config config.properties --daemon(2)啟動bireme
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera cd ~/bireme-2.0.0-alpha-1 bin/bireme start(3)啟動MySQL從庫的數據復制
-- 在127從庫執行 start slave;(4)查看Kafka消費情況
export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/; kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group bireme? ? ? ? 壓測結束時,三個消費者的LAG在幾秒后都變為0,說明此時源和目標已經實時同步,而且bireme方式的消費延遲很小,幾乎是同時完成數據同步。可以對比mysql主庫、從庫和Greenplum的表數據以確認三者的數據一致性。
? ? ? ? 聰明如你也許已經想到,如果數據量太大,導致全量同步執行時間過長,以至于MySQL從庫的復制停滯太久,在重新啟動復制后會不會延遲越拉越久,而永遠不能追上主庫呢?理論上可能發生這種情況,但實際上不太可能出現。首先,業務庫不可能永遠滿載工作,總有波峰波谷。其次,數據倉庫通常只需要同步部分業務數據,而不會應用全部binlog。最后,我們還能采取各種手段加快MySQL主從復制,使從庫在一個可接受的時間范圍內追上主庫,包括:
- 在數據完整性允許的情況下,設置innodb_flush_log_at_trx_commit和sync_binlog雙0。
- 使用MySQL 5.6以后版本的組提交和多線程復制。
- 使用MySQL 5.7.22及其以后版本的基于WriteSet的多線程復制。
? ? ? ? maxwell + Kafka + bireme 方案的有點主要體現在兩點:一是容易上手,只需配置無需編程即可使用;二是消費速度快,這得益于bireme采用的 DELETE + COPY 方法,通過小批次準實時進行數據裝載的方式。這種方案的缺點也很明顯,bireme的實現比較糟糕。前面已經看到了幾處,如不支持DDL,不支持maxwell-bootstrap,不支持源表和目標表的自由映射等。而且,當pipeline.thread_pool.size值設置小于Kafka 分區數時,極易出現Consumer group 'xxx' is rebalancing問題,該問題還不能自行恢復,我極度懷疑這是由實現代碼的瑕疵所造成。也難怪,bireme這個個人作品在github上已經四年沒更新了。下面介紹更為流行,也是我們所采用的基于Canal的解決方案。
5.6 Canal + Kafka + ClientAdapter
? ? ? ? 本節介紹的方法和上節類似,只是將Kafka的生產者與消費者換成了Canal Server和Canal Adapter。
5.6.1 總體架構
? ? ? ? 本方案的總體架構如圖5-13所示。
圖5-13 Canal + Kafka + ClientAdapter 架構
? ? ? ? Canal是阿里開源的一個的組件,無論功能還是實現上都與maxwell類似。其主要用途是基于MySQL數據庫增量日志解析,提供增量數據訂閱和消費,工作原理相對比較簡單:
? ? ? ? 圖5-14顯示了Canal服務器的構成模塊。Server代表一個Canal運行實例,對應于一個jvm。Instance對應于一個數據隊列,1個Server對應1..n個Instance。Instance模塊中,EventParser完成數據源接入,模擬slave與master進行交互并解析協議。EventSink是Parser和Store的連接器,進行數據過濾、加工與分發。EventStore負責存儲數據。MetaManager是增量訂閱與消費信息管理器。
圖5-14 Canal服務器構成模塊
? ? ? ? Canal 1.1.1版本之后默認支持將Canal Server接收到的binlog數據直接投遞到消息隊列,目前默認支持的消息系統有Kafka和RocketMQ。早期的Canal僅提供Client API,需要用戶自己編寫客戶端程序實現消費邏輯。Canal 1.1.1版本之后增加了client-adapter,提供客戶端數據落地的適配及啟動功能。
? ? ? ? 下面演示安裝配置Canal Server和Canal Adapter實現MySQL到Greenplum的實時數據同步。這里使用的環境與5.5.1的相同,MySQL已經配置好主從復制,CDH的Kafka服務正常。我們還事先在MySQL中創建了Canal用于連接數據庫的用戶,并授予了相關權限。
-- 在126主庫執行 create user canal identified by 'canal'; ? grant select, replication slave, replication client on *.* to 'canal'@'%';? ? ? ? 下面在Kafka中創建一個topic,在后面配置Canal時將使用該topic:
kafka-topics.sh --create --topic example --bootstrap-server 172.16.1.124:9092 --partitions 3 --replication-factor 3example的分區如下:
Topic: example?? ?Partition: 0?? ?Leader: 340?? ?Replicas: 340,339,330?? ?Isr: 340,339,330 Topic: example?? ?Partition: 1?? ?Leader: 339?? ?Replicas: 339,330,340?? ?Isr: 339,330,340 Topic: example?? ?Partition: 2?? ?Leader: 330?? ?Replicas: 330,340,339?? ?Isr: 330,340,339? ? ? ? Client-Adapter在1.14版本為了解決對MySQL關鍵字的兼容問題引入了一個BUG,使它只能兼容MySQL,在向Greenplum插入數據時會報錯:
canal 1.1.5 bug 2021-10-08 16:51:09.347 [pool-2-thread-1] ERROR com.alibaba.otter.canal.client.adapter.support.Util - ERROR: syntax error at or near "`"Position: 15 org.postgresql.util.PSQLException: ERROR: syntax error at or near "`"Position: 15該問題說明詳見https://github.com/alibaba/canal/pull/3020,直到Adapter 1.1.5依然沒有解決。1.1.3版本還沒有出現此問題,因此我們選擇使用Canal 1.1.3,要求JDK 1.8以上。
5.6.2 Canal Server安裝配置
? ? ? ? 我們在172.16.1.126上運行Canal Server。
1. 下載并解壓
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/2. 修改配置文件
(1)編輯Canal配置文件/home/mysql/canal_113/deployer/conf/canal.properties,修改以下配置項。
? ? ? ? canal.properties是Canal服務器配置文件,其中包括三部分定義:
- common argument:通用參數定義,可以將instance.properties的公用參數,抽取放置到這里,這樣每個實例啟動的時候就可以共享配置。instance.properties配置定義優先級高于canal.properties
- destinations: Canal實例列表定義,列出當前服務器上有多少個實例,每個實例的加載方式是spring/manager等。
- MQ:消息隊列相關配置。
(2)編輯instance配置文件/home/mysql/canal_113/deployer/conf/example/instance.properties,修改以下配置項。
# Canal實例對應的MySQL master canal.instance.master.address=172.16.1.127:3306 # 注釋canal.mq.partition配置項 # canal.mq.partition=0 # Kafka topic分區數 canal.mq.partitionsNum=3 # 哈希分區規則,指定所有正則匹配的表對應的哈希字段為表主鍵 canal.mq.partitionHash=.*\\..*:$pk$? ? ? ? instance.properties是Canal實例配置文件,在canal.properties定義了canal.destinations后,需要在canal.conf.dir對應的目錄下建立同名目錄。例如缺省配置:
canal.destinations = example canal.conf.dir = ../conf這時需要在canal.properties所在目錄中存在(或創建)example目錄,example目錄里有一個instance.properties文件。
? ? ? ? 與上節介紹的bireme類似,Canal同樣存在消息隊列的順序性問題。Canal目前選擇支持的Kafka/rocketmq,本質上都是基于本地文件的方式來支持分區級的順序消息能力,也就是binlog寫入消息隊列可以有一些順序性保障,這取決于用戶的參數選擇。
? ? ? ? Canal支持消息隊列數據的幾種路由方式:單topic單分區,單topic多分區、多topic單分區、多topic多分區。canal.mq.dynamicTopic參數主要控制是單topic還是多topic,針對命中條件的表可以發到表名對應的topic、庫名對應的topic,或默認topic。canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分區以及分區的partition的路由計算,針對命中條件的可以做到按表級做分區、主鍵級做分區等。
? ? ? ? Canal的消費順序性,主要取決于路由選擇,例如:
- 單topic單分區,可以嚴格保證和binlog一樣的順序性,缺點就是性能比較慢,單分區的性能寫入大概在2~3k的TPS。
- 多topic單分區,可以保證表級別的順序性,一張表或者一個庫的所有數據都寫入到一個topic的單分區中,可以保證有序性,針對熱點表也存在寫入分區的性能問題。
- 單topic、多topic的多分區,如果用戶選擇的是指定table的方式,保障的是表級別的順序性(存在熱點表寫入分區的性能問題),如果用戶選擇的是指定pk hash的方式,那只能保障的是一個主鍵的多次binlog順序性。pk hash的方式性能最好,但需要權衡業務,如果業務上有主鍵變更或者對多主鍵數據有順序性依賴,就會產生業務處理錯亂的情況。如果有主鍵變更,主鍵變更前和變更后的值會落在不同的分區里,業務消費就會有先后順序的問題,需要注意。
? ? ? ? 如果事先創建好topic,canal.mq.partitionsNum參數值不能大于該topic的分區數。如果選擇在開始向Kafka發送消息時自動創建topic,則canal.mq.partitionsNum值不能大于Kafka的 num.partitions參數值。否則在Canal Server啟動時報錯:
ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - Invalid partition given with record: 1 is not in the range [0...1).3. 啟動Canal Server
? ? ? ? 執行下面的命令啟動Canal Server:
Canal Server成功啟動后,將在日志文件/home/mysql/canal_113/deployer/logs/canal/canal.log中看到類似下面的信息:
2021-12-14 16:29:42.724 [destination = example , address = /172.16.1.127:3306 , EventParser] WARN ?c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=333,serverId=127,gtid=,timestamp=1639466655000] cost : 688ms , the next step is binlog dump從MySQL可以看到canal用戶創建的dump線程:
*************************** 4. row ***************************Id: 5739453User: canalHost: 172.16.1.126:22423db: NULL Command: Binlog DumpTime: 123State: Master has sent all binlog to slave; waiting for binlog to be updatedInfo: NULL5.6.3 Canal Adapter安裝配置
? ? ? ? 我們在172.16.1.126上運行Canal Adapter。
1. 下載并解壓
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz tar -zxvf canal.adapter-1.1.3.tar.gz -C ~/canal_113/adapter/2. 修改配置文件
(1)編輯啟動器配置文件/home/mysql/canal_113/adapter/conf/application.yml,內容如下。
? ? ? ? 1.1.3版本的ClientAdapter支持如下功能:
- 客戶端啟動器
- 同步管理REST接口
- 日志適配器
- 關系型數據庫的表對表數據同步
- HBase的表對表數據同步
- ElasticSearch多表數據同步
? ? ? ? 適配器將會自動加載 conf/rdb 下的所有.yml結尾的表映射配置文件。
(2)編輯RDB表映射文件/home/mysql/canal_113/adapter/conf/rdb/t1.yml,內容如下。
dataSourceKey: defaultDS?? ? # 源數據源的key destination: example?? ??? ? # cannal的instance或者MQ的topic groupId: g1?? ??? ??? ??? ???# 對應MQ模式下的groupId,只會同步對應groupId的數據?? ? outerAdapterKey: Greenplum?? # adapter key, 對應上面配置outAdapters中的key concurrent: true?? ??? ??? ? # 是否按主鍵hash并行同步,并行同步的表必須保證主鍵不會更改,及不存在依賴該主鍵的其他同步表上的外鍵約束。 dbMapping:database: test?? ??? ??? ? # 源數據源的database/shcematable: t1?? ??? ??? ??? ???# 源數據源表名targetTable: public.t1?? ? # 目標數據源的模式名.表名targetPk:?? ??? ??? ??? ???# 主鍵映射a: a?? ??? ??? ??? ??? ? # 如果是復合主鍵可以換行映射多個 # ?mapAll: true?? ??? ??? ???# 是否整表映射,要求源表和目標表字段名一模一樣。如果targetColumns也配置了映射,則以targetColumns配置為準。targetColumns:?? ??? ??? ? # 字段映射,格式: 目標表字段: 源表字段,如果字段名一樣源表字段名可不填。a: acommitBatch: 30000 ?? ??? ?# 批量提交的大小? ? ? ? RDB adapter 用于適配MySQL到關系型數據庫(需支持jdbc)的數據同步及導入。
3. 啟動Canal Adapter
? ? ? ? 執行下面的命令啟動Canal Adapter:
Canal Adapter成功啟動后,將在日志文件/home/mysql/canal_113/adapter/logs/adapter/adapter.log中看到類似下面的信息:
2021-12-14 17:40:37.206 [main] INFO ?c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-g1 succeed ... 2021-12-14 17:40:37.384 [Thread-5] INFO ?c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Start to subscribe topic: example <============= 2021-12-14 17:40:37.385 [Thread-5] INFO ?c.a.o.c.adapter.launcher.loader.CanalAdapterKafkaWorker - =============> Subscribe topic: example succeed <============= ...? ? ? ? 現在所有服務都已正常,可以進行一些簡單的測試:
-- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (4),(5),(6); update t1 set a=30 where a=3; delete from t1 where a=10; commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----30654 (4 rows)? ? ? ? MySQL中的數據變化被實時同步到Greenplum中。
5.6.4 HA模式配置
? ? ? ? Canal的高可用不是服務器級別,而是基于實例的,一個實例對應一個MySQL實例。Canal通過將增量訂閱&消費的關系信息持久化存儲在Zookeeper中,保證數據集群共享,以支持HA模式。我們在172.16.1.127上再安裝一個Canal Server,然后對126、127上的Canal Server進行HA模式配置。配置中所使用的Zookeeper是Kafka同一CDH集群中的Zookeeper服務。
1. 配置Canal Server
(1)在127上安裝Canal Server
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz tar -zxvf canal.deployer-1.1.3.tar.gz -C ~/canal_113/deployer/(2)修改canal.properties文件,加上zookeeper配置。
# 兩個Canal Server的配置相同 canal.zkServers = 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml # 注釋下面行 # canal.instance.global.spring.xml = classpath:spring/file-instance.xml(3)修改example/instance.properties文件
canal.instance.mysql.slaveId=1126?? ?# 另一臺機器改成1127,保證slaveId不重復即可? ? ? ? 注意,兩臺機器上的instance目錄的名字需要保證完全一致,HA模式是依賴于instance name進行管理,同時必須都選擇default-instance.xml配置。
(4)啟動兩臺機器的Canal Server
~/canal_113/deployer/bin/startup.sh? ? ? ? 啟動后可以查看logs/example/example.log,只會看到一臺機器上出現了啟動成功的日志,如這里啟動成功的是126:
2021-12-15 11:23:46.593 [main] INFO ?c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....zookeeper中記錄了集群信息:
[zk: localhost:2181(CONNECTED) 6] ls /otter/canal/destinations/example/cluster [172.16.1.127:11111, 172.16.1.126:11111]2. 配置Canal Adapter
(1)修改Adapter啟動器配置文件application.yml,加上zookeeper配置。
# 注釋下面一行 # canalServerHost: 127.0.0.1:11111 zookeeperHosts: 172.16.1.125:2181,172.16.1.126:2181,172.16.1.127:2181(2)重啟Adapter
~/canal_113/adapter/bin/stop.sh ~/canal_113/adapter/bin/startup.sh? ? ? ? Adapter會自動從zookeeper中的running節點獲取當前服務的工作節點,然后與其建立連接。連接成功后,Canal Server會記錄當前正在工作的服務器信息:
[zk: localhost:2181(CONNECTED) 9] get /otter/canal/destinations/example/running {"active":true,"address":"172.16.1.126:11111","cid":1}? ? ? ? 現在進行一些簡單的測試驗證功能是否正常:
-- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (7),(8),(9); update t1 set a=40 where a=4; delete from t1 where a in (5,6,7); commit;-- 查詢Greenplum dw=> select * from public.t1;a ? ----308940 (4 rows)? ? ? ? MySQL中的數據變化被實時同步到Greenplum中,所有組件工作正常。
? ? ? ? 數據消費成功后,Canal Server會在zookeeper中記錄下當前最后一次消費成功的binlog位點,下次重啟客戶端時時,會從這最后一個位點繼續進行消費。
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":4270,"serverId":126,"timestamp":1639541811000}}3. 自動切換
? ? ? ? 停止正在工作的126的Canal Server:
~/canal_113/deployer/bin/stop.sh這時127會立馬啟動example instance,提供新的數據服務:
[zk: localhost:2181(CONNECTED) 20] get /otter/canal/destinations/example/running {"active":true,"address":"172.16.1.127:11111","cid":1}驗證功能是否正常:
-- 在MySQL主庫執行一些數據修改 use test; insert into t1 values (1),(2); update t1 set a=3 where a=30; delete from t1 where a in (8,9,40); commit;-- 查詢Greenplum dw=> select * from public.t1;a? ---132 (3 rows)啟動126的Canal Server,它將再次被添加到集群中:
[zk: localhost:2181(CONNECTED) 22] ls /otter/canal/destinations/example/cluster [172.16.1.127:11111, 172.16.1.126:11111]5.6.5 實時CDC
? ? ? ? 我們依然可以使用上節介紹的方法,進行全量加增量的實時數據同步。工作原理和操作步驟別無二致,只是實現的組件變了,maxwell替換為Canal Server,bireme替換為Canal Adapter,而這對于數據倉庫用戶來說完全透明。
? ? ? ? 也許在初始化數據同步之前需要進行必要的清理工作:
~/canal_113/deployer/bin/stop.sh ~/canal_113/adapter/bin/stop.sh rm ~/canal_113/deployer/conf/example/meta.dat rm ~/canal_113/deployer/conf/example/h2.mv.db cat /dev/null > ~/canal_113/adapter/logs/adapter/adapter.log cat /dev/null > ~/canal_113/deployer/logs/canal/canal.log? cat /dev/null > ~/canal_113/deployer/logs/example/example.log?? ? ? ? meta.dat文件的內容是個json串,存儲實例最后獲取的binlog位點信息,例如:
{"clientDatas":[{"clientIdentity":{"clientId":1001,"destination":"example","filter":""},"cursor":{"identity":{"slaveId":-1,"sourceAddress":{"address":"node3","port":3306}},"postion":{"gtid":"","included":false,"journalName":"mysql-bin.000001","position":3707,"serverId":126,"timestamp":1639531036000}}}],"destination":"example"}Canal重啟時,從meta.dat文件中journalName的起始位置開始同步,如果此時獲取到的binlog信息在MySQL中已被清除,啟動將會失敗。通常在MySQL執行了reset master、purge binary logs等操作,或修改完Canal的instance.properties配置后,重啟Canal Server前需要刪除meta.dat文件。
? ? ? ? h2.mv.db是Canal的表元數據存儲數據庫。當MySQL修改了表結構,根據binlog的DDL語句,將該時刻表結構元數據信息在h2.mv.db的meta_snapshot、meta_history等表中。改設計主要為解決MySQL某一時刻發生DDL變更,如果回溯時間跨越DDL變更的時刻,產生解析字段不一致的問題。目前Canal Adapter不支持DDL。
? ? ? ? 清理后的Canal處于一個全新的初始狀態,此時可以在不影響業務數據庫正常訪問的前提下進行實時數據同步,主要操作步驟歸納如下:
5.6.6 消費延遲監控
? ? ? ? 有別于bireme的DELETE + COPY,Canal Adapter在Greenplum上逐條執行INSERT、UPDATE、DELETE語句。從日志中清晰可見:
2021-12-15 12:45:37.597 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":3}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":[{"a":30}],"pkNames":null,"sql":"","table":"t1","ts":1639543512399,"type":"UPDATE"} 2021-12-15 12:45:38.308 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":1},{"a":2}],"database":"test","destination":"example","es":1639543537000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543512393,"type":"INSERT"} 2021-12-15 12:46:02.934 [pool-24-thread-1] INFO ?c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"a":8},{"a":40}],"database":"test","destination":"example","es":1639543562000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"t1","ts":1639543537814,"type":"DELETE"}? ? ? ? 正如上節所見,壓測中bireme的批次方式與MySQL的執行速度大差不差,而Canal Adapter的這種方式在Greenplum中的執行速度會比MySQL中慢得多。下面我們還是使用tpcc-mysql壓測制造MySQL負載,然后執行腳本監控消費延遲。Greenplum中已經創建好tpcc-mysql測試所用的9張表,需要配置這些表的映射關系,為每張表生成一個yml文件:
[mysql@node2~]$ls -l /home/mysql/canal_113/adapter/conf/rdb total 40 -rw-r--r-- 1 mysql mysql 825 Oct 11 11:13 customer.yml -rw-r--r-- 1 mysql mysql 538 Oct 11 11:18 district.yml -rw-r--r-- 1 mysql mysql 603 Oct 22 15:38 history.yml -rw-r--r-- 1 mysql mysql 379 Oct 11 11:23 item.yml -rw-r--r-- 1 mysql mysql 407 Oct 11 11:26 new_orders.yml -rw-r--r-- 1 mysql mysql 631 Oct 11 11:30 order_line.yml -rw-r--r-- 1 mysql mysql 506 Oct 11 11:33 orders.yml -rw-r--r-- 1 mysql mysql 720 Oct 11 11:44 stock.yml -rw-r--r-- 1 mysql mysql 275 Dec 14 17:37 t1.yml -rw-r--r-- 1 mysql mysql 473 Oct 11 11:50 warehouse.yml [mysql@node2~]$? ? ? ? history.yml文件內容如下,其他表映射配置文件類似:
dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: Greenplum concurrent: true dbMapping:database: tpcc_testtable: historytargetTable: tpcc_test.historytargetPk:h_c_id: h_c_idh_c_d_id: h_c_d_idh_c_w_id: h_c_w_idh_d_id: h_d_idh_w_id: h_w_idh_date: h_dateh_amount: h_amounth_data: h_data # ?mapAll: truetargetColumns:h_c_id: h_c_idh_c_d_id: h_c_d_idh_c_w_id: h_c_w_idh_d_id: h_d_idh_w_id: h_w_idh_date: h_dateh_amount: h_amounth_data: h_datacommitBatch: 30000? ? ? ? 消費延遲監控腳本lag.sh內容如下:
#!/bin/bash export PATH=$PATH:/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/;~/tpcc-mysql-master/tpcc_start -h172.16.1.126 -d tpcc_test -u root -p "123456" -w 10 -c 32 -r 60 -l 540 > tpcc_test.log 2>&1rate1=0 for ((i=1; i<=10; i++)) dostartTime=`date +%Y%m%d-%H:%M:%S`startTime_s=`date +%s`lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')lag1=$lagsleep 60lag=$(kafka-consumer-groups.sh --bootstrap-server 172.16.1.124:9092 --describe --group $1 |sed -n "3, 5p" | awk '{lag+=$5}END{print lag}')endTime=`date +%Y%m%d-%H:%M:%S`endTime_s=`date +%s`lag2=$laglag=$(($lag1 - $lag2))sumTime=$[ $endTime_s - $startTime_s ]rate=`expr $lag / $sumTime`rate1=$(($rate1 + $rate))echo "$startTime ---> $endTime" "Total:$sumTime seconds, consume $rate messages per second." doneecho; avg_rate=`expr $rate1 / 10`; left=`expr $lag2 / $avg_rate` echo "It will take about $left seconds to complete the consumption."? ? ? ? 說明:
- 首先執行10分鐘的tpcc壓測制造MySQL負載。
- 壓測結束后查看消費延遲,以位移(OFFSET)差作為度量。查看10次,每次相隔1分鐘。
- 每次計算每秒消費的消息數,取10次的平均值估算還需要多長時間完成消費。
- example topic中有三個分區,消費時會創建三個消費者,每個消費者用多線程(threads參數指定)進行消費,因此需要累加三個消費者的延遲。
? ? ? ? 在數據同步使用的所有組件工作正常的情況下執行腳本,命令行參數是消費組名稱(groupId參數指定):
./lag.sh g1? ? ? ? 輸出結果如下:
20211216-11:33:05 ---> 20211216-11:34:11 Total:66 seconds, consume 1509 messages per second. 20211216-11:34:11 ---> 20211216-11:35:17 Total:66 seconds, consume 1488 messages per second. 20211216-11:35:17 ---> 20211216-11:36:23 Total:66 seconds, consume 1508 messages per second. 20211216-11:36:23 ---> 20211216-11:37:29 Total:66 seconds, consume 1511 messages per second. 20211216-11:37:29 ---> 20211216-11:38:35 Total:66 seconds, consume 1465 messages per second. 20211216-11:38:35 ---> 20211216-11:39:41 Total:66 seconds, consume 1510 messages per second. 20211216-11:39:41 ---> 20211216-11:40:48 Total:67 seconds, consume 1444 messages per second. 20211216-11:40:48 ---> 20211216-11:41:54 Total:66 seconds, consume 1511 messages per second. 20211216-11:41:54 ---> 20211216-11:43:00 Total:66 seconds, consume 1465 messages per second. 20211216-11:43:00 ---> 20211216-11:44:06 Total:66 seconds, consume 1465 messages per second.It will take about 387 seconds to complete the consumption.? ? ? ? 在本實驗環境中,MySQL執行10分鐘的壓測負載,Greenplum大約需要執行約27分半(kafka-consumer-groups.sh命令本身需要約6秒的執行時間),兩者的QPS相差2.75倍。由此也可以看出,Greenplum作為分布式數據庫,專為分析型數據倉庫場景所設計,單條DML的執行效率遠沒有MySQL這種主機型數據庫高,并不適合高并發小事務的OLTP型應用。
? ? ? ? 雖然性能上比bireme的微批處理慢不少,但Canal Adapter的INSERT、UPDATE、DELETE處理方式,使得用類似于數據庫觸發器的功能實現自動實時ETL成為可能,這也是下一篇所要討論的主題。
小結
- 時間戳、觸發器、快照表、日志是常用的四種變化數據捕獲方法。使用日志不會侵入數據庫,適合做實時CDC。
- Maxwell和Canal本質都是MySQL binlog解析器,工作方式是把自己偽裝成Slave,實現MySQL復制協議。
- Maxwell和Canal可作為生產者,將binlog解析結果以消息形式輸出到Kafka中。
- Kafka在數據同步方案中用于消息中轉和持久化。使用Kafka時要注意多分區的消息順序問題,通常可以將表主鍵作為哈希分區鍵,保證主鍵行的更新與源同序。
- bireme是一個Greenplum數據倉庫的增量同步工具,支持將maxwell + Kafka 作為數據源,特點是采用DELETE + COPY方式,數據同步速度快。
- Canal Adapter提供客戶端數據落地的適配及啟動功能,可將MySQL binlog事件在目標數據庫中回放,使用的是INSERT、UPDATE、DELETE方式。
總結
以上是生活随笔為你收集整理的Greenplum 实时数据仓库实践(5)——实时数据同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据恢复工具(minitool powe
- 下一篇: Spring tool suite修改主