如何将自定义数据源集成到Apache Spark中
如今,流數據是一個熱門話題,而Apache Spark是出色的流框架。 在此博客文章中,我將向您展示如何將自定義數據源集成到Spark中。
Spark Streaming使我們能夠從各種來源進行流傳輸,同時使用相同的簡潔API訪問數據流,執行SQL查詢或創建機器學習算法。 這些功能使Spark成為流式(或任何類型的工作流)應用程序的首選框架,因為我們可以使用框架的所有方面。
面臨的挑戰是弄清楚如何將自定義數據源集成到Spark中,以便我們能夠利用其強大功能而無需更改為更多標準源。 更改似乎是合乎邏輯的,但是在某些情況下,這樣做是不可能或不方便的。
流式自定義接收器
Spark提供了不同的擴展點,正如我們在此處擴展Data Source API以便將自定義數據存儲集成到Spark SQL中所看到的那樣。
在此示例中,我們將做同樣的事情,但是我們還將擴展流API,以便我們可以從任何地方流。
為了實現我們的自定義接收器,我們需要擴展Receiver [A]類。 請注意,它具有類型注釋,因此我們可以從流客戶端的角度對DStream實施類型安全。
我們將使用此自定義接收器來流式傳輸我們的應用程序之一通過套接字發送的訂單。
通過網絡傳輸的數據的結構如下所示:
1 5 1 1 2 2 1 1 2 1 1 4 1 1 2 2 1 2 2我們首先接收訂單ID和訂單總金額,然后接收訂單的行項目。 第一個值是商品ID,第二個是訂單ID(與訂單ID值匹配),然后是商品成本。 在此示例中,我們有兩個訂單。 第一個只有四個項目,第二個只有一個項目。
這個想法是將所有這些隱藏在我們的Spark應用程序中,因此它在DStream上收到的是在流上定義的完整順序,如下所示:
val orderStream: DStream[Order] = .....val orderStream: DStream[Order] = .....同時,我們還使用接收器來流式傳輸我們的自定義流式源。 即使它通過套接字發送數據,使用來自Spark的標準套接字流也將非常復雜,因為我們將無法控制數據的輸入方式,并且會遇到在應用程序上遵循順序的問題本身。 這可能非常復雜,因為一旦進入應用程序空間,我們便會并行運行,并且很難同步所有這些傳入數據。 但是,在接收方空間中,很容易從原始輸入文本創建訂單。
讓我們看看我們的初始實現是什么樣的。
case class Order(id: Int, total: Int, items: List[Item] = null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = .... }case class Order(id: Int, total: Int, items: List[Item] = null) case class Item(id: Int, cost: Int)class OrderReceiver(host: String, port: Int) extends Receiver[Order](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {println("starting...")val thread = new Thread("Receiver") {override def run() {receive() }}thread.start()}override def onStop(): Unit = stop("I am done")def receive() = .... }我們的OrderReceiver擴展了Receiver [Order],它使我們可以在Spark內部存儲Order(帶注釋的類型)。 我們還需要實現onStart()和onStop()方法。 請注意,onStart()創建一個線程,因此它是非阻塞的,這對于正確的行為非常重要。
現在,讓我們看一下接收方法,真正發生魔術的地方。
def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}def receive() = {val socket = new Socket(host, port)var currentOrder: Order = nullvar currentItems: List[Item] = nullval reader = new BufferedReader(new InputStreamReader (socket.getInputStream(), "UTF-8"))while (!isStopped()) {var userInput = reader.readLine()if (userInput == null) stop("Stream has ended")else {val parts = userInput.split(" ")if (parts.length == 2) {if (currentOrder != null) {store(Order(currentOrder.id, currentOrder.total, currentItems))}currentOrder = Order(parts(0).toInt, parts(1).toInt)currentItems = List[Item]()}else {currentItems = Item(parts(0).toInt, parts(1).toInt) :: currentItems}}}}在這里,我們創建一個套接字并將其指向源,然后我們就可以簡單地開始讀取它,直到調度了stop命令,或者套接字上沒有更多數據為止。 請注意,我們正在讀取與之前定義的結構相同的結構(如何發送數據)。 完全閱讀訂單后,我們將調用store(…),以便將其保存到Spark中。
除了在我們的應用程序中使用我們的接收器外,這里別無所要做:
val config = new SparkConf().setAppName("streaming") val sc = new SparkContext(config) val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))val config = new SparkConf().setAppName("streaming") val sc = new SparkContext(config) val ssc = new StreamingContext(sc, Seconds(5))val stream: DStream[Order] = ssc.receiverStream(new OrderReceiver(port))請注意我們是如何使用自定義OrderReceiver創建流的(僅為了清楚起見,對val流進行了注釋,但這不是必需的)。 從現在開始,我們將流(DString [Order])用作我們在任何其他應用程序中使用的任何其他流。
stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id)) order.items.foreach(println)}}stream.foreachRDD { rdd =>rdd.foreach(order => {println(order.id)) order.items.foreach(println)}}摘要
當處理生成無盡數據的源時,Spark Streaming非常方便。 您可以使用與Spark SQL和系統中其他組件相同的API,但它也足夠靈活,可以擴展以滿足您的特定需求。
翻譯自: https://www.javacodegeeks.com/2016/05/integrate-custom-data-sources-apache-spark.html
總結
以上是生活随笔為你收集整理的如何将自定义数据源集成到Apache Spark中的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eclipse pmd使用_使用您自己的
- 下一篇: 超市价签需要备案吗(超市价签备案)