FlinkX脏值处理
FlinkX臟值處理
在大量數據的傳輸過程中,必定會由于各種原因導致很多數據傳輸報錯(比如類型轉換錯誤),這種數據DataX認為就是臟數據。
? – by DataX
配置實例
"dirty": {"path": "/tmp","hadoopConfig": {"fs.default.name": "hdfs://flinkhadoop:8020","dfs.nameservices": "ns1","dfs.ha.namenodes.ns1": "flinkhadoop","dfs.namenode.rpc-address.ns1.nn1": "hdfs://flinkhadoop:8020","dfs.ha.automatic-failover.enabled": "true","dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","fs.hdfs.impl.disable.cache": "true"}}我這里的hdfs是一個單機。
實現邏輯
FlinkX的臟值處理邏輯是放在寫入數據過程中的。按照臟值的定義,在讀取過程中,能讀取進來的都是正常的值,但是在寫入過程中,以目標源的標準,可能讀取的值是存在瑕疵的,所以是臟值。
在臟值處理的過程中,臟值處理器的作用肯定是首當其沖,我們先看一下DirtyManager的定義和初始化過程:
DirtyDataManager
全局視角:
創建
public DirtyDataManager(String path, Map<String, Object> configMap, String[] fieldNames, String jobId) {this.fieldNames = fieldNames;location = path + "/" + UUID.randomUUID() + ".txt";this.config = configMap;this.jobId = jobId;}初始化
public void open() {try {FileSystem fs = FileSystemUtil.getFileSystem(config, null);Path path = new Path(location);stream = fs.create(path, true);} catch (Exception e) {throw new RuntimeException("Open dirty manager error", e);}}// -------------------- FileSystem ---------------------------public static FileSystem getFileSystem(Map<String, Object> hadoopConfigMap, String defaultFs) throws Exception {if(isOpenKerberos(hadoopConfigMap)){return getFsWithKerberos(hadoopConfigMap, defaultFs);}Configuration conf = getConfiguration(hadoopConfigMap, defaultFs);setHadoopUserName(conf);return FileSystem.get(getConfiguration(hadoopConfigMap, defaultFs));}flinkX的臟值是存放在hadoop上面的。
臟值寫入
public String writeData(Row row, WriteRecordException ex) {String content = RowUtil.rowToJson(row, fieldNames);String errorType = retrieveCategory(ex);String line = StringUtils.join(new String[]{content,errorType, gson.toJson(ex.toString()), DateUtil.timestampToString(new Date()) }, FIELD_DELIMITER);try {// stream.write(line.getBytes(StandardCharsets.UTF_8));stream.write(LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));DFSOutputStream dfsOutputStream = (DFSOutputStream) stream.getWrappedStream();dfsOutputStream.hsync(syncFlags);return errorType;} catch (IOException e) {throw new RuntimeException(e);}}private String retrieveCategory(WriteRecordException ex) {Throwable cause = ex.getCause();if(cause instanceof NullPointerException) {return ERR_NULL_POINTER;}for(String keyword : PRIMARY_CONFLICT_KEYWORDS) {if(cause.toString().toLowerCase().contains(keyword)) {return ERR_PRIMARY_CONFLICT;}}return ERR_FORMAT_TRANSFORM;}hsync的語義是:client端所有的數據都發送到副本的每個datanode上,并且datanode上的每個副本都完成了posix中fsync的調用,也就是說操作系統已經把數據刷到磁盤上(當然磁盤也可能緩沖數據);需要注意的是當調用fsync時只有當前的block會刷到磁盤中,要想每個block都刷到磁盤,必須在創建流時傳入Sync標示。
UPDATE_LENGTH: 同步到DataNodes時,還更新NameNode中的元數據(塊長度)。
臟值寫入時機
在寫入每一行數據writeSingleRecord的時候,進行臟值的捕獲
protected void writeSingleRecord(Row row) {if(errorLimiter != null) {errorLimiter.acquire();}try {writeSingleRecordInternal(row);if(!restoreConfig.isRestore() || isStreamButNoWriteCheckpoint()){numWriteCounter.add(1);snapshotWriteCounter.add(1);}} catch(WriteRecordException e) {// 寫入錯誤限流器saveErrorData(row, e);// 更新指標以及持久化存儲臟值updateStatisticsOfDirtyData(row, e);// 總記錄數加1numWriteCounter.add(1);snapshotWriteCounter.add(1);if(dirtyDataManager == null && errCounter.getLocalValue() % LOG_PRINT_INTERNAL == 0){LOG.error(e.getMessage());}if(DtLogger.isEnableTrace()){LOG.trace("write error row, row = {}, e = {}", row.toString(), ExceptionUtil.getErrorMessage(e));}}} private void updateStatisticsOfDirtyData(Row row, WriteRecordException e){if(dirtyDataManager != null) {String errorType = dirtyDataManager.writeData(row, e);if (ERR_NULL_POINTER.equals(errorType)){nullErrCounter.add(1);} else if(ERR_FORMAT_TRANSFORM.equals(errorType)){conversionErrCounter.add(1);} else if(ERR_PRIMARY_CONFLICT.equals(errorType)){duplicateErrCounter.add(1);} else {otherErrCounter.add(1);}}}這代碼邏輯確實和我的邏輯稍微有些區別,為什么會在這里進行存儲。。。。
應該邏輯應該分離的。將dirtyDataManager.writeData(row, e)放在上一個saveErrorData方法中可能更合適。
參考 https://github.com/DTStack/flinkx/issues/220
臟值實例測試
臟值文件實例
根據dirty配置,初始化hadoop的連接,并創建對應文件,如我們這里配置的path是:/tmp/flinkx/bond_info_mongodb_to_mysql,如我們配置的是4個處理器。在對應的hdfs上面,有四個文件:
感覺官方需要對這個作業存儲位置進行一些處理:
臟值模擬
我們模擬將mysql對應的表的string=>bigint,這樣肯定會在轉換中發生錯誤。
{"bond_name":"xxxx","bond_stop_time":"xxx","bond_time_limit":"xx","bond_type":"xxx","plan_issued_quantity":"xx","publish_expire_time":"xxx","publish_time":"xx","publisher_name":"xxx","real_issued_quantity":"14","start_cal_interest_time":"xx","inst_code":"x":"x","city_code":"x","area_code":"x","input_date":x,"input_time":x}conversion"com.dtstack.flinkx.exception.WriteRecordException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1\njava.sql.SQLException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1"2020-05-24 17:33:15可以看出來是類型轉換錯誤,它會把錯誤數據和錯誤原因都進行存儲,并且根據u0001進行分割。
總結
本文對臟值的定義,以及FlinkX的處理進行詳細的分解,并進行了相關的測試,與實例展示。從本文中可以了解到hdfs的hsync與hdfs的基本配置。
總結
以上是生活随笔為你收集整理的FlinkX脏值处理的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 魔力魔力哟接口文档
 - 下一篇: 架构师的 36 项修炼第02讲:架构核心