KMeans算法的Mapreduce实现
?
Hive數據分析... 4
一、數據處理.... 4
1.1處理不符合規范的數據。... 4
1.2訪問時間分段。... 5
二、基本統計信息.... 6
三、數據屬性基礎分析.... 6
3.1用戶ID分析... 6
3.1.1UID的查詢次數。... 6
3.1.2UID頻度排名分析。... 7
3.2搜索關鍵詞分析... 8
3.2.1熱詞分析... 8
3.2.2使用幾個單詞還是一個句子作為關鍵詞。... 9
3.2.3使用文字描述還是域名一部分作為關鍵詞。... 10
3.3URL分析... 10
3.3.1熱門搜索分析。... 10
3.3.2URL流量分析。... 11
3.4 Rank分析... 12
3.5 Order分析... 13
3.6訪問時間分析... 14
四、數據深入特色分析.... 15
4.1.某一用戶分析... 15
4.1.1該UID背后是否是爬蟲程序?. 15
4.1.2該UID背后是瀏覽器代理程序嗎?... 16
4.2.某一網站分析... 18
4.2.1關鍵字分析... 18
4.2.2訪問量與時間... 19
KMeans算法的MapReduce實現... 20
一、制定距離衡量標準.... 20
1.1曼哈頓距離衡量時間、Order、Rank之間的距離。... 20
1.2萊文斯坦距離衡量關鍵字之間的距離。... 21
1.3三種距離的計算。... 22
1.3.1兩個記錄之間的距離。... 22
1.3.2一條記錄與類簇中心點集合的距離。... 22
1.3.3新類簇中心點集合與舊類簇中心點之間的距離。... 22
二、設計定制的Writeable集合與實現功能函數.... 23
2.1定制的Writable集合:dataCell類... 23
2.1.1構造函數:dataCell()、dataCell(String time, String uid, String keyword, int rank, int order, String url)。... 23
2.1.2序列化與反序列化:write(DataOutput out)、readFields(DataInput in)。... 23
2.1.3比較大小:compareTo(dataCell o) 23
2.2功能函數。... 24
2.2.1從一組元素中計算一個類簇中心:caculateCenter(List<List<Object>> A) 24
2.2.2從文件中獲取所有類簇中心集合:getCenters(String inputpath) 24
2.2.3從Hdfs獲取程序迭代類簇中心結果及分類結果到本地:getCenterResult( String localPath)、getClassfiyResult( String localPath) 24
2.2.4使用新類簇中心集合替換舊類簇中心集合:replaceOldCenter(String oldpath, String newpath) 25
2.2.5判斷新舊兩組類簇中心的距離是否已經達到迭代停止條件:isFinished(String oldpath, String newpath, int max) 25
三、生成初始類簇中心點.... 26
四、第一次MapReduce:迭代聚類中心點.... 27
4.1MapReduce設計。... 27
4.2 Mapper實現。... 27
4.3 Reducer實現。... 28
4.4 JobDriver實現。... 29
五、第二次MapReduce:數據分類.... 29
5.1 MapReduce設計。... 29
5.2 Mapper實現。... 30
5.3 Recuce實現。... 30
5.4 JobDriver實現。... 31
六、衡量分類效果.... 31
七、運行與分析.... 32
7.1?????? 一次完整的程序運行... 32
7.1.1產生初始聚類中心。... 32
7.1.2迭代聚類中心。... 33
7.1.3數據分類。... 34
7.1.4衡量分類效果。... 34
7.2?????? 尋找最佳類簇個數
Mapreduce附錄.... 44
(1)??? dataCell類... 44
(2)??? Help類... 47
(3)??? HelpTest 55
(4)??? KMeansDriver 56
(5)??? KmeansMapperForCenter類... 59
(6) ?KMeansReducerForCenter類... 60
(7) ?KMeansMapperForClassify類... 61
(8)? KMeansReducerForClassify類??? 62KMeans算法的MapReduce實現
在本文中我使用KMeans算法實現搜狗搜索數據集上的MapReduce程序。K-Means算法輸入聚類個數k,以及源數據,并將源數據分為k類輸出。在分類后的數據中,同一聚類中的對象相似度較高;而不同聚類中的對象相似度較小。眾所周知,KMeans算法在初始中心點選取及聚類個數方面存在一定不足,在本文中我將在實現算法之余對于這兩點嘗試做出一些改進。此外,想要順利的實現算法清晰的思路必不可少,在程序實現方面,我將按照制定距離衡量標準、生成初始聚類中心、迭代聚類中心、數據分類、衡量分類效果的步驟進行,最后還將對使用不同個數聚類中心,程序所展示出的效果進行分析。
一、制定距離衡量標準
搜狗數據集每行一條記錄,每條記錄由六個屬性構成:時間、用戶ID、搜索關鍵字、Order、Rank和URL。因為數據集沒有分類標志,所以不能使用有監督算法對其進行分類,只能使用無監督算法。在六個屬性中,用戶ID是一串瀏覽器生成的字符,并不能衡量兩個ID之間的距離,所以這里我們不將其考慮到算法中;URL的命名規則很隨意,也很難衡量兩個URL之間的距離,則算法中也不考慮URL屬性。除此之外,我們將在算法中,依據時間、搜索關鍵字、Order、Rank對數據之間的距離進行衡量,并分類。
時間、搜索關鍵字、Order、Rank這四個屬性擁有不同的特征,其中時間、Order、Rank是整數,可以執行數字運算;而搜索關鍵字是字符串無法執行數字運算,從而這兩類屬性需要使用不同的方法衡量距離。這里我們使用曼哈頓距離衡量時間、Order、Rank之間的距離,使用萊文斯坦距離衡量搜索關鍵字之間的距離:
1.1曼哈頓距離衡量時間、Order、Rank之間的距離。
在數據集中,時間是連續變化的其范圍是:2011年12月30日至2011年12月31日,數據格式為“20111230000005”,其中第7,8位數字表示小時,為了不使計算過于麻煩,我們以小時(即時間屬性字符串的7和8位數字)作為該條數據時間屬性的值,每天有24個小時,這里我們對其進行歸一化,設at為記錄A的時間屬性值,bt為記錄B的時間屬性值,則記錄A與記錄B之間時間屬性的距離如公式(1)所示:
Dt=abs(at-bt)/24??? ????????????????????????公式(1)
Order是該條記錄在網頁展示時的排序,這是較為重要的一個屬性。在數據集中Order值的范圍在1~40之間,設ao為記錄A的Order數值,bo為記錄B的Order數值,則記錄A與記錄B之間的Order屬性的的距離如公式(2)所示:
Do=abs(ao-bo) ???????????????????????????公式(2)
Rank記錄用戶點擊的次序,也是一個很重要的屬性。這里設ar為記錄A的Rank值,br為記錄B的Rank至,則記錄A與記錄B之間的Order屬性的距離如公式(3)所示:
Dr=abs(ar-br)?? ??????????????????????????公式(3)
1.2萊文斯坦距離衡量關鍵字之間的距離。
本數據集中的記錄中的搜索關鍵字屬性是用戶在使用搜狗瀏覽器輸入的搜索內容,因其是文本,不能使用簡單的算術運算衡量其距離,所以這里選擇編輯距離——萊文斯坦距離衡量兩個關鍵字之間的距離。在信息論和計算機科學中,萊文斯坦距離是一種兩個字符串序列的距離度量。形式化地說,兩個單詞的萊文斯坦距離是一個單詞變成另一個單詞要求的最少單個字符編輯數量(如:刪除、插入和替換)。萊文斯坦距離也被稱做編輯距離,盡管它只是編輯距離的一種,與成對字符串比對緊密相關。其定義為,兩個字符串a,b的萊文斯坦距離記為,其計算公式為公式(11):
? ?????公式(11)
這里,???????????è?¨?¤o?-??|??2a?b?é??o|???????ˉ????????????o1???|?????o0??¤o?§???°?è???·??是?a的前?i個字符和b的前?j?個字符之間的距離。
這里我們采用向量存儲的方式實現萊文斯坦距離的計算,使用函levenshteinTwoRows(String string1, int s_len, String string2, int t_len) 來實現,該函數的執行過程如流程圖1所示,具體實現代碼見代碼(1)。設ak為記錄A的關鍵字,設bk為記錄B的關鍵字,則記錄A與記錄B之間的Keyword屬性的距離如公式(4)所示:
Dk=levenshteinTwoRows(ak,len(ak),bk,len(bk)) ??????????公式(4)
1.3三種距離的計算。
綜上所述,數據集中任意兩條記錄:記錄A與記錄B之間的距離可以使用公式(5)來計算。結合程序需求,我們需要計算三種情況的距離:1??¤??aè?°???1é′?è·?|???2????è?°?????±??°??-???1é??è·?|???3新類簇中心點集合與舊類簇中心點之間的距離。
D=Dt+Do+Dr+Dk????????????????????????? 公式(5)
1.3.1兩個記錄之間的距離。
該功能使用函數caculateDistance0(List<Object> A,List<Object> B)實現,其實現邏輯為:程序使用公式(5)計算兩個參數的距離,并返回該距離。函數的實現代碼見附錄Help類,函數的測試函數為caculateDistance0Test(),代碼內容見附錄HelpTest類。
1.3.2一條記錄與類簇中心點集合的距離。
該功能使用函數caculateDistance1(List<Object> A,List< List<Object>> B)實現,其實現邏輯為:程序依次讀取B中的元素,并使用公式(5)計算該元素與A的距離,記錄每次的距離,最終返回最小距離所對應的元素。函數的實現代碼見附錄Help類,函數的測試函數為caculateDistance1Test(),代碼內容見附錄HelpTest類。
1.3.3新類簇中心點集合與舊類簇中心點之間的距離。
該功能使用函數caculateDistance2(List< List<Object>> A,List< List<Object>> B),其實現邏輯為:程序依次讀取A的第K個元素與B的第K個元素(其中K∈(0,len(A))),并使用公式(5)計算距離,將每次得到的距離累加得到D,返回D/len(A)。
二、設計定制的Writeable集合與實現功能函數
2.1定制的Writable集合:dataCell類
?????? Hadoop有一套非常有用的Writable實現可以滿足大部分需求,但是在本文的情況下,我們需要設計構造一個新的實現,從而完全控制二進制的表示和排序順序,這將有助于后續的MapReduce算法實現。
?????? 我們使用類dataCell實現對于一條記錄的存儲與表示。每條記錄有六個字段,則dataCell需為這六個字段創建對應的屬性,分別是: String time;String uid;String keyword;int rank;int order;String url。此外,我們為其這些屬性提供getter和setter方法。為了讓dataCell類能夠用于MapReduce過程的數據傳輸中,我們需要讓dataCell類可序列化、可比較大小,這里我們通過讓類dataCell實現接口WritableComparable<dataCell>實現這些功能。dataCell的代碼實現見附錄dataCell類。
2.1.1構造函數:dataCell()、dataCell(String time, String uid, String keyword, int rank, int order, String url)。
在dataCell類中我們提供兩個構造函數,其中無參構造函數用于反序列化時的反射;擁有六個參數的構造函數用于實例化一個dataCell對象,函數體內六個形參依次對類的六個屬性賦值。
2.1.2序列化與反序列化:write(DataOutput out)、readFields(DataInput in)。
本類中序列化與反序列化的功能通過實現函數write與readFields實現。write函數實現序列化,本函數將六個屬性依次寫入輸出流out,這里要注意的是寫出String類型的屬性時需要使用寫出UTF的形式。readFields函數實現反序列化,該函數對應于write寫出屬性的格式與順序將屬性從輸入流in中讀取出來。
2.1.3比較大小:compareTo(dataCell o)
MapReduce的suffer過程中需要將輸出的鍵值對進行排序,所以dataCell有必要實現比較大小的功能。這里我們將參數列表傳入的參數o與類屬性通過上文所提到函數caculateDistance0進行比較(注:這里不取絕對值),若結果大于0,返回1;結果小于0,返回-1。
2.2功能函數。
?????? 為了讓MapReduce程序結構更清晰,讓程序的可用性更高,這里我們將一些復雜的邏輯函提取出來放到Help類中,具體實現代碼見附錄Help類,對應測試代碼見附錄HelpTest類。
2.2.1從一組元素中計算一個類簇中心:caculateCenter(List<List<Object>> A)
本函數適用于迭代類簇的Reduce程序中。函數接收一組記錄,首先遍歷記錄計算出這組記錄的平均值,然后再次遍歷記錄從記錄中找到與平均值距離最近的那條記錄,作為新的類簇中心返回。這里要注意的是:不能直接返回這組記錄的平均值作為新的類簇中心,否則會造成類簇中心集合元素缺失的問題。
2.2.2從文件中獲取所有類簇中心集合:getCenters(String inputpath)
該函數的主要邏輯為從參數列表中獲的類簇中心集合的路徑,然后通過HDFS的API接口逐行讀取類簇中心文件,并將每行數據封裝成為一個List<Object>,最后返回類簇中心列表List< List<Object>>。
2.2.3從Hdfs獲取程序迭代類簇中心結果及分類結果到本地:getCenterResult( String localPath)、getClassfiyResult( String localPath)
MapReduce程序執行完畢后會在輸出目錄下產生運行結果,getCenterResult與getClassfiyResult分別將類簇中心結果與分類結果拷貝到本地。這兩個函數邏輯大致相同,使用HDFS的API接口從集群上取得對應的文件,然后將該文件放入參數localPath路徑中。
2.2.4使用新類簇中心集合替換舊類簇中心集合:replaceOldCenter(String oldpath, String newpath)
由于HDFS的API中并沒有提供集群中移動文件的方法,在這里我們通過首先將新類簇中心文件下載到本地文件,然后再舊類簇中心文件刪除,最后再將本地文件上傳到舊類簇中心文件中的方法實現該功能。參數oldpath為舊類簇中心文件的路徑,newpath的新類簇中心文件的路徑,該函數由isFinished函數調用。
2.2.5判斷新舊兩組類簇中心的距離是否已經達到迭代停止條件:isFinished(String oldpath, String newpath, int max)
該函數首先使用函數getCenters()分別從參數oldpath和參數newpath所對應的路徑中獲取舊類簇中心集合與新類簇中心集合,然后使用函數caculateDistance2()計算兩組類簇的距離,如果距離小于max,則滿足停止迭代條件,返回false;若距離大于max,則不滿足迭代條件,使用函數replaceOldCenter將舊類簇中心文件替換為新類簇中心文件,返回true。使用流程圖表示如圖1所示
圖1 isFinished函數流程圖
三、生成初始類簇中心點
初始聚類中心的選擇對于KMeans算法來說十分重要,初始類簇中心的好壞直接影響到聚類的效果。這里我使用“選擇批次距離盡可能遠的K個點”的方法,具體操作步驟為,首先隨機選擇一個點作為作為初始類簇中心點,然后選擇距離該店最遠的那個點作為第二個初始聚類中心點,然后再選擇距離前兩個點的最近距離最大的點作為第三個初始類簇的中心點,以此類推,直至選擇出K個初始類簇中心點。
基于以上思想,在程序中實現該算法時,可以按照圖1中流程執行。該算法使用函數ProdeceCenter(String inputpath,int k,int initRank,int initOrder)實現,其中參數inputpath為源數據的路徑,k為要生成的初始類簇集合元素的個數,initRank為隨機生成的初始類簇中心。函數的實現代碼見附錄Help類。
圖1 生成初始類簇中心點
四、第一次MapReduce:迭代聚類中心點
在KMean算法中,迭代聚類中心是使用初始類簇作為集合做初始分類,然后再每個分類中尋找中心點作為新的類簇中心點,如此迭代,直到迭代次數足夠多或者新舊兩組類簇的類簇距離足夠小。下面,將按照MapReduce設計、Mapper實現、Reducer實現、JobDriver實現三部分進行闡述。
4.1MapReduce設計。
該部分的MapReduce讀取源數據,讀取初始類簇集合,產生聚類中心集合。Map部分逐行讀入搜狗搜索數據,并找到類簇集合中距離該行數據最近的類簇,然后將最近的類簇的序號作為這一行數據的標簽,最終將標簽作為Key,改行數據作為Value作為數據寫出;Reduce部分負責接收Map產生的數據,并在標簽相同的數據中找到中心點,將中心點作為新的類簇輸出;JobDriver部分負責一些配置工作,并負責計算新舊兩組類簇集合的距離、統計迭代的次數,其中類簇集合的距離與迭代的次數均可以控制整個MapReduce過程的停止。其中,Map部分與Reduce部分的輸入輸出格式如表1所示。
表1 Map與Reduce的輸入輸出格式
| ? | 輸入 | 輸出 |
| Map | (字節偏移量,一行數據內容) | (類簇中心標志,一行數據內容) |
| Reduce | (類簇中心標志,多行數據內容) | (NULLWriteable,新的類簇中心) |
4.2 Mapper實現。
本文中我們使用類KmeansMapperForCenter實現迭代聚類中心的Mapper,該類的實現代碼見附錄KmeansMapperForCenter類。該類繼承Mapper<LongWritable,Text,IntWritable,Text>類,并實現了Mapper類的抽象方法map。在map函數中實現了Mapper部分的主要邏輯,其流程如圖1所示。
圖1 map函數流程圖
4.3 Reducer實現。
本文中我們使用類KMeansReducerForCenter實現迭代聚類中心的Reducer,該類的實現代碼見附錄KMeansReducerForCenter類。該類繼承Reducer<IntWritable, Text, NullWritable, Text>類,并實現了Reducer類的抽象方法reduce。在reduce函數中實現了Reducer部分的主要邏輯,其流程如圖2所示。
圖2 reduce函數流程圖
4.4 JobDriver實現。
?????? JobDriver部分驅動MapReduce的執行,這里我們在類KMeansDriver中的getCenter()函數中實現該功能。getCenter()需要為MapReduce流程設置六個變量:輸入路徑、輸出路徑、舊類簇中心文件、新類簇中心內文件、類簇個數、聚類停止條件,并且該函數還設置了Map過程使用類,Reduce過程使用類等。我們在這個函數中控制迭代類簇中心的迭代次數,該函數的流程如圖1所示,實現代碼見附錄KMeansDriver類。
圖2 getCenter()函數流程圖
五、第二次MapReduce:數據分類
在KMeans算法中,數據分類一部分比較簡單,該部分為每一個源數據中的元素在類簇中心集合中尋找一個距離最近的類簇中心,并將該元素標記為該類簇中心類即可。下面,將按照MapReduce設計、Mapper實現、Reducer實現、JobDriver實現三部分進行闡述。
5.1 MapReduce設計。
該部分的MapReduce讀取源數據,讀取初始類簇中心集合,給每個源數據中元素分類并輸出。Map部分負責逐行讀入搜狗搜索數據,并找到類簇中心集合中距離該行數據最近的類簇中心,然后將最近的類簇中心的序號作為這一行數據的標簽,最終將標簽作為Key,改行數據作為Value作為數據寫出;Reduce部分負責將Map傳輸過來的數據逐行輸出到結果集中;JobDriver部分負責程序的配置工作,以及提交任務。其中,Map部分與Reduce部分的輸入輸出格式如表2所示。
表2 Map與Reduce的輸入輸出格式
| ? | 輸入 | 輸出 |
| Map | (字節偏移量,一行數據內容) | (類簇中心標志,一行數據內容) |
| Reduce | (類簇中心標志,多行數據內容) | n(類簇中心標志,一行數據內容) |
5.2 Mapper實現。
本文中我們使用類KMeansMapperForClassify實現迭代聚類中心的Mapper,該類的實現代碼見附錄KMeansMapperForClassify類。該類繼承Mapper<LongWritable,Text,IntWritable,dataCell>類,并實現了Mapper類的抽象方法map。在map函數中實現了Mapper部分的主要邏輯,其流程如圖1所示。
圖1 map函數流程圖
5.3 Recuce實現。
本文中我們使用類KMeansReducerForCenter實現迭代聚類中心的Reducer,該類的實現代碼見附錄KMeansReducerForCenter類。該類繼承Reducer<IntWritable, Text, NullWritable, Text>類,并實現了Reducer類的抽象方法reduce。在reduce函數中實現了Reducer部分的主要邏輯,該部分比較簡單,直接將迭代器中的dataCell對象寫出到文件中即可。
5.4 JobDriver實現。
JobDriver部分驅動MapReduce的執行,這里我們在類KMeansDriver中的forClssify()函數中實現該功能。forClssify()需要為MapReduce流程設置四個變量:輸入路徑、輸出路徑、類簇中心文件、類簇個數,并且該函數還設置了Map過程使用類,Reduce過程使用類等。該函數過于簡單,只是對Job做了一些簡單的配置,在這里不予展示,實現代碼見附錄KMeansDriver類。
六、衡量分類效果
KMeans算法將數據分為幾類,如何度量分類效果是值得考慮的問題。聚類的任務是將目標樣本分為若干簇,并且保證每個簇之間樣本盡可能接近,并且不同簇的樣本距離盡可能遠。基于此,聚類的效果好壞又分為兩類指標衡量,一類是外部聚類效果,一類是內部聚類效果。這里我們僅使用內部聚類效果來衡量聚類的效果,且由于作業時間太緊,我們僅僅衡量聚類的緊湊度一項指標。
這里我們使用類簇中所有樣本到類簇中心距離的累加和作為衡量緊湊度的標準,其中,數據集合相同的情況下,累加和越小,緊湊度越高;累加和越大,緊湊度越低。我們使用函數measureResult(String inputPath ,String centerPath,int k)衡量聚類效果,該函數有三個參數:inputPath為分類后結果數據集的路徑,centerPath是迭代后類簇中心點的坐標,k是類簇的個數。程序調用getCenter()函數得到類簇中心點的集合,然后逐行讀取inputPath中的數據并計算其與對應類簇中心的距離,并將距離累加,最終打印累加距離的值。該函數的實現代碼見附錄中Help類,其實現的流程如圖1所示。
圖1 measureResult函數流程圖
七、運行與分析
由于電腦配置跟不上,而KMeans算法有需要較多的迭代次數,所以這里我僅使用了10000條數據運行程序。
7.1?? 一次完整的程序運行
在執行程序之前,首先要做一些配置:(1)創建迭代類簇中心點輸出文件夾,創建分類結果輸出文件夾;(2)將源數據提交到集群上;(3)將代碼打包上傳到Linux系統上。程序的運行步驟為:產生初始聚類中心、迭代聚類中心、數據分類、衡量分類效果。
7.1.1產生初始聚類中心。
我們首先產生20個不重復的類簇中心,以time=00,Rank=1,Order=1,key=“火影忍者”為隨機初始類簇中心,運行函數Help.ProdeceCenter(),可以得到初始類簇中心,這里僅展示前10個,如表1所示。
?
表1 初始類簇中心
| 時間 | 搜索關鍵詞 | Rank | Order |
| 20111230001328 | 火影忍者 | 2 | 2 |
| 20111230001600 | 蹲墻誘相公 | 10 | 10 |
| 20111230004356 | 家園守衛戰羅德港防守攻略 | 1 | 1 |
| 20111230003246 | Gay 性騷擾 圖 | 10 | 9 |
| 20111230002353 | 汕頭市金平區八年級第一學期數學試卷 | 3 | 1 |
| 20111230000219 | 人體藝術 | 9 | 9 |
| 20111230004156 | 廣州渥格服裝輔料有限公司 | 1 | 2 |
| 20111230001037 | 快播 中文字幕 主婦42 | 10 | 8 |
| 20111230003830 | WWW、RRMMM、COM | 1 | 9 |
| 20111230001230 | 海南師范大學美術系校園照片 | 10 | 1 |
7.1.2迭代聚類中心。
?????? 根據上文產生的初始類簇中心,我們選取前六個初始類簇中心點迭代聚類中心。本此迭代共計六輪,最終迭代后的類簇中心如表2所示,程序運行截圖如圖1所示。
表2 迭代后的類簇中心
| 時間 | 搜索關鍵詞 | Rank | Order |
| 20111230000249 | 天與地 | 2 | 1 |
| 20111230004246 | HTCG10手機系統自帶軟件怎么刪除? | 1 | 2 |
| 20111230004356 | 家園守衛戰羅德港防守攻略 | 1 | 1 |
| 20111230000158 | 北京市西城區2008英語抽樣測試答案 | 4 | 1 |
| 20111230002353 | 汕頭市金平區八年級第一學期數學試卷 | 3 | 1 |
| 20111230001418 | 環衛工人業務知識競賽搶答題 | 4 | 2 |
圖1 程序運行截圖
7.1.3數據分類。
?????? 根據以上迭代產生的類簇中心點集合,我們執行數據分類操作,運行函數forClssify(),可以對數據集進行分類,部分分類結果如表1所示。從表1中我們可以看到分類效果還是不錯的。
表1 數據分類結果
| 類簇 | 時間 | 用戶ID | 關鍵詞 | Rank | Order | URL |
| 0 | 20111230002225 | 5794763849288f418c58789492cd1f2e | 左耳 | 2 | 1 | http://www.tudou.com/programs/view/q96O7olHT-Q/ |
| 0 | 20111230000942 | b54b6c1e8039276b87c8002be3e8583f | 遵義宅 快遞電話 | 2 | 2 | http://zhidao.baidu.com/question/235623645 |
| 0 | 20111230001418 | 1b4fc71d2a068a638e66db462a93f89f | 最終幻想 | 2 | 1 | http://www.163dyy.com/detail/1678.html |
| 0 | 20111230003938 | fa936e397a0994997f234681a65549b2 | 最新移動手機充值q幣 | 2 | 1 | http://service.qq.com/info/25295.html |
| 1 | 20111230003905 | 3c21686be709b847009680976d6a2b4c | 百度一下 | 1 | 2 | http://www.baidu.com/ |
| 1 | 20111230004234 | 6056710d9eafa569ddc800fe24643051 | 百度一下 | 1 | 2 | http://www.baidu.com/ |
| 1 | 20111230000701 | c71267c05b21e2a8f6a3e6b812fabc1f | 百度ady | 1 | 2 | http://zhidao.baidu.com/question/188644177 |
7.1.4衡量分類效果。
經過以上三個步驟,我們已經基本完成了KMeans算法的基本過程,最后對算法的分類效果進行衡量。運行函數measureResult()可以得到類簇的累加距離,運行結果為10528。該數據需要有多組分類數據時進行比較才有意義,所以接下來我們尋找本數據集的最佳類簇個數。
7.2?? 尋找最佳類簇個數
我們使用產生聚類中心小節中產生的初始聚類中心,分別取前1個、前2個、前3個、前4個、前5個、前6個、前7個初始類簇中心對數據集進行聚類,并最終使用函數measureResult()計算累加距離,結果如表1所示。將表中數據用折線圖表示如圖1所示,從圖中我們可以清楚看到,在類簇個數為3時,圖中曲線出現了很大的轉折:在類簇個數小于3時,每增加一個類簇,累加距離下降頻度很大;在類簇個數大于3時,每增加一個類簇,累加距離下降頻度較小。由此,當類簇個數為3時,既可以保證較好的分類效果,又可以避免分類過于細致的麻煩。
表1 1~7個類簇的分類效果
| 類簇個數 | 距離 |
| 1 | 25805 |
| 2 | 17503 |
| 3 | 12348 |
| 4 | 11798 |
| 5 | 11003 |
| 6 | 10528 |
| 7 | 10288 |
?
圖1 1~7個類簇的分類效果
Mapreduce附錄
(1)?????? dataCell類
package myKMeans;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
?* 自定義的可以作為MR傳輸對象的類
?* @author zheng
?*
?*/
public class dataCell implements WritableComparable<dataCell>{
?????? private String time;
?????? private String uid;
?????? private String keyword;
?????? private int rank;
?????? private int order;
?????? private String url;
/**************************get set 方法************************/
?????? public String getTime() {
????????????? return time;
?????? }
?????? public void setTime(String time) {
????????????? this.time = time;
?????? }
?????? public String getUid() {
????????????? return uid;
?????? }
?????? public void setUid(String uid) {
????????????? this.uid = uid;
?????? }
?????? public String getKeyword() {
????????????? return keyword;
?????? }
?????? public void setKeyword(String keyword) {
????????????? this.keyword = keyword;
?????? }
?????? public int getRank() {
????????????? return rank;
?????? }
?????? public void setRank(int rank) {
????????????? this.rank = rank;
?????? }
?????? public int getOrder() {
????????????? return order;
?????? }
?????? public void setOrder(int order) {
????????????? this.order = order;
?????? }
?????? public String getUrl() {
????????????? return url;
?????? }
?????? public void setUrl(String url) {
????????????? this.url = url;
?????? }
/*************************構造函數*****************************/
?????? /**
?????? ?* 構造函數
?????? ?* @param time
?????? ?* @param uid
?????? ?* @param keyword
?????? ?* @param rank
?????? ?* @param order
?????? ?* @param url
?????? ?*/
?????? public dataCell(String time, String uid, String keyword, int rank, int order, String url) {
????????????? super();
????????????? this.time = time;
????????????? this.uid = uid;
????????????? this.keyword = keyword;
????????????? this.rank = rank;
????????????? this.order = order;
????????????? this.url = url;
?????? }
?????? /**
?????? ?* 無參構造函數
?????? ?* 空構造函數用于反射 反序列化
?????? ?*/
?????? public dataCell() {
????????????? super();
?????? }
/**********************實現接口函數*****************************/
?????? /**
?????? ?*
?????? ?* 反序列化的方法,反序列化是,從流中讀取到各個字段的順序應該與序列化時些出去的順序保持一致
?????? ?*/
?????? public void readFields(DataInput in) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? time=in.readUTF();
????????????? uid=in.readUTF();
????????????? keyword=in.readUTF();
????????????? rank=in.readInt();
????????????? order=in.readInt();
????????????? url=in.readUTF();
?????? }
?????? /**
?????? ?* 序列化的方法
?????? ?*/
?????? public void write(DataOutput out) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? out.writeUTF(time);
????????????? out.writeUTF(uid);
????????????? out.writeUTF(keyword);
????????????? out.writeInt(rank);
????????????? out.writeInt(order);
????????????? out.writeUTF(url);
?????? }
?????? /**
?????? ?* 比較排序
?????? ?*/
?????? public int compareTo(dataCell o) {
????????????? // TODO Auto-generated method stub
????????????? //正序排列
????????????? if(this.rank>o.rank){
???????????????????? return 1;
????????????? }
????????????? else if (this.order>o.order){
???????????????????? return 1;
????????????? }
????????????? else{
???????????????????? return -1;
????????????? }
?????? }
?????? /**
?????? ?* 字符串輸出時的方法
?????? ?*/
?????? public String toString(){
????????????? return time+"\t"+uid+"\t"+keyword+"\t"+rank+"\t"+order+"\t"+url;
?????? }
}
(2)?????? Help類
package myKMeans;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.LineReader;
import org.apache.xerces.util.URI;
public class Help {
?????? /**
?????? ?* 從hdfs文件中獲取中心點,返回中心點列表的List
?????? ?* @param inputpath
?????? ?* @return
?????? ?*/
?????? public static ArrayList<ArrayList<Integer>> getCenters(String inputpath) {
????????????? // TODO Auto-generated method stub
????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?
??????? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
??????? try {?
??????????? FileSystem hdfs = FileSystem.get(conf);
??????????? Path in = new Path(inputpath);?
??????????? FSDataInputStream fsIn = hdfs.open(in);?
??????????? LineReader lineIn = new LineReader(fsIn, conf);?
??????????? Text line = new Text();?
??????????? while (lineIn.readLine(line) > 0){?
????? ??????????String record = line.toString();?
??????????????? /**
???????????????? *? 因為Hadoop輸出鍵值對時會在鍵跟值之間添加制表符, 所以用空格代替之。
???????????????? */?
???????????????
??????????????? String[] fields = record.split("\t");?
??????????????? List<Integer> tmplist = new ArrayList<Integer>();?
??????????????? for (int i = 0; i < fields.length; ++i){?
??????????????????? tmplist.add(Integer.parseInt(fields[i]));?
??????????????? }?
??????????????? result.add((ArrayList<Integer>) tmplist);?
??????????? }?
????? ??????fsIn.close();?
??????? } catch (IOException e){?
??????????? e.printStackTrace();?
??????? }?
??????? return result;? ???? ?
?????? }
?????? /**
?????? ?* 計算兩個點之間的距離,返回兩個點的距離
?????? ?* @param data
?????? ?* @param arrayList
?????? ?* @return
?????? ?*/
?????? public static int caculateDistance0(ArrayList<Integer> data, ArrayList<Integer> arrayList) {
????????????? // TODO Auto-generated method stub
????????????? //曼哈頓距離
????????????? int x1=data.get(0);
????????????? int y1=data.get(1);
????????????? int x2=arrayList.get(0);
????????????? int y2=arrayList.get(1);
????????????? int distance=Math.abs(x1-x2)+Math.abs(y1-y2);
????????????? return distance;
?????? }
?????? /**
?????? ?* 計算oldcenter隊列與newcenter隊列之間的距離,返回old隊列中中心點與new隊列中對應中心點的距離之和
?????? ?* @param oldCenter
?????? ?* @param newCenter
?????? ?* @param k
?????? ?* @return
?????? ?*/
?????? public static int caculateDistance2(List<ArrayList<Integer>> oldCenter,
???????????????????? List<ArrayList<Integer>> newCenter) {
????????????? // TODO Auto-generated method stub
????????????? //曼哈頓距離
????????????? int distance=0;
????????????? //System.out.println(oldCenter.size());
????????????? //System.out.println(newCenter.size());
????????????? for(int i=0;i<oldCenter.size()&&i<newCenter.size();i++){
???????????????????? distance+=Math.abs(oldCenter.get(i).get(0)-newCenter.get(i).get(0))
???????????????????? +Math.abs(oldCenter.get(i).get(1)-newCenter.get(i).get(1));
????????????? }
????????????? return distance;
?????? }
?????? /**
?????? ?* 計算一個點與中心點隊列的距離,返回該點與隊列中所有中心的距離之和
?????? ?* @param node
?????? ?* @param centerList
?????? ?* @return
?????? ?*/
?????? public static int caculateDistance1( List<Integer> data,
???????????????????? List<ArrayList<Integer>> centerList) {
????????????? // TODO Auto-generated method stub
????????????? //曼哈頓距離
????????????? int distance=0;
????????????? for(int i=0;i<centerList.size();i++){
???????????????????? int temp=Integer.MIN_VALUE;
???????????????????? temp=Math.abs(data.get(0)-centerList.get(i).get(0))
?????????????????????????????????? +Math.abs(data.get(1)-centerList.get(i).get(1));
???????????????????? if(temp!=0){
??????????????????????????? distance+=temp;
???????????????????? }
???????????????????? else{
??????????????????????????? distance=Integer.MIN_VALUE;
??????????????????????????? return distance;
???????????????????? }
????????????? }
????????????? return distance;
?????? }
?????? /**
?????? ?* 計算中心點
?????? ?* 在一堆node里面找中間的那一個
?????? ?* @param helpList
?????? ?* @return
?????? ?*/
?????? public static Text caculateCenter(List<ArrayList<Integer>> helpList) {
????????????? // TODO Auto-generated method stub
????????????? float rankTotal=0.0f;
????????????? float orderTotal=0.0f;
????????????? int totalDistance=Integer.MAX_VALUE;
????????????? int rankRusult=Integer.MAX_VALUE;
????????????? int orderResult=Integer.MAX_VALUE;
????????????? int i=0;
????????????? for(ArrayList<Integer> list:helpList){
???????????????????? rankTotal+=list.get(0);
???????????????????? orderTotal+=list.get(1);
???????????????????? i++;
????????????? }
????????????? System.out.println("$$$$$$$$$$"+i);
????????????? int rank=0;
????????????? int order=0;
????????????? if(i!=0){
???????????????????? rank =Math.round(rankTotal/i);
???????????????????? order=Math.round(orderTotal/i);
???????????????????? for(ArrayList<Integer> list:helpList){
??????????????????????????? int temp=list.get(0)-rank+list.get(1)-order;
??????????????????????????? if(temp<totalDistance){
?????????????????????????????????? rankRusult=list.get(0);
?????????????????????????????????? orderResult=list.get(1);
?????????????????????????????????? totalDistance=temp;
??????????????????????????? }
???????????????????? }
????????????? }
????????????? //System.out.println(rank);
????????????? //System.out.println(order);
????????????? Text result=new Text(rankRusult+"\t"+orderResult);
????????????? System.out.println(rankRusult+"\t"+orderResult);
????????????? return result;
?????? }
?????? /**
?????? ?* 判斷當前中心點是否已經到達停止條件
?????? ?* @param oldpath
?????? ?* @param newpath
?????? ?* @param k
?????? ?* @param max
?????? ?* @return
?????? ?* @throws IOException
?????? ?*/
?????? public static boolean isFinished(String oldpath, String newpath, int max) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? //<oldcenters> <newcenters> <k> <threshold>
????????????? //構建oldcenters,newcenters數組
????????????? List<ArrayList<Integer>> oldcenters = Help.getCenters(oldpath);
??????? List<ArrayList<Integer>> newcenters = Help.getCenters(newpath);
??????? //計算距離
??????? int distance=Help.caculateDistance2(oldcenters, newcenters);
??????? System.out.println(distance);
??????? if (distance<max){
??????? ?????? //停止迭代
??????? ?????? System.out.println("false");
??????? ?????? return false;
??????? }
??????? else{
??????? ?????? //繼續迭代
??????? ?????? //使用新中心替換舊中心
??????? ?????? boolean flag=Help.replaceOldCenter(oldpath,newpath);
??????? ?????? System.out.println(flag);
??????? ?????? System.out.println("true");
??????? ?????? return true;
??????? }
?????? }
?????? /**
?????? ?* 使用新中心點替代舊的中心點
?????? ?* @param oldpath
?????? ?* @param newpath
?????? ?* @return
?????? ?* @throws IOException
?????? ?*/
?????? public static boolean replaceOldCenter(String oldpath, String newpath) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?
??????? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
??????? FileSystem fs = FileSystem.get(conf);
??????? Path newFile = new Path(newpath);
??????? Path oldFile=new Path(oldpath);
??????? //Path temp=new Path("/root/testForHelp1.txt");
??????? Path temp=new Path("C:\\Users\\zheng\\Desktop\\testForHelp1.txt");
??????? //"/root/testForHelp1.txt"
????????????? fs.copyToLocalFile(newFile, temp);
????????????? fs.copyFromLocalFile(temp, oldFile);
????????????? return true;???
?????? }
?????? public static boolean getClassfiyResult( String localPath) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?
??????? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
??????? FileSystem fs = FileSystem.get(conf);
??????? Path resultFile = new Path("/outForClassify/part-r-00000");
??????? Path localFile=new Path(localPath);
????????????? fs.copyToLocalFile(resultFile, localFile);
????????????? System.out.println("successful copy");
????????????? return true;???
?????? }
?????? public static boolean getCenterResult( String localPath) throws IOException {
????????????? // TODO Auto-generated method stub
????????????? ArrayList<ArrayList<Integer>> result = new ArrayList<ArrayList<Integer>>();?
??????? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
??????? FileSystem fs = FileSystem.get(conf);
??????? Path resultFile = new Path("/out/part-r-00000");
??????? Path localFile=new Path(localPath);
????????????? fs.copyToLocalFile(resultFile, localFile);
????????????? System.out.println("successful copy");
????????????? return true;???
?????? }
?????? /**
?????? ?* 產生中心點
?????? ?* @param inputpath? 元數據集
?????? ?* @param k 要產生幾個聚類中心
?????? ?* @param initRank? 初始的rank
?????? ?* @param initOrder? 初始的order
?????? ?*/
?????? public static? void ProdeceCenter(String inputpath,int k,int initRank,int initOrder){
?????? ??? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
????????????? //保存所有的center點的隊列
????????????? List<ArrayList<Integer>> centerList=new ArrayList<ArrayList<Integer>>();
????????????? //保存最開始的中心點
????????????? ArrayList<Integer> firstCenter=new ArrayList<Integer>();
????????????? firstCenter.add(initRank);
????????????? firstCenter.add(initOrder);
????????????? //將最開始的中心點加入隊列
????????????? centerList.add(firstCenter);
????????????? //保存臨時最大距離
????????????? int maxDistance=0;
????????????? //保存待選中心點
????????????? ArrayList<Integer> tmpCenter=new ArrayList<Integer>();
????????????? for(int i=0;i<k-1;i++){
???????????????????? try {?
??????????????????????????? //打開目標數據文件
??????????????????????????? FileSystem hdfs = FileSystem.get(conf);
??????????????????????????? Path in = new Path(inputpath);?
??????????????????????????? FSDataInputStream fsIn = hdfs.open(in);?
??????????????????????????? LineReader lineIn = new LineReader(fsIn, conf);?
??????????????????????????? Text line = new Text();?
??????????????????????????? //對數據文件中的每一行都進行處理
??????????? ?????? while (lineIn.readLine(line) > 0){
??????????? ????????????? //從取出的記錄中拿到Rank Order對,進行比較
??????????? ????????????? String record = line.toString();???????????????
??????????? ????????????? String[] fields = record.split("\t");?
??????????? ????????????? ArrayList<Integer> data = new ArrayList<Integer>();?
??????????? ????????????? data.add(Integer.parseInt(fields[3]));
??????????? ????????????? data.add(Integer.parseInt(fields[4]));
??????????? ????????????? //比較list,將距離最遠的放在tmpCenter里面
??????????? ????????????? int tmpDistance=Help.caculateDistance1(data, centerList);
??????????? ????????????? if(tmpDistance>maxDistance){
??????????? ???????????????????? boolean flag=true;
??????????? ???????????????????? for(ArrayList<Integer> c:centerList){
??????????? ???????????????????? if(Integer.parseInt(fields[3])==c.get(0)&&Integer.parseInt(fields[4])==c.get(1)){
??????????? ?????????????????????????????????? flag=false;
??????????? ??????????????????????????? }
??????????? ???????????????????? }
??????????? ???????????????????? if(flag){
??????????? ???????????????????? tmpCenter=data;
??????????????? ????????????? maxDistance=tmpDistance;
??????????? ???????????????????? }
??????????? ????????????? }
??????????? ?????? }
??????????? ?????? centerList.add(tmpCenter);
??????????? ?????? System.out.println(tmpCenter.get(0)+"? "+tmpCenter.get(1));
??????????? ?????? fsIn.close();?
???????????????????? } catch (IOException e){?
??????????????????????????? e.printStackTrace();?
???????????????????? }?
????????????? ?}
????????????? for(ArrayList<Integer> c:centerList){
???????????????????? System.out.print(c.get(0)+" "+c.get(1)+";");
????????????? }
?????? }
?????? /**
?????? ?* 衡量聚類的結果? 返回質心距離的累加和,這里使用曼哈頓距離
?????? ?* @param inputPath? 聚類的結果及
?????? ?* @param centerPth? 聚類中心
?????? ?* @param k? 聚類中心的個數
?????? ?* @return
?????? ?*/
?????? public static int measureResult(String inputPath ,String centerPth,int k){
????????????? Configuration conf = new Configuration();
????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
????????????? int [] distanceList=new int[k];
????????????? try {?
???????????????????? //打開目標數據文件
???????????????????? FileSystem hdfs = FileSystem.get(conf);
???????????????????? Path in = new Path(inputPath);?
???????????????????? FSDataInputStream fsIn = hdfs.open(in);?
???????????????????? LineReader lineIn = new LineReader(fsIn, conf);?
???????????????????? Text line = new Text();?
???????????????????? //對數據文件中的每一行都進行處理
???????????????????? ArrayList<ArrayList<Integer>> centers =Help.getCenters(centerPth);
??????? ?????? while (lineIn.readLine(line) > 0){
??????? ????????????? String record = line.toString();???????????????
??????? ????????????? String[] fields = record.split("\t");
??????? ????????????? int centerNum=Integer.parseInt(fields[0]);
??????? ????????????? int rank=Integer.parseInt(fields[4]);
??????? ????????????? int order=Integer.parseInt(fields[5]);
??????? ????????????? ArrayList<Integer> data = new ArrayList<Integer>();?
??????? ????????????? data.add(rank);
??????? ????????????? data.add(order);
???? distanceList[centerNum]+=Help.caculateDistance0(centers.get(centerNum), data);
??????? ?????? }
????????????? } catch (IOException e){?
????????????? e.printStackTrace();?
????????????? }
????????????? int distanceTotal=0;
?????? for(int i=0;i<k;i++){
????????????? //System.out.println(distanceList[i]);
????????????? distanceTotal+=distanceList[i];
?????? }
?????? System.out.println(distanceTotal);
?????? return distanceTotal;
?????? }
}
(3)?? HelpTest
package myKMeans;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.junit.Test;
public class Helptest {
?????? @Test
?????? public void getCentersTest() {
????????????? String path="/testForHelp.txt" ;
????????????? ArrayList<ArrayList<Integer>> result =Help.getCenters(path);
????????????? for(ArrayList<Integer>? re :result){
???????????????????? System.out.println(re.get(0));
???????????????????? System.out.println(re.get(1));
????????????? }
?????? }
?????? @Test
?????? public void caculateDistanceTest(){
????????????? ArrayList<Integer> data=new ArrayList<Integer>();
????????????? data.add(1);
????????????? data.add(2);
????????????? ArrayList<Integer> arrayList=new ArrayList<Integer>();
????????????? arrayList.add(7);
????????????? arrayList.add(2);
????????????? int distance=Help.caculateDistance0(data, arrayList);
?????? System.out.println(distance);?????
?????? }
?????? @Test
?????? public void caculateCenterTest(){
????????????? List<ArrayList<Integer>> list=new LinkedList<ArrayList<Integer>>();
????????????? ArrayList<Integer> a=new ArrayList<Integer>();
????????????? a.add(1);
????????????? a.add(2);
????????????? ArrayList<Integer> b=new ArrayList<Integer>();
????????????? b.add(2);
????????????? b.add(3);
????????????? ArrayList<Integer> c=new ArrayList<Integer>();
????????????? c.add(3);
????????????? c.add(4);
????????????? list.add(a);
????????????? list.add(b);
????????????? list.add(c);
????????????? Text t=Help.caculateCenter(list);
????????????? System.out.println(t.getLength());
?????? }
?????? @Test
?????? public void replaceOldCenterTest() throws IOException{
????????????? String oldpath = "/testForHelp.txt";
????????????? String newpath = "/out/part-r-00000";
????????????? Help.replaceOldCenter(oldpath,newpath); }
?????? @Test
?????? public void caculateDistance1Test(){
????????????? List<Integer> data=new ArrayList<Integer>();
????????????? List<ArrayList<Integer>> centerList=new ArrayList<ArrayList<Integer>>();
????????????? data.add(10);
????????????? data.add(6);
????????????? ArrayList<Integer> data1=new ArrayList<Integer>();
????????????? data1.add(1);
????????????? data1.add(1);
????????????? ArrayList<Integer> data2=new ArrayList<Integer>();
????????????? data2.add(10);
????????????? data2.add(10);
????????????? centerList.add( data1);
????????????? centerList.add( data2);
????????????? int a=Help.caculateDistance1(data, centerList);
????????????? System.out.println(a);
?????? }
}
(4)?? KMeansDriver
package myKMeans;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class KMeansDriver {
?????? public final static int K=5;
?????? public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException{
????????????? ?//Help.ProdeceCenter("/in/ssaa", 20, 1, 1);
????????????? ?//getCenter();
????????????? ?forClssify();
//Help.getClassfiyResult("C:\\Users\\zheng\\Desktop\\mapreduce\\5M\\"+K+"\\result\\result.txt");
????????????? ?//Help.getCenterResult("C:\\Users\\zheng\\Desktop\\mapreduce\\5M\\"+K+"\\result\\center.txt");
????????????? ?//Help.measureResult("/outForClassify/part-r-00000", "/out/part-r-00000", K);
?? }?
?????? public? static void getCenter() throws IOException, ClassNotFoundException, InterruptedException{
????????????? int repeated=0;
????????????? ?String[] otherArgs=new String[]{"/in","/out","/oldCenterSet","/out/part-r-00000",K+"","1"
????????????? };
????????????? do{
???????????????????? Configuration conf = new Configuration();?
???????????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
?????????? //String[] otherArgs? = new GenericOptionsParser(conf, args).getRemainingArgs();?
?????????? if (otherArgs.length != 6){?
?????????????? System.err.println("Usage: <in> <out> <oldcenters> <newcenters> <k> <threshold>");?
??? ???????????System.exit(2);?
?????????? }?
????????
?????????? conf.set("centerpath", otherArgs[2]);?
?????????? conf.set("kpath", otherArgs[4]);?
?????????? Job job = new Job(conf, "KMeansCluster");??
?????????? job.setJarByClass(KMeansDriver.class);?
?????????? Path in = new Path(otherArgs[0]);?
?????????? Path out = new Path(otherArgs[1]);?
?????????? FileInputFormat.addInputPath(job, in);?
?????????? FileSystem fs = FileSystem.get(conf);?
?????????? if (fs.exists(out)){
?????????????? fs.delete(out, true);?
?????????? }?
?????????? FileOutputFormat.setOutputPath(job, out);?
?????????? job.setMapperClass(KmeansMapperForCenter.class);?
?????????? job.setReducerClass(KMeansReducerForCenter.class);
?????????? job.setOutputKeyClass(IntWritable.class);?
?????????? job.setOutputValueClass(Text.class);?
?????????? job.waitForCompletion(true);
?????????? ++repeated;?
?????????? System.out.println("We have repeated " + repeated + " times.");?
??????? } while (repeated < 9
?????? ???????? ?&& (Help.isFinished(otherArgs[2], otherArgs[3], Integer.parseInt(otherArgs[5]))));?
//&& (Help.isFinished(args[2], args[3], Integer.parseInt(args[4]), Float.parseFloat(args[5])) == false)
?????
?????? }
?????? public? static void forClssify() throws IOException, ClassNotFoundException, InterruptedException{
????????????? ?String[] otherArgs=new String[]{"/in","/outForClassify","/oldCenterSet","/out/part-r-00000",K+"","2"
????????????? };
???????????????????? Configuration conf = new Configuration();?
???????????????????? conf.set("fs.defaultFS", "hdfs://192.168.79.111:9000");
???????? ?//String[] otherArgs? = new GenericOptionsParser(conf, args).getRemainingArgs();?
????????? if (otherArgs.length != 6){?
????????????? System.err.println("Usage: <in> <out> <oldcenters> <newcenters> <k> <threshold>");?
????????????? System.exit(2);?
????????? }?
????????? conf.set("centerpath", otherArgs[2]);?
????????? conf.set("kpath", otherArgs[4]);?
????????? Job job = new Job(conf, "KMeansCluster");?
????????? job.setJarByClass(KMeansDriver.class);?
????????? Path in = new Path(otherArgs[0]);?
????????? Path out = new Path(otherArgs[1]);?
????????? FileInputFormat.addInputPath(job, in);
????????? FileSystem fs = FileSystem.get(conf);?
????????? if (fs.exists(out)){?
????????????? fs.delete(out, true);?
????????? }?
????????? FileOutputFormat.setOutputPath(job, out);?
????????? job.setMapperClass(KMeansMapperForClassify.class);/
????????? job.setReducerClass(KMeansReducerForClassify.class);
????????? job.setOutputKeyClass(IntWritable.class);
????????? job.setOutputValueClass(dataCell.class);
????????? job.setMapOutputKeyClass(IntWritable.class);
????????? job.setMapOutputValueClass(dataCell.class);
????????? job.waitForCompletion(true);
?????? }
}
(5)?? ?KmeansMapperForCenter類
package myKMeans;
import org.apache.hadoop.io.IntWritable;?
import org.apache.hadoop.io.LongWritable;?
import org.apache.hadoop.io.Text;?
import org.apache.hadoop.mapreduce.Mapper;?
?
import java.io.IOException;?
import java.util.ArrayList;?
import java.util.List;?
/***
?* 未獲得中心點的mapper?
?* @author zheng
?*
?*/
public class KmeansMapperForCenter extends Mapper<LongWritable,Text,IntWritable,Text>{
?????? public void map(LongWritable key,Text value,Context context)
???????????????????? throws IOException,InterruptedException{
????????????? String line =value.toString();
????????????? String[] fields=line.split("\t");
????????????? int rank=Integer.parseInt(fields[3]);
????????????? int order=Integer.parseInt(fields[4]);
????????????? ArrayList<Integer> data =new ArrayList<Integer>();
????????????? data.add(rank);
????????????? data.add(order);
?????????????
????????????? //獲取中心點列表
????????????? List<ArrayList<Integer>> centers = Help.getCenters(context.getConfiguration().get("centerpath"));
????????????? //有幾個聚類中心
????????????? int k = Integer.parseInt(context.getConfiguration().get("kpath"));?
????????????? //當前數據與中心點的最小距離
????????????? int minDist = Integer.MAX_VALUE;?
????????????? //中心點索引
????????????? int centerIndex = k;
????????????? //計算樣本點到各個中心的距離,并把樣本聚類到距離最近的中心點所屬的類
????????????? for(int i=0;i<k;i++){
???????????????????? int currentDist=0;
???????????????????? currentDist=Help.caculateDistance0(data,centers.get(i));
???????????????????? if(minDist>currentDist){
??????????????????????????? minDist=currentDist;
??????????????????????????? centerIndex=i;
???????????????????? }
????????????? }
????????????? Text centerdata=new Text(rank+"\t"+order);
????????????? context.write(new IntWritable(centerIndex), centerdata);
?????? }
}
(6) KMeansReducerForCenter類
package myKMeans;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;?
import org.apache.hadoop.mapreduce.Reducer;?
import java.io.IOException;?
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;?
/**
?* 為獲得中心點的reducer
?* @author zheng
?*
?*/
public class KMeansReducerForCenter extends Reducer<IntWritable, Text, NullWritable, Text> {
?????? public void reduce(IntWritable key,Iterable<Text> value,Context context)
???????????????????? throws IOException,InterruptedException {
????????????? System.out.println("#######################");
????????????? List<ArrayList<Integer>>? helpList=new LinkedList<ArrayList<Integer>> ();
????????????? String tempResult="";
????????????? for(Text val:value){
???????????????????? String line =val.toString();
???????????????????? String[] fields=line.split("\t");
???????????????????? ArrayList<Integer> tempList=new ArrayList<Integer>();
???????????????????? for( String f:fields){
??????????????????????????? tempList.add(Integer.parseInt(f));
???????????????????? }
???????????????????? helpList.add(tempList);
????????????? }
????????????? //計算新的聚類中心
????????????? ?Text result= Help.caculateCenter(helpList);
????????????? ?context.write(NullWritable.get(), result);
?????? }
}
(7) KMeansMapperForClassify類
package myKMeans;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
/**
?* 為數據集分類的mapper
?* @author zheng
?*
?*/
public class KMeansMapperForClassify extends
Mapper<LongWritable,Text,IntWritable,dataCell>{
?????? public void map(LongWritable key,Text value,Context context)
???????????????????? throws IOException,InterruptedException{
????????????? String line =value.toString();
????????????? String[] fields=line.split("\t");
????????????? dataCell cell=new dataCell(fields[0],fields[1],fields[2]
?????????????????????????? ,Integer.parseInt(fields[3]),Integer.parseInt(fields[4]),fields[5]);
????????????? int rank=cell.getRank();
????????????? int order=cell.getOrder();
????????????? ArrayList<Integer> data =new ArrayList<Integer>();
????????????? data.add(rank);
????????????? data.add(order);
?????? ?????? //獲取中心點列表
????????????? List<ArrayList<Integer>> centers = Help.getCenters(context.getConfiguration().get("centerpath"));
????????????? //有幾個聚類中心
????????????? int k = Integer.parseInt(context.getConfiguration().get("kpath"));?
????????????? //當前數據與中心點的最小距離
????????????? int minDist = Integer.MAX_VALUE;?
????????????? //中心點索引
????????????? int centerIndex = k;
????????????? //計算樣本點到各個中心的距離,并把樣本聚類到距離最近的中心點所屬的類
????????????? for(int i=0;i<k;i++){
???????????????????? int currentDist=0;
???????????????????? currentDist=Help.caculateDistance0(data,centers.get(i));
???????????????????? if(minDist>currentDist){
??????????????????????????? minDist=currentDist;
??????????????????????????? centerIndex=i;
???????????????????? }
????????????? }
????????????? Text centerdata=new Text(rank+"\t"+order);
????????????? context.write(new IntWritable(centerIndex), cell);
?????? }
}
(8)? KMeansReducerForClassify類
package myKMeans;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
/**
?* 為數據集分類的Reducer
?* @author zheng
?*
?*/
public class KMeansReducerForClassify extends Reducer<IntWritable, dataCell, IntWritable, dataCell>{
?????? public void reduce(IntWritable key,Iterable<dataCell> value,Context context)
???????????????????? throws IOException,InterruptedException {
????????????? System.out.println("#######################");
????????????? for(dataCell val:value){
???????????????????? context.write(key, val);
?????? ?????? }
?????? }
}
?
?
總結
以上是生活随笔為你收集整理的KMeans算法的Mapreduce实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iPhone 12易掉漆、边框太锋利还割
- 下一篇: Halcon 3D点云和深度图的相互转化