import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.mapreduce.Job; /** * Counts the number of occurrences of words. */ public class WordCount { /** * Mapper class */ public static class Map extends MapReduceBase implements Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); /** * Mapper output is word (Text) and count of number of occurrences. */ public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line, "| \t"); while (tokenizer.hasMoreTokens()) { this.word.set(tokenizer.nextToken()); output.collect(this.word, one); } } } /** * Reducer class */ public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } /** * Main method to submit the job to Hadoop cluster after configurating job parameters. * @param args * args[0] - input path, args[1] - output path * @throws Exception * if an error occurs */ public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); // Name of class conf.setJobName("wordcount"); // Name of job when executing with Hadoop conf.setOutputKeyClass(Text.class); // Type of output key conf.setOutputValueClass(IntWritable.class); // Type of output value conf.setMapperClass(Map.class); // Class that implements mapper conf.setCombinerClass(Reduce.class); // Performs local reducing after map stage conf.setReducerClass(Reduce.class); // Class that implements reducer conf.setInputFormat(TextInputFormat.class); // Indicate input files are in text format conf.setOutputFormat(TextOutputFormat.class); // Indicate output files are in text format // This following line is required when executing remotely conf.set("mapred.jar","wordcount.jar"); // Configure the JAR that stores the class. Configuration conf2 = new Configuration(); // Setup default input and output directories if not provided as arguments String outputPath = "/user/rlawrenc/outputwc/"; String inputPath = "/user/hduser/data/nation.tbl"; if ( args.length > 0 ) inputPath = args[0]; if ( args.length > 1 ) outputPath = args[1]; // Delete output path if it already exists as otherwise get an error if try to overwrite Path pt = new Path(outputPath); FileSystem fs = FileSystem.get(conf2); fs.delete(pt, true); // Remove previous output directory FileInputFormat.setInputPaths(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, pt); Job job = new Job(conf, "wordcount"); // Execute job job.waitForCompletion(true); // Show output by iterating through all file parts (one part per reducer) and output lines FileStatus[] files = fs.listStatus(pt); int recordCount = 0; for (FileStatus file : files) { System.out.println("File: " + file.getPath()); if ( file.isDir() || !file.getPath().toString().contains("part") ) continue; FSDataInputStream is = fs.open(file.getPath()); BufferedReader br = new BufferedReader(new InputStreamReader(is)); String s; while ((s = br.readLine()) != null) { System.out.println(s); recordCount++; } br.close(); } fs.close(); System.out.println("Total records printed: "+recordCount); } }