Hadoop MapReduce – Example, Algorithm, Step by Step Tutorial

Hadoop MapReduce is a system for parallel processing which was initially adopted by Google for executing the set of functions over large data sets in batch mode which is stored in the fault-tolerant large cluster.

The input data set which can be a terabyte file broken down into chunks of 64 MB by default is the input to Mapper function. The Mapper function then filters and sort these data chunks on Hadoop cluster data nodes based on the business requirement.

After the distributed computation is completed, the output of the mapper function is passed to reducer function which combines all the elements back together to provide the resulting output.

An example of Hadoop MapReduce usage is “word-count” algorithm in raw Java using classes provided by Hadoop libraries. Count how many times a given word such as “are”, “Hole”, “the” exists in a document which is the input file.

To begin, consider below figure, which breaks the word-count process into steps.

Hadoop Mapreduce
Hadoop MapReduce Word Count Process

The building blocks of Hadoop MapReduce programs are broadly classified into two phases, the map and reduce.

Both phases take input data in form of (key, value) pair and output data as (key, value) pair. The mapper program runs in parallel on the data nodes in the cluster. Once map phase is over, reducer run in parallel on data nodes.

mapper reducerThe input file is split into 64 MB chunk and is spread over the data nodes of the cluster. The mapper program runs on each data node of the cluster and generates (K1, V1) as the key-value pair.

Sort and shuffle stage creates the iterator for each key for e.g. (are, 1,1,1) which is passed to the reduce function that sums up the values for each key to generate (K2, V2) as output. The illustration of the same is shown in above figure (word count MapReduce process).

Hadoop MapReduce Algorithm for Word Count Program

  1. Take one line at a time
  2. Split the line into individual word one by one (tokenize)
  3. Take each word
  4. Note the frequency count (tabulation) for each word
  5. Report the list of words and the frequency tabulation

Hadoop MapReduce Code Structure for Word Count Program

Step 1

Import Hadoop libraries for Path, configuration, I/O, Map Reduce, and utilities.

import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;

Step 2

The summary of the classes defined in the “word count map reduce” program is as below :

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
----------
----------
--------
}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
......................
.........................
............................

}

public static void main(String[] args) throws Exception {
========================
=====================
================
}

We have created a package in the eclipse and defined a class named “WordCount”. The “WordCount” class has two nested class and one main class. “Mapper” and “Reducer” are the reserved keywords.

The source code for the same is written by Hadoop developer. We are extending the “Mapper” and “Reducer” class by the “Map” and “Reduce” respectively using inheritance.

Let us understand what is LongWritable, Text, IntWritable. For the same, we need to first understand serialization and de-serialization in java.

Object serialization is a mechanism where an object can be represented as a sequence of bytes that includes the object’s data as well as information about the object’s type and the types of data stored in the object.

The serialized object is written in a file and then de-serialized to recreate the object back into memory.

For example word “Hai” has a serializable value of say “0010110” and then once it is written in a file, you can de-serialized back to “Hai”.

In Hadoop MapReduce framework, mapper output is feeding as reducer input. These intermediate values are always in serialized form.

Serialization and de-serialization in java are called as Writable in Hadoop MapReduce programming. Therefore, Hadoop developers have converted all the data types in serialized form. For example, Int in java is  IntWritable in MapReduce framework, String in java is Text in MapReduce framework and so on.

The input and output of the mapper or reducer is in (key, value) format. For example, we have a file which contains text input and text outputs say the sample data as (1, aaa). The key is considered to be the precision of input data. The precision for (1, aaa) is defined as “01234”. 0 for “1”, 1 for “,” and so on which makes it to “01234”.

Therefore, for a text input/output file, the precision of first value is considered to be as key and the rest are values. In this case, “0” is considered as the key while as “(1, aaa)” as value.

Similarly, if you have another data in the file say (2, bbb).  The precision for (1, bbb) is defined as “56789”. Key here will be 5 and the value will be (1, bbb).

Now, let us try to understand the below with an example:

Mapper<LongWritable, Text, Text, IntWritable> {

Consider, we have the first line in the file as “Hi! How are you”.

The mapper input key value is (0, Hi!), (4, How), (8, are), (12, you). Therefore, the key generated by mapper class has a data type “LongWritable” i.e. the first parameter and the value generated by mapper class is “Text”.

The mapper output value would be the word and the count of the word i.e. (Hi!,1), (How,1), (are,1), (you, 1).

If the word “are” repeated twice in the sentence then the mapper output would be (are,1,1). Hence, the key of the mapper output is “Text” while as the value is “IntWritable”. This output to the mapper is getting

This output to the mapper is getting fed as the input to the reducer. Therefore, if the reducer input is (are, 1, 1) then the output of the reducer will be (are,2). Here, the reducer output data type has the key as “Text” and value as “IntWritable”.

Step 3

Define the map class. The key and value input pair have to be serializable by the framework and hence need to implement the Writable interface.

Output pairs do not need to be of the same types as input pairs. Output pairs are collected with calls to context.

Inside the static class “map” we are declaring an object with the name “one” to store the incremental value of the given word and the particular word is stored in the variable named “word”.

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);

}
}
}

The above piece of code takes each line as an input and stores it into the variable “line”. StringTokenizer allows an application to break a string into tokens. For example:

StringTokenizerst = new StringTokenizer(“my name is kumar”,” “);

The output of the above line will be: my
                                      name
                                      is
                                      kumar

If the “tokenizer” variable has more number of tokens to count then the while loop will get open. The context will take care of executing the for loop i.e. to read line by line of the file and store the output as the particular word and their occurrences. For example: if you have “hai, hai, hai” then the context will store (hai, 1, 1, 1)

Step 4

Reduce class will accept shuffled key-value pairs as input.The code then totals the values for the key-value pairs with the same key and outputs the totaled key-value pairs; e.g. <word,3>

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

Step 5

The main method sets up the Map Reduce configuration by defining the type of input. In this case, the input is text.The code then defines the Map, Combine, and Reduce classes, as well as specifying the input/output formats.

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");//Name for the job is “wordcount”
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class); // Mapper Class Name
job.setReducerClass(Reduce.class); //Reducer Class Name
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);

}
}

Step 6

The full Java code for the “word count” program is as below:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);

}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}