cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案
作者:伍翀 (云邪)
整理:陳政羽(Flink 社區(qū)志愿者)
Flink 1.11 引入了 Flink SQL CDC,CDC 能給我們數(shù)據(jù)和業(yè)務間能帶來什么變化?本文由 Apache Flink PMC,阿里巴巴技術(shù)專家伍翀 (云邪)分享,內(nèi)容將從傳統(tǒng)的數(shù)據(jù)同步方案,基于 Flink CDC 同步的解決方案以及更多的應用場景和 CDC 未來開發(fā)規(guī)劃等方面進行介紹和演示。
1、傳統(tǒng)數(shù)據(jù)同步方案
2、基于 Flink SQL CDC 的數(shù)據(jù)同步方案(Demo)
3、Flink SQL CDC 的更多應用場景
4、Flink SQL CDC 的未來規(guī)劃
直播回顧:
https://www.bilibili.com/video/BV1zt4y1D7kt/
傳統(tǒng)的數(shù)據(jù)同步方案與 Flink SQL CDC 解決方案
業(yè)務系統(tǒng)經(jīng)常會遇到需要更新數(shù)據(jù)到多個存儲的需求。例如:一個訂單系統(tǒng)剛剛開始只需要寫入數(shù)據(jù)庫即可完成業(yè)務使用。某天 BI 團隊期望對數(shù)據(jù)庫做全文索引,于是我們同時要寫多一份數(shù)據(jù)到 ES 中,改造后一段時間,又有需求需要寫入到 Redis 緩存中。
很明顯這種模式是不可持續(xù)發(fā)展的,這種雙寫到各個數(shù)據(jù)存儲系統(tǒng)中可能導致不可維護和擴展,數(shù)據(jù)一致性問題等,需要引入分布式事務,成本和復雜度也隨之增加。我們可以通過 CDC(Change Data Capture)工具進行解除耦合,同步到下游需要同步的存儲系統(tǒng)。通過這種方式提高系統(tǒng)的穩(wěn)健性,也方便后續(xù)的維護。
Flink SQL CDC 數(shù)據(jù)同步與原理解析
CDC 全稱是 Change Data Capture ,它是一個比較廣義的概念,只要能捕獲變更的數(shù)據(jù),我們都可以稱為 CDC 。業(yè)界主要有基于查詢的 CDC 和基于日志的 CDC ,可以從下面表格對比他們功能和差異點。
經(jīng)過以上對比,我們可以發(fā)現(xiàn)基于日志 CDC 有以下這幾種優(yōu)勢:
· 能夠捕獲所有數(shù)據(jù)的變化,捕獲完整的變更記錄。在異地容災,數(shù)據(jù)備份等場景中得到廣泛應用,如果是基于查詢的 CDC 有可能導致兩次查詢的中間一部分數(shù)據(jù)丟失
· 每次 DML 操作均有記錄無需像查詢 CDC 這樣發(fā)起全表掃描進行過濾,擁有更高的效率和性能,具有低延遲,不增加數(shù)據(jù)庫負載的優(yōu)勢
· 無需入侵業(yè)務,業(yè)務解耦,無需更改業(yè)務模型
· 捕獲刪除事件和捕獲舊記錄的狀態(tài),在查詢 CDC 中,周期的查詢無法感知中間數(shù)據(jù)是否刪除
基于日志的 CDC 方案介紹
從 ETL 的角度進行分析,一般采集的都是業(yè)務庫數(shù)據(jù),這里使用 MySQL 作為需要采集的數(shù)據(jù)庫,通過 Debezium 把 MySQL Binlog 進行采集后發(fā)送至 Kafka 消息隊列,然后對接一些實時計算引擎或者 APP 進行消費后把數(shù)據(jù)傳輸入 OLAP 系統(tǒng)或者其他存儲介質(zhì)。
Flink 希望打通更多數(shù)據(jù)源,發(fā)揮完整的計算能力。我們生產(chǎn)中主要來源于業(yè)務日志和數(shù)據(jù)庫日志,Flink 在業(yè)務日志的支持上已經(jīng)非常完善,但是在數(shù)據(jù)庫日志支持方面在 Flink 1.11 前還屬于一片空白,這就是為什么要集成 CDC 的原因之一。
Flink SQL 內(nèi)部支持了完整的 changelog 機制,所以 Flink 對接 CDC 數(shù)據(jù)只需要把CDC 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的數(shù)據(jù),所以在 Flink 1.11 里面重構(gòu)了 TableSource 接口,以便更好支持和集成 CDC。
重構(gòu)后的 TableSource 輸出的都是 RowData 數(shù)據(jù)結(jié)構(gòu),代表了一行的數(shù)據(jù)。在RowData 上面會有一個元數(shù)據(jù)的信息,我們稱為 RowKind 。RowKind 里面包括了插入、更新前、更新后、刪除,這樣和數(shù)據(jù)庫里面的 binlog 概念十分類似。通過 Debezium 采集的 JSON 格式,包含了舊數(shù)據(jù)和新數(shù)據(jù)行以及原數(shù)據(jù)信息,op 的 u表示是 update 更新操作標識符,ts_ms 表示同步的時間戳。因此,對接 Debezium JSON 的數(shù)據(jù),其實就是將這種原始的 JSON 數(shù)據(jù)轉(zhuǎn)換成 Flink 認識的 RowData。
選擇 Flink 作為 ETL 工具
當選擇 Flink 作為 ETL 工具時,在數(shù)據(jù)同步場景,如下圖同步結(jié)構(gòu):
通過 Debezium 訂閱業(yè)務庫 MySQL 的 Binlog 傳輸至 Kafka ,Flink 通過創(chuàng)建 Kafka 表指定 format 格式為 debezium-json ,然后通過 Flink 進行計算后或者直接插入到其他外部數(shù)據(jù)存儲系統(tǒng),例如圖中的 Elasticsearch 和 PostgreSQL。
但是這個架構(gòu)有個缺點,我們可以看到采集端組件過多導致維護繁雜,這時候就會想是否可以用 Flink SQL 直接對接 MySQL 的 binlog 數(shù)據(jù)呢,有沒可以替代的方案呢?
答案是有的!經(jīng)過改進后結(jié)構(gòu)如下圖:
社區(qū)開發(fā)了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等數(shù)據(jù)庫直接讀取全量數(shù)據(jù)和增量變更數(shù)據(jù)的 source 組件。目前也已開源,開源地址:
flink-cdc-connectors 可以用來替換 Debezium+Kafka 的數(shù)據(jù)采集模塊,從而實現(xiàn) Flink SQL 采集+計算+傳輸(ETL)一體化,這樣做的優(yōu)點有以下:
· 開箱即用,簡單易上手
· 減少維護的組件,簡化實時鏈路,減輕部署成本
· 減小端到端延遲
· Flink 自身支持 Exactly Once 的讀取和計算
· 數(shù)據(jù)不落地,減少存儲成本
· 支持全量和增量流式讀取
· binlog 采集位點可回溯*
基于 Flink SQL CDC 的數(shù)據(jù)同步方案實踐
下面給大家?guī)?3 個關(guān)于 Flink SQL + CDC 在實際場景中使用較多的案例。在完成實驗時候,你需要 Docker、MySQL、Elasticsearch 等組件,具體請參考每個案例參考文檔。
案例 1 : Flink SQL CDC + JDBC Connector
這個案例通過訂閱我們訂單表(事實表)數(shù)據(jù),通過 Debezium 將 MySQL Binlog 發(fā)送至 Kafka,通過維表 Join 和 ETL 操作把結(jié)果輸出至下游的 PG 數(shù)據(jù)庫。具體可以參考 Flink 公眾號文章:《Flink JDBC Connector:Flink 與數(shù)據(jù)庫集成最佳實踐》案例進行實踐操作。
案例 2 : CDC Streaming ETL
模擬電商公司的訂單表和物流表,需要對訂單數(shù)據(jù)進行統(tǒng)計分析,對于不同的信息需要進行關(guān)聯(lián)后續(xù)形成訂單的大寬表后,交給下游的業(yè)務方使用 ES 做數(shù)據(jù)分析,這個案例演示了如何只依賴 Flink 不依賴其他組件,借助 Flink 強大的計算能力實時把 Binlog 的數(shù)據(jù)流關(guān)聯(lián)一次并同步至 ES 。
例如如下的這段 Flink SQL 代碼就能完成實時同步 MySQL 中 orders 表的全量+增量數(shù)據(jù)的目的。
CREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'mydb', 'table-name' = 'orders' ); SELECT * FROM orders
為了讓讀者更好地上手和理解,我們還提供了 docker-compose 的測試環(huán)境,更詳細的案例教程請參考下文的視頻鏈接和文檔鏈接。
案例 3 : Streaming Changes to Kafka
下面案例就是對 GMV 進行天級別的全站統(tǒng)計。包含插入/更新/刪除,只有付款的訂單才能計算進入 GMV ,觀察 GMV 值的變化。
Flink SQL CDC 的更多應用場景
Flink SQL CDC 不僅可以靈活地應用于實時數(shù)據(jù)同步場景中,還可以打通更多的場景提供給用戶選擇。
Flink 在數(shù)據(jù)同步場景中的靈活定位
· 如果你已經(jīng)有 Debezium/Canal + Kafka 的采集層 (E),可以使用 Flink 作為計算層 (T) 和傳輸層 (L)
· 也可以用 Flink 替代 Debezium/Canal ,由 Flink 直接同步變更數(shù)據(jù)到 Kafka,Flink 統(tǒng)一 ETL 流程
· 如果不需要 Kafka 數(shù)據(jù)緩存,可以由 Flink 直接同步變更數(shù)據(jù)到目的地,Flink 統(tǒng)一 ETL 流程
Flink SQL CDC : 打通更多場景
· 實時數(shù)據(jù)同步,數(shù)據(jù)備份,數(shù)據(jù)遷移,數(shù)倉構(gòu)建
優(yōu)勢:豐富的上下游(E & L),強大的計算(T),易用的 API(SQL),流式計算低延遲
· 數(shù)據(jù)庫之上的實時物化視圖、流式數(shù)據(jù)分析
· 索引構(gòu)建和實時維護
· 業(yè)務 cache 刷新
· 審計跟蹤
· 微服務的解耦,讀寫分離
· 基于 CDC 的維表關(guān)聯(lián)
下面介紹一下為何用 CDC 的維表關(guān)聯(lián)會比基于查詢的維表查詢快。
■ 基于查詢的維表關(guān)聯(lián)
目前維表查詢的方式主要是通過 Join 的方式,數(shù)據(jù)從消息隊列進來后通過向數(shù)據(jù)庫發(fā)起 IO 的請求,由數(shù)據(jù)庫把結(jié)果返回后合并再輸出到下游,但是這個過程無可避免的產(chǎn)生了 IO 和網(wǎng)絡通信的消耗,導致吞吐量無法進一步提升,就算使用一些緩存機制,但是因為緩存更新不及時可能會導致精確性也沒那么高。
■ 基于 CDC 的維表關(guān)聯(lián)
我們可以通過 CDC 把維表的數(shù)據(jù)導入到維表 Join 的狀態(tài)里面,在這個 State 里面因為它是一個分布式的 State ,里面保存了 Database 里面實時的數(shù)據(jù)庫維表鏡像,當消息隊列數(shù)據(jù)過來時候無需再次查詢遠程的數(shù)據(jù)庫了,直接查詢本地磁盤的 State ,避免了 IO 操作,實現(xiàn)了低延遲、高吞吐,更精準。
Tips:目前此功能在 1.12 版本的規(guī)劃中,具體進度請關(guān)注 FLIP-132 。
未來規(guī)劃
· FLIP-132 :Temporal Table DDL(基于 CDC 的維表關(guān)聯(lián))
· Upsert 數(shù)據(jù)輸出到 Kafka
· 更多的 CDC formats 支持(debezium-avro, OGG, Maxwell)
· 批模式支持處理 CDC 數(shù)據(jù)
· flink-cdc-connectors 支持更多數(shù)據(jù)庫
總結(jié)
本文通過對比傳統(tǒng)的數(shù)據(jù)同步方案與 Flink SQL CDC 方案分享了 Flink CDC 的優(yōu)勢,與此同時介紹了 CDC 分為日志型和查詢型各自的實現(xiàn)原理。后續(xù)案例也演示了關(guān)于 Debezium 訂閱 MySQL Binlog 的場景介紹,以及如何通過 flink-cdc-connectors 實現(xiàn)技術(shù)整合替代訂閱組件。除此之外,還詳細講解了 Flink CDC 在數(shù)據(jù)同步、物化視圖、多機房備份等的場景,并重點講解了社區(qū)未來規(guī)劃的基于 CDC 維表關(guān)聯(lián)對比傳統(tǒng)維表關(guān)聯(lián)的優(yōu)勢以及 CDC 組件工作。
希望通過這次分享,大家對 Flink SQL CDC 能有全新的認識和了解,在未來實際生產(chǎn)開發(fā)中,期望 Flink CDC 能帶來更多開發(fā)的便捷和更豐富的使用場景。
Q & A
1、GROUP BY 結(jié)果如何寫到 Kafka ?
因為 group by 的結(jié)果是一個更新的結(jié)果,目前無法寫入 append only 的消息隊列中里面去。更新的結(jié)果寫入 Kafka 中將在 1.12 版本中原生地支持。在 1.11 版本中,可以通過 flink-cdc-connectors 項目提供的 changelog-json format 來實現(xiàn)該功能,具體見文檔。
2、CDC 是否需要保證順序化消費?
是的,數(shù)據(jù)同步到 kafka ,首先需要 kafka 在分區(qū)中保證有序,同一個 key 的變更數(shù)據(jù)需要打入到同一個 kafka 的分區(qū)里面。這樣 flink 讀取的時候才能保證順序。
總結(jié)
以上是生活随笔為你收集整理的cdc工具 postgresql_基于 Flink SQL CDC 的实时数据同步方案的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 39.组合键(复合主键)
- 下一篇: Oem7F7 Win7激活 比SK PA