(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)
轉自:http://zengzhaozheng.blog.51cto.com/8219051/1557054
一、概述
這2個月為公司數據挖掘系統做一些根據用戶標簽情況對用戶的相似度進行評估,其中涉及一些推薦算法知識,在這段時間研究了一遍《推薦算法實踐》和《Mahout in action》,在這里主要是根據這兩本書的一些思想和自己的一些理解對分布式基于ItemBase的推薦算法進行實現。其中分兩部分,第一部分是根據共現矩陣的方式來簡單的推算出用戶的推薦項,第二部分則是通過傳統的相似度矩陣的方法來實踐ItemBase推薦算法。這篇blog主要記錄第一部分的內容,并且利用MapReduce進行實現,下一篇blog則是記錄第二部分的內容和實現。
二、算法原理
協同推薦算法,作為眾多推薦算法中的一種已經被廣泛的應用。其主要分為2種,第一種就是基于用戶的協同過濾,第二種就是基于物品的協同過濾。
所謂的itemBase推薦算法簡單直白的描述就是:用戶A喜歡物品X1,用戶B喜歡物品X2,如果X1和X2相似則,將A之前喜歡過的物品推薦給B,或者B之前喜歡過的物品推薦給A。這種算法是完全依賴于用戶的歷史喜歡物品的;所謂的UserBase推薦算法直白地說就是:用戶A喜歡物品X1,用戶B喜歡物品X2,如果用戶A和用戶B相似則將物品X1推薦給用戶B,將物品X2推薦給用戶A。簡單的示意圖:
至于選擇哪種要看自己的實際情況,如果用戶量比物品種類多得多那么就采用ItemBase的協同過濾推薦算法,如果是用戶量比物品種類少的多則采用UserBase的協同顧慮推薦算,這樣選擇的一個原因是為了讓物品的相似度矩陣或者用戶相似度矩陣或者共現矩陣的規模最小化。
三、數據建模
基本的算法上面已經大概說了一下,對于算法來說,對數據建模使之運用在算法之上是重點也是難點。這小節主要根據自己相關項目的經驗和《推薦引擎實踐》的一些觀點來討論一些。分開2部分說,一是根據共現矩陣推薦、而是根據相似度算法進行推薦。
(1)共現矩陣方式:
第一步:轉換成用戶向量
1[102:0.1,103:0.2,104:0.3]:表示用戶1喜歡的物品列表,以及他們對應的喜好評分。
2[101:0.1,102:0.7,105:0.9]:表示用戶2喜歡的物品列表,以及他們對應的喜好評分。
3[102:0.1,103:0.7,104:0.2]:表示用戶3喜歡的物品列表,以及他們對應的喜好評分。
第二步:計算共現矩陣
簡單地說就是將同時喜歡物品x1和x2的用戶數組成矩陣。
第三步:
生成用戶對物品的評分矩陣
第四步:物品共現矩陣和用戶對物品的評分矩陣相乘得到推薦結果
舉個例子計算用戶1的推薦列表過程:
用戶1對物品101的總評分計算:
1*0+1*0.1+0*0.2+0*0.3+1*0=0.1
用戶1對物品102的總評分計算:
1*0+3*0.1+1*0.2+2*0.3+2*0=1.1
用戶1對物品103的總評分計算:
0*0+1*0.1+1*0.2+1*0.3+0*0=0.6
用戶1對物品104的總評分計算:
0*0+2*0.1+1*0.2+2*0.3+1*0=1.0
用戶1對物品105的總評分計算:
1*0+2*0.1+0*0.2+1*0.3+2*0=0.5
從而得到用戶1的推薦列表為1[101:0.1,102:1.1,103:0.6,104:1.0,105:0.5]再經過排序得到最終推薦列表1[102:1.1,104:1.0,103:0.6,105:0.5,101:0.1]。
(2)通過計算機物品相似度方式計算用戶的推薦向量。
通過計算機物品相似度方式計算用戶的推薦向量和上面通過共現矩陣的方式差不多,就是將物品相似度矩陣代替掉共現矩陣和用戶對物品的評分矩陣相乘,然后在計算推薦向量。
計算相似度矩陣:
在計算之前我們先了解一下物品相似度相關的計算方法。
對于計算物品相似度的算法有很多,要根據自己的數據模型進行選擇。基于皮爾遜相關系數計算、歐幾里德定理(實際上是算兩點距離)、基于余弦相似度計算斯皮爾曼相關系數計算、基于谷本系數計算、基于對數似然比計算。其中谷本系數和對數似然比這兩種方式主要是針對那些沒有指名對物品喜歡度的數據模型進行相似度計算,也就是mahout中所指的Boolean數據模型。下面主要介紹2種,歐幾里德和余弦相似度算法。
現在關鍵是怎么將現有數據轉化成對應的空間向量模型使之適用這些定理,這是個關鍵點。下面我以歐幾里德定理作為例子看看那如何建立模型:
第一步:將用戶向量轉化為物品向量
用戶向量:
1[102:0.1,103:0.2,104:0.3]
2[101:0.1,102:0.7,105:0.9]
3[102:0.1,103:0.7,104:0.2]
轉為為物品向量:
101[2:0.1]
102[1:0.1,2:0.7,3:0.1]
103[1:0.2,3:0.7]
104[1:0.3,3:0.2]
105[2:0.9]
第二步:
那么物品相似度計算為:
第三步:
最終得到物品相似度矩陣為:(這里省略掉沒有意義的自關聯相似度)
第四步:物品相似度矩陣和用戶對物品的評分矩陣相乘得到推薦結果:
舉個例子計算用戶1的類似推薦列表過程:
用戶1對物品101的總評分計算:
1*0+1*0.6186429+0*0.6964322+0*0.7277142+1*0.55555556=1.174198
用戶1對物品102的總評分計算:
1*0.6186429+3*0+1*0.5188439+2*0.5764197+2*0.8032458=3.896818
用戶1對物品103的總評分計算:
0*0.6964322+1*0.5188439+1*0+1*0.662294+0*0.463481=1.181138
用戶1對物品104的總評分計算:
0*0.7277142+2*0.5764197+1*0.662294+2*0+1*0.5077338=2.322867
用戶1對物品105的總評分計算:
1*0.55555556+2*0.8032458+0*0.463481+1*0.5077338=2.669780
四、共現矩陣方式的MapReduce實現
這里主要是利用MapReduce結合Mahout連的一些數據類型對共現矩陣方式的推薦方法進行實現,至于相似度矩陣方式進行推薦的在下一篇blog寫。這里采用Boolean數據模型,即用戶是沒有對喜歡的物品進行初始打分的,我們在程序中默認都為1。
先看看整個MapReduce的數據流向圖:
具體代碼實現:HadoopUtil
package com.util;import java.io.IOException; import java.util.Arrays; import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator; import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public final class HadoopUtil {private static final Logger log = LoggerFactory.getLogger(HadoopUtil.class);private HadoopUtil() { }public static Job prepareJob(String jobName,String[] inputPath,String outputPath,Class<? extends InputFormat> inputFormat,Class<? extends Mapper> mapper,Class<? extends Writable> mapperKey,Class<? extends Writable> mapperValue,Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {Job job = new Job(new Configuration(conf)); job.setJobName(jobName); Configuration jobConf = job.getConfiguration();if (mapper.equals(Mapper.class)) {throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer"); } job.setJarByClass(mapper);job.setInputFormatClass(inputFormat); job.setInputFormatClass(inputFormat); StringBuilder inputPathsStringBuilder =new StringBuilder(); for(String p : inputPath){ inputPathsStringBuilder.append(",").append(p); } inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper); job.setMapOutputKeyClass(mapperKey); job.setMapOutputValueClass(mapperValue); job.setOutputKeyClass(mapperKey); job.setOutputValueClass(mapperValue); jobConf.setBoolean("mapred.compress.map.output", true); job.setNumReduceTasks(0);job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job;}public static Job prepareJob(String jobName,String[] inputPath,String outputPath,Class<? extends InputFormat> inputFormat,Class<? extends Mapper> mapper,Class<? extends Writable> mapperKey,Class<? extends Writable> mapperValue, Class<? extends Reducer> reducer,Class<? extends Writable> reducerKey,Class<? extends Writable> reducerValue,Class<? extends OutputFormat> outputFormat,Configuration conf) throws IOException {Job job = new Job(new Configuration(conf)); job.setJobName(jobName); Configuration jobConf = job.getConfiguration();if (reducer.equals(Reducer.class)) {if (mapper.equals(Mapper.class)) { throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer");}job.setJarByClass(mapper); } else {job.setJarByClass(reducer); }job.setInputFormatClass(inputFormat); StringBuilder inputPathsStringBuilder =new StringBuilder(); for(String p : inputPath){ inputPathsStringBuilder.append(",").append(p); } inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper); if (mapperKey != null) {job.setMapOutputKeyClass(mapperKey); } if (mapperValue != null) {job.setMapOutputValueClass(mapperValue); }jobConf.setBoolean("mapred.compress.map.output", true);job.setReducerClass(reducer); job.setOutputKeyClass(reducerKey); job.setOutputValueClass(reducerValue);job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job;}public static Job prepareJob(String jobName, String[] inputPath, String outputPath, Class<? extends InputFormat> inputFormat, Class<? extends Mapper> mapper, Class<? extends Writable> mapperKey, Class<? extends Writable> mapperValue, Class<? extends Reducer> combiner, Class<? extends Reducer> reducer, Class<? extends Writable> reducerKey, Class<? extends Writable> reducerValue, Class<? extends OutputFormat> outputFormat, Configuration conf) throws IOException {Job job = new Job(new Configuration(conf)); job.setJobName(jobName); Configuration jobConf = job.getConfiguration();if (reducer.equals(Reducer.class)) { if (mapper.equals(Mapper.class)) { throw new IllegalStateException( "Can't figure out the user class jar file from mapper/reducer"); } job.setJarByClass(mapper); } else { job.setJarByClass(reducer); }job.setInputFormatClass(inputFormat); StringBuilder inputPathsStringBuilder = new StringBuilder(); for (String p : inputPath) { inputPathsStringBuilder.append(",").append(p); } inputPathsStringBuilder.deleteCharAt(0); jobConf.set("mapred.input.dir", inputPathsStringBuilder.toString());job.setMapperClass(mapper); if (mapperKey != null) { job.setMapOutputKeyClass(mapperKey); } if (mapperValue != null) { job.setMapOutputValueClass(mapperValue); }jobConf.setBoolean("mapred.compress.map.output", true);job.setCombinerClass(combiner);job.setReducerClass(reducer); job.setOutputKeyClass(reducerKey); job.setOutputValueClass(reducerValue);job.setOutputFormatClass(outputFormat); jobConf.set("mapred.output.dir", outputPath);return job; }public static String getCustomJobName(String className, JobContext job,Class<? extends Mapper> mapper,Class<? extends Reducer> reducer) { StringBuilder name = new StringBuilder(100); String customJobName = job.getJobName(); if (customJobName == null || customJobName.trim().isEmpty()) {name.append(className); } else {name.append(customJobName); } name.append('-').append(mapper.getSimpleName()); name.append('-').append(reducer.getSimpleName()); return name.toString();}public static void delete(Configuration conf, Iterable<Path> paths) throws IOException { if (conf == null) {conf = new Configuration(); } for (Path path : paths) {FileSystem fs = path.getFileSystem(conf);if (fs.exists(path)) { log.info("Deleting {}", path); fs.delete(path, true);} }}public static void delete(Configuration conf, Path... paths) throws IOException { delete(conf, Arrays.asList(paths));}public static long countRecords(Path path, Configuration conf) throws IOException { long count = 0; Iterator<?> iterator = new SequenceFileValueIterator<Writable>(path, true, conf); while (iterator.hasNext()) {iterator.next();count++; } return count;}public static long countRecords(Path path, PathType pt, PathFilter filter, Configuration conf) throws IOException { long count = 0; Iterator<?> iterator = new SequenceFileDirValueIterator<Writable>(path, pt, filter, null, true, conf); while (iterator.hasNext()) {iterator.next();count++; } return count;} }先看看寫的工具類:
第一步:處理原始輸入數據
處理原始數據的SourceDataToItemPrefsJob作業的mapper:SourceDataToItemPrefsMapper
package com.mapper;import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable;/*** mapper輸入格式:userID:itemID1 itemID2 itemID3....* mapper輸出格式:<userID,itemID>* @author 曾昭正*/ public class SourceDataToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable>{ //private static final Logger logger = LoggerFactory.getLogger(SourceDataToItemPrefsMapper.class); private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); private String line = null;@Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {line = value.toString();if(line == null) return ; // logger.info("line:"+line);Matcher matcher = NUMBERS.matcher(line);matcher.find();//尋找第一個分組,即userIDVarLongWritable userID = new VarLongWritable(Long.parseLong(matcher.group()));//這個類型是在mahout中獨立進行封裝的VarLongWritable itemID = new VarLongWritable();while(matcher.find()){itemID.set(Long.parseLong(matcher.group())); // logger.info(userID + " " + itemID);context.write(userID, itemID);} } }處理原始數據的SourceDataToItemPrefsJob作業的reducer:SourceDataToItemPrefsMapper
package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** reducer輸入:<userID,Iterable<itemID>>* reducer輸出:<userID,VecotrWriable<index=itemID,valuce=pres>....>* @author 曾昭正*/ public class SourceDataToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable>{ private static final Logger logger = LoggerFactory.getLogger(SourceDataToUserVectorReducer.class); @Override protected void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs,Context context) throws IOException, InterruptedException { /*** DenseVector,它的實現就是一個浮點數數組,對向量里所有域都進行存儲,適合用于存儲密集向量。 RandomAccessSparseVector 基于浮點數的 HashMap 實現的,key 是整形 (int) 類型,value 是浮點數 (double) 類型,它只存儲向量中不為空的值,并提供隨機訪問。 SequentialAccessVector 實現為整形 (int) 類型和浮點數 (double) 類型的并行數組,它也只存儲向量中不為空的值,但只提供順序訪問。 用戶可以根據自己算法的需求選擇合適的向量實現類,如果算法需要很多隨機訪問,應該選擇 DenseVector 或者 RandomAccessSparseVector,如果大部分都是順序訪問,SequentialAccessVector 的效果應該更好。 介紹了向量的實現,下面我們看看如何將現有的數據建模成向量,術語就是“如何對數據進行向量化”,以便采用 Mahout 的各種高效的聚類算法。*/ Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for(VarLongWritable itemPref : itemPrefs){ userVector.set((int)itemPref.get(), 1.0f);//RandomAccessSparseVector.set(index,value),用戶偏好類型為boolean類型,將偏好值默認都為1.0f } logger.info(userID+" "+new VectorWritable(userVector)); context.write(userID, new VectorWritable(userVector)); } }第二步:將SourceDataToItemPrefsJob作業的reduce輸出結果組合成共現矩陣
UserVectorToCooccurrenceJob作業的mapper:UserVectorToCooccurrenceMapper
package com.mapper;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable;/*** mapper輸入:<userID,VecotrWriable<index=itemID,valuce=pres>....>* mapper輸出:<itemID,itemID>(共現物品id對)* @author 曾昭正*/ public class UserVectorToCooccurrenceMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, IntWritable>{ @Override protected void map(VarLongWritable userID, VectorWritable userVector,Context context) throws IOException, InterruptedException { Iterator<Vector.Element> it = userVector.get().nonZeroes().iterator();//過濾掉非空元素 while(it.hasNext()){ int index1 = it.next().index(); Iterator<Vector.Element> it2 = userVector.get().nonZeroes().iterator(); while(it2.hasNext()){ int index2 = it2.next().index(); context.write(new IntWritable(index1), new IntWritable(index2)); } }} }UserVectorToCooccurrenceJob作業的reducer:UserVectorToCoocurrenceReducer
package com.reducer;import java.io.IOException;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /*** reducer輸入:<itemID,Iterable<itemIDs>>* reducer輸出:<mainItemID,Vector<coocItemID,coocTime(共現次數)>....>* @author 曾昭正*/ public class UserVectorToCoocurrenceReducer extends Reducer<IntWritable, IntWritable, IntWritable, VectorOrPrefWritable>{ private static final Logger logger = LoggerFactory.getLogger(UserVectorToCoocurrenceReducer.class); @Override protected void reduce(IntWritable mainItemID, Iterable<IntWritable> coocItemIDs,Context context) throws IOException, InterruptedException { Vector coocItemIDVectorRow = new RandomAccessSparseVector(Integer.MAX_VALUE,100); for(IntWritable coocItem : coocItemIDs){ int itemCoocTime = coocItem.get(); coocItemIDVectorRow.set(itemCoocTime,coocItemIDVectorRow.get(itemCoocTime)+1.0);//將共現次數累加 } logger.info(mainItemID +" "+new VectorOrPrefWritable(coocItemIDVectorRow)); context.write(mainItemID, new VectorOrPrefWritable(coocItemIDVectorRow));//記錄mainItemID的完整共現關系 } }第三步:將SourceDataToItemPrefsJob作業的reduce輸出結果進行分割
userVecotrSplitJob作業的mapper:UserVecotrSplitMapper
package com.mapper;import java.io.IOException; import java.util.Iterator;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.Vector.Element; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** 將用戶向量分割,以便和物品的共現向量進行合并* mapper輸入:<userID,Vector<itemIDIndex,preferenceValuce>....>* reducer輸出:<itemID,Vecotor<userID,preferenceValuce>....> * @author 曾昭正*/ public class UserVecotrSplitMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable>{ private static final Logger logger = LoggerFactory.getLogger(UserVecotrSplitMapper.class); @Override protected void map(VarLongWritable userIDWritable, VectorWritable value,Context context) throws IOException, InterruptedException { IntWritable itemIDIndex = new IntWritable(); long userID = userIDWritable.get(); Vector userVector = value.get(); Iterator<Element> it = userVector.nonZeroes().iterator();//只取非空用戶向量 while(it.hasNext()){ Element e = it.next(); int itemID = e.index(); itemIDIndex.set(itemID); float preferenceValuce = (float) e.get(); logger.info(itemIDIndex +" "+new VectorOrPrefWritable(userID,preferenceValuce)); context.write(itemIDIndex, new VectorOrPrefWritable(userID,preferenceValuce)); }} }第四步:將userVecotrSplitJob和UserVectorToCooccurrenceJob作業的輸出結果合并
combineUserVectorAndCoocMatrixJob作業的mapper:CombineUserVectorAndCoocMatrixMapper
package com.mapper;import java.io.IOException;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable;/*** 將共現矩陣和分割后的用戶向量進行合并,以便計算部分的推薦向量* 這個mapper其實沒有什么邏輯處理功能,只是將數據按照輸入格式輸出* 注意:這里的mapper輸入為共現矩陣和分割后的用戶向量計算過程中的共同輸出的2個目錄* mapper輸入:<itemID,Vecotor<userID,preferenceValuce>> or <itemID,Vecotor<coocItemID,coocTimes>>* mapper輸出:<itemID,Vecotor<userID,preferenceValuce>/Vecotor<coocItemID,coocTimes>>* @author 曾昭正*/ public class CombineUserVectorAndCoocMatrixMapper extends Mapper<IntWritable, VectorOrPrefWritable, IntWritable, VectorOrPrefWritable>{ @Override protected void map(IntWritable itemID, VectorOrPrefWritable value,Context context) throws IOException, InterruptedException { context.write(itemID, value); }}combineUserVectorAndCoocMatrixJob作業的CombineUserVectorAndCoocMatrixReducer
package com.reducer;import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.Vector; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** 將共現矩陣和分割后的用戶向量進行合并,以便計算部分的推薦向量* @author 曾昭正*/ public class CombineUserVectorAndCoocMatrixReducer extends Reducer<IntWritable, VectorOrPrefWritable, IntWritable, VectorAndPrefsWritable>{ private static final Logger logger = LoggerFactory.getLogger(CombineUserVectorAndCoocMatrixReducer.class); @Override protected void reduce(IntWritable itemID, Iterable<VectorOrPrefWritable> values,Context context) throws IOException, InterruptedException { VectorAndPrefsWritable vectorAndPrefsWritable = new VectorAndPrefsWritable(); List<Long> userIDs = new ArrayList<Long>(); List<Float> preferenceValues = new ArrayList<Float>(); Vector coocVector = null; Vector coocVectorTemp = null; Iterator<VectorOrPrefWritable> it = values.iterator(); while(it.hasNext()){ VectorOrPrefWritable e = it.next(); coocVectorTemp = e.getVector() ; if(coocVectorTemp == null){ userIDs.add(e.getUserID()); preferenceValues.add(e.getValue()); }else{ coocVector = coocVectorTemp; } } if(coocVector != null){ //這個需要注意,根據共現矩陣的計算reduce聚合之后,到了這個一個Reudce分組就有且只有一個vecotr(即共現矩陣的一列或者一行,這里行和列是一樣的)了。 vectorAndPrefsWritable.set(coocVector, userIDs, preferenceValues); logger.info(itemID+" "+vectorAndPrefsWritable); context.write(itemID, vectorAndPrefsWritable); } } }第五步:將combineUserVectorAndCoocMatrixJob作業的輸出結果生成推薦列表
caclPartialRecomUserVectorJob作業的mapper:CaclPartialRecomUserVectorMapper
package com.mapper;import java.io.IOException; import java.util.List;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** 計算部分用戶推薦向量* @author 曾昭正*/ public class CaclPartialRecomUserVectorMapper extends Mapper<IntWritable,VectorAndPrefsWritable, VarLongWritable, VectorWritable>{ private static final Logger logger = LoggerFactory.getLogger(CaclPartialRecomUserVectorMapper.class); @Override protected void map(IntWritable itemID, VectorAndPrefsWritable values,Context context) throws IOException, InterruptedException { Vector coocVectorColumn = values.getVector(); List<Long> userIDs = values.getUserIDs(); List<Float> preferenceValues = values.getValues(); for(int i = 0; i< userIDs.size(); i++){ long userID = userIDs.get(i); float preferenceValue = preferenceValues.get(i); logger.info("userID:" + userID); logger.info("preferenceValue:"+preferenceValue); //將共現矩陣中userID對應的列相乘,算出部分用戶對應的推薦列表分數 Vector preferenceParScores = coocVectorColumn.times(preferenceValue); context.write(new VarLongWritable(userID), new VectorWritable(preferenceParScores)); } } }caclPartialRecomUserVectorJob作業的combiner:ParRecomUserVectorCombiner
package com.reducer;import java.io.IOException;import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /*** 將計算部分用戶推薦向量的結果進行合并,將userID對應的貢現向量的分值進行相加(注意:這個只是將一個map的輸出進行合并,所以這個也是只部分結果)* @author 曾昭正*/ public class ParRecomUserVectorCombiner extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, VectorWritable>{ private static final Logger logger = LoggerFactory.getLogger(ParRecomUserVectorCombiner.class); @Override protected void reduce(VarLongWritable userID, Iterable<VectorWritable> coocVectorColunms,Context context) throws IOException, InterruptedException {Vector vectorColunms = null;for(VectorWritable coocVectorColunm : coocVectorColunms){ vectorColunms = vectorColunms == null ? coocVectorColunm.get() : vectorColunms.plus(coocVectorColunm.get()); } logger.info(userID +" " + new VectorWritable(vectorColunms)); context.write(userID, new VectorWritable(vectorColunms)); } }caclPartialRecomUserVectorJob作業的reducer:MergeAndGenerateRecommendReducer
package com.reducer;import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue;import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable; import org.apache.mahout.cf.taste.impl.recommender.ByValueRecommendedItemComparator; import org.apache.mahout.cf.taste.impl.recommender.GenericRecommendedItem; import org.apache.mahout.cf.taste.recommender.RecommendedItem; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.Vector; import org.apache.mahout.math.Vector.Element; import org.apache.mahout.math.VectorWritable; import org.slf4j.Logger; import org.slf4j.LoggerFactory;/*** 合并所有已經評分的共現矩陣* @author 曾昭正*/ public class MergeAndGenerateRecommendReducer extends Reducer<VarLongWritable, VectorWritable, VarLongWritable, RecommendedItemsWritable>{ private static final Logger logger = LoggerFactory.getLogger(MergeAndGenerateRecommendReducer.class); private int recommendationsPerUser; @Override protected void setup(Context context) throws IOException, InterruptedException { recommendationsPerUser = context.getConfiguration().getInt("recomandItems.recommendationsPerUser", 5); } @Override protected void reduce(VarLongWritable userID, Iterable<VectorWritable> cooVectorColumn,Context context) throws IOException, InterruptedException { //分數求和合并 Vector recommdVector = null; for(VectorWritable vector : cooVectorColumn){ recommdVector = recommdVector == null ? vector.get() : recommdVector.plus(vector.get()); } //對推薦向量進行排序,為每個UserID找出topM個推薦選項(默認找出5個),此隊列按照item對應的分數進行排序 //注意下:PriorityQueue隊列的頭一定是最小的元素,另外這個隊列容量增加1是為了為添加更大的新元素時使用的臨時空間 Queue<RecommendedItem> topItems = new PriorityQueue<RecommendedItem>(recommendationsPerUser+1, ByValueRecommendedItemComparator.getInstance());Iterator<Element> it = recommdVector.nonZeroes().iterator(); while(it.hasNext()){ Element e = it.next(); int itemID = e.index(); float preValue = (float) e.get(); //當隊列容量小于推薦個數,往隊列中填item和分數 if(topItems.size() < recommendationsPerUser){ topItems.add(new GenericRecommendedItem(itemID, preValue)); } //當前item對應的分數比隊列中的item的最小分數大,則將隊列頭原始(即最小元素)彈出,并且將當前item:分數加入隊列 else if(preValue > topItems.peek().getValue()){ topItems.add(new GenericRecommendedItem(itemID, preValue)); //彈出頭元素(最小元素) topItems.poll(); } }//重新調整隊列的元素的順序 List<RecommendedItem> recommdations = new ArrayList<RecommendedItem>(topItems.size()); recommdations.addAll(topItems);//將隊列中所有元素添加即將排序的集合 Collections.sort(recommdations,ByValueRecommendedItemComparator.getInstance());//排序//輸出推薦向量信息 logger.info(userID+" "+ new RecommendedItemsWritable(recommdations)); context.write(userID, new RecommendedItemsWritable(recommdations));} }第六步:組裝各個作業關系
PackageRecomendJob
package com.mapreduceMain;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable; import org.apache.mahout.cf.taste.hadoop.item.VectorAndPrefsWritable; import org.apache.mahout.cf.taste.hadoop.item.VectorOrPrefWritable; import org.apache.mahout.math.VarLongWritable; import org.apache.mahout.math.VectorWritable; import com.mapper.CaclPartialRecomUserVectorMapper; import com.mapper.CombineUserVectorAndCoocMatrixMapper; import com.mapper.UserVecotrSplitMapper; import com.mapper.UserVectorToCooccurrenceMapper; import com.mapper.SourceDataToItemPrefsMapper; import com.reducer.CombineUserVectorAndCoocMatrixReducer; import com.reducer.MergeAndGenerateRecommendReducer; import com.reducer.ParRecomUserVectorCombiner; import com.reducer.UserVectorToCoocurrenceReducer; import com.reducer.SourceDataToUserVectorReducer; import com.util.HadoopUtil;/*** 組裝各個作業組件,完成推薦作業* @author 曾昭正*/ public class PackageRecomendJob extends Configured implements Tool{ String[] dataSourceInputPath = {"/user/hadoop/z.zeng/distruteItemCF/dataSourceInput"}; String[] uesrVectorOutput = {"/user/hadoop/z.zeng/distruteItemCF/uesrVectorOutput/"}; String[] userVectorSpliltOutPut = {"/user/hadoop/z.zeng/distruteItemCF/userVectorSpliltOutPut"}; String[] cooccurrenceMatrixOuptPath = {"/user/hadoop/z.zeng/distruteItemCF/CooccurrenceMatrixOuptPath"}; String[] combineUserVectorAndCoocMatrixOutPutPath = {"/user/hadoop/z.zeng/distruteItemCF/combineUserVectorAndCoocMatrixOutPutPath"}; String[] caclPartialRecomUserVectorOutPutPath = {"/user/hadoop/z.zeng/distruteItemCF/CaclPartialRecomUserVectorOutPutPath"};protected void setup(Configuration configuration) throws IOException, InterruptedException { FileSystem hdfs = FileSystem.get(URI.create("hdfs://cluster-master"), configuration); Path p1 = new Path(uesrVectorOutput[0]); Path p2 = new Path(userVectorSpliltOutPut[0]); Path p3 = new Path(cooccurrenceMatrixOuptPath[0]); Path p4 = new Path(combineUserVectorAndCoocMatrixOutPutPath[0]); Path p5 = new Path(caclPartialRecomUserVectorOutPutPath[0]);if (hdfs.exists(p1)) { hdfs.delete(p1, true); } if (hdfs.exists(p2)) { hdfs.delete(p2, true); } if (hdfs.exists(p3)) { hdfs.delete(p3, true); } if (hdfs.exists(p4)) { hdfs.delete(p4, true); } if (hdfs.exists(p5)) { hdfs.delete(p5, true); } } @Override public int run(String[] args) throws Exception {Configuration conf=getConf(); //獲得配置文件對象setup(conf);// DistributedCache.addArchiveToClassPath(new Path("/user/hadoop/z.zeng/distruteItemCF/lib"), conf);//配置計算用戶向量作業Job wikipediaToItemPrefsJob = HadoopUtil.prepareJob("WikipediaToItemPrefsJob",dataSourceInputPath, uesrVectorOutput[0], TextInputFormat.class, SourceDataToItemPrefsMapper.class, VarLongWritable.class, VarLongWritable.class, SourceDataToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class, SequenceFileOutputFormat.class, conf);//配置計算共現向量作業Job userVectorToCooccurrenceJob = HadoopUtil.prepareJob("UserVectorToCooccurrenceJob",uesrVectorOutput, cooccurrenceMatrixOuptPath[0], SequenceFileInputFormat.class, UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, UserVectorToCoocurrenceReducer.class, IntWritable.class, VectorOrPrefWritable.class, SequenceFileOutputFormat.class, conf);//配置分割用戶向量作業Job userVecotrSplitJob = HadoopUtil.prepareJob("userVecotrSplitJob",uesrVectorOutput, userVectorSpliltOutPut[0], SequenceFileInputFormat.class, UserVecotrSplitMapper.class, IntWritable.class, VectorOrPrefWritable.class, SequenceFileOutputFormat.class, conf);//合并共現向量和分割之后的用戶向量作業//這個主意要將分割用戶向量和共現向量的輸出結果一起作為輸入String[] combineUserVectorAndCoocMatrixIutPutPath = {cooccurrenceMatrixOuptPath[0],userVectorSpliltOutPut[0]};Job combineUserVectorAndCoocMatrixJob = HadoopUtil.prepareJob("combineUserVectorAndCoocMatrixJob",combineUserVectorAndCoocMatrixIutPutPath,combineUserVectorAndCoocMatrixOutPutPath[0], SequenceFileInputFormat.class, CombineUserVectorAndCoocMatrixMapper.class, IntWritable.class, VectorOrPrefWritable.class, CombineUserVectorAndCoocMatrixReducer.class, IntWritable.class, VectorAndPrefsWritable.class, SequenceFileOutputFormat.class, conf);//計算用戶推薦向量Job caclPartialRecomUserVectorJob= HadoopUtil.prepareJob("caclPartialRecomUserVectorJob",combineUserVectorAndCoocMatrixOutPutPath,caclPartialRecomUserVectorOutPutPath[0], SequenceFileInputFormat.class, CaclPartialRecomUserVectorMapper.class, VarLongWritable.class, VectorWritable.class, ParRecomUserVectorCombiner.class,//為map設置combiner減少網絡IOMergeAndGenerateRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class, TextOutputFormat.class, conf);//串聯各個jobif(wikipediaToItemPrefsJob.waitForCompletion(true)){if(userVectorToCooccurrenceJob.waitForCompletion(true)){if(userVecotrSplitJob.waitForCompletion(true)){if(combineUserVectorAndCoocMatrixJob.waitForCompletion(true)){int rs = caclPartialRecomUserVectorJob.waitForCompletion(true) ? 1 :0;return rs;}else{throw new Exception("合并共現向量和分割之后的用戶向量作業失敗!!");}}else{throw new Exception("分割用戶向量作業失敗!!");}}else{throw new Exception("計算共現向量作業失敗!!");}}else{throw new Exception("計算用戶向量作業失敗!!");} } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { try { int returnCode = ToolRunner.run(new PackageRecomendJob(),args); System.exit(returnCode); } catch (Exception e) { } }}五、總結
本blog主要說了下itemBase推薦算法的一些概念,以及如何多現有數據進行建模。其中對共現矩陣方式的推薦用MapReduce結合Mahout的內置數據類型進行了實現。寫完這篇blog和對算法實現完畢后,發現Mapreduce編程雖然數據模型非常簡單,只有2個過程:數據的分散與合并,但是在分散與合并的過程中可以使用自定義的各種數據組合類型使其能夠完成很多復雜的功能。
參考文獻:《Mahout in action》、《推薦引擎實踐》
?
來源:?<http://www.tuicool.com/articles/BZVBRz>?
轉載于:https://www.cnblogs.com/baixl/p/4165712.html
總結
以上是生活随笔為你收集整理的(转) 基于MapReduce的ItemBase推荐算法的共现矩阵实现(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: paip.字符串操作uapi java
- 下一篇: Cache技术―OSCache