SpringBoot 整合cana 实现数据同步
微服務(wù)多數(shù)據(jù)庫情況下可以使用canal替代觸發(fā)器,canal是應(yīng)阿里巴巴跨機(jī)房同步的業(yè)務(wù)需求而提出的,canal基于數(shù)據(jù)庫的日志解析,獲取變更進(jìn)行增量訂閱&消費(fèi)的業(yè)務(wù)。無論是canal實(shí)驗(yàn)需要還是為了增量備份、主從復(fù)制和恢復(fù),都是需要開啟mysql-binlog日志,數(shù)據(jù)目錄設(shè)置到不同的磁盤分區(qū)可以降低io等待。
官網(wǎng):https://github.com/alibaba/canal
canal 工作原理
canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協(xié)議
MySQL master 收到 dump 請(qǐng)求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對(duì)象(原始為 byte 流)
canal 搭建
搭建mysql環(huán)境
1,修改配置文件
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復(fù);
重啟MySQL服務(wù)后,確認(rèn)是否開啟了binlog(注意一點(diǎn)是MySQL8.x默認(rèn)開啟binlog)SHOW VARIABLES LIKE '%bin%';:
2,授權(quán) canal 鏈接 MySQL 賬號(hào)具有作為 MySQL slave 的權(quán)限, 如果已有賬戶可直接 grant(省略第三步)
CREATE USER canal IDENTIFIED BY 'root'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ; FLUSH PRIVILEGES;
3,新建一個(gè)用戶名canal密碼為QWqw12!@的新用戶,賦予REPLICATION SLAVE和REPLICATION CLIENT權(quán)限:
CREATE USER canal IDENTIFIED BY '123456!@'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES; ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456!@';
搭建canal環(huán)境
下載Linux最新穩(wěn)定版(canal.deployer-1.1.4.tar.gz):https://github.com/alibaba/canal/releases
解壓后修改/canal/conf/example下的instance.properties配置文件:
canal.instance.master.address,數(shù)據(jù)庫地址,這里指定為127.0.0.1:3306。 canal.instance.dbUsername,監(jiān)聽的數(shù)據(jù)庫用戶名。 canal.instance.dbPassword,監(jiān)聽的數(shù)據(jù)庫密碼。 新增canal.instance.defaultDatabaseName,默認(rèn)那個(gè)庫,這里指定為test(需要在MySQL中建立一個(gè)test庫)
啟動(dòng)
sh /canal/bin/startup.sh # 查看服務(wù)日志 tail -100f /canal/logs/canal/canal # 查看實(shí)例日志 -- 一般情況下,關(guān)注實(shí)例日志即可 tail -100f /canal/logs/example/example.log
到目前為止 canal的服務(wù)端我們已經(jīng)搭建好了 但是到目前 我們只是把數(shù)據(jù)庫的binlog 拉到canal中,我們還得編寫客戶端消費(fèi)數(shù)據(jù)
properties配置文件
properties配置分為兩部分:
canal.properties (系統(tǒng)根配置文件)
instance.properties (instance級(jí)別的配置文件,每個(gè)instance一份)
instance列表定義 (列出當(dāng)前server上有多少個(gè)instance,每個(gè)instance的加載方式是spring/manager等)
common參數(shù)定義,比如可以將instance.properties的公用參數(shù),抽取放置到這里,這樣每個(gè)instance啟動(dòng)的時(shí)候就可以共享. 【instance.properties配置定義優(yōu)先級(jí)高于canal.properties】
instance.properties介紹:
a. 在canal.properties定義了canal.destinations后,需要在canal.conf.dir對(duì)應(yīng)的目錄下建立同名的文件
比如:
canal.destinations = example1,example2 #spring客戶端注意指定的不同名字
這時(shí)需要?jiǎng)?chuàng)建example1和example2兩個(gè)目錄,每個(gè)目錄里各自有一份instance.properties.
兩種方式,官方提供的demo和springboot starter
1,官方提供的
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
package com.example.demo.test;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalTest {
public static void main(String[] args) throws Exception {
//canal.ip = 192.168.56.104
//canal.port = 11111
//canal.destinations = example
//canal.user =
//canal.passwd =
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.104", 11111), "example", "", "");
try {
connector.connect();
//監(jiān)聽的表, 格式為數(shù)據(jù)庫.表名,數(shù)據(jù)庫.表名
connector.subscribe(".*\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 獲取指定數(shù)量的數(shù)據(jù)
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交確認(rèn),消費(fèi)成功,通知server刪除數(shù)據(jù)
// connector.rollback(batchId);// 處理失敗, 回滾數(shù)據(jù),后續(xù)重新獲取數(shù)據(jù)
}
}catch (Exception e){
}finally {
connector.disconnect();
}
}
private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
private static void printColumns(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
操作數(shù)據(jù)庫增刪改,控制臺(tái)則會(huì)打印
參考:https://github.com/gxmanito/canal-client
https://gitee.com/zhiqishao/canal-client/tree/master
2,springboot starter
https://github.com/NormanGyllenhaal/canal-client
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
package com.example.demo.test;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
@Slf4j
@Component
@CanalTable(value = "test")
public class UserHandler implements EntryHandler<Test> {
@Override
public void insert(Test user) {
log.info("insert message {}", user);
}
@Override
public void update(Test before, Test after) {
log.info("update before {} ", before);
log.info("update after {}", after);
}
@Override
public void delete(Test user) {
log.info("delete {}", user);
}
}
package com.example.demo.test;
import lombok.Data;
import java.io.Serializable;
/**
* @Description //TODO
* @Author GaoX
* @Date 2020/6/28 14:44
*/
@Data
//@Table(name = "test")
public class Test implements Serializable {
private Integer id;
private String name;
}
總結(jié)
以上是生活随笔為你收集整理的SpringBoot 整合cana 实现数据同步的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 华夏信用卡app手动还款
- 下一篇: 在lean trace mode下运行f