谈谈对Canal( 增量数据订阅与消费 )的理解
概述
canal是阿里巴巴旗下的一款開源項目,純Java開發。基于數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
起源:早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基于trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基于數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
基于日志增量訂閱&消費支持的業務:
工作原理
mysql主備復制實現:
從上層來看,復制分成三步:
canal的工作原理
原理相對比較簡單:
架構設計
個人理解,數據增量訂閱與消費應當有如下幾個點:
可以參考下圖:
canal架構設計
說明:
- server代表一個canal運行實例,對應于一個jvm
- instance對應于一個數據隊列 (1個server對應1..n個instance)
instance模塊:
- eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
- eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
- eventStore (數據存儲)
- metaManager (增量訂閱&消費信息管理器)
EventParser
整個parser過程大致可分為幾部:
EventSink設計
說明:
- 數據過濾:支持通配符的過濾模式,表名,字段內容等
- 數據路由/分發:解決1:n (1個parser對應多個store的模式)
- 數據歸并:解決n:1 (多個parser對應1個store)
- 數據加工:在進入store之前進行額外的處理,比如join
1 數據1:n業務 :
為了合理的利用數據庫資源, 一般常見的業務都是按照schema進行隔離,然后在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是通過cobar/tddl來解決數據源路由問題。 所以,一般一個數據庫實例上,會部署多個schema,每個schema會有由1個或者多個業務方關注。
2 數據n:1業務:
同樣,當一個業務的數據規模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據需要處理時,就需要鏈接多個store進行處理,消費的位點就會變成多份,而且數據消費的進度無法得到盡可能有序的保證。 所以,在一定業務場景下,需要將拆分后的增量數據進行歸并處理,比如按照時間戳/全局id進行排序歸并.
EventStore設計
目前實現了Memory內存、本地file存儲以及持久化到zookeeper以保障數據集群共享。
Memory內存的RingBuffer設計:
定義了3個cursor
- Put : Sink模塊進行數據存儲的最后一次寫入位置
- Get : 數據訂閱獲取的最后一次提取位置
- Ack : 數據消費成功的最后一次消費位置
借鑒Disruptor的RingBuffer的實現,將RingBuffer拉直來看:
實現說明:
- Put/Get/Ack cursor用于遞增,采用long型存儲
- buffer的get操作,通過取余或者與操作。(與操作: cusor & (size – 1) , size需要為2的指數,效率比較高)
Instance設計
instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
1. manager方式: 和你自己的內部web console/manager系統進行對接。(alibaba內部使用方式)
2. spring方式:基于spring xml + properties進行定義,構建spring配置.
- spring/memory-instance.xml 所有的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啟后又會回到初始位點進行解析。特點:速度最快,依賴最少
- spring/file-instance.xml 所有的組件(parser , sink , store)都選擇了基于file持久化模式,注意,不支持HA機制.支持單機持久化
- spring/default-instance.xml 所有的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集群共享. 支持HA
- spring/group-instance.xml 主要針對需要進行多庫合并時,可以將多個物理instance合并為一個邏輯instance,提供客戶端訪問。場景:分庫業務。 比如產品數據拆分了4個庫,每個庫會有一個instance,如果不用group,業務上要消費數據時,需要啟動4個客戶端,分別鏈接4個instance實例。使用group后,可以在canal server上合并為一個邏輯instance,只需要啟動1個客戶端,鏈接這個邏輯instance即可.
Server設計
server代表了一個canal的運行實例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現:
- Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)
- Netty : 基于netty封裝了一層網絡協議,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。
增量訂閱/消費設計
具體的協議格式,可參見:CanalProtocol.proto
get/ack/rollback協議介紹:
- Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內容為:
- a. batch id 唯一標識
- b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
- void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數據。基于get獲取的batchId進行提交,避免誤操作
- void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基于get獲取的batchId進行提交,避免誤操作
- canal的get/ack/rollback協議和常規的jms協議有所不同,允許get/ack異步處理,比如可以連續調用get多次,后續異步按順序提交ack/rollback,項目中稱之為流式api.
- 流式api設計的好處:
- get/ack異步化,減少因ack帶來的網絡延遲和操作成本 (99%的狀態都是處于正常狀態,異常的rollback屬于個別情況,沒必要為個別的case犧牲整個性能)
- get獲取數據后,業務消費存在瓶頸或者需要多進程/多線程消費時,可以不停的輪詢get數據,不停的往后發送任務,提高并行化. (作者在實際業務中的一個case:業務數據消費需要跨中美網絡,所以一次操作基本在200ms以上,為了減少延遲,所以需要實施并行化)
流式api設計:
- 每次get操作都會在meta中產生一個mark,mark標記會遞增,保證運行過程中mark的唯一性
- 每次的get操作,都會在上一次的mark操作記錄的cursor繼續往后取,如果mark不存在,則在last ack cursor繼續往后取
- 進行ack時,需要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,并將對應的mark位置更新為last ack cusor
- 一旦出現異常情況,客戶端可發起rollback情況,重新置位:刪除所有的mark, 清理get請求位置,下次請求會從last ack cursor繼續往后取
數據格式
canal采用protobuff:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | Entry ????Header ????????logfileName [binlog文件名] ????????logfileOffset [binlog position] ????????executeTime [發生的變更] ????????schemaName ????????tableName ????????eventType [insert/update/delete類型] ????entryType?? [事務頭BEGIN/事務尾END/數據ROWDATA] ????storeValue? [byte數據,可展開,對應的類型為RowChange]??? RowChange ????isDdl?????? [是否是ddl變更操作,比如create table/drop table] ????sql???? [具體的ddl sql] ????rowDatas??? [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理] ????????beforeColumns [Column類型的數組] ????????afterColumns [Column類型的數組]????? Column ????index?????? ????sqlType???? [jdbc type] ????name??????? [column name] ????isKey?????? [是否為主鍵] ????updated???? [是否發生過變更] ????isNull????? [值是否為null] ????value?????? [具體的內容,注意為文本] |
canal-message example:
比如數據庫中的表:
| 1 2 3 4 5 6 7 8 9 | mysql> select * from person; +----+------+------+------+ | id | name | age? | sex? | +----+------+------+------+ |? 1 | zzh? |?? 10 | m??? | |? 3 | zzh3 |?? 12 | f??? | |? 4 | zzh4 |??? 5 | m??? | +----+------+------+------+ 3 rows in set (0.00 sec) |
更新一條數據(update person set age=15 where id=4):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | **************************************************** * Batch Id: [2] ,count : [3] , memsize : [165] , Time : 2016-09-07 15:54:18 * Start : [mysql-bin.000003:6354:1473234846000(2016-09-07 15:54:06)] * End : [mysql-bin.000003:6550:1473234846000(2016-09-07 15:54:06)] **************************************************** ================> binlog[mysql-bin.000003:6354] , executeTime : 1473234846000 , delay : 12225ms ?BEGIN ----> Thread id: 67 ----------------> binlog[mysql-bin.000003:6486] , name[canal_test,person] , eventType : UPDATE , executeTime : 1473234846000 , delay : 12225ms id : 4??? type=int(11) name : zzh4??? type=varchar(100) age : 15??? type=int(11)??? update=true sex : m??? type=char(1) ---------------- ?END ----> transaction id: 308 ================> binlog[mysql-bin.000003:6550] , executeTime : 1473234846000 , delay : 12240ms |
HA機制設計
canal的HA分為兩部分,canal server和canal client分別有對應的ha實現:
- canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處于running,其他的處于standby狀態.
- canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。
Canal Server:
大致步驟:
HA配置架構圖(舉例)如下所示:
canal其他鏈接方式
canal還有幾種連接方式:
1. 單連
2. 兩個client+兩個instance+1個mysql
當mysql變動時,兩個client都能獲取到變動
3. 一個server+兩個instance+兩個mysql+兩個client
4. instance的standby配置
整體架構
從整體架構上來說canal是這種架構的(canal中沒有包含一個運維的console web來對接,但要運用于分布式環境中肯定需要一個Manager來管理):
一個總體的manager system對應于n個Canal Server(物理上來說是一臺服務器), 那么一個Canal Server對應于n個Canal Instance(destinations). 大體上是三層結構,第二層也需要Manager統籌運維管理。
那么隨著Docker技術的興起,是否可以試一下下面的架構呢?
- 一個docker中跑一個instance服務,相當于略去server這一層的概念。
- Manager System中配置一個instance,直接調取一個docker發布這個instance,其中包括向這個instance發送配置信息,啟動instance服務.
- instance在運行過程中,定時刷新binlog filename+ binlog position的信息至zk。
- 如果一個instance出現故障,instance本身報錯或者zk感知此node消失,則根據相應的信息,比如上一步保存的binlog filename+binlog position重新開啟一個docker服務,當然這里可以適當的加一些重試機制。
- 當要更新時,類似AB test, 先關閉一個docker,然后開啟新的已更新的替換,循序漸進的進行。
- 當涉及到分表分庫時,多個物理表對應于一個邏輯表,可以將結果存于一個公共的模塊(比如MQ),或者單獨存取也可以,具體情況具體分析
- 存儲可以參考canal的多樣化:內存,文件,zk,或者加入至MQ中
- docker由此之外的工具管理,比如kubernetes
- 也可以進一步添加HA的功能,兩個docker對應一個mysql,互為主備,類似Canal的HA架構。如果時效性不是貼別強的場景,考慮到成本,此功能可以不采用。
總結
這里總結了一下Canal的一些點,僅供參考:
參考資料
總結
以上是生活随笔為你收集整理的谈谈对Canal( 增量数据订阅与消费 )的理解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据库事务的隔离级别
- 下一篇: 使用 Binlog 和 Canal 从