Hadoop Distributed File System (HDFS)

Hadoop Distributed File System (HDFS) is a file system that provides reliable data storage for large data-set in distributed computing environment using cluster of commodity servers.

When the data enters into HDFS file system, it breaks down into smaller chunks default 64 MB chunk and then gets distributed across the different nodes in the cluster allowing it for parallel processing.

The file system also copies each 64 MB chunk to multiple machines depending on the replication factor mentioned in HDFS configuration file.

This allows HDFS to be fault tolerance. An example of HDFS file system would be phone number of everyone across the globe stored in a file.

Our goal is to get the number of people whose first name starts with “A”. As soon as the file will arrive at HDFS file system it will break into default 64 MB chunk scattered across the commodity servers.

To reconstruct the cluster in order to achieve the output as count file, we have to write a program which will go to each node in the cluster and get the aggregated count information which will finally be summed up to get the final output.

To achieve availability of each node where 64 MB chunk is residing, HDFS replicates these 64 MB chunks onto two additional servers by default.

HDFS file system has master-slave architecture. Master is called as Name node. Name Node is a high-end server.

Along with master we have one more high-end server which is called secondary name node.

All other servers are normal machines i.e. 2 GB/4GB RAM machines which we call it as commodity machines or slave machines.

Data blocks which by default is 64 MB chunk resides on commodity machines or slave machines. It is important to understand that secondary name node is not a backup machine.

The pertinent question here is what name node and secondary name node does.

Name node keeps the metadata information about the data blocks residing on commodity machines or slave machines in Hadoop cluster.

Only the name node in the whole Hadoop eco system has the information about which data blocks resides on what commodity machine.

Name node basically has the name and path of the data blocks along with the IP address of commodity machine where data blocks reside.

File system operations such as opening, closing, renaming files and directories are executed by name node.

The Data Nodes are responsible for serving read and write requests from the file system’s clients. The Data Nodes also perform block creation, deletion, and replication upon instruction from the name node.

It is the job of Data Nodes for serving read and writes requests from the clients file system.

Data Nodes also performs data block creation, deletion, and replication upon instruction from the Name Node in order to have fault-tolerance capabilities.

Any machine that supports JAVA can run Name Node and Data Node software since HDFS is built using open source JAVA programming language.

Name Node has background process running. Metadata of all the changes which are happening to data blocks such as block creation, deletion are stored in Edit logs of name node.

Edit log is basically one of the construct of name node where filesystem metadata is stored.

Another construct of name node is fsimage that keeps point-in-time snapshot of the file system metadata.

Once the name node is restarted, edit logs are applied to fsimage to get the most recent snapshot of the HDFS. But the name node restarts are very rare in Hadoop eco-system and hence edit log will grow very huge over a period of time. The challenges at this point of time would be as below:

  1. Edit log size will grow over a period of time, which will be difficult to manage.
  2. Restart of name node will take time since huge changes need to be merged from edit log to fsimage.
  3. In case of any fault, we will lose huge amount of metadata. 

Secondary Name Node helps to overcome the above issue by taking over responsibility of merging editlogs with fsimage from the name node.

Secondary name node gets the edit logs from name node in a regular interval and applies to the fsimage of itself i.e. secondary name node.

Once secondary name node has the new fsimage, it copies fsimage back to namenode. In this way name node will always have the most recent information about the metadata.

This is why we do not call secondary name node as replacement or backup for the name node. It is also named as checkpoint node for the above reason.

Hadoop Distributed File System Architecture
HDFS Architecture

Summary of sequence of steps which takes place when the HDFS cluster starts:

  1. Start the cluster for first time.
  2. Name Node receives the heartbeat from all commodity servers.
  3. All the commodity servers regularly pings the name node. This ping is called heartbeat. In the heartbeat it sends the information about which block is residing on what commodity server.
  4. Using the above information, name node creates a file which we call it as “fsimage”.
  5. Till the time “fsimage” is created, name node is locked up. Once the creation process completes, it passes the information to Secondary name node and unlock itself.
  6. If creation/deletion of data blocks takes place at different data nodes, Name node writes the updated metadata information to edit logs.
  7. It passes the edit logs information to Secondary namenode. Edit logs information will get applied to Secondary name node fsimage i.e. the delta changes will be captured in edit log of name node and further passed to Secondary name node. Secondary name node will take this delta changes and create a new fsimage using the edit log of name node.
  8. Secondary name node passes the most recent fsimage back to name node. Hence, name node will always have the most recent information about the metadata.

