Kafka实现MySQL增量同步
目標
本文是對[1]的復現和整理
?
環境
| 組件 | 版本 |
| Zookeeper | 3.6.0 |
| Kafka | 2.5.0 |
| Mysql | 8.0.21-0ubuntu0.20.04.4 |
?
準備工作
分別新建兩個數據庫A和B,然后各自新建一個表格
mysql> create database A;
Query OK, 1 row affected (0.12 sec)
mysql> create database B;
Query OK, 1 row affected (0.08 sec)
mysql> use A;
Database changed
mysql> CREATE TABLE `person` (
? ? -> ? `pid` int(11) NOT NULL AUTO_INCREMENT,
? ? -> ? `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
? ? -> ? `age` int(11) DEFAULT NULL,
? ? -> ? PRIMARY KEY (`pid`)
? ? -> ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Query OK, 0 rows affected, 3 warnings (0.82 sec)
mysql> use B;
Database changed
mysql> CREATE TABLE `kafkaperson` (
? ? -> ? `pid` int(11) NOT NULL AUTO_INCREMENT,
? ? -> ? `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
? ? -> ? `age` int(11) DEFAULT NULL,
? ? -> ? PRIMARY KEY (`pid`)
? ? -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
Query OK, 0 rows affected, 5 warnings (0.49 sec)
?
集群啟動
啟動Hadoop,Zookeeper與Kafka
測試
?
①生產者:
$KAFKA/bin/kafka-topics.sh --create --zookeeper Desktop:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-person
②消費者
?
③往A表插入條數據
mysql> INSERT INTO person (pid,firstname,age) VALUES ( 1, 'zs',66);
Query OK, 1 row affected (0.07 sec)
mysql> select * from person;
+-----+-----------+------+
| pid | firstname | age ?|
+-----+-----------+------+
| ? 1 | zs ? ? ? ?| ? 66 |
+-----+-----------+------+
1 row in set (0.00 sec)
?
④mysql> use B;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables;
+-------------+
| Tables_in_B |
+-------------+
| kafkaperson |
+-------------+
1 row in set (0.00 sec)
mysql> select * from kafkaperson
? ? -> ;
+-----+-----------+------+
| pid | firstname | age ?|
+-----+-----------+------+
| ? 1 | zs ? ? ? ?| ? 66 |
+-----+-----------+------+
1 row in set (0.00 sec)
?
可以看到mysql 表A的數據通過kafka順利傳達到了表B,而在我們的kafka終端也會看到相關信息:
?
附錄
?
quickstart-mysql.properties
name=mysql-a-source-person connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://Desktop:3306/A?user=appleyuchi&password=appleyuchi # incrementing 自增 mode=incrementing # 自增字段 pid incrementing.column.name=pid # 白名單表 person table.whitelist=person # topic前綴 mysql-kafka- topic.prefix=mysql-kafka-?
quickstart-mysql-sink.properties
name=mysql-a-sink-person connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka的topic名稱 topics=mysql-kafka-person # 配置JDBC鏈接 connection.url=jdbc:mysql://Desktop:3306/B?user=appleyuchi&password=appleyuchi # 不自動創建表,如果為true,會自動創建表,表名為topic名稱 auto.create=false # upsert model更新和插入 insert.mode=upsert # 下面兩個參數配置了以pid為主鍵更新 pk.mode = record_value pk.fields = pid #表名為kafkatable table.name.format=kafkaperson?
Reference:
[1]Kafka Connect 實現MySQL增量同步
[2]Kafka connect快速構建數據ETL通道
[3]Kafka Connect 日志配置
?
總結
以上是生活随笔為你收集整理的Kafka实现MySQL增量同步的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 打印Excel报表有哪些打印机巧? Ex
- 下一篇: flink二阶提交(没有搞完)