Big Data Analytics with Hadoop 3
上QQ阅读APP看书,第一时间看更新

MapReduce job types

MapReduce jobs can be written in multiple ways, depending on what the desired outcome is. The fundamental structure of a MapReduce job is as follows:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.lang.StringEscapeUtils;

public class
EnglishWordCounter {
public static class WordMapper
extends Mapper<Object, Text, Text, IntWritable> {
...
}
public static class CountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
...
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "English Word Counter");
job.setJarByClass(EnglishWordCounter.class);
job.setMapperClass(WordMapper.class);
job.setCombinerClass(CountReducer.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

The purpose of the driver is to orchestrate the jobs. The first few lines of main are all about parsing command-line arguments. Then, we start setting up the job object by telling it what classes to use for computations and what input paths and output paths to use.

Let's look at the Mapper code, which simply tokenizes the input string and writes each word as an output of the mapper:

public static class WordMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Grab the "Text" field, since that is what we are counting over
String txt = value.toString()
StringTokenizer itr = new StringTokenizer(txt);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

Finally, there is comes the reducer code, which is relatively simple. The reduce function gets called once per key grouping; in this case, each word. We'll iterate through the values, which will be numbers, and take a running sum. The final value of this running sum will be the sum of the ones:

public static class CountReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

There are basic types of MapReduce jobs, as shown in the following points.