Hadoop MapReduce – Example, Algorithm, Step by Step Tutorial

Hadoop MapReduce is a system for parallel processing which was initially adopted by Google for executing the set of functions over large data sets in batch mode which is stored in the fault-tolerant large cluster.

The input data set which can be a terabyte file broken down into chunks of 64 MB by default is the input to Mapper function. The Mapper function then filters and sort these data chunks on Hadoop cluster data nodes based on the business requirement.

After the distributed computation is completed, the output of the mapper function is passed to reducer function which combines all the elements back together to provide the resulting output.

An example of Hadoop MapReduce usage is “word-count” algorithm in raw Java using classes provided by Hadoop libraries. Count how many times a given word such as “are”, “Hole”, “the” exists in a document which is the input file.

To begin, consider below figure, which breaks the word-count process into steps.

Hadoop Mapreduce
Hadoop MapReduce Word Count Process

The building blocks of Hadoop MapReduce programs are broadly classified into two phases, the map and reduce.

Both phases take input data in form of (key, value) pair and output data as (key, value) pair. The mapper program runs in parallel on the data nodes in the cluster. Once map phase is over, reducer run in parallel on data nodes.

mapper reducerThe input file is split into 64 MB chunk and is spread over the data nodes of the cluster. The mapper program runs on each data node of the cluster and generates (K1, V1) as the key-value pair.

Sort and shuffle stage creates the iterator for each key for e.g. (are, 1,1,1) which is passed to the reduce function that sums up the values for each key to generate (K2, V2) as output. The illustration of the same is shown in above figure (word count MapReduce process).

Hadoop MapReduce Algorithm for Word Count Program

  1. Take one line at a time
  2. Split the line into individual word one by one (tokenize)
  3. Take each word
  4. Note the frequency count (tabulation) for each word
  5. Report the list of words and the frequency tabulation

Hadoop MapReduce Code Structure for Word Count Program

Step 1

Import Hadoop libraries for Path, configuration, I/O, Map Reduce, and utilities.

import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;

Step 2

The summary of the classes defined in the “word count map reduce” program is as below :

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
----------
----------
--------
}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
......................
.........................
............................

}

public static void main(String[] args) throws Exception {
========================
=====================
================
}

We have created a package in the eclipse and defined a class named “WordCount”. The “WordCount” class has two nested class and one main class. “Mapper” and “Reducer” are the reserved keywords.

The source code for the same is written by Hadoop developer. We are extending the “Mapper” and “Reducer” class by the “Map” and “Reduce” respectively using inheritance.

Let us understand what is LongWritable, Text, IntWritable. For the same, we need to first understand serialization and de-serialization in java.

Object serialization is a mechanism where an object can be represented as a sequence of bytes that includes the object’s data as well as information about the object’s type and the types of data stored in the object.

The serialized object is written in a file and then de-serialized to recreate the object back into memory.

For example word “Hai” has a serializable value of say “0010110” and then once it is written in a file, you can de-serialized back to “Hai”.

In Hadoop MapReduce framework, mapper output is feeding as reducer input. These intermediate values are always in serialized form.

Serialization and de-serialization in java are called as Writable in Hadoop MapReduce programming. Therefore, Hadoop developers have converted all the data types in serialized form. For example, Int in java is  IntWritable in MapReduce framework, String in java is Text in MapReduce framework and so on.

The input and output of the mapper or reducer is in (key, value) format. For example, we have a file which contains text input and text outputs say the sample data as (1, aaa). The key is considered to be the precision of input data. The precision for (1, aaa) is defined as “01234”. 0 for “1”, 1 for “,” and so on which makes it to “01234”.

Therefore, for a text input/output file, the precision of first value is considered to be as key and the rest are values. In this case, “0” is considered as the key while as “(1, aaa)” as value.

