使用Java将数据流式传输到HPCC
高性能計算集群(HPCC)是類似于Hadoop的分布式處理框架,除了它運行以自己的稱為企業(yè)控制語言(ECL)的特定領(lǐng)域語言(DSL)編寫的程序外。 ECL很棒,但是偶爾您會想用其他語言來執(zhí)行繁重的任務。 例如,您可能想利用Java編寫的NLP庫。
此外,HPCC通常針對類似于HDFS的文件系統(tǒng)上的數(shù)據(jù)進行操作。 就像使用HDFS一樣,一旦您超越了日志文件處理和靜態(tài)數(shù)據(jù)快照的范圍,您就會Swift產(chǎn)生對數(shù)據(jù)庫后端的渴望。
實際上,我想說這是一個普遍的行業(yè)趨勢:HDFS-> HBase,S3-> Redshift等。最終,您希望減少分析的延遲(接近零)。 為此,您需要設(shè)置某種分布式數(shù)據(jù)庫,該數(shù)據(jù)庫能夠支持批處理以及數(shù)據(jù)流/微分批處理。 而且,您采用了一種不變/遞增的數(shù)據(jù)存儲方法,使您可以折疊基礎(chǔ)結(jié)構(gòu),并在分析數(shù)據(jù)時將數(shù)據(jù)流傳輸?shù)较到y(tǒng)中(簡化了處理過程)
但是我離題了,作為朝這個方向邁出的一步……
我們可以利用HPCC中的Java集成功能來支持Java中的用戶定義函數(shù)。 同樣,我們可以利用相同的功能來添加其他后端存儲機制(例如Cassandra)。 更具體地說,讓我們看一下HPCC / Java集成的流功能,以從外部源獲取數(shù)據(jù)。
讓我們首先看一下原始Java集成。
如果您具有HPCC環(huán)境設(shè)置,則Java集成將從/ opt / HPCCSystems / classes路徑開始。 您可以將類和jar文件拖放到該位置,并且可以從ECL中使用這些功能。 請按照此頁面上的說明進行操作 。
如果遇到問題,請參閱該頁面上的故障排除指南。 最困難的部分是讓HPCC查找您的班級。 對我來說,我遇到了一個討厭的JDK版本問題。 默認情況下,HPCC在我的Ubuntu計算機上選擇了舊的JDK版本。 由于它使用的是舊版本,因此HPCC找不到使用“新” JDK(1.7)編譯的類,這導致了一條模糊的消息:“無法解析類名”。 如果遇到此問題,請拉出我提交的針對Ubuntu修復的補丁 。
完成該工作后,您將可以使用以下語法從ECL調(diào)用Java:
IMPORT java; integer add1(integer val) := IMPORT(java, 'JavaCat.add1:(I)I'); output(add1(10));這非常好用,而且正如文檔所建議的,如果數(shù)據(jù)復雜,則可以從Java方法返回XML。 但是,如果您擁有大量的數(shù)據(jù),而不是駐留在內(nèi)存中,該怎么辦? 好吧,那么您需要將Java流傳輸?shù)紿PCC。 ;)
而不是從導入的方法返回實際數(shù)據(jù),我們返回一個Java Iterator。 然后,HPCC使用Iterator構(gòu)造數(shù)據(jù)集。 以下是一個示例Iterator。
import java.util.ArrayList; import java.util.Iterator; import java.util.List;public class DataStream implements Iterator {private int position = 0;private int size = 5;public static Iterator stream(String foo, String bar){return new DataStream();}@Overridepublic boolean hasNext() {position++;return (position < size);}@Overridepublic Row next() {return new Row("row");}@Overridepublic void remove() {}}這是一個標準的Iterator,但請注意它返回一個Row對象,其定義如下:
import java.util.ArrayList; import java.util.Iterator; import java.util.List;public class Row {private String value;public Row(String value){this.value = value;} }該對象是一個Java bean。 HPCC將在映射到DATASET時設(shè)置成員變量的值。 要確切了解這種情況如何發(fā)生,讓我們看一下ECL代碼:
IMPORT java;rowrec := recordstring value; end;DATASET(rowrec) stream() := IMPORT(java, 'DataStream.stream:(Ljava/lang/String;Ljava/lang/String;)Ljava/util/Iterator;');output(stream());在import語句之后,我們定義了一種稱為rowrec的記錄類型。 在以下行中,我們導入UDF,然后將結(jié)果鍵入為包含rowrecs的DATASET。 rowrec中的字段名稱必須與java bean上成員變量的名稱匹配。 HPCC將使用迭代器,并使用next()方法的返回值填充數(shù)據(jù)集。 ECL的最后一行輸出返回的結(jié)果。
我已將以上所有代碼提交給github存儲庫 ,其中包含一些有關(guān)使其運行的說明。 玩得開心。
敬請期待更多…
想象一下,將這里概述的java流功能與將數(shù)據(jù)流出Cassandra的能力結(jié)合在一起,就像我之前的文章中所詳細描述的那樣 。 結(jié)果是一種強大的方法,可以使用Thor對存儲在Cassandra中的數(shù)據(jù)(具有數(shù)據(jù)局部性!)運行批處理分析(可能對通過實時實時事件流獲取的數(shù)據(jù)啟用ECL作業(yè)!=)
翻譯自: https://www.javacodegeeks.com/2015/05/streaming-data-into-hpcc-using-java.html
總結(jié)
以上是生活随笔為你收集整理的使用Java将数据流式传输到HPCC的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring RESTful错误处理
- 下一篇: 家里的路由器如何设置密码路由器如何设置安