基于Apache Hudi和Debezium构建CDC入湖管道
從 Hudi v0.10.0 開始,我們很高興地宣布推出適用于 Deltastreamer 的 Debezium 源,它提供從 Postgres 和 MySQL 數據庫到數據湖的變更捕獲數據 (CDC) 的攝取。有關詳細信息請參閱原始 RFC
1. 背景
當想要對來自事務數據庫(如 Postgres 或 MySQL)的數據執行分析時,通常需要通過稱為更改數據捕獲 CDC的過程將此數據引入數據倉庫或數據湖等 OLAP 系統。 Debezium 是一種流行的工具,它使 CDC 變得簡單,其提供了一種通過讀取更改日志來捕獲數據庫中行級更改的方法,通過這種方式 Debezium 可以避免增加數據庫上的 CPU 負載,并確保捕獲包括刪除在內的所有變更。
現在 Apache Hudi 提供了 Debezium 源連接器,CDC 引入數據湖比以往任何時候都更容易,因為它具有一些獨特的差異化功能。 Hudi 可在數據湖上實現高效的更新、合并和刪除事務。 Hudi 獨特地提供了 Merge-On-Read 寫入器,與使用 Spark 或 Flink 的典型數據湖寫入器相比,該寫入器可以顯著降低攝取延遲。 最后,Apache Hudi 提供增量查詢,因此在從數據庫中捕獲更改后可以在所有后續 ETL 管道中以增量方式處理這些更改下游。
2. 總體設計
上面顯示了使用 Apache Hudi 的端到端 CDC 攝取流的架構,第一個組件是 Debezium 部署,它由 Kafka 集群、schema registry(Confluent 或 Apicurio)和 Debezium 連接器組成,Debezium 連接器不斷輪詢數據庫中的更改日志,并將每個數據庫行的更改寫入 AVRO 消息到每個表的專用 Kafka 主題。
第二個組件是 Hudi Deltastreamer,它為每個表從 Kafka 讀取和處理傳入的 Debezium 記錄,并在云存儲上的 Hudi 表中寫入(更新)相應的行。
為了近乎實時地將數據庫表中的數據提取到 Hudi 表中,我們實現了兩個可插拔的 Deltastreamer 類。首先我們實現了一個 Debezium 源。 Deltastreamer 在連續模式下運行,源源不斷地從給定表的 Kafka 主題中讀取和處理 Avro 格式的 Debezium 更改記錄,并將更新的記錄寫入目標 Hudi 表。 除了數據庫表中的列之外,我們還攝取了一些由 Debezium 添加到目標 Hudi 表中的元字段,元字段幫助我們正確地合并更新和刪除記錄,使用Schema Registry表中的最新模式讀取記錄。
其次我們實現了一個自定義的 Debezium Payload,它控制了在更新或刪除同一行時如何合并 Hudi 記錄,當接收到現有行的新 Hudi 記錄時,有效負載使用相應列的較高值(MySQL 中的 FILEID 和 POS 字段以及 Postgres 中的 LSN 字段)選擇最新記錄,在后一個事件是刪除記錄的情況下,有效負載實現確保從存儲中硬刪除記錄。 刪除記錄使用 op 字段標識,該字段的值 d 表示刪除。
3. Apache Hudi配置
在使用 Debezium 源連接器進行 CDC 攝取時,請務必考慮以下 Hudi 部署配置。
- 記錄鍵 - 表的 Hudi 記錄鍵應設置為上游數據庫中表的主鍵。這可確保正確應用更新,因為記錄鍵唯一地標識 Hudi 表中的一行。
- 源排序字段 - 對于更改日志記錄的重復數據刪除,源排序字段應設置為數據庫上發生的更改事件的實際位置。 例如我們分別使用 MySQL 中的 FILEID 和 POS 字段以及 Postgres 數據庫中的 LSN 字段來確保記錄在原始數據庫中以正確的出現順序進行處理。
- 分區字段 - 不要將 Hudi 表的分區與與上游數據庫相同的分區字段相匹配。當然也可以根據需要為 Hudi 表單獨設置分區字段。
3.1 引導現有表
一個重要的用例可能是必須對現有數據庫表進行 CDC 攝取。在流式傳輸更改之前我們可以通過兩種方式獲取現有數據庫數據:
- 默認情況下,Debezium 在初始化時執行數據庫的初始一致快照(由 config snapshot.mode 控制)。在初始快照之后它會繼續從正確的位置流式傳輸更新以避免數據丟失。
- 雖然第一種方法很簡單,但對于大型表,Debezium 引導初始快照可能需要很長時間。或者我們可以運行 Deltastreamer 作業,使用 JDBC 源直接從數據庫引導表,這為用戶定義和執行引導數據庫表所需的更優化的 SQL 查詢提供了更大的靈活性。引導作業成功完成后,將執行另一個 Deltastreamer 作業,處理來自 Debezium 的數據庫更改日志,用戶必須在 Deltastreamer 中使用檢查點來確保第二個作業從正確的位置開始處理變更日志,以避免數據丟失。
3.2 例子
以下描述了使用 AWS RDS 實例 Postgres、基于 Kubernetes 的 Debezium 部署和在 Spark 集群上運行的 Hudi Deltastreamer 實施端到端 CDC 管道的步驟。
3.3 數據庫
RDS 實例需要進行一些配置更改才能啟用邏輯復制。
SET rds.logical_replication to 1 (instead of 0)
psql --host=<aws_rds_instance> --port=5432 --username=postgres --password -d <database_name>;
CREATE PUBLICATION <publication_name> FOR TABLE schema1.table1, schema1.table2;
ALTER TABLE schema1.table1 REPLICA IDENTITY FULL;
3.4 Debezium 連接器
Strimzi 是在 Kubernetes 集群上部署和管理 Kafka 連接器的推薦選項,或者可以選擇使用 Confluent 托管的 Debezium 連接器。
kubectl create namespace kafka
kubectl create -f https://strimzi.io/install/latest?namespace=kafka -n kafka
kubectl -n kafka apply -f kafka-connector.yaml
kafka-connector.yaml 的示例如下所示:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-kafka-connect
annotations:
strimzi.io/use-connector-resources: "false"
spec:
image: debezium-kafka-connect:latest
replicas: 1
bootstrapServers: localhost:9092
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
可以使用以下包含 Postgres Debezium 連接器的 Dockerfile 構建 docker 映像 debezium-kafka-connect
FROM confluentinc/cp-kafka-connect:6.2.0 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.0
FROM strimzi/kafka:0.18.0-kafka-2.5.0
USER root:root
RUN yum -y update
RUN yum -y install git
RUN yum -y install wget
RUN wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.1.Final/debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN tar xzf debezium-connector-postgres-1.6.1.Final-plugin.tar.gz
RUN mkdir -p /opt/kafka/plugins/debezium && mkdir -p /opt/kafka/plugins/avro/
RUN mv debezium-connector-postgres /opt/kafka/plugins/debezium/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/
USER 1001
一旦部署了 Strimzi 運算符和 Kafka 連接器,我們就可以啟動 Debezium 連接器。
curl -X POST -H "Content-Type:application/json" -d @connect-source.json http://localhost:8083/connectors/
以下是設置 Debezium 連接器以生成兩個表 table1 和 table2 的更改日志的配置示例。
connect-source.json 的內容如下
{
"name": "postgres-debezium-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "database",
"plugin.name": "pgoutput",
"database.server.name": "postgres",
"table.include.list": "schema1.table1,schema1.table2",
"publication.autocreate.mode": "filtered",
"tombstones.on.delete":"false",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "<schema_registry_host>",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<schema_registry_host>",
"slot.name": "pgslot"
}
}
3.5 Hudi Deltastreamer
接下來我們使用 Spark 運行 Hudi Deltastreamer,它將從 kafka 攝取 Debezium 變更日志并將它們寫入 Hudi 表。 下面顯示了一個這樣的命令實例,它適用于 Postgres 數據庫。 幾個關鍵配置如下:
- 將源類設置為 PostgresDebeziumSource。
- 將有效負載類設置為 PostgresDebeziumAvroPayload。
- 為 Debezium Source 和 Kafka Source 配置模式注冊表 URL。
- 將記錄鍵設置為數據庫表的主鍵。
- 將源排序字段 (dedup) 設置為 _event_lsn
spark-submit \\
--jars "/home/hadoop/hudi-utilities-bundle_2.12-0.10.0.jar,/usr/lib/spark/external/lib/spark-avro.jar" \\
--master yarn --deploy-mode client \\
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /home/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.0-SNAPSHOT.jar \\
--table-type COPY_ON_WRITE --op UPSERT \\
--target-base-path s3://bucket_name/path/for/hudi_table1 \\
--target-table hudi_table1 --continuous \\
--min-sync-interval-seconds 60 \\
--source-class org.apache.hudi.utilities.sources.debezium.PostgresDebeziumSource \\
--source-ordering-field _event_lsn \\
--payload-class org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload \\
--hoodie-conf schema.registry.url=https://localhost:8081 \\
--hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://localhost:8081/subjects/postgres.schema1.table1-value/versions/latest \\
--hoodie-conf hoodie.deltastreamer.source.kafka.value.deserializer.class=io.confluent.kafka.serializers.KafkaAvroDeserializer \\
--hoodie-conf hoodie.deltastreamer.source.kafka.topic=postgres.schema1.table1 \\
--hoodie-conf auto.offset.reset=earliest \\
--hoodie-conf hoodie.datasource.write.recordkey.field=”database_primary_key” \\
--hoodie-conf hoodie.datasource.write.partitionpath.field=partition_key \\
--enable-hive-sync \\
--hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor \\
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \\
--hoodie-conf hoodie.datasource.hive_sync.database=default \\
--hoodie-conf hoodie.datasource.hive_sync.table=hudi_table1 \\
--hoodie-conf hoodie.datasource.hive_sync.partition_fields=partition_key
4. 總結
這篇文章介紹了用于 Hudi Deltastreamer 的 Debezium 源,以將 Debezium 更改日志提取到 Hudi 表中。 現在可以將數據庫數據提取到數據湖中,以提供一種經濟高效的方式來存儲和分析數據庫數據。
請關注此 JIRA 以了解有關此新功能的更多信息。
總結
以上是生活随笔為你收集整理的基于Apache Hudi和Debezium构建CDC入湖管道的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【五一qbxt】test1
- 下一篇: 关于C++拷贝控制