Similarly, if you have another data in the file say (2, bbb).  The precision for (1, bbb) is defined as “56789”. Key here will be 5 and the value will be (1, bbb).

Now, let us try to understand the below with an example:

Mapper<LongWritable, Text, Text, IntWritable> {

Consider, we have the first line in the file as “Hi! How are you”.

The mapper input key value is (0, Hi!), (4, How), (8, are), (12, you). Therefore, the key generated by mapper class has a data type “LongWritable” i.e. the first parameter and the value generated by mapper class is “Text”.

The mapper output value would be the word and the count of the word i.e. (Hi!,1), (How,1), (are,1), (you, 1).

If the word “are” repeated twice in the sentence then the mapper output would be (are,1,1). Hence, the key of the mapper output is “Text” while as the value is “IntWritable”. This output to the mapper is getting

This output to the mapper is getting fed as the input to the reducer. Therefore, if the reducer input is (are, 1, 1) then the output of the reducer will be (are,2). Here, the reducer output data type has the key as “Text” and value as “IntWritable”.

Step 3

Define the map class. The key and value input pair have to be serializable by the framework and hence need to implement the Writable interface.

Output pairs do not need to be of the same types as input pairs. Output pairs are collected with calls to context.

Inside the static class “map” we are declaring an object with the name “one” to store the incremental value of the given word and the particular word is stored in the variable named “word”.

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);

}
}
}

The above piece of code takes each line as an input and stores it into the variable “line”. StringTokenizer allows an application to break a string into tokens. For example:

StringTokenizerst = new StringTokenizer(“my name is kumar”,” “);

The output of the above line will be: my
                                      name
                                      is
                                      kumar

If the “tokenizer” variable has more number of tokens to count then the while loop will get open. The context will take care of executing the for loop i.e. to read line by line of the file and store the output as the particular word and their occurrences. For example: if you have “hai, hai, hai” then the context will store (hai, 1, 1, 1)

Step 4

Reduce class will accept shuffled key-value pairs as input.The code then totals the values for the key-value pairs with the same key and outputs the totaled key-value pairs; e.g. <word,3>

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}

Step 5

The main method sets up the Map Reduce configuration by defining the type of input. In this case, the input is text.The code then defines the Map, Combine, and Reduce classes, as well as specifying the input/output formats.

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");//Name for the job is “wordcount”
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class); // Mapper Class Name
job.setReducerClass(Reduce.class); //Reducer Class Name
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);

}
}

Step 6

The full Java code for the “word count” program is as below:

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);

}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritableval : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}

Hadoop 1 Architecture – Step by Step Description, Limitations

In this post, we will learn Hadoop 1 Architecture and step by step description of the architecture. Hadoop 1 Architecture had some limitations which have been addressed in Hadoop 2.x.

Hadoop 1 Architecture
Hadoop 1.x Architecture

Hadoop 1 Architecture Description

  • One or more HDFS Clients submit the job to Hadoop System.
  • When the Hadoop System receives a client request, it first contacts the Master Node. Master Node consists of Name node, secondary name node and data nodes which form the HDFS layer while as job tracker in the Master node gives us MapReduce layer.
  • Once you write a MapReduce java program say using Eclipse IDE, you convert the program into a runnable jar file. Job tracker then receives this runnable jar.
  • Now, job tracker needs to know on which commodity machine the data blocks are residing. Name node will give the information about machine i.e., the IP address of commodity machine where data blocks are residing.
  • Slave node MapReduce component, i.e., task tracker, receives the runnable jar from job tracker and perform those tasks using map reduce components.
  • Task Tracker will create a JVM (java virtual machine) to execute the runnable jar. The program will first run the mapper routine. The mapper routine needs the key and value pair which is fetched by the task tracker. Task tracker internally accesses the data blocks residing on slave nodes.
  • Mapper routine will put the result set in the context which is also a 64 MB block by default.
  • Task Tracker will create another JVM where the reducer routine will run. Reducer takes the input as mapper output and then shuffle, sort and reduce the data blocks and finally gives you summarized information as output.
  • Once all Task Trackers finishes their jobs, Job Tracker takes those results and combines them into final result-set
  • Hadoop client then receives the final result.

