ip integrator_使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构
ip integrator
“ Lambda體系結(jié)構(gòu)是一種數(shù)據(jù)處理體系結(jié)構(gòu),旨在通過利用批處理和流處理方法來處理大量數(shù)據(jù)。 這種體系結(jié)構(gòu)方法嘗試通過使用批處理提供批處理數(shù)據(jù)的全面而準確的視圖,同時使用實時流處理提供在線數(shù)據(jù)的視圖來平衡延遲 , 吞吐量和容錯 。 在演示之前,可以將兩個視圖輸出合并。 lambda體系結(jié)構(gòu)的興起與大數(shù)據(jù)的增長,實時分析以及減輕地圖縮減延遲的驅(qū)動力有關(guān)。” –維基百科
以前,我已經(jīng)寫了一些博客,涉及許多用例,這些用例是使用Oracle Data Integrator(ODI)在MapR分發(fā)之上進行批處理,以及使用Oracle GoldenGate(OGG)將事務(wù)數(shù)據(jù)流式傳輸?shù)組apR Streams和其他Hadoop組件中。 最新的ODI(12.2.1.2.6)結(jié)合了這兩種產(chǎn)品以完全適合lambda架構(gòu),同時具有許多新的強大功能,包括能夠?qū)afka流作為ODI本身的源和目標進行處理。 通過簡化我們在一種產(chǎn)品下以相同邏輯設(shè)計處理和處理批處理和快速數(shù)據(jù)的方式,此功能對已經(jīng)擁有或計劃擁有l(wèi)ambda架構(gòu)的任何人都具有巨大的優(yōu)勢。 現(xiàn)在,如果我們將OGG流傳輸功能和ODI批處理/流傳輸功能結(jié)合在一起,則可能性是無限的。
在本博客中,我將向您展示如何使用Spark Streaming在Oracle Data Integrator上配置MapR流(又名Kafka)以創(chuàng)建真正的lambda體系結(jié)構(gòu):補充批處理和服務(wù)層的快速層。
在本文中,我將跳過ODI的“贊揚和稱贊”部分,但我只想強調(diào)一點:自從ODI首次發(fā)布以來,為該博客設(shè)計的映射就像您要設(shè)計的所有其他映射一樣,都是您可以立即使用Hadoop / Spark集群上的本機代碼運行100%的代碼,無需編寫零行代碼,也不必擔心如何以及在何處編碼。
我已經(jīng)在MapR上完成了此操作,因此我可以制作“兩只鳥一塊石頭”。 向您展示MapR Streams步驟和Kafka。 由于兩者在概念或API實現(xiàn)上并沒有太大差異,因此如果您使用的是Kafka,則可以輕松地應(yīng)用相同的步驟。
如果您不熟悉MapR Streams和/或Kafka概念,建議您花一些時間來閱讀它們。 以下內(nèi)容假定您知道什么是MapR Streams和Kafka(當然還有ODI)。 否則,您仍然會對可能的功能有個好主意。
準備工作
MapR Streams(aka Kafka)相關(guān)的準備工作
顯然,我們需要創(chuàng)建MapR Streams路徑和主題。 與Kafka不同,MapR通過“ maprcli”命令行實用程序使用自己的API來創(chuàng)建和定義主題。 因此,如果您使用商品Kafka,則此步驟將略有不同。 Web上有很多有關(guān)如何創(chuàng)建和配置Kafka主題和服務(wù)器的示例,因此您并不孤單。
為了進行此演示,我創(chuàng)建了一個路徑和該路徑下的兩個主題。 我們將讓ODI從其中一個主題(注冊)進行消費,并生成另一個主題(registrations2)。 這樣,您將看到它如何通過ODI起作用。
創(chuàng)建一個名為“ users-stream”的MapR Streams路徑和一個名為“ registrations”的主題:
在我之前定義的相同路徑上創(chuàng)建第二個主題“ registrations2”:
Hadoop相關(guān)準備
由于我正在使用已安裝并正在運行MapR的個人預(yù)配置VM,因此此處的準備工作不多。 但是,需要一些步驟才能成功完成ODI映射。 如果您想知道我是如何使ODI從事MapR發(fā)行的,那么您可以參考此博客文章 。
- Spark:我已經(jīng)在Spark 1.6.1上進行了測試,您也應(yīng)該這樣做。 至少不要轉(zhuǎn)到任何較低版本。 此外,您需要針對Spark構(gòu)建具有特定的標簽版本。 我從標簽1605(這是MapR發(fā)布約定)開始測試,但是我的工作失敗了。 究其原因,我發(fā)現(xiàn)PySpark庫不是MapR Streams API的最新版本。 他們可以使用商品Kafka,但不能使用MapR。 這是我使用過的RPM的鏈接 。
- Spark日志記錄:在spark路徑下,有一個“ config”文件夾,其中包含不同的配置文件。 如果需要的話,我們只對其中一項感興趣。 文件名為“ log4j.properties”。 您需要確保將“ rootCategory”參數(shù)設(shè)置為INFO,否則,當您運行提交到Spark的任何ODI映射時,都會出現(xiàn)異常:
- Hadoop憑證存儲:在提交的任何作業(yè)中需要特定密碼時,ODI將引用Hadoop憑證存儲。 這樣,我們就不會在參數(shù)/屬性文件或代碼本身中包含任何明確的密碼。 在此演示中,我們將在某個時候使用MySQL,因此我需要創(chuàng)建一個存儲并為MySQL密碼添加別名。 首先,您需要確保core-site.xml中存在憑證存儲的條目,然后實際上為密碼值創(chuàng)建別名:
上一張圖片是我的“ site-core.xml”的摘要,向您顯示了我添加的憑據(jù)存儲。 下一步將是驗證商店是否存在,然后為密碼值創(chuàng)建別名:
更改之后,即使在編輯core-site.xml之后,也無需重新啟動任何hadoop組件。
注意:如果遇到“操作系統(tǒng)異常”(例如137),請確保有足夠的可用內(nèi)存。
ODI相關(guān)準備
您將在ODI中進行的常規(guī)準備工作。 我將在此博客中顯示相關(guān)內(nèi)容。
Hadoop數(shù)據(jù)服務(wù)器
以下配置特定于MapR。 如果使用其他發(fā)行版,則需要輸入相關(guān)的端口號和路徑:
Spark-Python數(shù)據(jù)服務(wù)器
在此ODI版本12.2.1.2.6中,如果要使用Spark Streaming和常規(guī)Spark服務(wù)器/群集,則需要創(chuàng)建多個Spark數(shù)據(jù)服務(wù)器。 在此演示中,我僅創(chuàng)建了Spark Streaming服務(wù)器,并將其稱為Spark-Async。
您需要將“主群集”值更改為實際使用的值:yarn-client或yarn-cluster,然后選擇我們之前創(chuàng)建的Hadoop DataServer。
現(xiàn)在,這里配置的有趣部分是Spark-Async數(shù)據(jù)服務(wù)器的屬性:
我已經(jīng)強調(diào)了您需要注意的最重要的部分。 之所以使用ASYNC,是因為我們將使用Spark Streaming。 其余屬性與性能有關(guān)。
卡夫卡數(shù)據(jù)服務(wù)器
在這里,我們將定義MapR Streams數(shù)據(jù)服務(wù)器:
元數(shù)據(jù)代理具有一個“虛擬”地址,僅符合Kafka API。 MapR Streams客戶端將為您提供連接到MapR Streams所需的服務(wù)。 您不能在此處測試數(shù)據(jù)服務(wù)器,因為在MapR上沒有運行這樣的Kafka服務(wù)器。 因此,請安全地忽略此處的測試連接,因為它將失敗(這樣就可以了)。
對于屬性,您需要定義以下內(nèi)容:
您需要手動定義“ key.deserializer”和“ value.deserializer”。 MapR Streams都需要兩者,如果未定義作業(yè),作業(yè)將失敗。
ODI映射設(shè)計
我已經(jīng)在這里進行了測試,涵蓋了五個用例。 但是,我將僅介紹一個完整的內(nèi)容,并突出顯示其他內(nèi)容,以免您閱讀多余和常識性的步驟。
1)MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka):
在此映射中,我們將從先前創(chuàng)建的主題之一中讀取流數(shù)據(jù),應(yīng)用一些函數(shù)(簡單的函數(shù)),然后將結(jié)果生成到另一個主題。 這是映射的邏輯設(shè)計:
我通過復(fù)制已經(jīng)為MySQL反向工程設(shè)計的模型之一(結(jié)構(gòu)相同)定義了MapR_Streams_Registrations1模型,但是在這種情況下,當然選擇的技術(shù)是Kafka。 您將能夠選擇流數(shù)據(jù)的格式:Avro,JSON,Parquet或Delimited:
物理設(shè)計如下所示:
- SOURCE_GROUP:這是我們的MapR Streams主題“注冊”
- TRANS_GROUP:這是我們的Spark異步服務(wù)器
- TARGET_GROUP:這是我們的MapR Streams主題“ registrations2”
物理實現(xiàn)的屬性為:
您需要選擇暫存位置作為Spark Async并啟用“流式傳輸”。
要將主題(注冊)中的流數(shù)據(jù)加載到Spark Streaming,我們需要選擇合適的LKM,即LKM Kafka到Spark:
然后從Spark Streaming加載到MapR Stream目標主題registrations2,我們需要選擇LKM Spark到Kafka:
2)MapR-FS(HDFS)=> Spark Streaming => MapR Streams(Kafka):
除了使用的知識模塊之外,我在這里不會向您展示太多。 要將MapR-FS(HDFS)加載到Spark Streaming,我使用了LKM File來Spark:
為了從Spark Streaming加載到MapR Streams,我像以前的映射一樣使用LKM Spark到Kafka。
注意:LKM File to Spark將充當一個流,一個文件流(顯然)。 ODI將僅接收任何更新/新文件,而不是靜態(tài)文件。
3)MapR Streams(Kafka)=> Spark Streaming => MySQL:
要將MapR Streams(Kafka)加載到Spark Streaming,就像在第一個映射中一樣,我使用了LKM Kafka到Spark。 然后從Spark Streaming加載到MySQL,我使用了LKM Spark到SQL:
4)MapR流(Kafka)=> Spark流=> MapR-FS(HDFS)
為了從MapR流加載到Spark流,我像以前一樣使用LKM Kafka到Spark,然后從Spark流加載到MapR-FS(HDFS),我已經(jīng)使用LKM Spark到File:
5)MapR Streams(Kafka)和Oracle DB => Spark Streaming => MySQL
這是另一個有趣的用例,您實際上可以在現(xiàn)場將Kafka流與SQL源一起加入。 這僅(當前)適用于查找組件:
請注意,驅(qū)動程序源必須是Kafka(在我們的示例中為MapR流),而查找源必須是SQL數(shù)據(jù)庫。 我使用了與以前的映射幾乎相同的LKM:從LKM SQL到Spark,從LKM Kafka到Spark和從LKM Spark到SQL。
行刑
我將僅向您展示第一個用例的執(zhí)行步驟,即MapR Streams(Kafka)=> Spark Streaming => MapR Streams(Kafka)。 為了模擬這種情況,我創(chuàng)建了一個Kafka生產(chǎn)者控制臺和另一個Kafka消費者控制臺,以便可以監(jiān)視結(jié)果。 查看下面的生產(chǎn)者,我粘貼了一些記錄:
我已經(jīng)突出顯示了其中一個URL,以確保您注意到它是小寫的。 等待幾秒鐘,Spark將處理這些消息并將其發(fā)送到目標MapR Streams主題:
請注意,所有URL均大寫。 成功!
通過映射,結(jié)果與預(yù)期的一樣。 我不會為他們展示測試步驟,因為它們很簡單。 這里的想法是向您展示如何使用MapR Streams(Kafka)配置ODI。
最后的話
值得一提的是,在執(zhí)行任何映射時,您都可以深入查看日志并查看正在發(fā)生的事情(生成的代碼等)。 此外,您將獲得指向工作歷史URL的鏈接,以在Spark UI上進行訪問:
打開鏈接將帶我們到Spark UI:
如果要控制流作業(yè)可以生存多長時間,則需要增加Spark-Async數(shù)據(jù)服務(wù)器的“ spark.streaming.timeout”屬性,或從映射配置本身覆蓋它。 您可能還需要創(chuàng)建一個ODI程序包,該程序包具有一個循環(huán)和其他有用的組件來滿足您的業(yè)務(wù)需求。
結(jié)論
ODI可以處理lambda架構(gòu)中的兩個層:批處理層和快速層。 這不僅是ODI在其非常長的綜合功能列表中添加的一項重要功能,而且還將提高從一個統(tǒng)一,易于使用的界面設(shè)計數(shù)據(jù)管道的生產(chǎn)率和效率。 顯然,ODI可以像使用商品Kafka一樣輕松地與MapR Streams一起使用,這要感謝MapR的二進制文件與Kafka API兼容,以及ODI不需要依賴于一個框架。 這向您保證ODI是與眾不同的真正開放和模塊化的E-LT工具。
其他一些相關(guān)職位:
- Oracle Data Integrator和MapR融合數(shù)據(jù)平臺:請檢查!
- 使用Oracle GoldenGate將事務(wù)數(shù)據(jù)流式傳輸?shù)組apR流中
- 使用Oracle GoldenGate進行MapR-FS實時事務(wù)數(shù)據(jù)提取
- 帶有ODI的逆向工程師MapR-DB
免責聲明
這里表達的思想,實踐和觀點僅是作者的觀點,不一定反映Oracle的觀點。
翻譯自: https://www.javacodegeeks.com/2017/02/perfecting-lambda-architecture-oracle-data-integrator-kafka-mapr-streams.html
ip integrator
總結(jié)
以上是生活随笔為你收集整理的ip integrator_使用Oracle Data Integrator(和Kafka / MapR流)完善Lambda体系结构的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java单词按字典排序_最终Java日志
- 下一篇: 香港警察备案流程(香港警察备案)