使用 Binlog 和 Canal 从 MySQL 抽取数据
數據抽取是 ETL 流程的第一步。我們會將數據從 RDBMS 或日志服務器等外部系統抽取至數據倉庫,進行清洗、轉換、聚合等操作。在現代網站技術棧中,MySQL 是最常見的數據庫管理系統,我們會從多個不同的 MySQL 實例中抽取數據,存入一個中心節點,或直接進入 Hive。市面上已有多種成熟的、基于 SQL 查詢的抽取軟件,如著名的開源項目?Apache Sqoop,然而這些工具并不支持實時的數據抽取。MySQL Binlog 則是一種實時的數據流,用于主從節點之間的數據復制,我們可以利用它來進行數據抽取。借助阿里巴巴開源的?Canal?項目,我們能夠非常便捷地將 MySQL 中的數據抽取到任意目標存儲中。
Canal 的組成部分
簡單來說,Canal 會將自己偽裝成 MySQL 從節點(Slave),并從主節點(Master)獲取 Binlog,解析和貯存后供下游消費端使用。Canal 包含兩個組成部分:服務端和客戶端。服務端負責連接至不同的 MySQL 實例,并為每個實例維護一個事件消息隊列;客戶端則可以訂閱這些隊列中的數據變更事件,處理并存儲到數據倉庫中。下面我們來看如何快速搭建起一個 Canal 服務。
配置 MySQL 主節點
MySQL 默認沒有開啟 Binlog,因此我們需要對?my.cnf?文件做以下修改:
server-id = 1 log_bin = /path/to/mysql-bin.log binlog_format = ROW- 1
- 2
- 3
注意?binlog_format?必須設置為?ROW, 因為在?STATEMENT?或?MIXED?模式下, Binlog 只會記錄和傳輸 SQL 語句(以減少日志大小),而不包含具體數據,我們也就無法保存了。
從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的?REPLICATION?權限。我們可以使用?GRANT?命令創建這樣的賬號:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';- 1
- 2
啟動 Canal 服務端
從 GitHub 項目發布頁中下載 Canal 服務端代碼(鏈接),配置文件在?conf?文件夾下,有以下目錄結構:
canal.deployer/conf/canal.properties canal.deployer/conf/instanceA/instance.properties canal.deployer/conf/instanceB/instance.properties- 1
- 2
- 3
conf/canal.properties?是主配置文件,如其中的?canal.port?用以指定服務端監聽的端口。instanceA/instance.properties?則是各個實例的配置文件,主要的配置項有:
# slaveId 不能與 my.cnf 中的 server-id 項重復 canal.instance.mysql.slaveId = 1234 canal.instance.master.address = 127.0.0.1:3306 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.connectionCharset = UTF-8 # 訂閱實例中所有的數據庫和表 canal.instance.filter.regex = .*\\..*- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
執行?sh bin/startup.sh?命令開啟服務端,在日志文件?logs/example/example.log?中可以看到以下輸出:
Loading properties file from class path resource [canal.properties] Loading properties file from class path resource [example/instance.properties] start CannalInstance for 1-example [destination = example , address = /127.0.0.1:3306 , EventParser] prepare to find start position just show master status- 1
- 2
- 3
- 4
編寫 Canal 客戶端
從服務端消費變更消息時,我們需要創建一個 Canal 客戶端,指定需要訂閱的數據庫和表,并開啟輪詢。
首先,在項目中添加?com.alibaba.otter:canal.client?依賴項,構建?CanalConnector?實例:
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");connector.connect(); connector.subscribe(".*\\..*");while (true) {Message message = connector.getWithoutAck(100);long batchId = message.getId();if (batchId == -1 || message.getEntries().isEmpty()) {Thread.sleep(3000);} else {printEntries(message.getEntries());connector.ack(batchId);} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
這段代碼和連接消息系統很相似。變更事件會批量發送過來,待處理完畢后我們可以 ACK 這一批次,從而避免消息丟失。
// printEntries RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); for (RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == EventType.INSERT) {printColumns(rowData.getAfterCollumnList());} }- 1
- 2
- 3
- 4
- 5
- 6
- 7
每一個?Entry?代表一組具有相同變更類型的數據列表,如 INSERT 類型、UPDATE、DELETE 等。每一行數據我們都可以獲取到各個字段的信息:
// printColumns String line = columns.stream().map(column -> column.getName() + "=" + column.getValue()).collect(Collectors.joining(",")); System.out.println(line);- 1
- 2
- 3
- 4
- 5
完整代碼可以在 GitHub 中找到(鏈接)。
加載至數據倉庫
關系型數據庫與批量更新
若數據倉庫是基于關系型數據庫的,我們可以直接使用?REPLACE?語句將數據變更寫入目標表。其中需要注意的是寫入性能,在更新較頻繁的場景下,我們通常會緩存一段時間的數據,并批量更新至數據庫,如:
REPLACE INTO `user` (`id`, `name`, `age`, `updated`) VALUES (1, 'Jerry', 30, '2017-08-12 16:00:00'), (2, 'Mary', 28, '2017-08-12 17:00:00'), (3, 'Tom', 36, '2017-08-12 18:00:00');- 1
- 2
- 3
- 4
另一種方式是將數據變更寫入按分隔符分割的文本文件,并用?LOAD DATA?語句載入數據庫。這些文件也可以用在需要寫入 Hive 的場景中。不管使用哪一種方法,請一定注意要對字符串類型的字段進行轉義,避免導入時出錯。
基于 Hive 的數據倉庫
Hive 表保存在 HDFS 上,該文件系統不支持修改,因此我們需要一些額外工作來寫入數據變更。常用的方式包括:JOIN、Hive 事務、或改用 HBase。
數據可以歸類成基礎數據和增量數據。如昨日的?user?表是基礎數據,今日變更的行是增量數據。通過?FULL OUTER JOIN,我們可以將基礎和增量數據合并成一張最新的數據表,并作為明天的基礎數據:
SELECTCOALESCE(b.`id`, a.`id`) AS `id`,COALESCE(b.`name`, a.`name`) AS `name`,COALESCE(b.`age`, a.`age`) AS `age`,COALESCE(b.`updated`, a.`updated`) AS `updated` FROM dw_stage.`user` a FULL OUTER JOIN (-- 增量數據會包含重復數據,因此需要選擇最新的那一條SELECT `id`, `name`, `age`, `updated`FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `id` ORDER BY `updated` DESC) AS `n`FROM dw_stage.`user_delta`) bWHERE `n` = 1 ) b ON a.`id` = b.`id`;- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
Hive 0.13 引入了事務和 ACID 表,0.14 開始支持?INSERT、UPDATE、DELETE?語句,Hive 2.0.0 則又新增了?Streaming Mutation API,用以通過編程的方式批量更新 Hive 表中的記錄。目前,ACID 表必須使用 ORC 文件格式進行存儲,且須按主鍵進行分桶(Bucket)。Hive 會將變更記錄保存在增量文件中,當?OrcInputFormat?讀取數據時會自動定位到最新的那條記錄。官方案例可以在這個鏈接中查看。
最后,我們可以使用 HBase 來實現表數據的更新,它是一種 KV 存儲系統,同樣基于 HDFS。HBase 的數據可以直接為 MapReduce 腳本使用,且 Hive 中可以創建外部映射表指向 HBase。更多信息請查看官方網站。
初始化數據
數據抽取通常是按需進行的,在新增一張表時,數據源中可能已經有大量原始記錄了。常見的做法是手工將這批數據全量導入至目標表中,但我們也可以復用 Canal 這套機制來實現歷史數據的抽取。
首先,我們在數據源庫中創建一張輔助表:
CREATE TABLE `retl_buffer` (id BIGINT AUTO_INCREMENT PRIMARY KEY,table_name VARCHAR(255),pk_value VARCHAR(255) );- 1
- 2
- 3
- 4
- 5
當需要全量抽取?user?表時,我們執行以下語句,將所有?user.id?寫入輔助表中:
INSERT INTO `retl_buffer` (`table_name`, `pk_value`) SELECT 'user', `id` FROM `user`;- 1
- 2
Canal 客戶端在處理到?retl_buffer?表的數據變更時,可以從中解析出表名和主鍵的值,直接反查數據源,將數據寫入目標表:
if ("retl_buffer".equals(entry.getHeader().getTableName())) {String tableName = rowData.getAfterColumns(1).getValue();String pkValue = rowData.getAfterColumns(2).getValue();System.out.println("SELECT * FROM " + tableName + " WHERE id = " + pkValue); }- 1
- 2
- 3
- 4
- 5
這一方法在阿里巴巴的另一個開源軟件?Otter?中使用。
Canal 高可用
- Canal 服務端中的實例可以配置一個備用 MySQL,從而能夠在雙 Master 場景下自動選擇正在工作的數據源。注意兩臺主庫都需要打開?log_slave_updates?選項。Canal 會使用自己的心跳機制(定期更新輔助表的記錄)來檢測主庫的存活。
- Canal 自身也有 HA 配置,配合 Zookeeper,我們可以開啟多個 Canal 服務端,當某臺服務器宕機時,客戶端可以從 ZK 中獲取新的服務端地址,繼續進行消費。更多信息可以參考?Canal AdminGuide。
參考資料
- https://github.com/alibaba/canal/wiki
- https://github.com/alibaba/otter/wiki
- https://www.phdata.io/4-strategies-for-updating-hive-tables/
- https://hortonworks.com/blog/four-step-strategy-incremental-updates-hive/
- https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions
總結
以上是生活随笔為你收集整理的使用 Binlog 和 Canal 从 MySQL 抽取数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谈谈对Canal( 增量数据订阅与消费
- 下一篇: 《设计模式》一书PPT浏览及下载地址