Hadoop 1.x Limitations

  • Hadoop 1.x supports only MapReduce-based Batch/Data Processing Applications.
  • It does not support Real-time data processing.
  • It allows only one name node and one namespace per cluster to configure, i.e. it does not support federated architecture. The entire Hadoop cluster will go down if the name node fails.
  • Job tracker is the single point of failure which has to perform multiple activities such as Resource Management, Job Scheduling, and Job Monitoring, etc.
  • Hadoop 1.x does not support Horizontal Scalability.
  • It can support maximum of 4000 nodes and maximum of 40,000 concurrent tasks in the cluster.

Hadoop Commands – HDFS dfs commands, Hadoop Linux commands

Hadoop commands list is a lot bigger than the list demonstrated here, however, we have explained some of the very useful Hadoop commands below.

“hadoop fs” lists all the Hadoop commands that can be run in FsShell

“hadoop fs -help ” will display help for that command where is the actual name of the command.

Hadoop Commands and HDFS Commands

All HDFS commands are invoked by the “bin/hdfs” script. If we will run the hdfs scripts without any argument then it will print the description of all commands.

CommandsUsagesDescription
classpathhdfsclasspathIt prints the class path needed to get the Hadoop jar and the required libraries.
lshadoop fs -ls /List the contents of the root directory in HDFS
versionhadoop versionPrint the Hadoop version
dfhadoop fs -dfhdfs:/amount of space used and available on currently mounted filesystem
balancerhadoop balancerRun a cluster balancing utility
mkdirhadoop fs -mkdir /usr/training/hadoop_filesCreate a new directory hadoop_files below the /usr/training directory in HDFS
puthadoop fs -put /data/myfile.txt /usr/training/hadoop_filesAdd a sample text file from the unix local directory (/data/myfile.txt) to the HDFS directory /usr/training/hadoop_files
lshadoop fs -ls /usr/training/hadoop_filesList the contents of this new directory in HDFS.
puthadoop fs -put /data/finance /usr/training/hadoop_filesAdd the entire local unix directory to HDFS filesystem (/usr/training/hadoop_files)
duhadoop fs -du -s -h hadoop_files/financeSee how much space a given directory occupies in HDFS.
rmhadoop fs -rm hadoop/finance/myfile.txtDelete a file “myfile.txt” from the “finance” directory.
rmhadoop fs -rm hadoop_files/finance/*Delete all files from the “finance” directory using a wildcard.
expungehadoop fs –expungeTo empty the trash
cathadoop fs -cat hadoop_files/myfile.txtSee the content of “myfile.txt” present in /hadoop_file directory
copyToLocalhadoop fs -copyToLocalhadoop_files/myfile.txt /scratch/dataAdd the myfile.txt file from “hadoop_files” directory which is present in HDFS directory to the directory “data” which is present in your local directory
gethadoop fs -get hadoop_files/myfile.txt /scratch/dataget command can be used alternaively to “copyToLocal” command
chmodsudo -u hdfs hadoop fs -chmod 600 hadoop_files/myfiles.txt Use “-chmod” command to change permissions of a file. Default file permissions are 666 in HDFS
mvhadoop fs -mv hadoop_filesapache_hadoopMove a directory from one location to othe
expungehadoop fs -expungeCommand to make the name node leave safe mode

Hadoop fs commands – HDFS dfs commands

CommandsUsagesDescription
fshadoop fsList all the Hadoop file system shell commands
helphadoop fs –helpHelp for any command
TOUCHZhdfs dfs –touchz /hadoop_files/myfile.txtCreate a file in HDFS with file size 0 bytes
rmrhdfs dfs –rmr /hadoop_files/Remove the directory to HDFS
counthdfs dfs –count /userCount the number of directories, files, and bytes under the paths that match the specified file pattern.

Hadoop Linux commands

CommandExampleDescription
lsls -l

ls -a

ls -l /etc

Lists files in current directory.If you run ls without any additional parameters, the program will list the contents of the current directory in short form.
-l
detailed list
-a
displays hidden files
cpcp [option(s)] <sourcefile> <targetfile>

cp file1 new-file2

cp -r dir1 dir2

Copies sourcefile to targetfile.
-i
Waits for confirmation, if necessary, before an existing targetfile is overwritten
-r
Copies recursively (includes subdirectories)
mv$ mv file_1.txt /scratch/kmakMove or rename files. Copies sourcefile to targetfile then deletes the original sourcefile.
rmrm myfile.txt

rm -r mydirectory

Removes the specified files from the file system. Directories are not removed by rm unless the option -r is used.
lnln file1.txt file2.txtln creates links between files.
cdcd  /scratch/kmak/biChanges the shell’s current working directory.
pwdpwdPrint working directory.It writes the full pathname of the current working directory to the standard output.
mkdirmkdir <mydir>It is used to create directories on a file system.
rmdirrmdir <emptydir>Deletes the specified directory provided it is already empty.
nlnl myfile.txtnl numbers the lines in a file.
geditgedit myfile.txtText editor
statstat myfile.txtDisplays the status of an entire file system.
wcwc myfile.txt
wc -l myfile.txt
wc -c myfile.txt
It is used to find out the number of newline count, word count, byte, and characters count in a file specified by the file arguments.
chownchown chope file.txt
chown -R chope /scratch/work
It changes the owner and owning group of files.
chgrpchgrp oracle myfile.txtChanges group ownership of a file or files.
ifconfigIfconfigIt is used to view and change the configuration of the network interfaces on your system.

Hadoop 2 Architecture – Key Design Concepts, YARN, Summary

Hadoop 2 came up to overcome the limitations of Hadoop 1.x. Hadoop 2 architecture overcomes previous limitations and meets the current data processing requirements.

Hadoop 2 Architecture – Key Design Concepts

  • Split up the two major functions of job tracker
  • Cluster resource management
  • Application life-cycle management
  • MapReduce becomes user library or one of the applications residing in Hadoop.

Key concepts to understand before getting into Hadoop 2 Architecture details.

Application

The application is the job submitted to the framework. For example – MapReduce Jobs

Container

A container is a basic unit of allocation which represents a physical resource such as Container A = 2GB, 1CPU.

The node manager provides containers to an application. Each mapper and reducer runs in its own container to be accurate. The AppMaster allocates these containers for the mappers and reducers tasks.

Therefore, a container is supervised by node manager and scheduled by a resource manager.

Resource Manager

It is global resource scheduler which tracks resource usages and node health. The resource manager has two main components: scheduler and application manager.

The responsibility of scheduler is to allocate resources to various running applications.

It is important to note that resource manager doesn’t facilitate any monitoring of the applications.

Application Manager is responsible for negotiating the first container for executing the application specific Application Master and provide the service for restarting the containers on failure.

Node Manager

Node Manager manages the life cycle of the container. It also monitors the health of a node.

Application Master

It manages application scheduling and task execution. It interacts with the scheduler to acquire required resources and it interacts with node manager to execute and monitor the assigned tasks.

YARN (Yet Another Resource Negotiator)

YARN is the key component of Hadoop 2 Architecture. HDFS is the underlying file system.

It is a framework to develop and/or execute distributed processing applications. For Example MapReduce, Spark, Apache Giraph etc.

Please go to post YARN Architecture to understand Hadoop 2 architecture in detail.

Hadoop 2 Summary

Apache Hadoop 2.0 made a generational shift in architecture with YARN being integrated to whole Hadoop eco-system.

It allows multiple applications to run on the same platform. YARN is not only the major feature on Hadoop 2.0. HDFS has undergone major enhancement in terms of high availability (HA), snapshot and federation.

Name node high availability facilitates automated failover with a hot standby and resiliency for name node master service.

In high availability (HA) cluster, two separate machines are configured as Namenode. One Name node will be in the active state while as other will be in standby state.

All client operation is done by active node while as other acts as the slave node. Slave node is always in sync with the active node by having access to the directory on a shared storage device.

Snapshots create the recovery for backups at a given point of time which we call it the snapshot at that point.

To improve scalability and isolation, it does federation by creating multiple namespaces.