springboot集成flink-cdc
文章目錄
- 前文
- (1)什么是CDC
- (2)Flink-CDC是什么
- (3)Flink-CDC 特性
 
- CDC與Flink畢業版本
- Springboot項目整合Flink-CDC
- (1)說明
- (2)引入依賴
- (3)接入springboot項目
- 創建監聽類 實現 ApplicationRunner
- 自定義數據讀取解析器
- 變更對象
- 自定義sink 交由spring管理
 
 
前文
(1)什么是CDC
CDC:全稱是 Change Data Capture,即數據變更捕獲技術,具體的含義是 通過識別和捕獲對數據庫中的數據所做的更改(包括數據或數據表的插入、更新、刪除;數據庫結構的變更調整等),然后將這些更改按發生的順序完整記錄下來,并實時通過中間技術橋梁(消息中間件、TCP等等)將變更順序消息傳送到下游流程或系統的過程。
(2)Flink-CDC是什么
CDC Connectors for Apache Flink ?是一組用于Apache Flink ?的源連接器,使用變更數據捕獲 (CDC) 從不同數據庫獲取變更。用于 Apache Flink ?的 CDC 連接器將 Debezium 集成為捕獲數據更改的引擎。所以它可以充分發揮 Debezium 的能力。
白話的意思是,Flink-CDC 一個成型的cdc技術實現(Debezium)的包裝,我前面也使用過Debezium,并編寫了一個簡略的博客,感興趣的可以戳下方連接去看一下
springboot+debezium捕獲數據庫變更(mysql、sql-server、mongodb、oracle…)
(3)Flink-CDC 特性
支持讀取數據庫快照,即使發生故障也能繼續讀取binlog,一次處理。
DataStream API 的 CDC 連接器,用戶可以在單個作業中使用多個數據庫和表的更改,而無需部署 Debezium 和 Kafka。
Table/SQL API 的 CDC 連接器,用戶可以使用 SQL DDL 創建 CDC 源來監控單個表的更改。
CDC與Flink畢業版本
下表顯示了 Flink? CDC 連接器和 Flink? 之間的版本映射:
| 1.0.0 | 1.11.* | 
| 1.1.0 | 1.11.* | 
| 1.2.0 | 1.12.* | 
| 1.3.0 | 1.12.* | 
| 1.4.0 | 1.13.* | 
| 2.0.* | 1.13.* | 
| 2.1.* | 1.13.* | 
| 2.2.* | 1.13.* , 1.14.* | 
Springboot項目整合Flink-CDC
(1)說明
按常理來說,一個正常的flink-job 最終我們并不會集成到springboot項目中,我們會直接編寫一個maven項目,在發布時使用flink程序來啟動任務
比如官網示例:
本文即要使用flink-cdc進行數據變更捕獲 (可以視作為一個flink-job),但又要契合我們的springboot項目,使用spring的特性,因此,我們需要轉換一下思路,轉換成什么樣子呢?就是不要將這個flink-cdc作為一個job 使用flink程序進行發布提交,我們就當它在我們開發時一樣,作為一個本地項目,main方法啟動
(2)引入依賴
flink客戶端版本使用 1.13.6 cdc 版本使用 2.0.0
<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.13.6</flink.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><!--mysql -cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.18</version></dependency></dependencies>(3)接入springboot項目
無法簡單的使用main方法來啟動cdc 作業,因為如果這樣的話,我們就無法與spring完美的契合
因此我們可以利用springboot的特性, 實現 ApplicationRunner 將flink-cdc 作為一個項目啟動時需要運行的分支子任務即可
創建監聽類 實現 ApplicationRunner
package com.leilei.mysql;import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.DebeziumSourceFunction; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 13:42* @desc mysql變更監聽**/ @Component public class MysqlEventListener implements ApplicationRunner {private final DataChangeSink dataChangeSink;public MysqlEventListener(DataChangeSink dataChangeSink) {this.dataChangeSink = dataChangeSink;}@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);streamSource.addSink(dataChangeSink);env.execute("mysql-stream-cdc");}/*** 構造變更數據源** @param* @return DebeziumSourceFunction<DataChangeInfo>* @author lei* @date 2022-08-25 15:29:38*/private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {return MySqlSource.<DataChangeInfo>builder().hostname("10.50.40.145").port(3306).databaseList("paas_common_db").tableList("paas_common_db.base_business_driver_score_*").username("root").password("cdwk-3g-145")/**initial初始化快照,即全量導入后增量導入(檢測更新數據寫入)* latest:只進行增量導入(不讀取歷史變化)* timestamp:指定時間戳進行數據導入(大于等于指定時間錯讀取數據)*/.startupOptions(StartupOptions.latest()).deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();} }自定義數據讀取解析器
我這里解析為一個數據變更對象
package com.leilei.mysql;import com.alibaba.fastjson.JSON; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import com.alibaba.fastjson.JSONObject;import java.util.List; import java.util.Optional;/*** @author lei* @create 2022-08-25 13:43* @desc mysql消息讀取自定義序列化**/ public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/**** 反序列化數據,轉為變更JSON對象* @param sourceRecord* @param collector* @return void* @author lei* @date 2022-08-25 14:44:31*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());//5.獲取操作類型 CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);String type = operation.toString().toUpperCase();int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(eventType);dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.輸出數據collector.collect(dataChangeInfo);}/**** 從袁術數據獲取出變更之前或之后的數據* @param value* @param fieldElement* @return JSONObject* @author lei* @date 2022-08-25 14:48:13*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);} }變更對象
import lombok.Data;/*** @author lei* @create 2022-08-25 14:33* @desc 數據變更對象**/ @Data public class DataChangeInfo {/*** 變更前數據*/private String beforeData;/*** 變更后數據*/private String afterData;/*** 變更類型 1新增 2修改 3刪除*/private Integer eventType;/*** binlog文件名*/private String fileName;/*** binlog當前讀取點位*/private Integer filePos;/*** 數據庫名*/private String database;/*** 表名*/private String tableName;/*** 變更時間*/private Long changeTime;}自定義sink 交由spring管理
package com.leilei.mysql;import lombok.extern.log4j.Log4j2; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.springframework.stereotype.Component;/*** @author lei* @create 2022-08-25 14:01* @desc**/ @Component @Log4j2 public class DataChangeSink implements SinkFunction<DataChangeInfo> {@Overridepublic void invoke(DataChangeInfo value, Context context) {log.info("收到變更原始數據:{}", value);// todo 數據處理;因為此sink也是交由了spring管理,您想進行任何操作都非常簡單} }當然,以上僅僅只是整合思路,如果你想使用flink-cdc 進行數據同步或日志記錄等,結合您自身的需求進行調整接口,以上內容,大的架子是沒問題的
如果遇到問題,可以先從官網QA尋找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
項目源碼:springboot-flink-cdc
總結
以上是生活随笔為你收集整理的springboot集成flink-cdc的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Ubuntu 学习
- 下一篇: 监控系统服务器存储,监控系统中存储服务器
