springboot集成canal,实现缓存实时刷新,驼峰问题
1.cancl安裝
下載路徑:cancl下載路徑
下載完安裝包,安裝完成后,需要修改conf\example路徑下配置文件instance.properties:設置position info和table meta tsdb info下面的屬性即可。
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0# enable gtid use true/false canal.instance.gtidon=false# position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid=# rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId=# table meta tsdb info #canal.instance.tsdb.enable= true #canal.instance.tsdb.url= jdbc:mysql://127.0.0.1:3306/maruko?useUnicode=true&characterEncoding=utf-8&useSSL=false #canal.instance.tsdb.dbUsername= root #canal.instance.tsdb.dbPassword= maruko#canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid=# username/password canal.instance.dbUsername= root canal.instance.dbPassword= maruko canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################需要注意的是,只需要配置canal.instance.master.address,canal.instance.dbUsername,canal.instance.dbPassword即可,不要去配置canal.instance.tsdb相關屬性,如果配置了啟動會報錯的。
2.依賴引入
引入依賴
<!--canal--><dependency><groupId>top.javatool</groupId><artifactId>canal-spring-boot-starter</artifactId><version>1.2.1-RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency>3.監聽配置
去實現EntryHandler接口,添加自己的業務邏輯,比如緩存的刪除更新插入,實現對增刪改查的邏輯重寫。
@CanalTable("kafka_test") @Component @Slf4j public class KafkaHandler implements EntryHandler<KafkaTest> {@Autowiredprivate RedisTemplate redisTemplate;@Overridepublic void insert(KafkaTest item) {log.info("canal插入insert," + item);// 寫數據到redisredisTemplate.opsForValue().set("kafka_test" + item.getId(), item);}@Overridepublic void update(KafkaTest before, KafkaTest after) {log.warn("更新前update before," + before);log.warn("更新后update after," + after);// 寫數據到redisredisTemplate.opsForValue().set("kafka_test" + after.getId(), after);}@Overridepublic void delete(KafkaTest item) {log.warn("刪除delete," + item);// 刪除數據到redisredisTemplate.delete("kafka_test" + item.getId());} }4.yml配置
默認destination就是example,如果修改了服務安裝里面的配置,這兒需要同步修改。
#cancl配置 canal:server: localhost:11111 #你canal的地址destination: example如果不想讓控制臺一直打印某些信息,可以配置如下配置屏蔽AbstractCanalClient類process()一直打印this.log.info(“獲取消息 {}”, message)。
logging:level:tracer: trace # 開啟trace級別日志,在開發時可以開啟此配置,則控制臺可以打印es全部請求信息及DSL語句,為了避免重復,開啟此項配置后,可以將EE的print-dsl設置為false.#top.javatool.canal.client: warn #禁止AbstractCanalClient 打印常規日志 獲取消息 {}5.第二種方案(解決數據庫存在下劃線,用上述方法,某些字段會為空)
上面的方式只適合數據庫字段和實體類字段,屬性完全一致的情況;當數據庫字段含有下劃線的適合,因為我們直接去監聽的binlog日志,里面的字段是數據庫字段,因為跟實體類字段不匹配,所以會出現字段為空的情況,這個適合需要去獲取列的字段,對字段進行屬性轉換,實現方法如下:
引入依賴
<dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency>創建監聽
@CanalEventListener @Slf4j public class KafkaListener {@Autowiredprivate RedisTemplate redisTemplate;/*** @param eventType 當前操作數據庫的類型* @param rowData 當前操作數據庫的數據*/@ListenPoint(schema = "maruko", table = "kafka_test")public void listenKafkaTest(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {KafkaTest kafkaTestBefore = new KafkaTest();KafkaTest kafkaTestAfter = new KafkaTest();//遍歷數據獲取k-vList<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();getEntity(beforeColumnsList, kafkaTestBefore);log.warn("獲取到提交前的對象為:" + kafkaTestBefore);getEntity(afterColumnsList, kafkaTestAfter);log.warn("獲取到提交后的對象為:" + kafkaTestAfter);//判斷是新增還是更新還是刪除switch (eventType.getNumber()) {case CanalEntry.EventType.INSERT_VALUE:case CanalEntry.EventType.UPDATE_VALUE:redisTemplate.opsForValue().set("kafka_test" + kafkaTestAfter.getId(), kafkaTestAfter);break;case CanalEntry.EventType.DELETE_VALUE:redisTemplate.delete("kafka_test" + kafkaTestBefore.getId());break;}}/*** 遍歷獲取屬性轉換為實體類** @param columnsList* @param kafkaTest*/private void getEntity(List<CanalEntry.Column> columnsList, KafkaTest kafkaTest) {SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");for (CanalEntry.Column column : columnsList) {String name = column.getName();String value = column.getValue();switch (name) {case KafkaTest.ID:if (StringUtils.hasLength(value)) {kafkaTest.setId(Integer.parseInt(value));}break;case KafkaTest.CONTENT:if (StringUtils.hasLength(value)) {kafkaTest.setContent(value);}break;case KafkaTest.PRODUCER_STATUS:if (StringUtils.hasLength(value)) {kafkaTest.setProducerStatus(Integer.parseInt(value));}break;case KafkaTest.CONSUMER_STATUS:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerStatus(Integer.parseInt(value));}break;case KafkaTest.UPDATE_TIME:if (StringUtils.hasLength(value)) {try {kafkaTest.setUpdateTime(format.parse(value));} catch (ParseException p) {log.error(p.getMessage());}}break;case KafkaTest.TOPIC:if (StringUtils.hasLength(value)) {kafkaTest.setTopic(value);}break;case KafkaTest.CONSUMER_ID:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerId(value);}break;case KafkaTest.GROUP_ID:if (StringUtils.hasLength(value)) {kafkaTest.setGroupId(value);}break;case KafkaTest.PARTITION_ID:if (StringUtils.hasLength(value)) {kafkaTest.setPartitionId(Integer.parseInt(value));}break;case KafkaTest.PRODUCER_OFFSET:if (StringUtils.hasLength(value)) {kafkaTest.setProducerOffset(Long.parseLong(value));}break;case KafkaTest.CONSUMER_OFFSET:if (StringUtils.hasLength(value)) {kafkaTest.setConsumerOffset(Long.parseLong(value));}break;case KafkaTest.TEST:if (StringUtils.hasLength(value)) {kafkaTest.setTest(value);}break;}}}}實體類
@Data @TableName("kafka_test") public class KafkaTest {public static final String ID = "id";public static final String CONTENT = "content";public static final String PRODUCER_STATUS = "producer_status";public static final String CONSUMER_STATUS = "consumer_status";public static final String UPDATE_TIME = "update_time";public static final String TOPIC = "topic";public static final String CONSUMER_ID = "consumer_id";public static final String GROUP_ID = "group_id";public static final String PARTITION_ID = "partition_id";public static final String PRODUCER_OFFSET = "consumer_offset";public static final String CONSUMER_OFFSET = "producer_offset";public static final String TEST = "test";@TableId(type = IdType.AUTO)private Integer id;@TableField("content")private String content;@TableField("producer_status")private Integer producerStatus;@TableField("consumer_status")private Integer consumerStatus;@TableField("update_time")private Date updateTime;@TableField("topic")private String topic;@TableField("consumer_id")private String consumerId;@TableField("group_id")private String groupId;@TableField("partition_id")private int partitionId;@TableField("consumer_offset")private Long consumerOffset;@TableField("producer_offset")private Long producerOffset;@TableField("test")private String test; }總結
以上是生活随笔為你收集整理的springboot集成canal,实现缓存实时刷新,驼峰问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: echarts中月份数据缺少怎么补齐呢?
- 下一篇: 深度:年收入超百亿元的恒源祥已成中老年服