Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源
生活随笔
收集整理的這篇文章主要介紹了
Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Flink程序加載數(shù)據(jù)源(3)自定義數(shù)據(jù)源(2)從Mysql 加載數(shù)據(jù)源
? 上文引出了Flink程序自定義數(shù)據(jù)源的方法,我們來再次回顧下。
? Flink還提供了數(shù)據(jù)源接口(抽象類),我們實現(xiàn)該接口(繼承抽象類)就可以實現(xiàn)自定義數(shù)據(jù)源,不同的接口(抽象類)功能的豐富性與范圍不同,分類如下:
? EX:
-
SourceFunction: 非并行數(shù)據(jù)源(并行度只能=1)
-
RichSourceFunction: 多功能非并行數(shù)據(jù)源(并行度只能=1)
-
ParallelSourceFunction: 并行數(shù)據(jù)源(并行度能夠>=1)
-
RichParallelSourceFunction: 多功能并行數(shù)據(jù)源(并行度能夠>=1)
代碼實現(xiàn)
如上所說,實現(xiàn)flink為我們提供的一些數(shù)據(jù)源接口,即能夠實現(xiàn)自定義數(shù)據(jù)源了!
env.addSource(自定義數(shù)據(jù)源類對象);下邊進行完整示例演示:
① 準備環(huán)境
//準備環(huán)境 env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);② 獲取數(shù)據(jù)源
env.addSource(自定義數(shù)據(jù)源類對象);③ 從Mysql中獲取數(shù)據(jù)源示例
數(shù)據(jù)對象
@Data @NoArgsConstructor @AllArgsConstructor public static class VehicleAlarm {private String id;private String licensePlate;private String plateColor;private Long deviceTime;private String zone; }自定義數(shù)據(jù)源類
public static class MysqlSource extends RichParallelSourceFunction<VehicleAlarm> {Connection conn = null;PreparedStatement ps = null;ResultSet result = null;private boolean flag = true;String url = "jdbc:mysql://xxx:3306/alarm-sc?useUnicode=true&characterEncoding=utf-8&useSSL=false";@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection(url, "root", "root");String sql = "select id,license_plate,plate_color,device_time,`zone` from vehicle_alarm_202103";ps = conn.prepareStatement(sql);super.open(parameters);}@Overridepublic void run(SourceContext<VehicleAlarm> ctx) throws Exception {while (flag) {result = ps.executeQuery();while (result.next()) {String id = result.getString("id");String licensePlate = result.getString("license_plate");String plateColor = result.getString("plate_color");Long deviceTime = result.getLong("device_time");String zone = result.getString("zone");VehicleAlarm vehicleAlarm = new VehicleAlarm(id, licensePlate, plateColor, deviceTime, zone);ctx.collect(vehicleAlarm);}Thread.sleep(2000);}}@Overridepublic void cancel() {flag = false;}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}if (result != null) {result.close();}} }結果展示:
方法以及特別屬性解釋說明:
- open():數(shù)據(jù)源最開始打開時執(zhí)行,整個數(shù)據(jù)源從加載到銷毀,只會執(zhí)行一次
- run(SourceContex):實現(xiàn)數(shù)據(jù)獲取邏輯,并可以通過傳入的參數(shù)ctx進行向下游節(jié)點的數(shù)據(jù)轉發(fā)。
- SourceContext:source函數(shù)用于發(fā)出元素和可能的watermark的接口,確定以及返回source生成的元素的類型。
- cancel():用來取消數(shù)據(jù)源,一般在run方法中,會存在一個循環(huán)來持續(xù)產(chǎn)生數(shù)據(jù),cancel方法則可以使該循環(huán)終止。
- close():數(shù)據(jù)源關閉時執(zhí)行,整個數(shù)據(jù)源從加載到銷毀,只會執(zhí)行一次
Flink完整流程代碼:
//準備環(huán)境 env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(2); //獲取數(shù)據(jù)源 source DataStreamSource<VehicleAlarm> streamSource = env.addSource(new MysqlSource()); //數(shù)據(jù)處理 todo streamSource.print(); //數(shù)據(jù)收集 sink //程序執(zhí)行 execute env.execute("mysql-source");總結
以上是生活随笔為你收集整理的Flink程序加载数据源(3)自定义数据源(2)从Mysql 加载数据源的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: (初学者)用python实现九九乘法表
- 下一篇: IOS学习-----课程体系-----坚