03 graphx 从 SSSP 来看 pregel
前言
呵呵 最近剛好有一些需要使用到 圖的相關計算?
然后 在其他文章中找到了一篇 關于最短路徑的graphx計算的代碼?spark graphx 最短路徑及中間節點?
呵呵 很久沒有用這些東西了, 雖然只是簡單的使用, 但是還是要 復習一下, 稍微理解一下 他的執行方式?
pregel 相關論文 : 留一個占位符?
?
本文主要是根據一個 SSSP 的最短路徑的測試代碼來進行開始, 大致了解一下 pregel 的執行模式, 調試一下 pregel 的執行, 以及 spark 本身提供的最短路徑的 api 理解一下?
?
環境如下 : spark2.4.5 + scala2.11 + jdk8
?
?
測試代碼?
為了便于調試, 只配置了1個executor節點?
在 vprog, sendMsg, mergeMsg 里面加了一些日志, 是為了查看執行過程?
package com.hx.testimport org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** Test19SSSP** @author Jerry.X.He <970655147@qq.com>* @version 1.0* @date 2020-05-25 15:06*/ object Test19SSSP {def main(args: Array[String]) {val conf = new SparkConf().setAppName("Pregel_SSSP").setMaster("local[1]")val sc = new SparkContext(conf)val sourceId: VertexId = 0 // The ultimate source// 創造一個邊的RDD, 包含各種關系val edges: RDD[Edge[Double]] = sc.parallelize(Array(Edge(3L, 7L, 1.0d),Edge(5L, 3L, 1.0d),Edge(2L, 5L, 1.0d),Edge(5L, 7L, 1.0d),Edge(0L, 3L, 1.0d),Edge(3L, 2L, 1.0d),Edge(7L, 9L, 1.0d),Edge(0L, 5L, 1.0d)))// 創造一個點的 RDD// 0L, 2L, 3L, 5L, 7L, 9Lval vertexes: RDD[(VertexId, (Double, List[VertexId]))] = edges.flatMap(edge => Array(edge.srcId, edge.dstId)).distinct().map(id =>if (id == sourceId) (id, (0, List[VertexId](sourceId)))else (id, (Double.PositiveInfinity, List[VertexId]())))val defaultVertex = (-1.0d, List[VertexId]())// A graph with edge attributes containing distancesval initialGraph: Graph[(Double, List[VertexId]), Double] = Graph(vertexes, edges, defaultVertex)println(" edges as follow : ")initialGraph.edges.foreach(println)// initialMsg, 會向每一個 vertex 發送 initialMsg, 然后使用 vprog 來計算, 更新頂點數據, 0, 1, ..., 9// 第一輪消息, 然后 各個頂點向鄰近的頂點發送消息, 0, 2, 3, 5, 7[根據邊]// 然后 各個收到消息的頂點, 執行 vprog, 3, 5// 第二輪消息, 第一輪收到消息的頂點, 向鄰近的頂點發送消息, 3, 5// 如果某個頂點收到多個消息, 進行 merge// 然后 各個收到消息的頂點, 執行 vprog 2, 7, 3// 第三輪消息, ...val sssp = initialGraph.pregel((Double.PositiveInfinity, List[VertexId]()), 2, EdgeDirection.Out)(// Vertex Program(id, dist, newDist) => {println(" vertex : " + id)if (dist._1 < newDist._1) dist else newDist},// Send Messagetriplet => {println(" sendMsg " + triplet.srcId + " -> " + triplet.dstId)if (triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr) {Iterator((triplet.dstId, (triplet.srcAttr._1 + triplet.attr, triplet.srcAttr._2 :+ triplet.dstId)))} else {Iterator.empty}},//Merge Message(a, b) => {println(" merge : " + a + ", " + b)if (a._1 < b._1) a else b})println(" result as follow : ")println(sssp.vertices.collect.mkString("\n"))}}?
執行結果如下?
edges as follow : Edge(0,3,1.0) Edge(0,5,1.0) Edge(2,5,1.0) Edge(3,2,1.0) Edge(3,7,1.0) Edge(5,3,1.0) Edge(5,7,1.0) Edge(7,9,1.0)vertex : 0 vertex : 3 vertex : 7 vertex : 9 vertex : 5 vertex : 2sendMsg 0 -> 3 sendMsg 0 -> 5 sendMsg 2 -> 5 sendMsg 3 -> 2 sendMsg 3 -> 7 sendMsg 5 -> 3 sendMsg 5 -> 7 sendMsg 7 -> 9vertex : 3 vertex : 5sendMsg 3 -> 2 sendMsg 3 -> 7 sendMsg 5 -> 3 sendMsg 5 -> 7 merge : (2.0,List(0, 3, 7)), (2.0,List(0, 5, 7))vertex : 7 vertex : 2sendMsg 7 -> 9 sendMsg 2 -> 5result as follow : (0,(0.0,List(0))) (3,(1.0,List(0, 3))) (7,(2.0,List(0, 5, 7))) (9,(Infinity,List())) (5,(1.0,List(0, 5))) (2,(2.0,List(0, 3, 2)))?
邊的信息如下?
edges as follow : Edge(0,3,1.0) Edge(0,5,1.0) Edge(2,5,1.0) Edge(3,2,1.0) Edge(3,7,1.0) Edge(5,3,1.0) Edge(5,7,1.0) Edge(7,9,1.0)構造的圖如下?
各個頂點的屬性, 除了 0(開始節點) 是 (0, List(0)), 其他的都是?(Double.PositiveInfinity, List[VertexId]())
?
?
Test19SSSP 的執行過程
1. 初始化發送消息個各個節點, 然后執行 vprog?
執行之前如下圖?
執行之后如下圖?
?
對應于上面的日志?
vertex : 0 vertex : 3 vertex : 7 vertex : 9 vertex : 5 vertex : 2?
2. 收到消息的頂點執行 sendMsg?
根據 sendMsg 的邏輯?
執行之后??sendMsg 0 -> 3, sendMsg 0 -> 5 發送了消息?
?sendMsg 2 -> 5,? sendMsg 3 -> 2,? sendMsg 3 -> 7,? sendMsg 5 -> 3, sendMsg 5 -> 7,? sendMsg 7 -> 9 因為源節點, 目標節點的屬性均是 (PositiveInfinity, List()), 不滿足條件 "(triplet.srcAttr._1 < triplet.dstAttr._1 - triplet.attr)"(到當前節點的最短路徑 + 路徑的權重 < 到目標節點的最短路徑, 表示到目標節點的最短路徑可以更小, 發消息給目標節點)?
sendMsg 0 -> 3 : 給節點3發送了消息 (1, List(0, 3))
sendMsg 0 -> 5?: 給節點5發送了消息 (1, List(0, 5))?
?
對應于上面的日志?
sendMsg 0 -> 3 sendMsg 0 -> 5 sendMsg 2 -> 5 sendMsg 3 -> 2 sendMsg 3 -> 7 sendMsg 5 -> 3 sendMsg 5 -> 7 sendMsg 7 -> 9?
3. 第一輪迭代, 收到消息的節點執行 vprog?
上面 節點 3, 5 收到了消息, 之后執行 vprog?
執行之前如下圖
執行之后如下圖?
?
對應于上面的日志?
vertex : 3 vertex : 5?
4. 第一輪迭代, 收到消息的節點執行 sendMsg?
根據 sendMsg 的邏輯?
執行之后??sendMsg 3 -> 2, sendMsg 3 -> 7, sendMsg 5 -> 7 發送了消息?
sendMsg 5 -> 3 因為源節點的最短路徑 + 邊權重 不小于 到目標節點的最短路徑, 不發送消息?
sendMsg 3 -> 2 : 給節點2發送了消息 (2, List(0, 3, 2))
sendMsg 3 -> 7?: 給節點7發送了消息 (2, List(0, 3, 7))
sendMsg 5 -> 7?: 給節點7發送了消息 (2, List(0, 5, 7))?
?
對應于上面的日志?
sendMsg 3 -> 2 sendMsg 3 -> 7 sendMsg 5 -> 3 sendMsg 5 -> 7?
5. 第一輪迭代, 兩個節點對節點7發送消息?mergeMsg?
然后 由于兩個節點同時向 節點 7 發送了消息, 使用 mergeMsg 對消息進行 merge?
對應于上面的日志?
merge : (2.0,List(0, 3, 7)), (2.0,List(0, 5, 7))?
根據 mergeMsg 的邏輯?
merge 的結果為?(2.0,List(0, 5, 7)?
所以, 節點2 收到的消息為?(2, List(0, 3, 2))?
節點7?收到的消息為?(2.0,List(0, 5, 7)?
?
6. 第二輪迭代,?收到消息的節點執行 vprog?
上面 節點 2, 3, 7?收到了消息, 之后執行 vprog?
執行之前如下圖
執行之后如下圖
?
對應于上面的日志?
vertex : 7 vertex : 2?
7. 第二輪迭代,?收到消息的節點執行 sendMsg??
根據 sendMsg 的邏輯?
執行之后??sendMsg 7 -> 9 發送了消息?
sendMsg 2 -> 5 因為源節點的最短路徑 + 邊權重 不小于 到目標節點的最短路徑, 不發送消息?
sendMsg 7 -> 9 : 給節點2發送了消息 (3, List(0, 5, 7, 9))
?
對應于上面的日志?
sendMsg 7 -> 9 sendMsg 2 -> 5?
8. 第三輪迭代?
代碼中限定了最多兩輪迭代, 因此 整個迭代結束?
最后各個 頂點上面的信息如下,?對應于上面的日志?
result as follow : (0,(0.0,List(0))) (3,(1.0,List(0, 3))) (7,(2.0,List(0, 5, 7))) (9,(Infinity,List())) (5,(1.0,List(0, 5))) (2,(2.0,List(0, 3, 2)))?
?
從 Pregel 的代碼來看?Test19SSSP 的執行過程
pregel 的代碼如下, 紅框處大致如下?
1. 使用初始化消息?初始化各個頂點?
2. 各個頂點根據邊發送初始化消息?
3. 收到消息的頂點執行 vprog?
4. 收到消息的頂點根據邊發送消息(一個節點收到多個消息, 使用 mergeMsg 進行消息的合并)?
迭代 3, 4, 直到沒有頂點之間消息傳遞, 或者 迭代次數達到上限?
?
以我們這里?Test19SSSP 為例?
進入 pregrel 的時候 graph 的邊信息如下??
各個頂點的信息如下?
?
1. 初始化發送消息個各個節點, 然后執行 vprog
想各個頂點發送初始化消息, 以及各個頂點發送初始化消息 的情況如下?
初始化消息發送到各個頂點, 各個頂點執行 vprog 之后, 各個頂點的數據沒有變化(因為初始化消息的 _1 是 PositiveInfinitely)
?
2. 收到消息的頂點執行 sendMsg?
收到消息的頂點開始執行 sendMsg, 發送了兩個消息?
sendMsg 0 -> 3 : 給節點3發送了消息 (1, List(0, 3))
sendMsg 0 -> 5?: 給節點5發送了消息 (1, List(0, 5))?
?
3. 第一輪迭代, 收到消息的節點執行 vprog?
?
4. 第一輪迭代, 收到消息的節點執行 sendMsg?
5. 第一輪迭代, 兩個節點對節點7發送消息?mergeMsg?
?
6. 第二輪迭代,?收到消息的節點執行 vprog?
?
7. 第二輪迭代,?收到消息的節點執行 sendMsg??
?
8. 第三輪迭代?
代碼中限定了最多兩輪迭代, 因此 整個迭代結束?
迭代結束, 然后 走用例程序后面的 打印結果 相關代碼?
?
?
spark 官方的?SSSP 測試用例
樣例代碼如下?
package com.hx.testimport org.apache.spark.graphx.lib.ShortestPaths import org.apache.spark.graphx.{Edge, EdgeDirection, Graph, VertexId} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** Test21SSSPOfficial** @author Jerry.X.He <970655147@qq.com>* @version 1.0* @date 2020-06-25 10:06*/ object Test21SSSPOfficial {def main(args: Array[String]) {val conf = new SparkConf().setAppName("Pregel_SSSP").setMaster("local[1]")val sc = new SparkContext(conf)val targetId: VertexId = 9 // The ultimate source// 創造一個邊的RDD, 包含各種關系val edges: RDD[Edge[Double]] = sc.parallelize(Array(Edge(3L, 7L, 1.0d),Edge(5L, 3L, 1.0d),Edge(2L, 5L, 1.0d),Edge(5L, 7L, 1.0d),Edge(0L, 3L, 1.0d),Edge(3L, 2L, 1.0d),Edge(7L, 9L, 1.0d),Edge(0L, 5L, 1.0d)))// 創造一個點的 RDD// 0L, 2L, 3L, 5L, 7L, 9Lval vertexes: RDD[(VertexId, Long)] = edges.flatMap(edge => Array(edge.srcId, edge.dstId)).distinct().map(id => (id, 1L))val defaultVertex = 1L// A graph with edge attributes containing distancesval graph: Graph[Long, Double] = Graph(vertexes, edges, defaultVertex)val landmarks = Seq(targetId).map(_.toLong)val vertices = ShortestPaths.run(graph, landmarks).vertices.collectval results = vertices.map {case (v, spMap) => (v, spMap.mapValues(i => i))}results.foreach(println)}}執行結果如下, 0 -> 9 最短路徑為 3, 3 -> 9 最短路徑為 2 以此類推?
(0,Map(9 -> 3)) (3,Map(9 -> 2)) (7,Map(9 -> 1)) (9,Map(9 -> 0)) (5,Map(9 -> 2)) (2,Map(9 -> 3))?
?
參考?
spark graphx 最短路徑及中間節點?
/spark-2.4.5/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPath.scala?
?
?
總結
以上是生活随笔為你收集整理的03 graphx 从 SSSP 来看 pregel的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 出现这十种症状,说明你不适合做程序员
- 下一篇: 悟空问答python反爬_Python写