flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路
導讀: 最近在對 Flink 進行平臺化,基于 REST API 構建一個平臺實現通過純 SQL 化編寫和管理 Job。盡管 Flink官方希望用戶將所有的依賴和業務邏輯打成一個fat jar,這樣方便提交。但我們在開發的過程中想對用戶自定義 UDF Jar 進行管理,想將 UDF Jar 存儲管理在阿里云 OSS ,在 Job 中通過動態加載的方式將 UDF Jar 加載進來,取代之前將 UDF 和 Job 打成一個 fat jar 的方式。下面將從幾點展開討論:
- 將 UDF 寫到 Job 中并打成一個 fat jar 的實現方式
- 動態加載 UDF Jar 代碼調整
- 代碼調整后存在的問題
- 解決 UDF Jar URL 分發的思路
環境
- Flink 1.11.2
- 部署方式:Flink on Kubernetes
- 部署模式: Session Cluster
將 UDF 寫到 Job 中并打成一個 fat jar 的方式
下面是一個簡單采用 FlinkSQL 編寫 Job 的例子。使用 datagen 連接器作為 Source 生成數據, print 作為 Sink 將結果打印到控制臺。自定義的一個簡單 UDF自定義函數(returnSelf)。
public static void main(String[] args) throws Exception { //創建流運行時環境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //采用BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //創建StreamTable環境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'"); bsTableEnv.executeSql("CREATE TABLE sourceTable (" + " f_sequence INT," + " f_random INT," + " f_random_str STRING," + " ts AS localtimestamp," + " WATERMARK FOR ts AS ts" + " ) WITH (" + " 'connector' = 'datagen'," + " 'rows-per-second'='5'," + " 'fields.f_sequence.kind'='sequence'," + " 'fields.f_sequence.start'='1'," + " 'fields.f_sequence.end'='1000'," + " 'fields.f_random.min'='1'," + " 'fields.f_random.max'='1000'," + " 'fields.f_random_str.length'='10'" + ")"); bsTableEnv.executeSql("CREATE TABLE sinktable (" + " f_random_str STRING" + ") WITH (" + " 'connector' = 'print'" + ")"); bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable"); }要將該 Job 提交給遠程 Flink 集群時,我們需要將 Job(包括自定義 UDF) 打成一個 fat Jar。但這并不是我們期望的操作,由于打成 fat jar 會顯得比較臃腫,同時不方便管理 UDF Jar ,有些 UDF 具有通用性,可復用。所以我們希望將自定義的UDF Jar 獨立出來保存管理,并在 Job 中通過動態加載的方式使用,如下圖:
動態加載 UDF Jar 代碼調整
- 將 returnSelf 并獨立打成一個 UDF Jar 上傳到阿里云OSS。
- 在 Job 的 main() 方法中新增動態加載的代碼
修改后,我們將 UDF jar 存放到 OSS 中進行管理。當 Job 需要依賴某個 UDF 時,只需要通過動態加載就可以完成。動態加載使用 URLClassLoader 實現,使用被管理于 OSS 的 UDF Jar 的 URL 將 Jar 加載進 JVM 中,并取得 returnSelf 類。
代碼調整后存在的問題
運行結果:代碼調整后,在本地 IDEA 運行程序(即,啟動了 Mini Cluster集群)是可以成功運行的。但是當發布到遠程 Flink 集群上時(采用 Flink on K8S , Session Cluster 部署模式),會出現找不到 UDF 異常,如下:
Caused by: java.lang.ClassNotFoundException: flinksql.function.udf.ReturnSelf分析:這是由于 Flink 的部署方式有多種。在本地運行的啟動的是 MiniCluster,即 JobManager 和 TaskManager 在同一個JVM 進程中。而我們在遠程部署 Flink on Kubernetes 的 Session Cluster 集群 JobManager 和 TaskManager 是不同的 JVM 進程。
在 Session 模式下,客戶端在 main() 方法開始執行直到 env.execute() 方法之前需要完成以下三件事情
- 獲取作業所需的依賴項
- 通過執行環境分析并取得邏輯計劃,即StreamGraph→JobGraph
- 將依賴項和JobGraph上傳到集群中
只有在這些都完成之后,才會通過env.execute() 方法觸發 Flink 運行時真正地開始執行作業。所以在本地運行的 Mini Cluster,因為都處于同一個 JVM 進程,客戶端運行 main() 方法進行動態加載后將依賴項和 JobGraph 提交給 JobMananger 再由 TaskManager 執行 Job。
而當在遠程集群時,客戶端實現動態加載 Jar 后將依賴項和 JobGraph 提交給 JobMananger,但是由于 JobMananger 和 TaskMananger 是處于不同的 JVM進程中,且沒有對自定義 UDF Jar URL 進行分發,這會讓 TaskMananger 在運行任務時出現 Class Not Found 異常,這是因為 TaskMananger 沒有進行類加載,JVM 中沒有 returnSelf 類所導致。
解決 UDF Jar 分發的思路
基于以上問題我們查閱了一些相關資料及閱讀源碼,以以下三點為條件
- 基于采用 Session 模式部署
- 基于 REST API 提交 Job 而不采用命令行方式
- 不改動 Flink 源碼
分析:官網提供了一個 -C 參數,大致用法就是把用戶自定義 Jar 放到一個 JobMananger 和 TaskMananger 都能訪問到的存儲地方,然后通過命令行方式啟動 Job 時使用 -C 參數,后面加上自定義 Jar 的URLs 就可以實現分發。
但是我們平臺由于采用 REST API,而提交 Job 的 API 并沒有提供該參數,所以在不改變 Flink 源碼的前提下進行源碼研究,最后發現可以在 main 中將 UDF Jar 的 URL 加到配置項 pipeline.classpaths 中,也就是曲線救國實現了 -C 的效果。在 main 中增加以下代碼片段:
Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configurationField.setAccessible(true); Configuration o = (Configuration)configurationField.get(bsEnv); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(funJarPath); temp.put("pipeline.classpaths",jarList);完整代碼
public static void main(String[] args) throws Exception { //創建流運行時環境 StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); //采用BlinkPlanner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //創建StreamTable環境 StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); bsEnv.setParallelism(1); // 動態加載 String funJarPath = "UDF jar 在 OSS 中所在的 URL 路徑"; loadJar(new URL(funJarPath)); Field configurationField = StreamExecutionEnvironment.class.getDeclaredField("configuration"); configurationField.setAccessible(true); Configuration o = (Configuration)configurationField.get(bsEnv); Field confData = Configuration.class.getDeclaredField("confData"); confData.setAccessible(true); Map temp = (Map)confData.get(o); List jarList = new ArrayList<>(); jarList.add(funJarPath); temp.put("pipeline.classpaths",jarList); bsTableEnv.executeSql("CREATE FUNCTION returnSelf AS 'flinksql.function.udf.ReturnSelf'"); bsTableEnv.executeSql("CREATE TABLE sourceTable (" + " f_sequence INT," + " f_random INT," + " f_random_str STRING," + " ts AS localtimestamp," + " WATERMARK FOR ts AS ts" + " ) WITH (" + " 'connector' = 'datagen'," + " 'rows-per-second'='5'," + " 'fields.f_sequence.kind'='sequence'," + " 'fields.f_sequence.start'='1'," + " 'fields.f_sequence.end'='1000'," + " 'fields.f_random.min'='1'," + " 'fields.f_random.max'='1000'," + " 'fields.f_random_str.length'='10'" + ")"); bsTableEnv.executeSql("CREATE TABLE sinktable (" + " f_random_str STRING" + ") WITH (" + " 'connector' = 'print'" + ")"); bsTableEnv.executeSql("insert into sinktable select returnSelf(f_random_str) from sourceTable"); } //動態加載Jar public static void loadJar(URL jarUrl) { //從URLClassLoader類加載器中獲取類的addURL方法 Method method = null; try { method = URLClassLoader.class.getDeclaredMethod("addURL", URL.class); } catch (NoSuchMethodException | SecurityException e1) { e1.printStackTrace(); } // 獲取方法的訪問權限 boolean accessible = method.isAccessible(); try { //修改訪問權限為可寫 if (accessible == false) { method.setAccessible(true); } // 獲取系統類加載器 URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader(); //jar路徑加入到系統url路徑里 method.invoke(classLoader, jarUrl); } catch (Exception e) { e.printStackTrace(); } finally { method.setAccessible(accessible); } }最后
以上就是在 Flink on K8S 集群 Session 模式下, FlinkSQL 動態加載 Jar 的解決方案。由于 REST API 沒有提供 -C 效果,自定義 Jar URL 沒有分發到 TaskMananger,導致 TaskMananger 沒有進行類加載到其 JVM 中。通過在 Job 的 main 方法中增加動態加載方法及配置 pipeline.classpaths,可以達到不改動 Flink 源碼的情況下實現 -C 效果。以上方案剛實現不久,還不保證是否有其他未知的問題,如果有更好的解決方案或者該方案中存在錯誤或者疏漏也歡迎提出共同討論。
感謝您的閱讀,如果喜歡本文歡迎關注和轉發,本頭條號將堅持持續分享IT技術知識。對于文章內容有其他想法或意見建議等,歡迎提出共同討論共同進步。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的flink sql udf jar包_FlinkSQL 动态加载 UDF 实现思路的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: python相关参考文献_python机
- 下一篇: 贝叶斯分类器的matlab实现_贝叶斯实
