全生命周期大数据处理系列
全生命周期大數據處理系列
任何一件復雜的事物,簡化它的方法就是分而治之,只是這個分法,萬變不離其宗,可能會因人因事而大同小異而已.我在車聯網大數據的處理實踐中不斷沉淀,在公司產品的迭代升級中逐步升華,深有感觸略有所得,分享出來拋磚引玉,希望能引起共鳴,共同進步.
我把數據的生命周期分為如下幾個階段,而這么劃分的標準是什么呢?我認為是"價值",數據的不斷處理,是價值的不斷提煉,其目的是為了獲取其潛在的價值,這個價值的體現可能是引導了決策走向,展現了數據全貌,給出了時間趨勢,挖掘出了不易發現的規律,預測了可能發生的事件等等.任何計算框架,無論什么工具產品,抑或是數據處理的策略和方法論,都是為了這個目標而產生的.
這個系列的講解也是為了達成這一目標,怎么從最初產生的大數據,應用一系列的方法和工具,最后萃取出有價值的高純度的小數據,我們可以稱這個過程為數據掘金,也可以說是數據挖掘.
第一階段: 數據采集
數據采集是由"米"到"炊"的最早階段,這個階段一般來說技術難度不大,但是有一定的規劃難度.
通常也稱這個階段為數據埋點,"埋點"是數據采集階段常用的術語,"埋"有設置和隱藏的含義.
以新能源電動汽車為例,司機在車輛使用甚至是停車的過程當中,有大量的數據產生,行駛過程中電池在放電,就會有電壓,電流,電池單體溫度,電阻,故障信號,車速,加速度等等,充電過程中電池在充電,就會有充電樁位置經緯度,充電電流,時長,室外溫度,是否過充信號等等,而且這些信號在以一定的頻率產生,短則毫秒級,長則秒級.這些信號數量數以千級萬級,很多信號還是二維數組甚至多維數組,同時采集頻率也很高,因為加速度的計算故障的分析尤其是碰撞事故發生時的診斷,對數據密度有較高的要求.這樣的數據事實要求我們,數據埋點要規劃好:
我們要采集哪些信號? 過多會產生無效傳輸和存儲,過少不能滿足分析需要.
采集頻率應該多大? 這個采集頻率是指車端的數據發送到服務器端的頻率,而不是車端的采集頻率,在車輛端采集頻率可以很高,信號基本是全部采集,但是往往只存儲一段時間,過后即會被清除.把這些數據發送到服務器端的頻率,除了數據分析的最低要求外,還要考慮到存儲和傳輸成本,所以往往存在兩個采集頻率,比如平時以1秒的頻率傳輸,在有信號故障發生時則以50毫秒的頻率傳輸.
采用什么協議? 協議即約定好的數據格式和標準,以便在發送端和接收端能做出一致的解釋.在新能源領域,有國家規定的標準即國標,它定義了必須采集和傳輸的數據信號及頻率,但其定義往往比較寬泛和基礎,不同的車企還有各自的數據分析需求,便產生了眾多的企業標準即企標.定義這個協議關鍵是定義數據結構,比如從哪個字節開始是溫度字段,是單值還是數組,數組的每個溫度值,應該采用幾個字節,值域范圍是多大,采用什么字節順序,物理單位是什么,信號字段往往只是一個邏輯值怎么才能節約存儲空間等等.
第一目標: 數據采集
車輛端采集的數據經網絡傳輸到服務器端,一般以分布式的消息隊列的存儲形式接收比如Kafka,用它的主要好處是可以彈性擴展,即可以隨時增減存儲節點資源以應對變化的采集數據體積和速度,不至于數據堆積也不至于存儲浪費,但是Kafka的數據往往不能直接用于計算,雖然它本身也是支持流數據計算的,通常的做法是把它當作緩沖的存儲,在我們的實際應用中,它的數據有3個應用方向,一方面用于實時的故障診斷計算,另一方面把軌跡及信號數據轉儲于HBase用于數據回溯,還有一方面把數據存為離線數據用于數據倉庫指標體系建設.
接收到的數據是二進制非結構化的,是不能用于常規的數據分析的,這個時候就要涉及到數據解析了,底層的數據解析是字節級別的操作,面對那么多的信號,各種數據格式,讓很多數據開發人員望而卻步,調試難易出錯,而且解析代碼在不同企標之間往往不能移植,即一次性代碼,這大大違背了程序的可重用的本質.所以相應的工具產品也就應運而生了,這就是Mouth.你可以通過文檔來深入了解它.
第二目標: 數據解析
怎么用 Mouth 來簡單地把二進制數據解析成結構化數據呢?可以說一個XML文件足矣,你從如下的文件結構中一眼即可看出端倪,它用簡單的幾個指令比如string_item,byte_item描述了二進制數據中的結構,有了這個描述文件,Mouth就知道該怎么來解析二進制數據了.
<!-- file: parser_vehicle.xml --> <parser xmlns="https://www.rocy-data.com/parser_ex"><string_item name="tsp_start" byte_count="2" fixed_value="##"/><!--first read string of length=2,and its name=tsp_start --><byte_item name="tsp_command"><!-- read a byte value to `tsp_command` --><value_map value="1" name="register"/> <!-- for value 1 of `tsp_command`,convert it to `register` --><value_map value="2" name="real info up"/></byte_item><byte_item name="tsp_ack"><value_map value="1" name="success"/><value_map value="2" name="error"/></byte_item><string_item name="vin" byte_count="17"/><dword_item name="flow_number"/><!-- read a long number and name it `flow_number` --><byte_item name="tsp_encrypt" is_temp="true"/><dword_item name="data.tsp_got_time"/><!-- read a long number and name it in dot struct, --><byte_item name="data.info_type"/><!-- make same group with above data because they have same prefix --><byte_item name="data.battery_number"/><!-- make same group with above data because they have same prefix --><switch_item switch="data.info_type"> <!-- generate different data structure based on the value of `data.info_type`,`switch` is expression returning boolean that con contains expression such as `data.info_type + 2 > data.battery_number` --><case value="2"><word_item name="dbt_probe_count"/><byte_item name="db_package_count"/><byte_item name="dbt_temperature_array_size"/><set_variable name="index" value="0"/> <!-- set variable used to control loop operation --><while_item condition="index < dbt_temperature_array_size"><byte_item name="dbt_temperature_array[index].db_package_high_temp"/> <!-- the `[]` symbol used to construct array --><byte_item name="dbt_temperature_array[index].db_package_low_temp"/><byte_item name="dbt_temperature_array[index].db_package_temperature_probe_count"/><array type="byte" length="dbt_probe_count" name="dbt_temperature_array[index].db_probe_temperatures"/> <!--for basic array type,its type and length is required--><set_variable name="index" value="index + 1"/><!-- set variable used to control loop operation --></while_item></case></switch_item> </parser>細心的讀者可能已經發現了,它還有while_item,switch_item,set_variable這些指令來幫助我們描述更為復雜的靈活的數據結構,而且不管是怎樣的企標數據,咱們用的都是這些指令,這便達到了可重用可移植的目的。
只需要一行代碼就可以根據上述XML解析文件將接收到的字節數組轉換為JSON數據,在這個步驟把數據結構化之后,就可以執行其他業務邏輯了。
val bytes = ...// get bytesval json = ParserTemplate.parseAsJson(bytes.map(_.toUnsigned), "parser_vehicle.xml") // above bytes and file輸出的JSON,大概是如下示例這個樣子的,其實不只如此,即便更為復雜的動態結構,也是能夠處理的,除了switch_item+case,if_item也可以根據其他信號的值構成的評估表達式來決定如何解析,這使得數據結構可以是動態的,即結構并不固定,這個能力在其他類似的產品中也是很難實現的:
{"flow_number": 123,"data": {"info_type": 2,"tsp_got_time": 123456789,"battery_number": 2},"tsp_start": "##","tsp_ack": "success","dbt_temperature_array": [{"db_package_high_temp": 38,"db_probe_temperatures": [ 31.0, 32.0, 33.0, 38.0, 31.0, 32.0, 33.0, 38.0 ],"db_package_low_temp": 31,"db_package_temperature_probe_count": 8},{"db_package_low_temp": 31,"db_probe_temperatures": [ 31.0, 32.0, 33.0, 38.0, 31.0, 32.0, 33.0, 37.0 ],"db_package_temperature_probe_count": 8,"db_package_high_temp": 37}],"dbt_temperature_array_size": 2,"tsp_command": "real info up","dbt_probe_count": 8,"vin": "ROCY0123456789123","db_package_count": 2 }第三目標: 數據回溯
數據回溯的常見需求是查詢某個ID(車聯網領域即是VIN)在某個時間段的數據,數據查詢往往要求實時響應,HBase正好能滿足這個需求,其RowKey可以這樣設計:
那么具體的數據怎么存儲呢?技術難度不大,但是通常的做法很麻煩,麻煩在于數據信號的數量是龐大的,數以千計,而且數據類型各種各樣,存儲的代碼如果對信號逐一處理是枯燥無味的,Pulse把這個過程簡化了:
如此而已,再不需要其他了,數據已經具備,如果需要圖表展示,前端頁面所需要做的是:篩選車輛,指定時間范圍,篩選關心的信號量,展示趨勢圖或是對比圖即可.
第二階段: 質量監控
前一階段使數據結構化,可理解,可分析,可計算,而數據質量是一切計算的基礎和保障,數據質量可從整體上反映了數據的質量情況,這一過程應該盡早完成,及時避免錯誤數據的向下傳播.在這個過程中,我們希望得到的答案或者說目標是:
第一目標 缺失情況
第二目標 數值錯誤
第三目標 頻度分布
第四目標 質量告警
仍然是數據字段很龐大的原因,使得我們按照常規的方法做這些質量檢查根本不可行.可以把這一過程簡化的根據是:數據類型是有限的,相同類型的數據其檢查方法是類似的,比如數值型做常規數理統計,枚舉型做頻度統計等等,Health正是這樣做的.
它的輸出是類似這樣的結果:
{"row_count":4,"null_stat":[{ "col1":1, "col2":1 }],"number_stat":[{ "field":"col1", "agg":"min", "agg_value":1.0 },{ "field":"col1", "agg":"max", "agg_value":3.0 },{ "field":"col1", "agg":"mean", "agg_value":2.0 },{ "field":"col1", "agg":"avg", "agg_value":2.0 },{ "field":"col1", "agg":"stddev", "agg_value":1.0 },{ "field":"col1", "agg":"skewness", "agg_value":0.0 },{ "field":"col1", "agg":"kurtosis", "agg_value":-1.5 },{ "field":"col1", "agg":"countDistinct", "agg_value":3.0 }],"arr_stat":[{ "arrName":"col3", "statType":"mean", "value":2.5 },{ "arrName":"col3", "statType":"avg", "value":2.5 },{ "arrName":"col3", "statType":"kurtosis", "value":-1.2000000000000006 },{ "arrName":"col3", "statType":"min", "value":1.0 },{ "arrName":"col3", "statType":"countDistinct", "value":3.0 },{ "arrName":"col3", "statType":"stddev", "value":1.2907110929399985 },{ "arrName":"col3", "statType":"max", "value":4.0 },{ "arrName":"col3", "statType":"skewness", "value":0.0 }],"enum_stat":[{ "name":"col2", "count":3, "freq":{ "b":1, "a":2, "NULL":1 } }] }輸出這樣詳細的結果,只需要我們做一個簡單的配置,那就是要做數據質量檢查的文件路徑.字段的類型是自動推斷的.
當然這只是一個最基本的需求,所以它必然允許我們個性化定制,通過配置文件的詳細配置,我們還可以:
如果有一定的Coding基礎,通過插件擴展,我們還可以:
第三階段: 實時診斷
上述的質量監控,是從整體上把握數據,這一階段的目標是要掌握數據的極端情況,極端數據并沒有數據質量問題,但從業務邏輯上卻屬于異常情況,是需要給予特別關注的.從診斷方法方面分類,可分為如下兩類目標.
第一目標 單記錄診斷
以數據實例來說,在如下的記錄中voltage_list是電池單體電壓數據,其數據類型是數組,元素個數代表單體個數,過高的均值或者方差都是不正常的,對應的現象可能是電池充電故障或者單體故障.
{"voltage_list":[3.2,3.4,3.41],"tsp_got_time":1674736665000,"vin":"A0","other":1} {"voltage_list":[4,3.6,3.7],"tsp_got_time":1674736675000,"vin":"A1","other":2}Pulse 處理這類異常,無論是單記錄還是多記錄都能很好地處理.我們要做的就是指定異常規則,其形式如下所示:
expression=array_min(voltage_list)>3.5,eng_name=test,chinese_name=電壓過高它是用來指定判定異常的邏輯表達式,在這里可以使用統計函數,也可以使用數組函數,不限于單個字段,也可以指定任意多個字段的復雜表達式
上述示例指定的rule,可以指定多個,它們之間是或的關系,即滿足任意一條rule,即把這條數據記錄視為異常.
在 Pulse 的實現中,數據讀自 Kafka,發現的異常數據存入Kafka. 以方便其他應用端能夠方便的訂閱這些異常數據.
第二目標 多記錄診斷
上述單記錄的實時監控,其能力還是有限的,車聯網或者其他物聯網的數據,往往都是實時數據,且數據的前后是存在關聯的,比如車輛的碰撞事故是跟速度及加速度存在很大關系的,而這兩個物理量的計算是要根據前后多條記錄才能計算出來.更復雜一點的,車輛在某個時間點發生故障比如熄火,其實在這一時間點之前的一段時間內比如1分鐘內或者1小時內已經有很多信號反映出異常,這在理論上是存在提前預測甚至避免故障的可能的.
下面以數據案例來舉例說明:
{"vin":"vin1","tsp_got_time":1674736665000,"esd_volt":48} {"vin":"vin1","tsp_got_time":1674736675000,"esd_volt":50} {"vin":"vin1","tsp_got_time":1674736685000,"esd_volt":55} {"vin":"vin1","tsp_got_time":1674736695000,"esd_volt":60} {"vin":"vin1","tsp_got_time":1674736705000,"esd_volt":58} {"vin":"vin1","tsp_got_time":1674736715000,"esd_volt":50} {"vin":"vin1","tsp_got_time":1674736725000,"esd_volt":45} {"vin":"vin2","tsp_got_time":1674736665000,"esd_volt":30} {"vin":"vin2","tsp_got_time":1674736675000,"esd_volt":50} {"vin":"vin2","tsp_got_time":1674736685000,"esd_volt":55} {"vin":"vin2","tsp_got_time":1674736695000,"esd_volt":60} {"vin":"vin2","tsp_got_time":1674736705000,"esd_volt":62} {"vin":"vin2","tsp_got_time":1674736715000,"esd_volt":48} {"vin":"vin3","tsp_got_time":1674736725000,"esd_volt":45}數據說明:
某個車輛的某個信號量或者是多個信號經過計算得到的信號量,發生并持續一段時間,是一個常見的監控需求,僅此一項就能發現很多異常,我們可以通過指定rule,來讓Pulse 幫我們把相應的數據找出來,這個rule可以這樣指定:
[{"partition_by": "vin","expression": "esd_volt>50","eng_name": "test","chinese_name": "test","duration_seconds": 90,"names_output": ["tsp_got_time","esd_volt"],"agg_outputs": [{"expression": "avg(esd_volt)","eng_name": "avg_esd_volt","chinese_name": "平均電壓"}] }]數據說明:
有了這樣的rule提交給Pulse,它會給我們輸出什么呢?
{"vin":"vin2","start_tsp_got_time":1674736685000,"start_esd_volt":55.0,"end_tsp_got_time":1674736705000,"end_esd_volt":62.0,"avg_esd_volt":59.0,"tag":"test"} {"vin":"vin1","start_tsp_got_time":1674736685000,"start_esd_volt":55.0,"end_tsp_got_time":1674736705000,"end_esd_volt":58.0,"avg_esd_volt":57.67,"tag":"test"}數據說明:
第三目標: 電子圍欄
上述的兩個實時診斷目標是從數據記錄方面劃分的,在實際的實時應用中,還有一類問題,對指定目標的在指定地理區域的監控,專業術語稱為電子圍欄.比如4S店對試駕車的監控,駕校對教練車的監控等等,都屬于這一類問題,電子圍欄的常見需求是:
由于這類問題比較普遍,需求也大致如此,所以它完全可以模塊化產品化,Pulse 已經能夠滿足這些需求.
第四階段: 數據倉庫
數據倉庫是大數據處理過程中非常重要的步驟,它的最重要的產出是指標體系,可用于展現為可視化易理解的圖表,也可用于挖掘模型產出更為復雜更為抽象的數據價值,這個階段的顯著特點是:
為滿足各種各樣的業務需求,往往要編寫數以百計千計的SQL任務,Hive任務或者是Spark任務,每個任務會產生幾個幾十個指標,根據指標之間的計算依賴關系或者業務邏輯關系,會產生指標層級和分類體系.維護這些指標是個很不小的工程.
不同類型的數據存儲有不同的應用場景,所以不可避免地要面對多種數據源,從關系型數據庫到文件型存儲HDFS再到文檔型數據庫MongoDB,從內存型數據庫Redis到鍵值型數據庫Hbase,從行式存儲到列式存儲,形式多樣,數據開發人員面臨的問題就是做好它們之間的連接,數據轉換和關聯計算.
當某個指標計算有誤,或者需要弄清楚這個指標的計算邏輯時,這個難度是很大的,文檔的維護不僅困難而且作用也是有限的,因為指標的計算深度可能很大,可能涉及多個數據開發人員,可能涉及多個數據表.
數據倉庫有離線數倉和實時數倉,前者一般采用Hive QL和Spark計算框架,后者一般采用Flink+Kafka,同時可能結合其他工具產品Impata,Kudu等等.
Bloods 正是要解決當前數據倉庫建設當中的種種難題,它很特別的亮點有兩個,1是零代碼,2是數據血緣. 前者帶來的好處是降低了數據倉庫建設難度,后者帶來的好處是無論數倉多么龐大,你仍然可以容易的掌握它的全貌它的細節,它的脈絡它的枝葉.
數據倉庫是個龐大的系統工程,但是無論什么樣的業務場景,其建設過程無非如下這么幾個關鍵步驟.
第一目標:數據標準化
什么是數據標準化呢,簡單地說就是要有一致的語義,一個字段的含義要有明確的定義,無論是名稱,值域,類型,計算邏輯,即便它們在不同的存儲介質中,在不同的參與人員看來,都應該是一致的,這在數據表較多,數據字段較多的數據環境中,更顯得尤為重要.
這個步驟也應該盡量在數據參與計算之前完成.Bloods 提供了一些簡單實用的手段.
如下的配置片段中,name_standardize 告知 Bloods,應該把名為product_id的字段更名為標準化的product_number,而product_number在另外一個字典文件中有定義,字典中定義了所有標準化的名稱,你在輸入product_number的時候,編輯器會給你智能提示,就像輸入法的智能提示一下,如果輸入錯誤,會給出錯誤提示.
如此這般把成百上千的數據字段都定義完成,即完成了名稱的標準化.
在車聯網領域,車機端的信號至少以千計,Nerve 整理了上千個信號并做了標準化定義,這使得車聯網領域的信號標準化有了可以參考的標準線.
如下的配置片段中,value_standardize告知 Bloods,應該把名為amount字段的值做數值變換,把值域為[0,100]的數值線性變換到[0,1],如果值為40則會變換為0.4,這種情況很常見,比如車速在車機端的表示可能為[0,300000]用來表示最高車速為300公里/小時,之所以用300000來表示,是為了表示精度能達到0.001公里/小時,在實際計算時就需要把它變換為易于理解的[0,300]范圍.
如下的配置片段中,boolean_convert告知 Bloods,包含on_(其為正則表達式形式)的所有字段,如果其值是yes,那么把它轉換為邏輯值true
上述的配置描述是接近自然語言的,容易編寫容易理解,Bloods會把它們自動轉換成大數據的代碼,而不需要我們編寫一行代碼.
第二目標:數據治理
上面的數據標準化過程,其實咱們也可以說它是數據治理,跟這里要說的數據治理有所區別的是,標準化強調的是統一和規范,而這里強調的是治,把一些相對異常的記錄甚至是數組,不是簡單的丟棄,而是盡量治成可用的數據.
以實際的數據案例說明,更容易理解.
假設有這樣的數據:
v1,v21,12,13,14,150,16,17,18,19,010,1然后用這樣的配置
<transform_data_governance id="id_transform_drop_if_exception" transform_ref="id_data_governance_csv"><data_governance_list><drop_if_exception column="v1" k="1.5"/></data_governance_list> </transform_data_governance>數據說明:
column 指定要對列v1做治理
k 用來描述異常的程度,值越大異常程度越大
drop_if_exception 表示如果指定列的某些值異常程度夠大,那么相對應的行記錄會被去除. 在這個數據案例中,50,1將會被刪除.
仍然是上面的示例數據,采用這樣的配置
數據說明:
expression 如果表達式的值評估為true,那么該記錄被刪除.這里的表達式可以涉及多個字段,可以包含常用的數學函數,所以可以表達很豐富的邏輯.
drop_if_expression 使更豐富的邏輯表達變得容易.
假設有這樣的數據要處理:
數據說明:
它代表的是車輛在一段時間內的數據,test是物理信號,可能是車速,電流,電壓什么的.
test在time=50的時刻出現異常
由于數據的連續性,time=50時刻的數據,其實是可以推斷出來的,而不適合丟棄處理.插值法替換處理可以這樣做:
<transform_data_governance id="id_transform_interpolation_governance" transform_ref="id_interpolation_governance_csv"><data_governance_list><interpolation_governance column="test" time_column="time" partition_by="vin" k="1.6"/></data_governance_list> </transform_data_governance>數據說明
time_column 指定時間列名, 是以毫秒為單位的記錄時間.
partition_by 指定分組列.不同的分組分別處理.
k 分位數異常檢測的參數,其值越大異常程度越大.
正如咱們預想的,Bloods 會把值為700的異常值替換為6,這是根據插值法得到的.
上面的數據格式是數組,數組項存在異常,可以采用這樣的配置處理:
<transform_data_governance id="id_transform_set_as_avg_if_exception_array" transform_ref="id_source_json_array"><data_governance_list><set_as_avg_if_exception_array column="v1,v2"/></data_governance_list> </transform_data_governance>數據說明:
column 指定要治理的列,可以同時指定多個,每1列應該是數值數組.
這個配置的處理結果,會把數據的異常項用數組的均值替換.
Bloods還提供了其他多種數據治理手段,這里就不再一一列舉了.
第三目標:數據血緣
按常理和時間先后看,先有指標體系后有數據血緣,這里先提出來,是為了對復雜的指標體系能有一個宏觀的理解,避免一下子陷入泥潭.另外,在構建指標體系之前,也確實要提前勾畫和設計一下數據血緣,但無論怎么設計,想提前勾畫清楚全貌基本是不可能的,這是因為很少有業務場景允許我們這么做,業務實踐往往是先完成一部分指標需求,再完成一批,逐步迭代完成的,所以指標體系也是逐漸豐富起來的.
在命令行Bloods 的控制臺中,在已經配置好指標體系的前提下,輸入如下命令可以輸出數據倉庫項目的整體血緣圖. 對一個比較大的數據倉庫項目,這個血緣圖可能會很大.
graph輸出的圖表大概是長這個樣子的:
從這樣的血緣圖表中我們得到很多信息:
- 哪些數據源或者中間表被引用的較少.
這可能在一定程度上說明,這些表沒有充分的利用,或者實際價值不大,甚至有必須刪除它或者把它整合到其他表中. - 哪些表被高頻引用.
和上述的情況相反,意味著它的價值很大,也同樣意味著對這個表的"讀"壓力很大,如果是只讀的文件還好,如果是數據庫表呢,就要考慮一下優化策略了,比如讀寫分隔,大表拆成小表,寬表窄化等等. - 哪些表單入單出.
這樣的表基本上起著臨時表的作用,如果它占用的存儲空間較大,就要考慮下是否有必要合并到上游表或者下游表中了. - 哪些表是孤立的.
這些表無入無出,如果是長期保持這樣,這樣的表基本上是沒有價值的,是該考慮騰出空間和相應的計算任務了. - 哪些表深度較大.
深度大意味著計算復雜度高,相應的指標計算步驟鏈條長,這樣越難于理解和維護,除非確實有必要,否則應該盡量減小深度,比如把多個計算步驟合并為一步,避免或減少中間表的存在等等. - 哪些表是葉子節點.
越是接近葉子節點,往往越接近于實際的業務需要,最能直接反映它的實際應用價值,如果對這個表的讀應用較少而相應的計算成本又很高,那這個節點就應該引起注意是否要優化掉了.
上述血緣圖是整體輪廓,當定位到具體的數據表,需要進一步分析時,我們就只關心某幾個節點的血緣圖了,在Bloods的控制臺,使用up 這個命令就可以得到某個數據表的上游血緣圖.
在這個輸出中,除了能得到整體血緣圖中也有的血緣關系外,它排除了無關血緣,而且給出了數據表的詳情信息,比如上下游的存儲周期,根據哪天的計算得到的血緣關系,數據的存儲位置是什么等等.
除了up,自然有down,相應的可得到數據表的下游血緣關系,根據這兩個命令就足以在血緣的脈絡中隨意暢游分析了.
整體血緣圖和上述的單表數據血緣,它們針對的是數據表,而不是指標,指標通常是數據表的某個字段,這個指標從何而來怎么計算而來?這在排查指標計算錯誤時,是極為有用的.up_index 這個指令正是做這個的.
假設我們有這樣的計算配置:
然后執行如下命令:
up_index id_sell_add輸出是這樣的:
{"amount_percent": {"exp": "amount_percent","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}}}}},"money_add": {"exp": "amount_add * price","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}},"price": {"exp": "price","bloods": {"id_sell_percent": ["price"]}}}}},"money": {"exp": "money","bloods": {"id_sell_percent": {"money": {"exp": "amount_percent * price","bloods": {"id_sell_percent": ["amount", "price"]}}}}},"amount_add": {"exp": "amount_percent + 10","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}}}}} }從輸出中可見:
1.bloods: 表明了指標血緣,從結果中可看出,它是嵌套定義的,所以這個血緣可能是任意深度.它還輸出了血緣上的字段,以及計算所用的表達式.
2.從哪里來的: 指標 index1 來自哪些表.
3.怎么來的: 指標index1 是通過哪個表達式來計算的.
4.來源路線: 如果指標 index1 是通過兩個指標 index2 和 index3得到的,那么 index2和 index3又來自哪里.
有了這些工具做為分析利器,想定位指標的計算錯誤,想理清指標的計算步驟,就會很清晰了,這比用文檔描述更為直接更為清楚,也更容易產出相應的文檔.
第四目標:指標體系
無論指標體系多么復雜,大致都是通過如下幾個步驟完成的,而其中定義的數據源,變換和Sink的步驟基本上對應著數據處理常說的ETL,而其中的T又尤其關鍵,多數的工作和時間都是消耗在這,它也決定著最終成果的規模和好壞.
創建項目
在 Bloods Console,輸入如下命令:
create -t dw <dw project path>項目結構即創建完成,其中的-t參數指定了4種數據倉庫的類型之1,dw是最常見的一種,其他幾種是涉及數據智能和車聯網的.在創建 之后,<dw project path>下文件的改變即會被Bloods監控,以用于生成一些元數據,而這些元數據可用于智能提示配置文件的編寫.最經常使用的是各種ID和文件路徑的引用的智能提示.
定義數據源
數據源是數據的開始,示例中只是給出文本格式,不過常見的數據源它都是支持的,比如JDBC,parquet,csv等等.
<dw xmlns="https://www.rocy-data.com/dw/v1.0.0"xsi:schemaLocation="https://www.rocy-data.com/dw/v1.0.0 ../.dw/dw.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"dw_market="market1" dw_subject="subject1" owner="whq" dw_level="dws"><path_list><profile name="test"><path name="name_hello_world_path" path="dw/data/hello_world" pattern="none" parent="PROJECT_PATH"/></profile></path_list><sources><source_whole_text_files id="id_whole_text_files_source" path="name_hello_world_path" sort_columns="true"/></sources> </dw>數據說明:
source_whole_text_files 用于讀取整個文件內容,類似于 source_csv
path 建議集中定義在某個文件,以便做統一管理.
智能提示 當定義完name_hello_world_path這個路徑后,在定義id_whole_text_files_source通過path來引用這個路徑時,智能提示會幫助完成
pattern 可以定義成yyyyMMdd這樣的格式,以應對按日期\月\季度\年的存儲
parent 用于指定父級路徑定義
定義變換
<transforms><transform_transform id="id_word_count_stat_transform" transform_ref="id_whole_text_files_source"><computed_column_group><computed_column name="file_content_splits" expression="split(file_content,'[,\r\n ]+')"/></computed_column_group></transform_transform> </transforms>數據說明:
computed_column 用于添加計算列, 其中的expression 可以是任意的 hive 表達式,name 是新列名稱。
transform_transform 是基本的數據變換,類似的節點還有 transform_join,transform_union 等等。更多的transform可參考這里
transform_ref 應被設定為輸入數據的id,在這里是引用1個上游,而transform_join可引用2個上游,它使得這些action之間上下關聯,形成樹型結構.
定義Sink
<sinks><sink_show id="id_word_count_stat_sink" transform_ref="id_word_count_stat_transform"/> </sinks>數據說明:
sink: 通常對應的是存儲行為,也可以是建模輸出,而在這個示例中,是打印輸出結果,以用于調試配置的正確性
執行: 在生產環境下,只有sink可被執行,但在本地Debug模式下,不僅是sink,source 和 transform 也都是可以執行的,用于檢查任意節點的輸出是否正確。
transform_ref: sink 的 transform_ref 應設定為 source 或者 transform 的 id
sink_show: 它是是用于調試目的的sink,類似的節點還有 sink_parquet,sink_csv…
調試執行
項目創建好之后,就已經包含了一些示例,可以直接輸入命令sinks就能列出所有的任務,其中的id_word_count_stat_sink任務讀取位于目錄dw/data/hello_world的數據然后做單詞統計word count and word file count,執行如下命令即可獲取輸出,其中參數t用于指定任務ID列表:
run -t id_word_count_stat_sink項目部署
上述步驟均是在本地執行的,在確保邏輯正確性之后,即可進行集群部署。下述命令將生成部署腳本用于在集群上執行任務。
deploy生成的腳本中,若不指定日期,默認以T-1執行,若不指定任務列表,將會執行所有的sinks
插件擴展
如上提到的source,transform,sink,雖然Bloods已經內置了很多常用的,但是難免會存在滿足不了需要的場景,這個時候就需要做一下插件擴展了.實現起來也是力求簡單的,基本上就是兩步,第1步編寫一個實現類并繼承相應的接口,第2步把這個實現類打包并放置在Bloods的指定的插件路徑下.
如果有一批的source,transform,sink需要擴展,比如是一個獨特的領域銀行,IOT或是科研領域,有自己獨到的需求時,也是比較容易擴展實現的,這個稱為流擴展.上面創建項目時指定的項目類型dw,其實就是內置的1種流擴展,其他的3種是di(用于數據智能數據挖掘算法),iov_dw(用于車聯網指標體系建設),iov_di(用于車聯網數據智能故障診斷)
批量處理
通常情況下,我們需要加載一個日期段的數據,比如這一周,這個月,這個季度,這一年等等.Range Date 能處理這個場景. 假設我們有如下數據.
- 周-天的情況
我們設置range_date 為 this_week,它將加載這一周的數據.
<path_list><profile name="test"><path name="csv_path_day" path="${PROJECT_PATH}/dw/data/ods/orders_day" pattern="day"/></profile> </path_list> <transforms><transform_transform id="id_transform_this_week" transform_ref="id_source_csv_path_day"range_date="this_week"><measures><computed_column name="days" expression="count(distinct PARTITIONS)"/><computed_column name="min_day" expression="min(PARTITIONS)"/><computed_column name="max_day" expression="max(PARTITIONS)"/></measures></transform_transform> </transforms> <sources><source_csv id="id_source_csv_path_day" path="csv_path_day" range_date="day_one"/> </sources> #運行如下命令來檢查 run -t id_transform_this_week -d 20220105 輸出如下,請注意 星期一(20220103)是一周的第一天. +----+-------------+-------------+ |days|min_day |max_day | +----+-------------+-------------+ |3 |date=20220103|date=20220105| +----+-------------+-------------+除此之外,還有月級\季度\半年度\年度方面的處理能力,而且不只在輸入日期上可以批量處理,在輸出方面也具有同樣的能力,這里有比較詳細的文檔說明.
調度能力
上面的指標體系配置,如果你希望以小時為單位來調度任務,那么需要你自己做一些工作比如用crontab來調度,但是如果是以天單位的任務調度,Bloods內置了一些調度能力,使調度工作更為靈活.
在框架內部,Bloods 分析 節點之間的 DAG(有向無環圖)關系,自動調度和執行任務. 值得一提的是,更多的配置是基于以天為單位的.
- 周期性調度
這是常見的情況,sink 周期包括:
每天執行.
每周的 SUNDAY執行
每月的最后一天執行.
每個季度的最后一天即 0331, 0630, 0930, 1231執行
半年的最后一天即 0630, 1231執行
每年的最后一天即 1231執行
- 自定義調度
但是我們仍然可以個性化定義任務的調度,這由sink_cycle_pattern 表達式來做到. 在表達式中,我們可以應用數學及邏輯表達式來返回一個布爾值.
如果這個表達式評估為 false 那么這個任務今天(run date)就不會執行.
很多變量是內置支持的:
運行日期的當前年份,如果 run date 是20221001,它的值將為2022
運行日期的當前月份,如果 run date 是20221001,它的值將為10
運行日期的當天,如果 run date 是20221001,它的值將為1
運行日期的周幾,如果 run date 是20221001,它的值將為6
運行月份的最后一天,如果 run date 是20221001,它的值將為31
第五階段: 數據智能
若馨數據的Bloods成功之處在于,她讓一眾數據開發者從繁雜的數據處理中解放出來,實現了從編碼到配置(零代碼)的轉變,開發效率極大提升,數據維護起來更顯清晰,而它的技術本質是:數據開發人員用接近自然語言的XML像搭積木一樣描述處理邏輯,而Bloods把這些描述自動轉化成Spark代碼. 這樣的思路不僅限于上述的指標體系建設,同樣也適用于Bloods 數據智能,數據挖掘和AI,如果你是Spark ML的愛好者,如果你想快速融入到智能的模型算法中來,這是一個很好的起點,你可以查看這里得到更為詳細的內容.
第一目標:模型訓練
模型訓練最常見的工作是什么呢?參數調優!而參數調優就是配置,不同的配置即不同的模型不同的效果,這用Bloods DI來零代碼配置是很自然的事情,多數情況我們并不需要深入到算法中深入到代碼的細微控制中,如下的各類算法常見工作示例,足以讓人體會到模型訓練的簡潔而又清晰.
聚類
如下示例中,id_clustering_lda 讀取數據 source_sample_lda_libsvm_data 然后應用 LDA 算法訓練它并將模型存儲到 path_clustering_lda_model
<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq" xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0 ../.dw/di.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_invoke id="lda_model_invoke_describeTopics" path="path_clustering_lda_model" method_name="describeTopics" method_parameters="3"/><source_ml_model_invoke id="lda_model_invoke_logLikelihood" path="path_clustering_lda_model" method_name="logLikelihood" method_parameters="source_sample_lda_libsvm_data"/><source_ml_model_invoke id="lda_model_invoke_logPerplexity" path="path_clustering_lda_model" method_name="logPerplexity" method_parameters="source_sample_lda_libsvm_data"/></sources><transforms><transform_ml_estimator_clustering_km id="id_kmeans" transform_ref="source_sample_kmeans_data" k="2" seed="1"/><transform_ml_estimator_clustering_lda id="id_clustering_lda" transform_ref="source_sample_lda_libsvm_data" k="10" max_iter="10" model_path="path_clustering_lda_model"/></transforms><sinks><sink_ml_evaluator_clustering id="id_evaluator_clustering" transform_ref="id_kmeans"/></sinks> </dw>現在我們來使用source_ml_model_invoke來檢查一下模型.
run -t lda_model_invoke_describeTopics +-----+-----------+---------------------------------------------------------------+ |topic|termIndices|termWeights | +-----+-----------+---------------------------------------------------------------+ |0 |[2, 5, 7] |[0.10596582700827585, 0.10560579109860191, 0.10421656683012902]| |1 |[1, 6, 2] |[0.10177934362911985, 0.09812186737848058, 0.09628916613024666]| |2 |[1, 9, 4] |[0.10587329435318685, 0.09746396510036567, 0.09650800754627996]| |3 |[5, 4, 0] |[0.16140487918106045, 0.13157416711460962, 0.12125555977641359]| |4 |[9, 6, 4] |[0.10444172332018084, 0.1040635944390557, 0.10097465247362353] | |5 |[10, 6, 3] |[0.18500622562463037, 0.16489028958649327, 0.15527004414864845]| |6 |[3, 7, 4] |[0.11621765255437844, 0.0989645753475578, 0.09790795515141672] | |7 |[4, 0, 2] |[0.10844113271172434, 0.10326267091975808, 0.10028860890038724]| |8 |[0, 7, 8] |[0.10995536322709686, 0.09914310583037018, 0.09806206271783646]| |9 |[9, 6, 8] |[0.1009940937221744, 0.10007205188894182, 0.0976478953418414] | +-----+-----------+---------------------------------------------------------------+ run -t lda_model_invoke_logLikelihood +------------------+ |logLikelihood | +------------------+ |-788.3752801566864| +------------------+再評估一下模型的效果如何:
run -t id_evaluator_clustering輸出為 :
-RECORD 0------------------------------Evaluator Result | 0.9997530305375207協同過濾
現在讓我們在訓練數據上使用ALS算法構建推薦模型
<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0 ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_invoke id="id_als_invoke_recommendForAllUsers" path="path_als_model" method_name="recommendForAllUsers" method_parameters="10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForAllItems" path="path_als_model" method_name="recommendForAllItems" method_parameters="10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForUserSubset" path="path_als_model" method_name="recommendForUserSubset" method_parameters="sample_movielens_ratings,10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForItemSubset" path="path_als_model" method_name="recommendForItemSubset" method_parameters="sample_movielens_ratings,10" setColdStartStrategy="drop"/></sources><transforms><transform_ml_utility_random_split id="id_random_split_get_training" transform_ref="sample_movielens_ratings"random_radio="7,3" pick_index="0"/><transform_ml_utility_random_split id="id_random_split_get_test" transform_ref="sample_movielens_ratings"random_radio="7,3" pick_index="1"/><transform_ml_utility_random_split id="id_random_split_random_test" transform_ref="sample_movielens_ratings"random_radio="2,13" pick_index="1"/><transform_ml_estimator_recommendation_als id="id_als_training" model_path="path_als_model"transform_ref="id_random_split_get_training" max_iter="5"reg_param="0.01" user_col="userId" item_col="movieId"rating_col="rating"/><transform_ml_model_recommendation_als id="id_als_test" transform_ref="id_random_split_get_test"cold_start_strategy="drop"estimator_id="id_als_training"/><transform_ml_model_recommendation_als id="id_als_test_path" transform_ref="id_random_split_get_test"cold_start_strategy="drop"model_path="path_als_model"/></transforms><sinks><sink_ml_evaluator_regression id="id_evaluator_als" transform_ref="id_als_test" metric_name="rmse" label_col="rating" prediction_col="prediction"/></sinks> </dw>運行如上命令來訓練模型.
2. id_als_test 讀取數據 id_random_split_get_test 并用如上模型來做預測
輸出為:
-RECORD 0------------------------------Evaluator Result | 1.9722660132618175分類及回歸
如下XML配置中, 讀取數據training 然后應用 logistic_regression算法做訓練,然后將模塊存儲到 lr_model,各個參數的配置均有智能提示,配置體驗還是很不錯的.
<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0 ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_file id="id_model_check" path="lr_model" is_output_metadata="false"/></sources><transforms><transform_ml_estimator_classification_logistic_regression id="lr" transform_ref="training"max_iter="10" reg_param="0.3"elastic_net_param="0.8" model_path="lr_model"/></transforms> </dw>執行如下的命令即可訓練模型.
run -t lr執行如下的命令即可檢查一下模型的截距和系數
run -t id_model_check -v true抽取變換特征提取
<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0 ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_sequence_constants id="sentenceData" quote_char="'"><sequence_constant column_name="label" column_values="0.0,0.0,0.1"/><sequence_constant column_name="sentence" column_values="'Hi I heard about Spark','I wish Java could use case classes','Logistic regression models are neat'"/></source_sequence_constants><source_sequence_constants id="dataFrame"><sequence_constant column_name="label" column_values="0,1,2"/><sequence_constant column_name="features" column_values="1.0 0.5 -1.0,2.0 1.0 1.0,4.0 10.0 5.0"/></source_sequence_constants></sources><transforms><transform_ml_transformer_feature_tokenizer id="id_feature_tokenizer" transform_ref="sentenceData" input_col="sentence" output_col="words"/><transform_ml_transformer_feature_hashing_tf id="hashingTF" transform_ref="id_feature_tokenizer" output_col="rawFeatures" input_col="words" num_features="20"/><transform_ml_estimator_feature_idf id="idf" transform_ref="hashingTF" input_col="rawFeatures" output_col="features" model_path="path_idf_model"/><transform_ml_transformer_feature_normalizer id="normalizer" transform_ref="dataFrame" input_col="features" output_col="normFeatures" p="1.0"/><transform_ml_transformer_feature_sqlt id="sqlTrans" transform_ref="sentenceData" statement="select label,sentence,concat(label,sentence) as concat from __THIS__"/></transforms> </dw>上述示例是這樣工作的:
第二目標:異常發現
以車聯網的實際案例為例,分析車輛故障其實是在分析車輛端收集的物理信號或者經過計算得到的一些指標,如下幾種分析均屬于異常發現領域:
這里以第1種情況說明一下:
假如我們有這樣的輸入數據,輸入字段分別表示循環充電次數\電池容量\模型及車輛ID
然后簡單地做如下配置:
<transform_ml_iov_exception_capacity_usage_time id="id_transform_iov_exception_capacity_usage_time"transform_ref="id_source_iov_exception_capacity_usage_time"acc_usage_time_col="acc_usage_time"capacity_attenuation_name_col="capacity_attenuation"/>即可得到異常結果:
[rocy@Nervous iov_di_samples]>run -t id_transform_iov_exception_capacity_usage_time Running Data Day:20210106 NO sinks to run running id_transform_iov_exception_capacity_usage_time/20210106 +--------------+--------------------+-----+---+------------+----------+ |acc_usage_time|capacity_attenuation|model|vin|features |prediction| +--------------+--------------------+-----+---+------------+----------+ |2000 |0.3 |m1 |v8 |[2000.0,0.3]|1 | +--------------+--------------------+-----+---+------------+----------+第三目標:模式發現
IOV T-BOX 數據有大量的報警信號,即便是領域專家也很難分析他們.但是不容置疑的是,這些報警信號之間是存在某種關聯的,比如某個報警信號A出現后大概率會出現B信號.這種模式可以通過如下辦法得到:
假如有如下的輸入數據:
我們做如下的配置:
<transform_iov_exception_pattern_extract id="id_transform_iov_exception_pattern_extract"transform_ref="id_source_iov_exception_pattern_extract"alarms_cols="alarm_common_brk,alarm_common_dcdc_st,alarm_common_dcdc_temp,alarm_common_driver_motor_temp,alarm_common_esd_charge_over"partition_col="vin"time_col="time"/>數據說明:
alarms_cols: 上述由alarms_cols指定的報警信號可以是任意多個.
partition_col: 用于指定分組字段.
time_col: 用于指定數據采集的時間.
transform_ref: 用于指定上游數據.
然后在Bloods控制臺輸入命令即可得到模式結果:
[rocy@Nervous iov_di_samples]>run -t id_transform_iov_exception_pattern_extract Running Data Day:20210106 NO sinks to run running id_transform_iov_exception_pattern_extract/20210106 +--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+ |from |to |confidence |support | +--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+ |[alarm_common_brk, alarm_common_dcdc_st] |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp] |0.3333333333333333|0.46153846153846156| |[alarm_common_brk, alarm_common_dcdc_st] |[alarm_common_dcdc_temp] |0.3333333333333333|0.46153846153846156| |[alarm_common_brk, alarm_common_dcdc_st] |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp, alarm_common_esd_charge_over]|0.3333333333333333|0.46153846153846156| |[alarm_common_dcdc_temp] |[alarm_common_brk, alarm_common_dcdc_st] |1.0 |0.15384615384615385| |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp] |[alarm_common_esd_charge_over] |1.0 |0.15384615384615385| |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp, alarm_common_esd_charge_over]|[alarm_common_brk, alarm_common_dcdc_st] |1.0 |0.15384615384615385| |[alarm_common_esd_charge_over] |[alarm_common_brk, alarm_common_dcdc_st] |1.0 |0.07692307692307693| +--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+第四目標:事件預測
事件預測實際上是模型算法的一個實踐應用,電池健康度衰減預測就是實際應用之一,在車聯網領域,對電池健康的預測是比較困難的,因為它涉及的因素很多,但理論上它是可行的,只要相關的因素收集的足夠精準足夠多,那么預測結果也會更接近實際值.
假設我們已經具備這樣一些統計指標:
然后做如下的配置:
<transform_ml_iov_health_factor_analysistransform_ref="id_source_iov_health_factor_analysis"id="id_transform_iov_health_factor_analysis"health_col="health"med_charge_duration_col="med_charge_duration"med_charge_start_soc_col="med_charge_start_soc"med_charge_end_soc_col="med_charge_end_soc"sum_charge_soc_times_col="sum_charge_soc_times"med_charge_soc_col="med_charge_soc"/>我們就可以得到電池健康度跟其他輸入指標之間的關聯關系數學表達式:
[rocy@Nervous iov_di_samples]>run -t id_transform_iov_health_factor_analysis Running Data Day:20210106 NO sinks to run running id_transform_iov_health_factor_analysis/20210106 +---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+ |med_charge_duration |med_charge_start_soc|med_charge_end_soc |sum_charge_soc_times |sum_charge_soc_times |med_charge_soc |constant | +---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+ |-0.036789554905818964|0.14260919055819518 |-0.004554520647164639|-0.006634642511633963|-0.006634642511633934|-1.8557214490735148E-4|99.92485117304537| +---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+這樣當輸入新的一批指標時,我們就可以量化健康度了,如果指標是實時輸入的,健康度也就可以實時計算了.
更進一步,假如有了一批歷史上的健康度數值,那么通過回歸計算,我們就可以預測未來某個日期的健康度了,相對地,指定一個健康值,Bloods也可以告訴我們,未來的哪個日期車輛將會衰減到這個健康度,這就達到預測的效果了.
示例如下,輸入的數據很簡單:
配置也很簡單:
<transform_ml_iov_health_down_predict id="id_transform_iov_health_down_predict"transform_ref="id_source_iov_health_down_predict" date_col="date"health_col="health" health_to="20"/>預測結果也很清楚:
[rocy@Nervous iov_di_samples]>run -t id_transform_iov_health_down_predict Running Data Day:20210106 NO sinks to run running id_transform_iov_health_down_predict/20210106 +---------------------+--------+ |predict_date |predict | +---------------------+--------+ |1.6766845868672517E12|20230218| +---------------------+--------+第六階段: 數據部署
大數據的任務,在一個公司里或是在一個項目上,都會有大量的任務,怎么讓它們有序的執行,是任何數據開發人員都需要面對的問題.在資源方面的任務調度有Yarn,在一個項目一個JAR內的任務有上述的Bloods,而在一個團隊的多個數據開發人員之間或者是多個團隊之間的大數據任務該怎么調度呢,這些任務可能屬于不同的JAR包,可能屬于不同的數據開發人員,任務之間的依賴既有數據文件上的依賴,也可能是某個邏輯條件的依賴比如數據庫中的某個查詢是否有記錄等等,這樣的任務該如何編排呢?
市場上已經有很多相關的產品,各有利弊,不過市場上的多數產品都需要這樣一個工作,需要人工指定任務的依賴關系,指定任務的調度周期執行隊列所需要的資源等等,這些工作其實是很繁雜的,Brain任務調度,就是為了盡一步簡化這一工作,它的理論基礎很簡單,如果您是數據開發人員,會很容易理解:任務的依賴可以通過任務ID引用來表達,也可以通過邏輯表達式來表達,甚至也可以通過指定一個JDBC的查詢來表達,如果這些依賴能夠通過反射的形式來得到,不就可以實現任務依賴的自動解析了嘛,Brain正是這樣做的.
第一目標:任務調度
先體會[Brain任務調度]的安裝示例,便能馬上領略它的簡單和大致原理.
安裝示例中含有兩個JAR包,分別含有一些可以調度的任務. 執行如下步驟:
然后示例中的JAR即提交到Yarn執行,而JAR中的任務也按照自動分析得到的依賴關系執行起來,所以接下來的僅有的工作,就是把你的JAR包放到scheduler.jars指定的位置.
Brain架構
下圖是Brain的架構,需要重點關注的一些關鍵點如下:
-
提交任務的方法是提交一些jar,每個jar可以包含許多任務,它們的依賴關系是自動分析的。
-
可以隨時提交 jar。
-
在接受任何一個任務后選擇最佳隊列,因為這時集群中的剩余資源可能會發生改變。
-
開發人員可以調試任務。如果以前的jar包含一些錯誤,他(她)可以重新提交jar。
-
依賴關系可以跨jar。任務可以依賴于其他jar中的任務。
調度配置
下列配置位于文件 application.properties 中
- scheduler.jars
必贊項.逗號分隔的jar文件或目錄路徑,Brain將監視目錄中的這些jar,根據所有的jar生成DAG任務樹。這些jar處于熱加載模式,因此您不需要重新啟動Brain. - scheduler.boostrap_jars
任務中依賴的的所有jar都應該放在這里,包括Brain的jar,在運行時,這些jar將被上傳到集群。 - scheduler.priorities
配置示例:scheduler.priorities=com.rocy.data.bigdata.scheduler.test.spark.Spark01=1,com.rocy.data.bigdata.scheduler.test.spark.Spark02=2,逗號分隔多個任務,完整類名和優先級由=分隔,更小的數字有更高的優先級. - scheduler.ignore_tasks
任務有兩個辦法可以忽略執行.
實現接口 IgnoreSchedule.
逗號分隔的多個完整類名,它的好處是不需要數據開發人員更改代碼.
- scheduler.state_save_seconds
狀態管理器將在這個時間間隔后將運行狀態的快照保存到文件或數據庫中. - scheduler.queue_sync_seconds
用于yarn的隊列狀態同步,此信息用于為Spark任務選擇最佳隊列。 - scheduler.yarn_rest_url
用于獲取這些信息:- 獲取集群信息,例如,剩余內存和CPU Core等資源。
- 獲取用于選擇最佳隊列的隊列信息。
- 獲取應用程序的狀態。
- scheduler.task_count_parallel
同時運行任務的個數。 - default.state_manager
用于擴展狀態管理器. 你的實現應該繼承 com.rocy.data.schedule.run.TStateManager,內部的默認實現是 ParquetStateManager,這保存任務狀態到這個目錄rocy_scheduler/states. - default.queue_select
用于擴展隊列選擇算法. 你的實現應該繼承 com.rocy.data.schedule.run.TQueueSelect,內部的默認實現是選擇具有最多資源的隊列.
第二目標:任務血緣
Brain提供圖表來顯示計劃任務的當前運行狀態,它們可以幫助您分析錯誤的原因。
Brain根據依賴關系對調度的任務進行聚類,您可以在Graph表的右上角切換不同的聚類。
如果您有聚類1和聚類2,那么聚類1中的任何任務都與聚類2沒有關系。
在實際的大數據任務工作中,我們可以有很多聚類,有些聚類可以有依賴鏈路非常深的任務。
第三目標:任務監控
在狀態表中,有任務運行列表的詳細屬性。
- 任務甘特圖
在下面的任務甘特圖中,Y軸列出任務名稱,X軸是時間。
圖形中的矩形,左側對應運行時間,右側對應完成時間,文本顯示為時間跨度,以秒為單位。
如果您的任務聚類很龐大,或者任務的時間跨度更長,可以通過水平和垂直滾動條進行放大.
和數據開發者一起成長
如上這些就是大數據全生命周期中,我們最常見到的一些工作了,看起來內容較多,但在Bloods等一系列產品的精心組裝之下,每個工作做起來都要簡單很多,這讓數據開發變成了讓人愉悅的事情.當然,它們雖然收獲了不小的成就,但還在成長中,好在它們提供了相當豐富的開發接口,可以讓數據開發的實踐者參與進來,讓它更豐富更好用.
總結
以上是生活随笔為你收集整理的全生命周期大数据处理系列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 泛函分析基础-如何证明l^∞是完备的度量
- 下一篇: 如何利用Excel快速批量创建文件夹