/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.examples;import java.io.IOException;
import java.math.BigDecimal;
import java.util.Iterator;import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** A Map-reduce program to estimate the value of Pi* using quasi-Monte Carlo method.** Mapper:* Generate points in a unit square* and then count points inside/outside of the inscribed circle of the square.** Reducer:* Accumulate points inside/outside results from the mappers.** Let numTotal = numInside + numOutside.* The fraction numInside/numTotal is a rational approximation of* the value (Area of the circle)/(Area of the square),* where the area of the inscribed circle is Pi/4* and the area of unit square is 1.* Then, Pi is estimated value to be 4(numInside/numTotal). */
public class PiEstimator extends Configured implements Tool {/** tmp directory for input/output */static private final Path TMP_DIR = new Path(PiEstimator.class.getSimpleName() + "_TMP_3_141592654");/** 2-dimensional Halton sequence {H(i)},* where H(i) is a 2-dimensional point and i >= 1 is the index.* Halton sequence is used to generate sample points for Pi estimation. */private static class HaltonSequence {/** Bases */static final int[] P = {2, 3}; /** Maximum number of digits allowed */static final int[] K = {63, 40}; private long index;private double[] x;private double[][] q;private int[][] d;/** Initialize to H(startindex),* so the sequence begins with H(startindex+1).*/HaltonSequence(long startindex) {index = startindex;x = new double[K.length];q = new double[K.length][];d = new int[K.length][];for(int i = 0; i < K.length; i++) {q[i] = new double[K[i]];d[i] = new int[K[i]];}for(int i = 0; i < K.length; i++) {long k = index;x[i] = 0;for(int j = 0; j < K[i]; j++) {q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i];d[i][j] = (int)(k % P[i]);k = (k - d[i][j])/P[i];x[i] += d[i][j] * q[i][j];}}}/** Compute next point.* Assume the current point is H(index).* Compute H(index+1).* * @return a 2-dimensional point with coordinates in [0,1)^2*/double[] nextPoint() {index++;for(int i = 0; i < K.length; i++) {for(int j = 0; j < K[i]; j++) {d[i][j]++;x[i] += q[i][j];if (d[i][j] < P[i]) {break;}d[i][j] = 0;x[i] -= (j == 0? 1.0: q[i][j-1]);}}return x;}}/*** Mapper class for Pi estimation.* Generate points in a unit square* and then count points inside/outside of the inscribed circle of the square.*/public static class PiMapper extends MapReduceBaseimplements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {/** Map method.* @param offset samples starting from the (offset+1)th sample.* @param size the number of samples for this map* @param out output {ture->numInside, false->numOutside}* @param reporter*/public void map(LongWritable offset,LongWritable size,OutputCollector<BooleanWritable, LongWritable> out,Reporter reporter) throws IOException {final HaltonSequence haltonsequence = new HaltonSequence(offset.get());long numInside = 0L;long numOutside = 0L;for(long i = 0; i < size.get(); ) {//generate points in a unit squarefinal double[] point = haltonsequence.nextPoint();//count points inside/outside of the inscribed circle of the squarefinal double x = point[0] - 0.5;final double y = point[1] - 0.5;if (x*x + y*y > 0.25) {numOutside++;} else {numInside++;}//report statusi++;if (i % 1000 == 0) {reporter.setStatus("Generated " + i + " samples.");}}//output map resultsout.collect(new BooleanWritable(true), new LongWritable(numInside));out.collect(new BooleanWritable(false), new LongWritable(numOutside));}}/*** Reducer class for Pi estimation.* Accumulate points inside/outside results from the mappers.*/public static class PiReducer extends MapReduceBaseimplements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {private long numInside = 0;private long numOutside = 0;private JobConf conf; //configuration for accessing the file system/** Store job configuration. */@Overridepublic void configure(JobConf job) {conf = job;}/*** Accumulate number of points inside/outside results from the mappers.* @param isInside Is the points inside? * @param values An iterator to a list of point counts* @param output dummy, not used here.* @param reporter*/public void reduce(BooleanWritable isInside,Iterator<LongWritable> values,OutputCollector<WritableComparable<?>, Writable> output,Reporter reporter) throws IOException {if (isInside.get()) {for(; values.hasNext(); numInside += values.next().get());} else {for(; values.hasNext(); numOutside += values.next().get());}}/*** Reduce task done, write output to a file.*/@Overridepublic void close() throws IOException {//write output to a filePath outDir = new Path(TMP_DIR, "out");Path outFile = new Path(outDir, "reduce-out");FileSystem fileSys = FileSystem.get(conf);SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,outFile, LongWritable.class, LongWritable.class, CompressionType.NONE);writer.append(new LongWritable(numInside), new LongWritable(numOutside));writer.close();}}/*** Run a map/reduce job for estimating Pi.** @return the estimated value of Pi*/public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf) throws IOException {//setup job confjobConf.setJobName(PiEstimator.class.getSimpleName());jobConf.setInputFormat(SequenceFileInputFormat.class);jobConf.setOutputKeyClass(BooleanWritable.class);jobConf.setOutputValueClass(LongWritable.class);jobConf.setOutputFormat(SequenceFileOutputFormat.class);jobConf.setMapperClass(PiMapper.class);jobConf.setNumMapTasks(numMaps);jobConf.setReducerClass(PiReducer.class);jobConf.setNumReduceTasks(1);// turn off speculative execution, because DFS doesn't handle// multiple writers to the same file.jobConf.setSpeculativeExecution(false);//setup input/output directoriesfinal Path inDir = new Path(TMP_DIR, "in");final Path outDir = new Path(TMP_DIR, "out");FileInputFormat.setInputPaths(jobConf, inDir);FileOutputFormat.setOutputPath(jobConf, outDir);final FileSystem fs = FileSystem.get(jobConf);if (fs.exists(TMP_DIR)) {throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)+ " already exists. Please remove it first.");}if (!fs.mkdirs(inDir)) {throw new IOException("Cannot create input directory " + inDir);}try {//generate an input file for each map taskfor(int i=0; i < numMaps; ++i) {final Path file = new Path(inDir, "part"+i);final LongWritable offset = new LongWritable(i * numPoints);final LongWritable size = new LongWritable(numPoints);final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf, file,LongWritable.class, LongWritable.class, CompressionType.NONE);try {writer.append(offset, size);} finally {writer.close();}System.out.println("Wrote input for Map #"+i);}//start a map/reduce jobSystem.out.println("Starting Job");final long startTime = System.currentTimeMillis();JobClient.runJob(jobConf);final double duration = (System.currentTimeMillis() - startTime)/1000.0;System.out.println("Job Finished in " + duration + " seconds");//read outputsPath inFile = new Path(outDir, "reduce-out");LongWritable numInside = new LongWritable();LongWritable numOutside = new LongWritable();SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);try {reader.next(numInside, numOutside);} finally {reader.close();}//compute estimated valuereturn BigDecimal.valueOf(4).setScale(20).multiply(BigDecimal.valueOf(numInside.get())).divide(BigDecimal.valueOf(numMaps)).divide(BigDecimal.valueOf(numPoints));} finally {fs.delete(TMP_DIR, true);}}/*** Parse arguments and then runs a map/reduce job.* Print output in standard out.* * @return a non-zero if there is an error. Otherwise, return 0. */public int run(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");ToolRunner.printGenericCommandUsage(System.err);return -1;}final int nMaps = Integer.parseInt(args[0]);final long nSamples = Long.parseLong(args[1]);System.out.println("Number of Maps = " + nMaps);System.out.println("Samples per Map = " + nSamples);final JobConf jobConf = new JobConf(getConf(), getClass());System.out.println("Estimated value of Pi is "+ estimate(nMaps, nSamples, jobConf));return 0;}/*** main method for running it as a stand alone command. */public static void main(String[] argv) throws Exception {System.exit(ToolRunner.run(null, new PiEstimator(), argv));}
}
/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.hadoop.examples;import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MultiFileInputFormat;
import org.apache.hadoop.mapred.MultiFileSplit;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** MultiFileWordCount is an example to demonstrate the usage of * MultiFileInputFormat. This examples counts the occurrences of* words in the text files under the given input directory.*/
public class MultiFileWordCount extends Configured implements Tool {/*** This record keeps <filename,offset> pairs.*/public static class WordOffset implements WritableComparable {private long offset;private String fileName;public void readFields(DataInput in) throws IOException {this.offset = in.readLong();this.fileName = Text.readString(in);}public void write(DataOutput out) throws IOException {out.writeLong(offset);Text.writeString(out, fileName);}public int compareTo(Object o) {WordOffset that = (WordOffset)o;int f = this.fileName.compareTo(that.fileName);if(f == 0) {return (int)Math.signum((double)(this.offset - that.offset));}return f;}@Overridepublic boolean equals(Object obj) {if(obj instanceof WordOffset)return this.compareTo(obj) == 0;return false;}@Overridepublic int hashCode() {assert false : "hashCode not designed";return 42; //an arbitrary constant}}/*** To use {@link MultiFileInputFormat}, one should extend it, to return a * (custom) {@link RecordReader}. MultiFileInputFormat uses * {@link MultiFileSplit}s. */public static class MyInputFormat extends MultiFileInputFormat<WordOffset, Text> {@Overridepublic RecordReader<WordOffset,Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {return new MultiFileLineRecordReader(job, (MultiFileSplit)split);}}/*** RecordReader is responsible from extracting records from the InputSplit. * This record reader accepts a {@link MultiFileSplit}, which encapsulates several * files, and no file is divided.*/public static class MultiFileLineRecordReader implements RecordReader<WordOffset, Text> {private MultiFileSplit split;private long offset; //total offset read so far;private long totLength;private FileSystem fs;private int count = 0;private Path[] paths;private FSDataInputStream currentStream;private BufferedReader currentReader;public MultiFileLineRecordReader(Configuration conf, MultiFileSplit split)throws IOException {this.split = split;fs = FileSystem.get(conf);this.paths = split.getPaths();this.totLength = split.getLength();this.offset = 0;//open the first filePath file = paths[count];currentStream = fs.open(file);currentReader = new BufferedReader(new InputStreamReader(currentStream));}public void close() throws IOException { }public long getPos() throws IOException {long currentOffset = currentStream == null ? 0 : currentStream.getPos();return offset + currentOffset;}public float getProgress() throws IOException {return ((float)getPos()) / totLength;}public boolean next(WordOffset key, Text value) throws IOException {if(count >= split.getNumPaths())return false;/* Read from file, fill in key and value, if we reach the end of file,* then open the next file and continue from there until all files are* consumed. */String line;do {line = currentReader.readLine();if(line == null) {//close the filecurrentReader.close();offset += split.getLength(count);if(++count >= split.getNumPaths()) //if we are donereturn false;//open a new filePath file = paths[count];currentStream = fs.open(file);currentReader=new BufferedReader(new InputStreamReader(currentStream));key.fileName = file.getName();}} while(line == null);//update the key and valuekey.offset = currentStream.getPos();value.set(line);return true;}public WordOffset createKey() {WordOffset wo = new WordOffset();wo.fileName = paths[0].toString(); //set as the first filereturn wo;}public Text createValue() {return new Text();}}/*** This Mapper is similar to the one in {@link WordCount.MapClass}.*/public static class MapClass extends MapReduceBaseimplements Mapper<WordOffset, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(WordOffset key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {String line = value.toString();StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {word.set(itr.nextToken());output.collect(word, one);}}}private void printUsage() {System.out.println("Usage : multifilewc <input_dir> <output>" );}public int run(String[] args) throws Exception {if(args.length < 2) {printUsage();return 1;}JobConf job = new JobConf(getConf(), MultiFileWordCount.class);job.setJobName("MultiFileWordCount");//set the InputFormat of the job to our InputFormatjob.setInputFormat(MyInputFormat.class);// the keys are words (strings)job.setOutputKeyClass(Text.class);// the values are counts (ints)job.setOutputValueClass(IntWritable.class);//use the defined mapperjob.setMapperClass(MapClass.class);//use the WordCount Reducerjob.setCombinerClass(LongSumReducer.class);job.setReducerClass(LongSumReducer.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));JobClient.runJob(job);return 0;}public static void main(String[] args) throws Exception {int ret = ToolRunner.run(new MultiFileWordCount(), args);System.exit(ret);}}
三、Python版WordCount.py
位置:E:\Hadoop\hadoop-0.20.1\src\examples\python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#from org.apache.hadoop.fs import Path
from org.apache.hadoop.io import *
from org.apache.hadoop.mapred import *import sys
import getoptclass WordCountMap(Mapper, MapReduceBase):one = IntWritable(1)def map(self, key, value, output, reporter):for w in value.toString().split():output.collect(Text(w), self.one)class Summer(Reducer, MapReduceBase):def reduce(self, key, values, output, reporter):sum = 0while values.hasNext():sum += values.next().get()output.collect(key, IntWritable(sum))def printUsage(code):print "wordcount [-m <maps>] [-r <reduces>] <input> <output>"sys.exit(code)def main(args):conf = JobConf(WordCountMap);conf.setJobName("wordcount");conf.setOutputKeyClass(Text);conf.setOutputValueClass(IntWritable);conf.setMapperClass(WordCountMap); conf.setCombinerClass(Summer);conf.setReducerClass(Summer);try:flags, other_args = getopt.getopt(args[1:], "m:r:")except getopt.GetoptError:printUsage(1)if len(other_args) != 2:printUsage(1)for f,v in flags:if f == "-m":conf.setNumMapTasks(int(v))elif f == "-r":conf.setNumReduceTasks(int(v))conf.setInputPath(Path(other_args[0]))conf.setOutputPath(Path(other_args[1]))JobClient.runJob(conf);if __name__ == "__main__":main(sys.argv)