canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程
點擊上方“Java知音”,選擇“置頂公眾號”
技術(shù)文章第一時間送達(dá)!
上一期講到了通過canal訂閱mysql的binlog日志并且轉(zhuǎn)換為對象,那么這一次我們將訂閱來的對象通過RocketMQ發(fā)送消息,接收方接受消息之后同時存儲到其他類型的數(shù)據(jù)源當(dāng)中,完成一個簡單的數(shù)據(jù)異構(gòu)的過程。
什么是Java消息服務(wù)?
兩個應(yīng)用程序之間進(jìn)行異步通信的API,它為標(biāo)準(zhǔn)消息協(xié)議和消息服務(wù)提供了一組通用接口,包括創(chuàng)建、發(fā)送、讀取消息等,用于支持JAVA應(yīng)用程序開發(fā)。
在J2EE中,當(dāng)兩個應(yīng)用程序使用JMS進(jìn)行通信時,它們之間并不是直接相連的,而是通過一個共同的消息收發(fā)服務(wù)連接起來,可以達(dá)到解耦的效果,我們將會在接下來的教程中詳細(xì)介紹。
jms的消息傳送模型
常見的消息傳送模型有以下兩種:
點對點消息傳送模型
在點對點消息傳送模型中,應(yīng)用程序由消息隊列,發(fā)送者,接收者組成。每一個消息發(fā)送給一個特殊的消息隊列,該隊列保存了所有發(fā)送給它的消息(除了被接收者消費掉的和過期的消息)。如下圖所示:
發(fā)布訂閱消息傳送模型
在發(fā)布訂閱模型中,消費者需要訂閱相關(guān)的topic才能接收到生產(chǎn)者的信息。生產(chǎn)者會將信息傳輸?shù)絫opic中,然后消費者只需要從topic中獲取數(shù)據(jù)即可。如下圖所示:
RocketMQ消息隊列使用
這次使用的消息中間件為RocketMQ的使用場景。RocketMQ是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給Apache基金會,并于2016年11月成為 Apache 孵化項目。
RocketMQ在使用之前,需要我們引入相關(guān)的依賴配置:
????????????<dependency>
????????????<groupId>org.apache.rocketmqgroupId>
????????????<artifactId>rocketmq-clientartifactId>
????????????<version>${rocketmq.version}version>
????????dependency>
關(guān)于RocketMQ的安裝在這里就不做過多的講解了。
通過mq的方式來進(jìn)行數(shù)據(jù)異構(gòu)通常是比較簡單的方案,首先我們需要在項目里面獨立一個模塊專門用于監(jiān)聽mysql的binlog日志,這個模塊我暫且稱之為datahandle-core模塊
整個工程采用了springboot的結(jié)構(gòu)來構(gòu)建,主要的核心也是在core工程中。
首先是監(jiān)聽canal的日志狀態(tài)模塊了,采用了上一節(jié)中講解到的客戶端代碼進(jìn)行數(shù)據(jù)監(jiān)聽,并且將其轉(zhuǎn)換為對象然后發(fā)送往mq中:
package?com.sise.datahandle.core;import?com.alibaba.otter.canal.client.CanalConnector;
import?com.alibaba.otter.canal.client.CanalConnectors;
import?com.alibaba.otter.canal.protocol.Message;
import?lombok.extern.slf4j.Slf4j;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.CommandLineRunner;
import?org.springframework.stereotype.Component;
import?java.net.InetSocketAddress;
import?static?com.sise.datahandle.constants.CanalConstants.*;
/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?CanalListener?implements?CommandLineRunner?{
????@Autowired
????private?CanalClient?canalClient;
????@Override
????public?void?run(String...?args)?throws?Exception?{
??????log.info("=============canal監(jiān)聽器開啟===============");
????????CanalConnector?canalConnector?=?CanalConnectors.newSingleConnector(
????????????????new?InetSocketAddress(SERVER_ADDRESS,?PORT),?DESTINATION,?USERNAME,?PASSWORD);
????????canalConnector.connect();
????????canalConnector.subscribe(".*\\..*");
????????canalConnector.rollback();
????????for?(;?;?)?{
????????????Message?message?=?canalConnector.getWithoutAck(100);
????????????long?batchId?=?message.getId();
????????????if?(batchId?!=?-1)?{
????????????????canalClient.entityHandle(message.getEntries());
????????????}
????????}
????}
}
ps:這里面的CanalClient代碼主要來自上一篇的canal客戶端代碼,文末會有完整項目代碼鏈接,需要的讀者可以前往查看。
在CanalClient里面,有一個函數(shù)是專門用于處理將訂閱的數(shù)據(jù)發(fā)送到mq消息隊列中:
package?com.sise.datahandle.core;import?com.alibaba.fastjson.JSON;
import?com.alibaba.otter.canal.protocol.CanalEntry;
import?com.google.protobuf.InvalidProtocolBufferException;
import?com.sise.datahandle.handler.CanalDataHandler;
import?com.sise.datahandle.model.TypeDTO;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.exception.MQBrokerException;
import?org.apache.rocketmq.client.exception.MQClientException;
import?org.apache.rocketmq.client.producer.DefaultMQProducer;
import?org.apache.rocketmq.client.producer.SendResult;
import?org.apache.rocketmq.remoting.exception.RemotingException;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Service;
import?java.util.List;
/**
?*?canal監(jiān)聽客戶端變化
?*
?*?@author?idea
?*?@date?2019/10/12
?*/
@Slf4j
@Service
public?class?CanalClient?{
????@Autowired
????private?DefaultMQProducer?rocketMqProducer;
????/**
?????*?處理binlog日志的監(jiān)聽
?????*
?????*?@param?entries
?????*/
????public?void?entityHandle(List?entries)?{
????????for?(CanalEntry.Entry?entry?:?entries)?{
????????????if?(entry.getEntryType()?!=?CanalEntry.EntryType.ROWDATA)?{
????????????????continue;
????????????}
????????????try?{
????????????????CanalEntry.RowChange?rowChange?=?CanalEntry.RowChange.parseFrom(entry.getStoreValue());
????????????????for?(CanalEntry.RowData?rowData?:?rowChange.getRowDatasList())?{
????????????????????switch?(rowChange.getEventType())?{
????????????????????????case?INSERT:
????????????????????????????String?tableName?=?entry.getHeader().getTableName();
????????????????????????????//測試選用t_type這張表進(jìn)行映射處理
????????????????????????????if?("t_type".equals(tableName))?{
????????????????????????????????TypeDTO?typeDTO?=?CanalDataHandler.convertToBean(rowData.getAfterColumnsList(),?TypeDTO.class);
????????????????????????????????org.apache.rocketmq.common.message.Message?message?=?new?org.apache.rocketmq.common.message.Message();
????????????????????????????????message.setTopic("canal-test-topic");
????????????????????????????????message.setTags("canal-test-tag");
????????????????????????????????String?json?=?JSON.toJSONString(typeDTO);
????????????????????????????????message.setBody(json.getBytes());
????????????????????????????????SendResult?sendResult?=?rocketMqProducer.send(message);
????????????????????????????????log.info("[mq消息發(fā)送結(jié)果]----"?+?sendResult);
????????????????????????????}
????????????????????????????break;
????????????????????????default:
????????????????????????????break;
????????????????????}
????????????????}
????????????}?catch?(InvalidProtocolBufferException?e)?{
????????????????log.error("[CanalClient]監(jiān)聽數(shù)據(jù)過程出現(xiàn)異常,異常信息為{}",?e);
????????????}?catch?(InterruptedException?|?RemotingException?|?MQClientException?|?MQBrokerException?e)?{
????????????????log.error("[CanalClient]?mq發(fā)送信息出現(xiàn)異常:{}",?e);
????????????}
????????}
????}
}
這里面主要是監(jiān)聽binlog記錄為插入數(shù)據(jù)事件的時候做發(fā)送mq操作。
接下來便是常見的mq配置了,本工程主要是一個模擬的簡單案例,因此我將consumer和producer都放在了一起方便測試。
通過springboot自身的properties文件對mq進(jìn)行參數(shù)初始化配置之后便可以構(gòu)建一個基本的consumer和producer了。這里我們拿一個TypeDto類來進(jìn)行樹異構(gòu)的測試,consumer端的核心代碼為:
package?com.sise.datahandle.mq.rocketmq.consumer;import?com.sise.datahandle.model.TypeDTO;
import?com.sise.datahandle.mq.rocketmq.producer.RocketMqMsgHandle;
import?com.sise.datahandle.redis.RedisService;
import?lombok.extern.slf4j.Slf4j;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import?org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import?org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import?org.apache.rocketmq.common.message.MessageExt;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.stereotype.Component;
import?org.springframework.util.CollectionUtils;
import?java.util.List;
/**
?*?@author?idea
?*?@date?2019/10/20
?*/
@Component
@Slf4j
public?class?RocketMqConsumeMsgListenerProcessor?implements?MessageListenerConcurrently?{
????@Autowired
????private?RedisService?redisService;
????@Override
????public?ConsumeConcurrentlyStatus?consumeMessage(List?msgs,?ConsumeConcurrentlyContext?context)?{if(CollectionUtils.isEmpty(msgs)){
????????????log.info("接受到的消息為空,不處理,直接返回成功");return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}
????????MessageExt?messageExt?=?msgs.get(0);
????????System.out.println("接受到的消息為:"+messageExt.toString());if("canal-test-topic".equals(messageExt.getTopic())){if("canal-test-tag".equals(messageExt.getTags())){
????????????????int?reconsume?=?messageExt.getReconsumeTimes();if(reconsume?==3){//消息已經(jīng)重試了3次,如果不需要再次消費,則返回成功return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????????????}
????????????????TypeDTO?typeDTO?=?RocketMqMsgHandle.parseMessage(messageExt,TypeDTO.class);//存儲進(jìn)入redis中
????????????????redisService.setObject("typeDTO-"+System.currentTimeMillis(),typeDTO);
????????????}
????????}//?如果沒有return?success?,consumer會重新消費該消息,直到return?successreturn?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????}
}
通過訂閱mq的信息,讀取相關(guān)的數(shù)據(jù)再次寫入到redis里面,完成一個簡單過程的數(shù)據(jù)異構(gòu)。
整個迷你工程寫下來,比較核心的地方就在于對binlog日志的解析器部分,如何將日志訂閱之后轉(zhuǎn)換為相應(yīng)的對象進(jìn)行處理。
通常采用mq的方式進(jìn)行數(shù)據(jù)異構(gòu)會相對簡單,實際上是在監(jiān)聽binlog為寫DB的同時去寫一次MQ,但是這種方式不能夠保證數(shù)據(jù)一致性,就是不能保證跨資源的事務(wù)。注:調(diào)用第三方遠(yuǎn)程RPC的操作一定不要放到事務(wù)中。
完整案例的代碼鏈接如下(點擊閱讀原文直達(dá)):
https://gitee.com/IdeaHome_admin/wfw
推薦閱讀(點擊即可跳轉(zhuǎn)閱讀)
1.SpringBoot內(nèi)容聚合
2.面試題內(nèi)容聚合
3.設(shè)計模式內(nèi)容聚合
4.Mybatis內(nèi)容聚合
5.多線程內(nèi)容聚合
覺得不錯?歡迎轉(zhuǎn)發(fā)分享給更多人
我知道你 “在看”
總結(jié)
以上是生活随笔為你收集整理的canal 监听不到数据变化_数据的异构实战(二)手写迷你版同步工程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 训练不出结果_工业设计师如何训练自己的设
- 下一篇: 移动app测试的多样性_app移动端接口