获取mongodb数据变更_支持mysql、MongoDB数据变更订阅/监听分发
1 概述
mysql、MongoDB數據變動監聽分發 本項目意在簡化監聽mysql、MongoDB數據庫的不同表的各種數據變動 項目依賴redis,mysql 使用場景:刷新緩存、異構系統...
2 使用方式
從bin-log-distributor-app到client數據分發方式的默認實現為redis,如果要開發基于其他的比如mq,只需要分別實現bin-log-distributor-pub下的DataPublisher接口與bin-log-distributor-client下的即可
2.1 服務端
服務端是項目中bin-log-distributor-app模塊,在mysql-binlog-connector-java基礎上提供了監聽mysql數據庫二進制日志并進行分發的功能
2.1.1 參考配置
# redis地址
spring.redisson.address=redis://192.168.1.204:6379
# mysql日志同步賬戶,
binaryLog.host = 192.168.1.204
binaryLog.port = 3306
binaryLog.username = aa
binaryLog.password = aa
binaryLog.serverId = 1
# 讀取列名,進行映射
spring.datasource.url = jdbc:mysql://${binaryLog.host}:${binaryLog.port}/mysql?autoReconnect=true&useUnicode=true&characterEncoding=utf-8
spring.datasource.username = ${binaryLog.username}
spring.datasource.password = ${binaryLog.password}
spring.datasource.driverClassName = com.mysql.jdbc.Driver
# rabbitmq連接相關信息(如果只有redis客戶端可以不需要使用)
spring.rabbit.host = 192.168.1.204
spring.rabbit.port = 5672
spring.rabbit.username = aa
spring.rabbit.password = aa
spring.rabbit.virtualHost = /binlog
2.1.2 啟動方式
編譯打包項目,直接通過(java -jar)啟動bin-log-distributor-app-${version}-SNAPSHOT.jar,可參考spring boot手冊
2.2 客戶端
2.2.1 redis客戶端
2.2.1.1 引入依賴包
cn.keking.project
bin-log-distributor-client-redis
${version}
2.2.1.2 添加客戶端配置
#自動注冊客戶端(2.1中服務端的地址)
databaseEventServerUrl=http://localhost:8885/client/addAll
#本應用命名
appName=lbt-service-ext-redis
# redis地址
spring.redisson.address=redis://192.168.1.204:6379
2.2.1.3 寫handler,實現 DatabaseEventHandler 接口,并加上注解 @HandleDatabaseEven
/**
* LockLevel為保持順序的級別,None為默認
* TABLE -> 同表按順序執行
* COLUMN -> 某列值一致的按順序執行
* NONE -> 無序
*/
@Service
@HandleDatabaseEvent(database = "Rana_G", table = "PTP_PROJ_REP_HIS", events = {DatabaseEvent.UPDATE_ROWS, DatabaseEvent.DELETE_ROWS},lockLevel = LockLevel.TABLE)
public class ProjRepHisDatabaseEventHandler implements DatabaseEventHandler {
private static final Logger logger = LoggerFactory.getLogger(ProjRepHisDatabaseEventHandler.class);
@Override
public void handle(EventBaseDTO eventBaseDTO) {
logger.info(JSON.toJSONString(eventBaseDTO));
// todo 在這里寫相關邏輯
}
}
2.2.1.4 啟動監聽,當2里的handler是由容器管理時需要通過registerHandler(projRepHisDatabaseEventHandler)手動注冊,如果不需要容器管理,可以直接通過autoScanHandler()自動掃描添加
@Component
public class DatabaseEventListener {
private static Logger logger = LoggerFactory.getLogger(DatabaseEventListener.class);
@Autowired
private RedissonClient redissonClient;
@Value("#{env.databaseEventServerUrl}")
private String serverUrl;
@Value("#{env.appName}")
private String appName;
//將第3步里的service注入
@Autowired
private ProjRepHisDatabaseEventHandler projRepHisDatabaseEventHandler;
@PostConstruct
public void start() {
//初始化訂閱的實現
DataSubscriber dataSubscriber = new DataSubscriberRedisImpl(redissonClient);
new BinLogDistributorClient(appName, dataSubscriber)
//在binlog中注冊handler
.setQueueType(ClientInfo.QUEUE_TYPE_REDIS)
.registerHandler(projRepHisDatabaseEventHandler)
.setServerUrl(serverUrl).autoRegisterClient().start();
}
}
2.2.2 rabbitmq客戶端
2.2.2.1 引入依賴包
cn.keking.project
bin-log-distributor-client-rabbitmq
${version}
2.2.2.2 添加客戶端配置
#自動注冊客戶端(2.1中服務端的地址)
databaseEventServerUrl=http://localhost:8885/client/addAll
#本應用命名
appName=lbt-service-ext-rabbit
# redis地址(rabbitmq實現也要使用redis作為分布式鎖)
spring.redisson.address=redis://192.168.1.204:6379
# RabbitMQ配置
spring.rabbit.host=192.168.1.204
spring.rabbit.port=5672
spring.rabbit.username=aa
spring.rabbit.password=aa
spring.rabbit.virtualHost=/binlog
spring.rabbit.apiUrl=http://192.168.1.204:15672/api/
2.2.2.3 寫handler,實現 DatabaseEventHandler 接口,并加上注解 @HandleDatabaseEven
/**
* LockLevel為保持順序的級別,None為默認
* TABLE -> 同表按順序執行
* COLUMN -> 某列值一致的按順序執行
* NONE -> 無序
*/
@Service
@HandleDatabaseEvent(database = "Rana_G", table = "PTP_PROJ_REP_HIS", events = {DatabaseEvent.UPDATE_ROWS, DatabaseEvent.DELETE_ROWS},lockLevel = LockLevel.TABLE)
public class ProjRepHisDatabaseEventHandler implements DatabaseEventHandler {
private static final Logger logger = LoggerFactory.getLogger(ProjRepHisDatabaseEventHandler.class);
@Override
public void handle(EventBaseDTO eventBaseDTO) {
logger.info(JSON.toJSONString(eventBaseDTO));
// todo 在這里寫相關邏輯
}
}
2.2.2.4 啟動監聽,當2里的handler是由容器管理時需要通過registerHandler(projRepHisDatabaseEventHandler)手動注冊,如果不需要容器管理,可以直接通過autoScanHandler()自動掃描添加
@Component
public class DatabaseEventListener {
private static Logger logger = LoggerFactory.getLogger(DatabaseEventListener.class);
@Autowired
private RedissonClient redissonClient;
@Value("#{env.databaseEventServerUrl}")
private String serverUrl;
@Value("#{env.appName}")
private String appName;
//將第3步里的service注入
@Autowired
private ProjRepHisDatabaseEventHandler projRepHisDatabaseEventHandler;
//org.springframework.amqp.rabbit.connection.ConnectionFactory 自行創建spring bean
@Autowired
private ConnectionFactory connectionFactory;
//com.rabbitmq.http.client.Client 自行創建spring bean
@Autowired
private Client rabbitHttpClient;
@PostConstruct
public void start() {
//初始化訂閱的實現
DataSubscriber dataSubscriber = new DataSubscriberRabbitMQImpl(connectionFactory, rabbitHttpClient, redissonClient);
new BinLogDistributorClient(appName, dataSubscriber)
//默認為redis實現,使用mq實現這里一定要指定QueueType
.setQueueType(ClientInfo.QUEUE_TYPE_RABBIT)
.registerHandler(projRepHisDatabaseEventHandler)
.setServerUrl(serverUrl).autoRegisterClient().start();
}
}
2.3 前端管理模塊
前端管理服務模塊是基于vue的管理各個應用監聽狀況的管理界面
新增MySQL數據源
數據源管理
日志進度
分發隊列監控
新增數據訂閱
3 其他
總結
以上是生活随笔為你收集整理的获取mongodb数据变更_支持mysql、MongoDB数据变更订阅/监听分发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 歌尔股份是做什么的 今日暴力涨停
- 下一篇: 缩减qe对股市的影响 中短期会受影响但长