【阿里云 MVP 月度分享】宋亚奇——应用MaxCompute实现电力设备监测数据的批量特征分析...
1 背景知識
電力設備在線監測指在不停電的情況下,對電力設備狀況進行連續或周期性地自動監視檢測,使用的技術包括:傳感器技術、廣域通信技術和信息處理技術。電力設備在線監測是實現電力設備狀態運行檢修管理、提升生產運行管理精益化水平的重要手段,對提升電網智能化水平、實現電力設備狀態運行管理具有積極而深遠的意義。
隨著智能電網建設的推進,電力設備在線監測得到了較大發展并成為趨勢,監測數據變得日益龐大,逐漸構成電力設備監測大數據,這給電力設備在線監測系統在數據存儲和處理方面帶來非常大的技術挑戰。
電力設備監測大數據具有體量大、類型多、價值密度低和處理速度快的特點。電網公司監測系統目前過于依賴集中式SAN存儲,并基于SOA進行數據集成,主要采用“企業級關系型數據庫”,受容量、擴展性以及訪問速度的制約,目前只存儲二次加工的“熟數據”,而所擅長的關聯查詢、事務處理在數據分析時又無用武之地,迫切需要新的大數據存儲和處理技術來應對。
變壓器的局部放電數據是一種典型的電力設備監測數據。局部放電相位分析(phase resolved partial discharge, PRPD)包含了從特征提取到模式識別的過程。本文將全面介紹利用MaxCompute實現局部放電監測數據特征提取的過程。
PD信號分析主要包括三個子過程:(1)基本參數n-q-φ的提取。掃描PD信號,統計信號中的放電峰值和相應的放電相位。(2)譜圖構造和統計特征計算。劃分相窗,統計平均放電量和放電次數的分布,計算平均放電量相位分布譜圖qave-φ和放電次數相位分布譜圖n-φ?;趒ave-φ和n-φ,以φi為隨機變量,計算譜圖的偏斜度Sk、陡峭度Ku等統計特征,形成放電特征向量。(3)放電類型識別。本文將介紹,使用MapReduce實現第一個子過程的方法。
MaxCompute(原ODPS) 是阿里云提供的海量數據處理平臺。主要服務于批量結構化數據的存儲和計算,數據規模達EB級別。MaxCompute目前已在大型互聯網企業的數據倉庫和BI分析、網站的日志分析、電子商務網站的交易分析等領域得到大規模應用。
另外,本文還將使用odpscmd作為客戶端完成對MaxCompute的各種操作。odpscmd是一個Java程序,可以以命令方式訪問MaxCompute。應用該客戶端,可以完成包括數據查詢、數據上傳、下載等各種任務。需要JRE環境才能運行,請下載并安裝JRE 1.6+版本。
本文將使用MapReduce編程來完成特征分析的計算任務。MapReduce最早是由Google提出的分布式數據處理模型,隨后受到了業內的廣泛關注,并被大量應用到各種商業場景中。比如搜索、Web訪問日志分析、文本統計分析、海量數據挖掘、機器學習、自然語言處理、廣告推薦等。
2 分析過程
2.1 創建項目、建表和數據上傳
(1)創建MaxCompute項目
打開阿里云官網:https://www.aliyun.com/
使用已有阿里云賬號登錄。
進入阿里云管理控制臺,并從左側導航欄選擇“大數據(數加)à大數據計算服務”,進入MaxCompute管理控制臺。
點擊下方的“創建項目”,創建一個新的MaxCompute項目。
選擇“I/O后付費”,填入項目名稱和項目描述,并“確定”。
創建完成后,可以在項目列表中看到剛剛創建的項目。
(2)安裝配置odpscmd
在本地準備好JRE環境,請下載并安裝JRE 1.6+版本。
從阿里云官網下載odpscmd工具:http://repo.aliyun.com/download/odpscmd/latest/odpscmd_public.zip?spm=5176.doc27804.2.3.o2o8Rw&file=odpscmd_public.zip
解壓縮,并配置/conf/odps_config.ini
project_name=[project_name] access_id=****************** access_key=********************* end_point=http://service.odps.aliyun.com/api tunnel_endpoint=http://dt.odps.aliyun.com log_view_host=http://logview.odps.aliyun.comhttps_check=trueaccess_id和access_key請從阿里云管理控制臺獲取。project_name配置為創建好的MaxCompute項目即可。
配置完成后,運行/bin/odpscmd,進入交互模式。會出現項目名稱作為提示符。
(3)建表并添加分區
1)? ? 創建ODS_PD表,用于存放原始的變壓器局部放電監測數據。
在odpscmd中,執行下面的SQL語句,建表。
create table if not exists ODS_PD(Time string,Phase bigint,Value bigint) partitioned by (DeviceID string, Date string);當出現“ok”,表示建表成功,可以使用“ls tables;”命令查看已經創建的表。
為ODS_PD表添加分區。
可以使用“show partitions ODS_PD;”驗證添加的分區是否成功。
2) 創建目標特征表DW_NQF
在odpscmd中,執行下面的SQL語句,建表。
create table if not exists DW_NQF(Time string,Phase bigint,MaxV bigint) partitioned by (DeviceID string, Date string);
為DW_NQF表添加分區。
(4)使用Tunnel進行數據上傳
在odpscmd中運行tunnel命令,將本地數據文件monitor_data.csv上傳至ODS_PD表。下面的命令中的路徑,請在執行時根據實際路徑進行修改。monitor_data.csv請從附件中下載。
2.2 MapReduce程序開發、本地調試和運行
(1)本地開發環境準備
本文使用Eclipse作為開發環境,請提前下載并安裝。
官網導航中找到并下載 ODPS for eclipse 插件,并將插件解壓并復制到Eclipse安裝目錄下的plugins子目錄下。啟動Eclipse,檢查Wizard選項里面是否有ODPS的目錄。
ODPS for eclipse 插件下載地址:
https://docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/odps/0.0.90/assets/download/odps-eclipse-plugin-bundle-0.16.0.zip?spm=5176.doc27981.2.3.cCapmQ&file=odps-eclipse-plugin-bundle-0.16.0.zip
當可以創建ODPS類型的項目時,表示本地開發環境已經準備好了。
(2)MapReduce程序開發
在Eclipse中創建ODPS項目,命名為NQF。為了讓Eclipse能正確訪問MaxCompute,需要在創建項目的時候正確配置odpscmd的本地路徑。
依次添加Mapper類、Reducer類、MapReduce Driver類和R類。
FSMapper.java代碼如下:
FSReducer.java代碼如下:
import java.io.IOException; import java.util.Iterator; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; import com.aliyun.odps.mapred.Reducer.TaskContext;public class FSReducer extends ReducerBase {private Record result=null;private R left;private R middle;private R right;@Overridepublic void setup(TaskContext context) throws IOException {result=context.createOutputRecord();;}@Overridepublic void reduce(Record key, Iterator<Record> values, TaskContext context)throws IOException {left=new R();middle=new R(); right=new R();Record temp;if (values.hasNext()){temp=values.next();left.phase=temp.getBigint("phase");left.val=temp.getBigint("value");} else left=null;if(values.hasNext()){temp=values.next();middle.phase=temp.getBigint("phase");middle.val=temp.getBigint("value");}else middle=null;if(values.hasNext()){temp=values.next();right.phase=temp.getBigint("phase");right.val=temp.getBigint("value");} else right=null;if(left!=null&&middle!=null&&right!=null){if(Math.abs(middle.val)>Math.abs(right.val)&&Math.abs(middle.val)>Math.abs(left.val)){result.set("time",key.get("time").toString());result.set("phase",middle.phase);result.set("maxv",middle.val);context.write(result); }}while(values.hasNext()){left.val=middle.val;left.phase=middle.phase;middle.val=right.val;middle.phase=right.phase;temp=values.next();right.phase=temp.getBigint("phase");right.val=temp.getBigint("value");if(left!=null&&middle!=null&&right!=null){if(Math.abs(middle.val)>Math.abs(right.val)&&Math.abs(middle.val)>Math.abs(left.val)){result.set("time",key.get("time").toString());result.set("phase",middle.phase);result.set("maxv",middle.val);context.write(result); }}}}@Overridepublic void cleanup(TaskContext context) throws IOException {}}TJDriver.java代碼如下:
import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.RunningJob; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; import java.util.*;public class FSDriver {public static void main(String[] args) throws OdpsException {if (args.length != 2) {System.err.println("Usage: WordCount <in_table> <out_table>");System.exit(2);}JobConf job = new JobConf();job.setMapperClass(FSMapper.class); job.setReducerClass(FSReducer.class);//二次排序job.setMapOutputKeySchema(SchemaUtils.fromString("time:string,phase:bigint")); job.setMapOutputValueSchema(SchemaUtils.fromString("phase:bigint,value:bigint"));job.setPartitionColumns(new String[] { "time" });job.setOutputGroupingColumns(new String[] { "time" });job.setOutputKeySortColumns(new String[] { "time","phase" });//分區輸入 InputUtils.addTable(TableInfo.builder().tableName(args[0]).partSpec("deviceid=001/date=20171116").build(), job);//分區輸出 OutputUtils.addTable(TableInfo.builder().tableName(args[1]).partSpec("deviceid=001/date=20171116").build(), job);try {JobClient.runJob(job);} catch (OdpsException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}R.java的代碼如下:
public class R {public long phase;public long val;public R(){phase=0;val=0;}}(3)本地測試
打開FSDriver.java,右擊“Run as-àRun Configurations”
在ODPS Config選項卡,選擇正確的ODPS項目。
在Arguments選項卡中,輸入運行參數:ods_pd dw_nqf,并點擊“Run”,執行本地測試運行。
在第一次運行時,Eclipse會從MaxCompute中下載少量的測試數據用于測試。運行完成后,可以在Warehouse中看到測試用的輸入數據和產生的結果數據。
(4)打包并上傳資源
在本地測試結果正確之后,就可以導出jar包了。在Eclipse下執行“FileàExport”,選擇導出“JAR File”,導出至本地。
在odpscmd下,執行添加資源的命令,將jar上傳至MaxCompute。
add jar d:/jar/NQF.jar;
(5)MaxCompute上執行程序
在odpscmd下,執行jar命令,運行程序。(請自行調整文件路徑)
jar -resources NQF.jar -classpath d:\jar\NQF.jar FSDriver ods_pd dw_nqf;
總結
以上是生活随笔為你收集整理的【阿里云 MVP 月度分享】宋亚奇——应用MaxCompute实现电力设备监测数据的批量特征分析...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在 Kubernetes 集群中使用 M
- 下一篇: 走进统信软件,读懂国产操作系统新生态建设