Debezium MySQL源连接器
Debezium MySQL源連接器
該Debezium的MySQL連接是源連接器,可以得到現(xiàn)有數(shù)據(jù)的快照,記錄所有的MySQL服務(wù)器/群集在數(shù)據(jù)庫中的行級變化。第一次連接到MySQL服務(wù)器時,它將讀取所有數(shù)據(jù)庫的一致快照。該快照完成后,連接器將連續(xù)讀取提交給MySQL的更改,并生成相應(yīng)的插入,更新和刪除事件。每個表的所有事件都記錄在單獨的Kafka主題中,應(yīng)用程序和服務(wù)可以輕松使用它們。
- Confluent支持MySQL連接器0.9.3版和更高版本。
- Confluent支持將此連接器與MySQL 5.6或更高版本一起使用。
安裝MySQL連接器
您可以使用Confluent Hub客戶端(推薦)安裝此連接器,也可以手動下載ZIP文件。
confluent-hub install debezium/debezium-connector-mysql:latest您可以通過替換latest版本號來安裝特定版本。例如:
confluent-hub install debezium/debezium-connector-mysql:0.9.4許可證
Debezium MySQL連接器是一個開源連接器,不需要Confluent企業(yè)許可證。
在MySQL服務(wù)器上啟用二進制日志
必須將MySQL服務(wù)器配置為使用行級二進制日志,有關(guān)詳細信息,請參見MySQL文檔。MySQL的二進制日志或二進制日志以數(shù)據(jù)庫執(zhí)行的相同順序記錄所有操作,包括對表模式的更改或?qū)Ρ碇写鎯Φ臄?shù)據(jù)的更改。MySQL使用其binlog進行復(fù)制和恢復(fù)。
Debezium的MySQL連接器讀取MySQL的二進制日志,以了解數(shù)據(jù)更改的內(nèi)容和順序。然后,它為binlog中的每個行級插入,更新和刪除操作生成一個change事件,并在單獨的Kafka主題中記錄每個表的所有change事件。
這通常是在MySQL服務(wù)器配置文件中完成的,看起來類似于以下片段:
server-id = 223344 log_bin = mysql-bin binlog_format = row binlog_row_image = full expire_logs_days = 10屬性解釋:
- 對于MySQL集群中的每個服務(wù)器和復(fù)制客戶端,server-id的值必須唯一。設(shè)置連接器時,還為連接器分配了唯一的服務(wù)器ID。
- log_bin的值是二進制日志文件序列的基本名稱。
- binlog_format的值必須設(shè)置為row或ROW。
- binlog_row_image的值必須設(shè)置為full或FULL。
- expire_log_days的值是自動二進制日志文件刪除的天數(shù)。默認值為0,表示“不自動刪除”,因此請確保設(shè)置適合您的環(huán)境的值。
mysql8下棄用expire_log_days,替換為:binlog_expire_logs_seconds,默認值為30天,建議配置如下:
# ---------------------------------------------- # 自定義參數(shù) # 設(shè)置日志清理周期為7天 binlog_expire_logs_seconds=604800 # 設(shè)置默認時區(qū) default-time-zone='+8:00' # ----------------------------------------------快速開始
Debezium的MySQL連接器是一個源連接器,可以在單獨的Kafka主題中記錄每個表的事件,應(yīng)用程序和服務(wù)可以輕松地使用它們。
安裝連接器
如果要使用Docker映像來設(shè)置Kafka,ZooKeeper和Kafka Connect,請參考Debezium教程。對于以下教程,您需要在本地安裝Confluent Platform。
導(dǎo)航到您的Confluent Platform安裝目錄,然后運行以下命令來安裝連接器:
confluent-hub install debezium/debezium-connector-mysql:0.9.4添加新的連接器插件需要重新啟動Connect。使用Confluent CLI重新啟動Connect。
提示
Confluent CLI開發(fā)命令的命令語法在5.3.0中已更改。這些命令已移至。例如,的語法為now 。有關(guān)更多信息,請參見confluent local。confluent local confluent start confluent local start
confluent local stop connect && confluent local start connect Using CONFLUENT_CURRENT: /Users/username/Sandbox/confluent-snapshots/var/confluent.NuZHxXfq Starting zookeeper zookeeper is [UP] Starting kafka kafka is [UP] Starting schema-registry schema-registry is [UP] Starting kafka-rest kafka-rest is [UP] Starting connect connect is [UP]檢查MySQL插件是否已正確安裝并由插件加載器加載:
curl -sS localhost:8083/connector-plugins | jq .[].class | grep mysql "io.debezium.connector.mysql.MySqlConnector"使用Docker設(shè)置MySQL(可選)
如果沒有本機安裝的MySQL,則可以使用以下命令來啟動一個新容器,該容器運行預(yù)先配置了清單數(shù)據(jù)庫的MySQL數(shù)據(jù)庫服務(wù)器:
#Run docker container docker run -it --rm --name mysql \ -p 3306:3306 \ -e MYSQL_ROOT_PASSWORD=debezium \ -e MYSQL_USER=mysqluser \ -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9啟動一個MySQL命令行客戶端。
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'啟動Debezium SQL Server連接器
創(chuàng)建文件register-mysql.json以存儲以下連接器配置:
{"name": "inventory-connector","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","tasks.max": "1","database.hostname": "mysql","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history.kafka.bootstrap.servers": "localhost:9092","database.history.kafka.topic": "schema-changes.inventory"}}啟動連接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json開始您的Kafka消費者
在新的終端會話中啟動使用者。
confluent local consume dbserver1.inventory.customers -- --from-beginning當您在MySQL bash中輸入SQL查詢以添加或修改數(shù)據(jù)庫中的記錄時,將填充消息,并在您的用戶終端上顯示這些消息,其中顯示已添加或已修改的記錄。
# Explore the sample inventory database already populated in your MySQL client running in Docker use inventory; SELECT * FROM customers; # Type these queries to see change events in the consumer terminal UPDATE customers SET first_name='Anne Marie' WHERE id=1004; DELETE FROM customers WHERE id=1004;清理資源
刪除連接器并停止Confluent服務(wù)。
curl -X DELETE localhost:8083/connectors/inventory-connector confluent local stop停止SQL Server容器。
docker stop mysqlterm mysql注意
此處提供的部分信息來自Debezium社區(qū)最初提供的文檔。Debezium生產(chǎn)的作品已獲得知識共享3.0的許可。
dbz-connecor-mysql與jdbc-connector-mysql對比
- 1、dbz-connector 支持數(shù)據(jù)插入、更新、刪除的CDC,但jdbc-connector 僅支持插入、更新;
- 2、jdbc-connect提供了mysql的源端、池端連接器,但是dbz僅提供為源端連接器,池端連接器,需要自行實現(xiàn)。
- 3、dbz-connector-mysql 與 jdbc-connect的消息格式不兼容;
原文
https://docs.confluent.io/current/connect/debezium-connect-mysql/index.html
總結(jié)
以上是生活随笔為你收集整理的Debezium MySQL源连接器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: unity2018转微信小游戏不显示图片
- 下一篇: c语言试题及答案解析,C语言期末考试题(