数仓实时数据同步 debezium
數(shù)倉實時數(shù)據(jù)同步 debezium
- 背景
- debezium 簡介
- 架構(gòu)
- 基本概念
- 例子
- Router
- 目前遇到的問題
背景
數(shù)據(jù)湖將源庫的數(shù)據(jù)同步到hive數(shù)倉ods層,或直接在kafka中用于后面計算。源庫包括mysql、postgresql、sqlserver、oracle,大部分是mysql數(shù)據(jù)庫。當(dāng)前采用的sqoop T+1全量或增量抽取的方式,時效性低,delete的數(shù)據(jù)可能無法被正確處理。
選擇debezium的原因:數(shù)據(jù)源支持眾多,使用的組件僅僅是kafka,需要進(jìn)行的開發(fā)少;debezium使用kafka-connect,而且kafka 2.3版本以后 增加或修改一個任務(wù)、整個kafka-connect集群都會rebalance的情況得到優(yōu)化;類似binlog的位點存儲在kafka中,不再需要引入額外的存儲也不需要關(guān)心位點;能保證at-least-once。
debezium 簡介
架構(gòu)
debezium 主要是一個kafka-connect的各種數(shù)據(jù)源同步的一種source實現(xiàn)。
數(shù)據(jù)存儲在kafka中
基本概念
kafka-connect 獨立于kafka的服務(wù),本項目中采用集群的部署方式,依賴kafka實現(xiàn)協(xié)調(diào)。
connector 針對一個連接實例,模仿從庫從主庫獲取實時binlog。 可支持mysql postgresql oracle sqlserver mongoDb 等多種數(shù)據(jù)源。connector運行在kafka-connect中
task 每個同步源數(shù)據(jù)的connector,只采用一個task(其他任務(wù)可以采用多可來保證高可用和提高并發(fā)),task是connector任務(wù)執(zhí)行的最小單位。運行在kafka-connect中,作為一個線程。
例子
同步mysql binlog,檢查源庫的binlog設(shè)置,
--是否開啟binlog SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; --是否開啟行模式 row SELECT variable_value as "binlog_format" FROM information_schema.global_variables WHERE variable_name='binlog_format'; --是否補全所有字段 full SELECT variable_value as "binlog_row_image" FROM information_schema.global_variables WHERE variable_name='binlog_row_image'; #新建一個mysql的同步任務(wù) #${connectIp} #${name} connector名稱 #${ip} mysql實例 #${port} #${user} #${password} #${serverId} 模擬從庫的servcer.id #${serverName} 此名稱會出現(xiàn)在topic中 ${serverName}.${schema}.${table} #${tableList} db1.table1,db2.table1,db2.table2 #${kafka} ip:9092,ip:9092 如果服務(wù)器較多隨機寫三個足夠 #${historyTopic} 存儲ddl的topic名稱,debezium內(nèi)部使用curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" ${connectIp}:8083/connectors/ \ -d '{ "name": "${name}", "config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"${ip}","database.port":"${port}","database.user":"${user}","database.password":"${password}","database.server.id":"${serverId}","database.server.name":"${serverName}","table.whitelist":"${tableList}","database.history.kafka.bootstrap.servers":"${kafka}","snapshot.mode":"schema_only","tombstones.on.delete":"false","database.history.kafka.topic":"${historyTopic}","database.history.skip.unparseable.ddl":"true","key.converter":"org.apache.kafka.connect.json.JsonConverter","value.converter":"org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable":"false","value.converter.schemas.enable":"false","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms"} }'主要參數(shù),還有很多其他的參數(shù)未列出
"name": "${name}","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "${ip}","database.port": "${port}","database.user": "${user}","database.password": "${password}","database.server.id": "${serverId}","database.server.name": "${serverName}","table.whitelist": "${tableList}","database.history.kafka.bootstrap.servers": "${kafka}","snapshot.mode": "schema_only", //選擇schema_only,從當(dāng)前最新的位點開始同步,不需要冷數(shù)據(jù),冷數(shù)據(jù)用其他方式抽取"tombstones.on.delete": "false","database.history.kafka.topic": "${historyTopic}","database.history.skip.unparseable.ddl": "true","key.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", //簡化數(shù)據(jù)結(jié)構(gòu)"transforms.unwrap.drop.tombstones": "false","transforms.unwrap.delete.handling.mode": "rewrite","transforms.unwrap.add.fields": "source.ts_ms" //增加時間戳字段用于} }kafka中的數(shù)據(jù)
{"id":1004,"first_name":"10","last_name":"1","email":"1","__source_ts_ms":1596450910000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"11","last_name":"1","email":"1","__source_ts_ms":1596451124000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"101","last_name":"1","email":"1","__source_ts_ms":1596606837000,"__deleted":"false"} //key是 {"id":1004} {"id":1004,"first_name":"102","last_name":"1","email":"1","__source_ts_ms":1596606992000,"__deleted":"false"} //key是 {"id":1004}__source_ts_ms 和__deleted是配置產(chǎn)生的字段
Router
分表的合并,將分表數(shù)據(jù)寫到一個topic
"transforms":"Reroute,unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones":"false", "transforms.unwrap.delete.handling.mode":"rewrite","transforms.unwrap.add.fields":"source.ts_ms", "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex": "dbserver1\\..*", "transforms.Reroute.topic.replacement": "dbserver1.all"結(jié)果
key的值{"id":1004,"__dbz__physicalTableIdentifier":"dbserver1.inventory.customers"}===============value的值{"id":1004,"first_name":"204","last_name":"1","email":"1","__source_ts_ms":1596773158000,"__deleted":"false"}__dbz__physicalTableIdentifier是自動增加的一個key字段來區(qū)別表,字段名稱可以改,也可以從topic名稱中匹配獲取
目前遇到的問題
- 需要同步的表比較多,kafka topic多,對性能影響比較大。
- kakfa 新版本kakfa-manager等基本都不能很好支持,需要自己開發(fā)來監(jiān)控和管理kafka,connect集群也要自己開發(fā)監(jiān)控。
- 數(shù)據(jù)同步后如何到hive數(shù)倉更好,hbase kudu hudi 或者直接hdfs。
總結(jié)
以上是生活随笔為你收集整理的数仓实时数据同步 debezium的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux性能检查命令总结
- 下一篇: dos批处理脚本自动添加网络IP打印机-