2019獨角獸企業重金招聘Python工程師標準>>>
快速入門step by step MaxCompute Studio 創建完成?MaxCompute Java Module后,即可以開始開發Graph了。
代碼示例 在examples目錄下有graph的一些代碼示例,可參考示例熟悉Graph程序的結構。
?
編寫Graph 在module的源碼目錄即src >main >java newMaxCompute Java 。 選擇GraphLoader/Vertex等類型,Name OK**,模板會自動填充框架代碼,可在此基礎上繼續修改。
本地調試Graph Graph開發好后,下一步就是要測試自己的代碼,看是否符合預期。我們支持本地運行Graph,具體的:
運行Graph: 在驅動類(有main函數且調用GraphJob.run方法)上右鍵,點擊運行 run configuration**對話框,配置Graph需要在哪個MaxCompute Project上運行即可。
點擊OK ,如果指定MaxCompute project的表數據未被下載到warehouse中,則首先下載數據;如果采用mock項目或已被下載則跳過。接下來,graph local run框架會讀取warehouse中指定表的數據作為輸入,開始本地運行Graph,用戶可以在控制臺看到日志輸出。每運行一次本地調試,都會在Intellij工程目錄下新建一個臨時目錄,見下圖:
說明 ?關于warehouse的詳細介紹請參考開發UDF中本地warehouse目錄部分。
生產運行Graph 本地調試通過后,接下來就可以把Graph發布到服務端,在MaxCompute分布式環境下運行了:
首先,將自己的Graph程序打成jar包,并發布到服務端。如何打包發布? 通過Studio無縫集成的MaxCompute Console(在Project Explorer Open in Console**),在Console命令行中輸入類似如下的?jar命令:試用 ` -libjars xxx.jar -classpath /Users/home/xxx.jar com.aliyun.odps.graph.examples.PageRank pagerank_in pagerank_out; 更詳細的Graph開發介紹請參見[編寫Graph](https://help.aliyun.com/document_detail/27813.html#concept-gzg-1c2-vdb)。
<a name="Eclipse"></a>
## Eclipse
創建MaxCompute項目后,用戶可以編寫自己的Graph程序,參照下文步驟操作完成本地調試。<br />在此示例中,我們選用插件提供的 PageRank.java來完成本地調試工作。選中 **examples**下的 PageRank.java文件,如下圖。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643218_zh-CN.png)<br /><br />右鍵單擊,選擇 ****Debug As** >ODPS MapReduce|Graph****,如下圖。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643220_zh-CN.png)<br /><br />單擊后出現對話框,作如下配置。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643221_zh-CN.png) <br /><br />查看作業運行結果,如下圖。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643222_zh-CN.png)<br /><br />可以查看在本地的計算結果,如下圖。 <br />[](http://static-aliyun-doc.oss-cn-hangzhou.aliyuncs.com/assets/img/12154/15450228643223_zh-CN.png)<br /><br />調試通過后,用戶可以將程序打包,并以Jar資源的形式上傳到MaxCompute,并提交Graph作業。<a name="b54d1384"></a>
# MaxCompute Graph的最佳實踐
<a name="cc20fa1e"></a>
## 基于MaxCompute Graph實現用戶聚類
<a name="95da745d"></a>
### 場景說明
在商品品牌預測中,提供了一份用戶行為數據,如下:| 字段 | 字段說明 | 提取說明 |
| --- | --- | --- |
| user_id | 用戶標識 | 抽樣&字段加密 |
| brand_id | 品牌ID | 抽了樣&字段加密 |
| type | 用戶對品牌的行為類型 | 點擊:0<br />購買:1<br />收藏:2<br />加入購物車:3 |
| visit_datetime | 行為時間 | 格式某月某日,如7月6日, 隱藏年份 |假設需求是希望基于用戶的購買行為對用戶聚類。當用戶瀏覽時,可以給TA推薦同一個聚類(興趣度相近)的其他用戶購買了什么。
<a name="094c47ac"></a>
### [](https://www.atatech.org/articles/32335#1)問題分析
在推薦領域,該問題屬于基于用戶的協同過濾范疇,它主要包括兩個步驟:一是找到和目標用戶興趣度相近的用戶集合;二是給目標用戶推薦該集合中其他用戶感興趣(而目標用戶沒聽過)的item。<br />對用戶聚類即構建興趣度相近的用戶集合,常見的一種方式是通過Kmeans算法來實現。假定要把樣本劃分為k個類別,Kmeans算法的計算過程如下:
* 選擇k個初始中心節點;
* 在每次迭代中,對每個樣本,計算其到中心節點的距離;
* 更新中心節點
* 如果中心節點不變(或小于閾值),迭代結束;否則繼續步驟2)、3)迭代Kmeans算法的優勢在于簡潔快速,其關鍵在于初始中心節點的選擇和距離公式。<br />在這個示例中,首先應該對數據進行預處理,構造用戶的特征向量。出于簡單,這里選擇10個最hot的品牌(構造次數最多),基于用戶對這10個品牌的購買次數,構造特征如下:<br />user_id, cnt1, …, cnt10,其中cnt表示對應品牌的購買次數。<br />然后通過Graph編程框架,通過KMeans算法實現聚類。
<a name="9195cc17"></a>
### [](https://www.atatech.org/articles/32335#2)數據準備
原始數據表為tmall_user_brand,數據準備主要包括生成特征和選擇初始節點。
<a name="1c72fb00"></a>
### [](https://www.atatech.org/articles/32335#3)生成特征
生成特征包括如下步驟:
1. 選擇top 10 brands,生成表b
1. 統計用戶購買每個品牌的次數,生成表t
1. 對表b和t進行聯接,統計用戶購買top 10品牌的次數,生成表ub假設ub表數據如下:
user_id brand_id count rank a b1 5 1 a b3 2 3 a b4 3 4 b b3 1 3 b b7 9 7
<br />需要生成的特征表如下<br /><br />
user_id, cnt1, … , cnt10 a 5 0 2 3 0 0 0 0 0 0 b 0 0 1 0 0 0 9 0 0 0
<br />這里為了代碼簡短,通過SQL來“補”數據,通過sum(case when…)方式實現。<br />完整的SQL語句如下:<br />
create table t_user_feature as select
user_id,
sum(case when rank=1 then cnt else 0 end) as cnt1,
sum(case when rank=2 then cnt else 0 end) as cnt2,
sum(case when rank=3 then cnt else 0 end) as cnt3,
sum(case when rank=4 then cnt else 0 end) as cnt4,
sum(case when rank=5 then cnt else 0 end) as cnt5,
sum(case when rank=6 then cnt else 0 end) as cnt6,
sum(case when rank=7 then cnt else 0 end) as cnt7,
sum(case when rank=8 then cnt else 0 end) as cnt8,
sum(case when rank=9 then cnt else 0 end) as cnt9,
sum(case when rank=10 then cnt else 0 end) as cnt10
from(
select /*+ MAPJOIN(b) */t.user_id, t.brand_id, t.cnt, b.rank
from(select user_id, brand_id, count(*) as cnt from tmall_user_brandwhere type='1'group by user_id, brand_id
)t
join(select brand_id, rankfrom(select brand_id,row_number() over (partition by 1 order by buy_cnt desc) as rankfrom(select brand_id, count(*) as buy_cntfrom tmall_user_brandwhere type='1'group by brand_id)t1 )t2 where t2.rank <=10
)b
on t.brand_id = b.brand_id
)ub group by user_id; alter table t_user_feature set lifecycle 7;
<a name="b28d718c"></a>
### 選擇初始節點
對于Kmeans算法,初始節點的選取對聚類結果很重要,有很多paper研究如何選擇初始節點。這里出于簡單,直接隨機選取3個節點,SQL如下:
drop table if exists t_kmeans_seed; create table t_kmeans_seed as select user_id,
cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10
from(
selectuser_id,cnt1,cnt2,cnt3,cnt4,cnt5,cnt6,cnt7,cnt8,cnt9,cnt10,cluster_sample(3) over (partition by 1) as flag
from t_user_feature
)t1 where flag = true; alter table t_kmeans_seed set lifecycle 7;
<a name="cecbfece"></a>
### [實現Kmeans聚類](https://www.atatech.org/articles/32335#5)
這里我們基于在線手冊Graph示例程序的“k-均值聚類算法”來實現。代碼如下:
package example.demo; public class KmeansDemo { private final static Logger LOG = Logger.getLogger(KmeansDemo.class); private static String RESOURCE_TABLE; public static class KmeansVertex extends
Vertex<Text, Tuple, NullWritable, NullWritable> {
@Override
public void compute(ComputeContext<Text, Tuple, NullWritable, NullWritable> context,Iterable<NullWritable> messages) throws IOException {context.aggregate(this.getValue());
}
} public static class KmeansVertexReader extends
GraphLoader<Text, Tuple, NullWritable, NullWritable> {
@Override
public void load(LongWritable recordNum, WritableRecord record,MutationContext<Text, Tuple, NullWritable, NullWritable> context)throws IOException {Tuple val = new Tuple();for(int i=1; i<record.size(); ++i) {val.append(record.get(i));}KmeansVertex vertex = new KmeansVertex();vertex.setId(new Text(String.valueOf(record.get(0))));vertex.setValue(val);context.addVertexRequest(vertex);
}
} public static class KmeansAggrValue implements Writable {
Tuple centers = new Tuple();
Tuple sums = new Tuple();
Tuple counts = new Tuple();
@Override
public void write(DataOutput out) throws IOException {centers.write(out);sums.write(out);counts.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {centers = new Tuple();centers.readFields(in);sums = new Tuple();sums.readFields(in);counts = new Tuple();counts.readFields(in);
}
@Override
public String toString() {return "centers " + centers.toString() + ", sums " + sums.toString()+ ", counts " + counts.toString();
}
} public static class KmeansAggregator extends Aggregator {
@Override
public KmeansAggrValue createStartupValue(WorkerContext context)throws IOException {KmeansAggrValue aggrVal = null;aggrVal = new KmeansAggrValue();aggrVal.centers = new Tuple();aggrVal.sums = new Tuple();aggrVal.counts = new Tuple();RESOURCE_TABLE = context.getConfiguration().get("RESOURCE_TABLE");Iterable<WritableRecord> iter = context.readResourceTable(RESOURCE_TABLE);for(WritableRecord record : iter) {Tuple center = new Tuple();Tuple sum = new Tuple();for (int i = 1; i < record.size(); ++i) {center.append(record.get(i));sum.append(new LongWritable(0L));}LongWritable count = new LongWritable(0L);aggrVal.sums.append(sum);aggrVal.counts.append(count);aggrVal.centers.append(center);}return aggrVal;
}
@Override
public KmeansAggrValue createInitialValue(WorkerContext context)throws IOException {return (KmeansAggrValue) context.getLastAggregatedValue(0);
}
@Override
public void aggregate(KmeansAggrValue value, Object item) {int min = 0;long mindist = Long.MAX_VALUE;Tuple point = (Tuple) item;for (int i = 0; i < value.centers.size(); i++) {Tuple center = (Tuple) value.centers.get(i);// use Euclidean Distance, no need to calculate sqrtlong dist = 0L;for (int j = 0; j < center.size(); j++) {long v = ((LongWritable) point.get(j)).get()- ((LongWritable) center.get(j)).get();dist += v * v;}if (dist < mindist) {mindist = dist;min = i;}}// update sum and countTuple sum = (Tuple) value.sums.get(min);for (int i = 0; i < point.size(); i++) {LongWritable s = (LongWritable) sum.get(i);s.set(s.get() + ((LongWritable) point.get(i)).get());}LongWritable count = (LongWritable) value.counts.get(min);count.set(count.get() + 1L);
}
@Override
public void merge(KmeansAggrValue value, KmeansAggrValue partial) {for (int i = 0; i < value.sums.size(); i++) {Tuple sum = (Tuple) value.sums.get(i);Tuple that = (Tuple) partial.sums.get(i);for (int j = 0; j < sum.size(); j++) {LongWritable s = (LongWritable) sum.get(j);s.set(s.get() + ((LongWritable) that.get(j)).get());}}for (int i = 0; i < value.counts.size(); i++) {LongWritable count = (LongWritable) value.counts.get(i);count.set(count.get() + ((LongWritable) partial.counts.get(i)).get());}
}
@SuppressWarnings("rawtypes")
@Override
public boolean terminate(WorkerContext context, KmeansAggrValue value)throws IOException {// compute new centersTuple newCenters = new Tuple(value.sums.size());for (int i = 0; i < value.sums.size(); i++) {Tuple sum = (Tuple) value.sums.get(i);Tuple newCenter = new Tuple(sum.size());LongWritable c = (LongWritable) value.counts.get(i);if(c.equals(0L)) {continue;}for (int j = 0; j < sum.size(); j++) {LongWritable s = (LongWritable) sum.get(j);newCenter.set(j, new LongWritable(new Double((double)s.get()/ c.get()+0.5).longValue()));// reset sum for next iterations.set(0L);}// reset count for next iterationc.set(0L);newCenters.set(i, newCenter);}// update centersTuple oldCenters = value.centers;value.centers = newCenters;LOG.info("old centers: " + oldCenters + ", new centers: " + newCenters);// compare new/old centersboolean converged = true;for (int i = 0; i < value.centers.size() && converged; i++) {Tuple oldCenter = (Tuple) oldCenters.get(i);Tuple newCenter = (Tuple) newCenters.get(i);long sum = 0L;for (int j = 0; j < newCenter.size(); j++) {long v = ((LongWritable) newCenter.get(j)).get()- ((LongWritable) oldCenter.get(j)).get();sum += v * v;}double dist = Math.sqrt(sum);LOG.info("old center: " + oldCenter + ", new center: " + newCenter+ ", dist: " + dist);// converge threshold for each center: 0.05converged = dist < 0.05d;}if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {// converged or reach max iteration, output centersfor (int i = 0; i < value.centers.size(); i++) {context.write(((Tuple) value.centers.get(i)).toArray());}// true means to terminate iterationreturn true;}// false means to continue iterationreturn false;
}
} private static void printUsage() {
System.out.println("Usage: <in> <out> <resource> [Max iterations (default 30)]");
System.exit(-1);
} public static void main(String[] args) throws IOException {
if (args.length < 3)printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(KmeansVertexReader.class);
job.setRuntimePartitioning(false);
job.setVertexClass(KmeansVertex.class);
job.setAggregatorClass(KmeansAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
job.set("RESOURCE_TABLE", args[2]);
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 4)job.setMaxIteration(Integer.parseInt(args[3]));
long start = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "+ (System.currentTimeMillis() - start) / 1000.0 + " seconds");
} }
<br />和MapReduce編程框架類似,在main函數,先實例化一個GraphJob,對job設置后,通過job.run()提交。<br />KmeansVertexReader類實現加載圖,定義圖節點。由于kmeans算法是計算節點距離,因此不需要定義邊;此外它需要對迭代結果進行匯總,所以通過KmeansAggregator繼承Aggregator,實現每一步迭代計算。<br /><br />
<a name="f1e68d34"></a>
### [](https://www.atatech.org/articles/32335#6)運行和輸出
準備結果表SQL如下:
create table t_kmeans_result(
cnt1 bigint,
cnt2 bigint,
cnt3 bigint,
cnt4 bigint,
cnt5 bigint,
cnt6 bigint,
cnt7 bigint,
cnt8 bigint,
cnt9 bigint,
cnt10 bigint) lifecycle 7;<br />在console中執行如下命令:<br /><br />
add jar /home/admin/duckrun/dev/open_graph_example/target/open_graph_example-0.1.jar -f; add table t_kmeans_seed -f; jar -resources open_graph_example-0.1.jar,t_kmeans_seed -classpath /home/admin/duckrun/dev/open_graph_examp
<a name="d41d8cd9"></a>
##
<a name="388ac3a4"></a>
## 基于MaxCompute Graph實現并行化層次聚類
<a name="8e1b944f"></a>
### 背景
圖聚類是常見的一種聚類場景。和基于向量的聚類不同,圖的每個節點只和有限個節點有距離,無法定義任意兩點之間的距離。因此,像k-means這類常規方法就不適合圖聚類。本文要介紹的是用層次聚類(hierarchical clustering)的方法做圖聚類,其中為簡單起見,圖是無向的。
<a name="90179ba4"></a>
### [](https://www.atatech.org/articles/25067#1)聚類過程
標準的自底向上的層次聚類過程是這樣的:每次選取距離最小的兩個點merge,直到最后只剩一個點(包含所有的原始點)為止。聚類過程涉及到點和點,以及簇和簇之間距離計算的不同方法;具體的可以參考[維基百科的解釋](http://en.wikipedia.org/wiki/Hierarchical_clustering)。<br />基于無向圖的層次聚類和標準層次聚類是類似的,用邊的權值來度量節點之間的距離,同時更新合并節點的鄰居節點之間的邊。用偽代碼描述過程如下:
圖加載; While(不滿足聚類停止條件) { 選取距離最小的邊edgeAB; 生產新的節點AB; 生產新的邊,AB和A,B的所有鄰居之間; 刪除A和A鄰居之間的邊,刪除B和B鄰居之間的邊; 刪除A,刪除B; }
<a name="85901219"></a>
### [](https://www.atatech.org/articles/25067#2)MaxCompute Graph實現細節
層次聚類實現的核心是通過Vertex的compute來實現的。定義Vertex的執行狀態,分別包括:選舉狀態(minedge_electing);等待選舉結果狀態(waiting_election);停止狀態(waiting_delete)。
Vertex.compute() { switch(current_state) { case minedge_electing: if(存在鄰居節點)
選取和鄰居節點之間最小的邊,發送給aggregator;
else
voteToHalt(); //沒有鄰居,停止計算退出;
break; case waiting_election:
從aggregator獲取全局選取的最小邊minEdge;
if(minEdge的距離值>閥值距離) voteToHalt(); //沒有可以再做聚合的簇了,停止計算并退出;
else if(minEdge不是本節點和某個鄰居節點之間的邊)轉換狀態到minedge_electing,準備下一輪選舉迭代;
else { //假設本節點為A, minEdge對應的鄰居為B addVertexRequest(AB); //mergeA和B新生產節點 for(Vertex neighbor: A’s neighbors) {
removeEdgeRequest(A->neighbor);
removeEdgeRequest(neighbor->A);
if(neighbor不是B) {addEdgeRequest(AB->neighbor);addEdgeRequest(neighbor->AB);
} } removeVertexRequest(A); 轉換狀態到waiting_delete; } break; case waiting_delete: voteToHalt(); break; } }
全局Aggregator定義:兩兩比較邊的距離值,選取最小的那個;<br />節點沖突Resolver定義:當A節點發現minEdge是edge(A B)的同時,B也同樣發現,其處理流程和A是對稱相等的,因此會出現沖突(重復增加新節點AB,重復增加和刪除邊edge(AB, C),當C和A,B都有連接的時候)。如下圖所示:C節點是A,B的共同鄰居,因此A,B合并為新的節點AB后,針對C節點就需要特別處理沖突的情況;而D,E的處理就相對簡單。<br />
<a name="dd76d17c"></a>
### [](https://www.atatech.org/articles/25067#3)并行近似優化
上述的聚類流程中,真正并行化執行只是在選舉最短距離的過程(單機版需要掃描所有的邊,graph分布式由節點把相鄰的最短距離report to aggregator),而merge僅僅只有兩個節點參與。由于graph框架本身的耗費,實際測試發現程序執行速度并不理想。<br />既然在節點merge的過程沒有并行化,那么就思考是否在這塊可以做并行化處理,答案是肯定的。例如下圖中,邊edgeAB可以merge的同時,是否可以考慮把edgeGH也merge。<br /><br />從圖上看出edgeAB和edgeGH之間路徑相對比較遠,同步merge G,H對全局結果的影響不大,按照標準的全局選舉流程,最終也會選擇G,H來merge。當然,理論上來說,有可能由于A,B合并了以后,導致和周圍節點邊更新,從而影響了后續的全局選舉結果。因此,并行化的merge節點最終是一個近似的結果。為了保證近似結果的可靠性,第一在于同時可merge節點之間的路徑要足夠的遠,相互影響的可能性就小。考慮一個極端的情況,就是路徑無窮大,實質是不連通的情況,那么同時merge就完全沒有風險了。第二,必須保證節點merge以后,生成新邊的權重要合理,以保證并行化merge順序和非并行化merge順序近似一致,有關這一點后續會細說。<br />修改選舉最短距離邊的實現,不用全局選舉的結果,而是在一定路徑范圍內選舉出最短距離邊,然后merge,這樣就同時會選舉出多個局部最短距離邊。可同步merge的邊必須滿足一個最短路徑閥值,如下圖所示:edgeAB和edgeDE是可以同步merge的,不會起沖突,因為對節點C而言,分別增加了兩個新的節點;如果edgeAB和edgeCD同步merge,那就會起沖突,因為兩個新生產的節點之間也需要產生鄰居關系。因此,必須保證同步merge的邊之間至少存在一個不變化的節點,這樣就避免了新節點之間的鄰居關系生成。<br /><br />在局部選舉的過程中,依然采用的是節點report自己所知道的最短距離,只是將report給aggregator,改為report給鄰居,并且通過多次迭代實現傳播功能。局部選舉的偽代碼如下:
Step1:? 選取和鄰居節點之間距離最小的邊,發送給所有鄰居節點以及本節點;進入step2; Step2: 從接受的消息中選取距離最小的邊(包括了在step1中鄰居以及本節點選取的結果),發送給所有鄰居節點以及本節點;進入step3; Step3: 從接受的消息中選取距離最小的邊(包括了在step2中鄰居以及本節點選取的結果),發送給所有鄰居節點以及本節點;進入step4; ………. StepN: 從接受的消息中選取距離最小的邊(包括了在stepN-1中鄰居以及本節點選取的結果),如果minEdge是本節點的一條邊,那么就進行merge,否則進入step1;
事實上,每一step就是不斷地選舉局部最短距離邊,并且把這個信息逐層擴散,這樣就確保了在一定的路徑范圍內永遠只選舉一個最短距離邊。N的設置可以配置,顯然,N越小,并行化程度就越高。當然,必須避免沖突,因此N的最小取值為3。
<a name="ce23477e"></a>
### [](https://www.atatech.org/articles/25067#4)邊權重更新
層次聚類過程中,簇和簇之間距離的計算可以參考[維基百科](http://en.wikipedia.org/wiki/Hierarchical_clustering) 提到的各種方法。本文參考的是[Ward方法](http://en.wikipedia.org/wiki/Ward%27s_method) 來計算節點merge以后和鄰居節點之間的邊權重。另外要說明的是有關邊距離的度量,由于本文提出的方法是針對淘寶商品[interest entity node](http://dthink.alibaba-inc.com/articles/commonalg/interestgraph.htm)聚類的實現,而輸入是node和node之間協同相似度(看了又看,買了又買);因此節點之間的距離度量是和相似度成反比的。相似度越大,等同于距離就越小。為簡單起見,就直接用相似度作為距離的度量。每次選舉局部距離最小的節點對,即是選舉相似度最大的節點對。<br />基于Ward的思想,把要merge的兩個簇的節點數量作為衡量的標準,同時考慮到降低邊權重減弱的速度,最終用以下的方法做更新:<br />假設要merge的兩個節點分別為A和B,節點nA,nB分別是A和B的鄰居;<br />nA,nB和新節點AB的相似度計算:
sim(nA, AB)=sim(A, nA) * alphaA; sim(nB, AB)=sim(B, nB) * alphaB; 當size(A) + size(B)=2的時候,alphaA=alphaB=0.9; 否則alphaA=sqrt(sizeA) / sqrt(sizeA + sizeB), alphaB=sqrt(sizeB) / sqrt(sizeA + sizeB)。 當nA和nB為同一個節點的時候,也即A,B共同鄰居,和新節點AB的相似度最終合并為:(sim(A, nA) + sim(B, nB)) * 0.618。
<a name="d41d8cd9-1"></a>
### [](https://www.atatech.org/articles/25067#5)<a name="992bf7f5"></a>
## 基于MaxCompute Graph實現大規模網絡的關系擴散
關系數據相關的實體有自然人、企業、媒介、賬號等,如何對由億級別的節點和邊組成的大規模網絡進行有效的圖計算是一個剛性需求。
<a name="2fe57705"></a>
### [](https://www.atatech.org/articles/104874#0)問題抽象
如果有一個億級別的大規模有向網絡(就假設為微博的用戶關注關系網絡好了,便于理解),如何進行關系擴散找到用戶可能想關注的其他用戶呢?打個比方,A用戶關注了B,B又關注了C,那么可能C就是A想要關注的潛在用戶,現在我們要做的事就是把所有的C找出來推薦給A,最好還要把A到C的關注鏈路也一并輸出,便于其他深入的分析。我們的目標定為四度關系擴散,A—>B—>C—>D—>E,找到E。
<a name="01ea6c5e"></a>
### [](https://www.atatech.org/articles/104874#1)暴力解法
最直接的想法就是對已有的一度關系表進行一次join得到兩度關系,進行兩次join得到三度關系,依次類推。假設網絡是均勻分布的,每個人關注的人數量級差不多,利用MaxCompute強大的計算能力,這種方法還有可能會計算出結果。然而現實的網絡結構往往會存在小部分的出邊和入邊遠大于平均水平的超點(微博大V),這些點在join的過程中極易造成數據傾斜,一次join還能勉勉強強接受,但兩次三次join最后99.9%會以計算失敗告終。<br />那么利用MaxCompute Graph的sendMessage機制能否解決這個問題呢?在每一迭代步里,每個節點都將自身的節點值添加到上游節點傳來的路徑后面,再將路徑當做message傳遞給下游節點,如下圖所示,計算過程如下:<br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/98ed95e8939df0408db2fee4eb23c0b9.png)<br />第一步:每個節點的value設置為自身的id,并將value發送所有出邊的終點;<br />第二步:每個節點將收到的所有消息存儲為一個list,將自身id添加到list里面的每個元素后面,再將這個list發送給下游節點;<br />第三~五步:重復第二步。第五步輸出的長度為5的路徑即是我們想要的結果。<br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/644dbbfd4438f4ada3e51e1a1b91d147.png)<br />但最終實踐證明,這種方法在到第二步以后就會內存不夠報錯,盡管已經將各項參數調到最大,還是不行。主要原因是發送消息采用數組的形式太占內存,每一步都將自身節點添加到所有路徑后面也會有重復存儲的問題,看來還有很多優化的空間。
<a name="ac500c12"></a>
### [](https://www.atatech.org/articles/104874#2)兩度關系
我們先從最簡單的兩度關系入手,由于MaxCompute Graph是以點為粒度進行輸出的,因此我們只需找到每個節點的頭和尾,相當于把兩度路徑的中間節點固定住,遍歷頭部節點和尾部節點,就可以輸出所有的兩度路徑了。實現很簡單,首先定義一個MyValue的class存儲所有的上游節點值和下游節點值以及自身節點值:
public static class MyValue implements Writable {
private Tuple downVertex; //下游節點
private Text selfId; //自身節點
private Tuple upVertex; //上游節點
public MyValue() {downVertex = new Tuple();selfId = new Text();upVertex = new Tuple();
}
public MyValue(Text id) {downVertex = new Tuple();selfId = new Text(id);upVertex = new Tuple();
}
public void setSelfId(Text id) {selfId = id;
}
public void setDownVertex(Tuple value) {downVertex = value;
}
public void setUpVertex(Tuple value) {upVertex = value;
}
public Tuple getDownVertex() {return downVertex;
}
public Text getSelfId() {return selfId;
}
public Tuple getUpVertex() {return upVertex;
}
public void addDownVertex(Writable value) {downVertex.append(value);
}
public void addUpVertex(Writable value) {upVertex.append(value);
}
@Override
public void write(DataOutput out) throws IOException {upVertex.write(out);selfId.write(out);downVertex.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {upVertex.readFields(in);selfId.readFields(in);downVertex.readFields(in);
}
}
然后進行簡單的5步迭代即可得到結果:
switch ((int) context.getSuperstep()) {
case 0: //設置自身節點值getValue().setSelfId(getId());break;
case 1: //發送自身id給下游節點if (hasEdges()) {context.sendMessageToNeighbors(this, new MyValue(getId()));}break;
case 2: //存儲收到的消息,存儲為上游節點列表 for (MyValue msg : messages) {getValue().addUpVertex(msg.getSelfId());}break;
case 3: //發送自身id給上游節點for (Writable id : getValue().getUpVertex().getAll()) {context.sendMessage((Text) id, new MyValue(getId()));}break;
case 4: //存儲收到的消息,存儲為下游節點列表for (MyValue msg : messages) {getValue().addDownVertex(msg.getSelfId());}break;
}
最后將結果輸出即可:
@Override public void cleanup(WorkerContext context)
throws IOException {
context.write(new Text(getValue().getUpVertex().toDelimitedString(',')),getId(),new Text(getValue().getDownVertex().toDelimitedString(',')));
}
輸出結果的第一列和第三列均為數組,第二列為當前的節點,利用trans_array函數即可將數組轉換為多行。這里有個坑需要注意,sql不能寫成下面的形式:
select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3) from? (
select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1)
from result_table
)t1
因為這樣會把那些出邊和入邊非常多的節點同時解析兩列trans_array的工作量分配到一個mapper上,造成嚴重的數據傾斜,寫成下面的形式即可進行兩次的資源分配,極大地降低數據傾斜的程度。
drop table if exists result_table_left; create table result_table_left lifecycle 7 as? select trans_array(2, ',', node2, node3, node1) as (node2, node3, node1) from result_table; drop table if exists result_table_right; create table result_table_right lifecycle 7 as? select trans_array(2, ',', node1, node2, node3) as (node1, node2, node3) from result_table_left;
<a name="b1b13f88"></a>
### [](https://www.atatech.org/articles/104874#3)三度關系
既然兩度關系可以利用MaxCompute Graph的特性固定住中間的節點,那么自然地,我們可以想到,三度關系可以固定住中間的兩個節點,變成以關系對的粒度(也就是邊的粒度)進行路徑頭和尾的遍歷。但是Graph的輸出是以點為粒度,想要實現邊的粒度還需要再多發送一次消息,如下所示:
switch ((int) context.getSuperstep()) {
case 0: //設置自身節點值getValue().setSelfId(getId());break;
case 1: //發送自身id給下游節點if (hasEdges()) {context.sendMessageToNeighbors(this, new MyValue(getId()));}break;
case 2: //存儲收到的消息,存儲為上游節點列表 for (MyValue msg : messages) {getValue().addUpVertex(msg.getSelfId());}break;
case 3: //發送自身id給上游節點for (Writable id : getValue().getUpVertex().getAll()) {context.sendMessage((Text) id, new MyValue(getId()));}break;
case 4: //存儲收到的消息,存儲為下游節點列表for (MyValue msg : messages) {getValue().addDownVertex(msg.getSelfId());}break;
case 5: //再將下游節點的值發送給上游for (Writable id : getValue().getUpVertex().getAll()) {context.sendMessage((Text) id, getValue());}break;
case 6: //結果輸出 [上游節點列表,本節點,當前消息所屬的下游節點,下游節點的下游節點列表]for (MyValue msg : messages) {context.write(new Text(getValue().getUpVertex().toDelimitedString(',')),getId(),msg.getSelfId(),new Text(msg.getDownVertex().toDelimitedString(',')));}break;
}
最后再像二度關系里面用兩次trans_array解析即可得到所有的三度關系路徑了。
<a name="3ad2b9ec"></a>
### [](https://www.atatech.org/articles/104874#4)四度關系
同樣按照之前的思路,四度關系相當于固定住中間的三個節點再進行頭部節點和尾部節點的遍歷。那么問題來了,固定住一個節點可以看做以點為粒度進行遍歷,固定住兩個節點可以看做是以邊為粒度進行遍歷,那么固定住三個節點相當于什么呢?問題好像不可解了。但是我們可以換個思路來看,如果我們把固定住三個節點轉換為固定住兩個節點呢?如下圖所示,我們已經通過兩度關系的輸出得到所有的三個節點的路徑,如A和C,那么我們在A和C上新加一條邊,將邊的值設置為中間節點B的節點值,這樣就可以變成兩個節點了!而原來的邊還保留,只是邊的值為空。<br /><br />[](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/10e374d629e8d1fb718255226111abf8.png)<br />因此,我們重新用一度關系邊和兩度關系的虛擬邊構建一個新的網絡,再在新的網絡上運用三度關系的迭代方法。注意,添加虛擬邊后會讓節點的上下游節點列表變大,因此,前四步構建上下游節點列表時需加一條判斷邊的值為空的條件,然后第五步和第六步輸出路徑時需要判斷邊不為空。
switch ((int) context.getSuperstep()) {
case 0: //設置自身節點值getValue().setSelfId(getId());break;
case 1: //發送自身id給下游節點if (hasEdges()) {for (Edge<Text, Text> e : getEdges()) {if (e.getValue().equals(new Text(""))) {context.sendMessage(e.getDestVertexId(), new MyValue(getId()));}}}break;
case 2: //存儲收到的消息,存儲為上游節點列表 for (MyValue msg : messages) {getValue().addUpVertex(msg.getSelfId());}break;
case 3: //發送自身id給上游節點for (Writable id : getValue().getUpVertex().getAll()) {context.sendMessage((Text) id, new MyValue(getId()));}break;
case 4: //存儲收到的消息,存儲為下游節點列表for (MyValue msg : messages) {getValue().addDownVertex(msg.getSelfId());}break;
case 5: //再將本節點的值和邊值發送給下游if (hasEdges()) {MyValue msg = new MyValue();msg.setDownVertex(getValue().getDownVertex());msg.setUpVertex(getValue().getUpVertex());for (Edge<Text, Text> e : getEdges()) {if (!e.getValue().equals(new Text(""))) {String id = getId().toString();String edge = e.getValue().toString();msg.setSelfId(new Text(id+"+"+edge); context.sendMessage(e.getDestVertexId(), msg);}}}break;
case 6: //結果輸出 [上游的上游節點列表,上游節點+中間節點,本節點,本節點的下游節點列表]for (MyValue msg : messages) {context.write(new Text(msg.getUpVertex().toDelimitedString(','))msg.getSelfId(),getId(),new Text(getValue().getDownVertex().toDelimitedString(',')));}break;
}
最后再用兩次trans_array和split解析即可得到所有的四度關系路徑了。
<a name="5a3530b9"></a>
### [](https://www.atatech.org/articles/104874#5)環路截斷
前面的討論沒有考慮環路的情況,實際中環路是很常見的,比如兩個人互相關注。有環路時,輸出的路徑需要截斷。<br />兩度關系輸出為3個節點,只需判斷頭尾不相同即可,頭尾相同將頭節點置為空,退化為一度關系。比如A—>B—>A截斷為B—>A;<br />三度關系輸出為4個節點,中間兩個節點肯定不相同,判斷第一個節點是否和第三、第四個節點相同,相同將第一個節點截斷,再判斷第二個節點和第四個節點是否相同,相同的話在第二個節點處截斷,即同時將第一個和第二個節點置為空。如A—>B—>C—>A截斷為B—>C—>A,B—>A—>C—>A截斷為C—>A;<br />四度關系同理,不再贅述。
<a name="433531fd"></a>
### [](https://www.atatech.org/articles/104874#6)結語
本例實踐了一張有1億節點,2億邊的有向圖,對其進行了關系擴散,最終的結果兩度關系有221億,三度關系有2180億,四度關系已經上萬億了,計算耗時兩度關系40分鐘,三度關系90分鐘左右,四度及以上整個過程的瓶頸已經不在計算了,而在MaxCompute Graph輸出上,輸出的耗時基本以小時為單位計算。
?
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
轉載于:https://my.oschina.net/u/3889140/blog/3038792
總結
以上是生活随笔 為你收集整理的MaxCompute 图计算开发指南 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。