4 Debezium抽取部署
本文目標
debezium,簡稱dbz,偽裝為MySQL從庫,當主庫發生變化后,主庫會主動將變化的信息同步到dbz內,dbz將收到的信息轉為JSON推送到Kafka內。
安裝JDK11
yum -y install java-11-openjdk-devel解壓部署
tar xfz debezium-server-dist-2.0.0.Final.tar.gz修改配置文件
application.properties
[yinyx@localhost conf]$ cat application.properties quarkus.http.port=8999 rkus.log.level=INFO quarkus.log.console.json=falsedebezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0debezium.source.database.hostname=127.0.0.1 debezium.source.database.port=6306 debezium.source.database.user=test debezium.source.database.password=test debezium.source.database.server.id=2 debezium.source.database.include.list=testdebezium.source.topic.prefix=yyx debezium.source.key.converter.schemas.enable=false debezium.source.value.converter.schemas.enable=false debezium.source.schema.history.internal.kafka.bootstrap.servers=127.0.0.1:9092 debezium.source.schema.history.internal.kafka.topic=schemahistorydebezium.source.decimal.handling.mode=string debezium.source.lob.enabled=true debezium.source.database.history.skip.unparseable.ddl=true debezium.source.tombstones.on.delete=falsedebezium.sink.type=kafka debezium.sink.kafka.producer.bootstrap.servers=127.0.0.1:9092 debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializerdebezium.format.key.schemas.enable=false debezium.format.value.schemas.enable=false[yinyx@localhost conf]$啟動
./run.sh注意先啟動kafka
測試
檢查topic是否已經創建
[yinyx@localhost bin]$ ./kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092 __consumer_offsets schemahistory yinyx yyx yyx.test.t1啟動kafka的消費
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic yyx.test.t1
到MySQL更新t1表的數據
insert update delete 隨便整
查看kafka消費,應出現類似如下信息
{“before”:{“f1”:3,“f2”:“cc|33”,“f3”:1670056305000},“after”:{“f1”:3,“f2”:“cc|333”,“f3”:1670056305000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670030109000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:25”,“file”:“mysql-bin.000002”,“pos”:7907,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525419,“transaction”:null}
{“before”:{“f1”:2,“f2”:“bb|222”,“f3”:1670002422000},“after”:{“f1”:2,“f2”:“bb|222haha”,“f3”:1670002422000},“source”:{“version”:“2.0.0.Final”,“connector”:“mysql”,“name”:“yyx”,“ts_ms”:1670031487000,“snapshot”:“false”,“db”:“test”,“sequence”:null,“table”:“t1”,“server_id”:1,“gtid”:“7bdc8394-71cf-11ed-b2d5-000c293c9462:27”,“file”:“mysql-bin.000002”,“pos”:8561,“row”:0,“thread”:39,“query”:null},“op”:“u”,“ts_ms”:1670031525422,“transaction”:null}
總結
至此,MySQL的變化,會實時反應到Kafka的JSON數據里面,后續自己開發程序從Kafka接收處理即可。
總結
以上是生活随笔為你收集整理的4 Debezium抽取部署的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021.12.26 第一章. 计算机组
- 下一篇: BUUCTF:[INSHack2018]