flink sql设置并行度_Flink集成Hivestream模式用例
01
背景
基于前面的文章
Flink集成hive?bath模式用例
knowfarhhy,公眾號:大數據摘文Flink 集成Hive,我們繼續介紹stream模式下的用例。
02
流模式讀取Hive
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();????????TableEnvironment?tableEnv?=?TableEnvironment.create(bsSettings); //增加hive支持 String name = "myhive"; String defaultDatabase = "dim";????????String?version?????????=?"1.2.1";? String hivecondir = System.getenv("HIVE_CONF_DIR");????????HiveCatalog?hive?=?new?HiveCatalog(name,?defaultDatabase,?hivecondir,?version); log.info("注冊catalog"); tableEnv.registerCatalog(name, hive); log.info("注冊catalog完成"); log.info("使用catalog"); tableEnv.useCatalog(name); log.info("注冊database");????????tableEnv.useDatabase(defaultDatabase);????????????????tableEnv.sqlQuery("SELECT?name,age,score,dt?FROM?myhive.dim.dim_flink_test").printSchema(); String[] fields = new String[4]; fields[0] = "name"; fields[1] = "age"; fields[2] = "score";????????fields[3]?=?"dt"; TypeInformation[] fieldType = new TypeInformation[4]; fieldType[0] = Types.STRING; fieldType[1] = Types.INT; fieldType[2] = Types.LONG;????????fieldType[3]?=?Types.STRING; PrintTableUpsertSink sink = new PrintTableUpsertSink(fields,fieldType,true); tableEnv.registerTableSink("inserttable",sink); tableEnv.sqlUpdate("INSERT INTO inserttable SELECT name,age,score,dt FROM myhive.dim.dim_flink_test");???????????????tableEnv.execute("stream_read_hive");03
運行拓撲
上圖展示了第二節中的測試用例任務的拓撲圖,我們會發現在流任務中出現了Finish這樣的最終狀態,而不是一個Running狀態,這個主要是目前1.10版本支持Hive的功能沒有那么完善,無法真正的實時讀取Hive數據,以及無法檢測Hive數據發生改變情況,只會在任務運行時候讀取一次表數據,然后Hive相關的算子任務便會結束。如果想要更好的使用Hive,建議大家還是用Flink 1.11之后,功能更加強大完善。
為了方便看流任務,有Finished狀態,提供另外一個流任務的拓撲圖,便于看到區別:
具體的流方式讀取Hive,即Hive Streaming,在Flink 1.11進行了相關的支持,這里提供幾篇參考文章:
相關Hive Streaming文章
Flink?1.11?新特性之?SQL?Hive?Streaming?簡單示例
LittleMagic,公眾號:Flink 中文社區Flink 1.11 新特性之 SQL Hive Streaming 簡單示例Flink?x?Zeppelin?,Hive?Streaming?實戰解析
狄杰@蘑菇街,公眾號:Flink 中文社區Flink x Zeppelin ,Hive Streaming 實戰解析Flink?SQL?FileSystem?Connector?分區提交與自定義小文件合并策略?
LittleMagic,公眾號:Flink 中文社區Flink SQL FileSystem Connector 分區提交與自定義小文件合并策略 ?Flink?1.11?SQL?使用攻略
李勁松,公眾號:Flink 中文社區Flink 1.11 SQL 使用攻略04
注意事項
任務運行環境:
(1)設置Job默認并行度為2
(2)基于K8s運行,申請了一個JobManager 以及一個 TaskManager
(3)TaskManager設置了8個Slot
上面的拓撲中,我們可以看到第一個算子的并行度是8,第二個算子是2,任務正常執行,是因為增加了其他設置才使得任務正常運行。
但是你可能會遇到下面情況:
第一個算子并行度是10,第二個算子并行度是2,因為集群只有8個Slot可用,就會導致資源不夠,任務一直處于created狀態,最終超時失敗。
問題分析:
下面展示了設置HiveTableSource的并行度:
???? int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) { int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); if (max < 1) { throw new IllegalConfigurationException( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); } int splitNum; try { long nano1 = System.nanoTime(); splitNum = inputFormat.createInputSplits(0).length; long nano2 = System.nanoTime(); LOG.info( "Hive source({}}) createInputSplits use time: {} ms", tablePath, (nano2 - nano1) / 1_000_000); } catch (IOException e) { throw new FlinkHiveException(e); } parallelism = Math.min(splitNum, max); } parallelism = limit > 0 ? Math.min(parallelism, (int) limit / 1000) : parallelism; parallelism = Math.max(1, parallelism); source.setParallelism(parallelism);涉及的相關參數:
public class HiveOptions { public static final ConfigOption TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER = key("table.exec.hive.fallback-mapred-reader") .defaultValue(false) .withDescription( "If it is false, using flink native vectorized reader to read orc files; " + "If it is true, using hadoop mapred record reader to read orc files."); public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM = key("table.exec.hive.infer-source-parallelism") .defaultValue(true) .withDescription( "If is false, parallelism of source are set by config.\n" + "If is true, source parallelism is inferred according to splits number.\n"); public static final ConfigOption TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX = key("table.exec.hive.infer-source-parallelism.max") .defaultValue(1000) .withDescription("Sets max infer parallelism for source operator.");}table.exec.hive.infer-source-parallelism - true : 并行度通過推導得到,依賴splits 數量 - false : 通過config獲得并行度table.exec.resource.default-parallelism 設置所有Operator(例如join agg filter等)的默認并行度table.exec.hive.infer-source-parallelism.max 設置HiveTableSource的最大并行度,默認值1000(1)首先從config中獲取所有算子的默認并行度
int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);(2)如果沒有開啟并行度自動推導,那么使用這個默認并行度
(3)如果開啟了并行度推導,會根據計算的split數量與設置的最大并行度取最小值:
parallelism = Math.min(splitNum, max);splitNum大小為下面方法返回數組的長度 public HiveTableInputSplit[] createInputSplits(int minNumSplits) throws IOException { List hiveSplits = new ArrayList<>(); int splitNum = 0; for (HiveTablePartition partition : partitions) { StorageDescriptor sd = partition.getStorageDescriptor(); InputFormat format; try { format = (InputFormat) Class.forName(sd.getInputFormat(), true, Thread.currentThread().getContextClassLoader()).newInstance(); } catch (Exception e) { throw new FlinkHiveException("Unable to instantiate the hadoop input format", e); } ReflectionUtils.setConf(format, jobConf); jobConf.set(INPUT_DIR, sd.getLocation()); //TODO: we should consider how to calculate the splits according to minNumSplits in the future. org.apache.hadoop.mapred.InputSplit[] splitArray = format.getSplits(jobConf, minNumSplits); for (org.apache.hadoop.mapred.InputSplit inputSplit : splitArray) { hiveSplits.add(new HiveTableInputSplit(splitNum++, inputSplit, jobConf, partition)); } } return hiveSplits.toArray(new HiveTableInputSplit[0]); }通過上面的參數配置 ,我們可以合理的控制HiveTableSource的并行度,不至于超過集群的資源配置,無法啟動任務。
?!關注不迷路~ 各種福利、資源定期分享!
總結
以上是生活随笔為你收集整理的flink sql设置并行度_Flink集成Hivestream模式用例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java怎么配置哨兵模式_redis 哨
- 下一篇: html5 烟雾,jQuery烟雾背景发