If you can’t explain it to a six year old, you don’t understand it yourself.

— Albert Einstein

A glimpse to the problem…​

problem lego
Problem…​
Imagine a giant toy box, filled to the brim with lego bricks. You could build all sorts of things, castles, houses, and even pirate ships. But with a box as big as you are and thousands of bricks all jumbled up it could take a very long time to count all the pieces. And what if you need to buy tons of new bricks in the near future. There is no bigger box and there never will be. What can you do instead?

Divide and Conquer

parallel processing

By splitting your lego Brick into more toy box, you could easily buy a bunch of inexpensive toy boxes. In computing, this is called horizontal scaling. Moreover, if you keep all your Lego in a single box and you need to count them all, you have to it by yourself.

Problem…​
Now, with so many boxes, you could count the pieces a lot more quickly. Did you see how?

Invite your friends to the party!

mapreduce lego

To determine how many bricks you have, you have to count each box separately and add the results. The true difference is that you do not need to do that alone. Why not invite your friends to help you to the task? That’s what we called parallel processing. How many of them? As many as the number of toy boxes! Each friend counts the number of bricks in his attributed box, and tell you the result. Add these results to know the total count. It is just as simple as that. In Hadoop terminology, you have just applied the MapReduce framework! Your friends represent the map task (“count the number of bricks in one box”) and you, the reduce task (add the intermediary results”).

Show me the code!
function map(ToyBox toyBox):
  count = 0
  for bricks in toyBox:
    count += 1
  emit(count)

function reduce(List<Count> partialCounts):
  total = 0
  for count in partialCounts:
    total += count
  emit(total)

This is only an informal definition of the MapReduce framework. To really grasp the full potential of the approach, we need to turn our attention to an equally interesting but more challenging problem. Let’s Go!

problem books

We know want to count the word occurrences in Jules Verne’s books. Do you see how to apply the MapReduce framework on this task? For simplicity, admit we have Twenty Thousand Leagues Under the Sea and Journey to the Center of the Earth at our disposition. All your friends except one has returned home.

Problem…​
Do you see how to apply the MapReduce framework to solve this problem?

MapReduce again

mapreduce words

For each book, you need to write (on paper for example) the list of words with their count. Then, everyone brings their notes to the person responsible of aggregating everything together. So, each map task returns the count for each word and the reduce task add the count of each word returned by the two mappers.

We could know define formally the Map and Reduce functions as follows:

Map(k1,v1) → list(k2,v2)
Reduce(k2, list(v2)) → list(v3)

Applied to the problem at hand:

Map(Book’s title, Content) → list(Word, Count)
Reduce(Word, list(Count)) → list(Count)
Show me the code!
function map(String title, String content):
  // title: books title
  // content: books content
  for each word w in content:
    emit (w, 1)

function reduce(String word, List<Integer> partialCounts):
  // word: a word
  // partialCounts: a list of aggregated partial counts
  sum = 0
  for count in partialCounts:
    sum += count
  emit (word, sum)

Here, each book is decomposed into words, and each word is counted by the map function, using the word as the result key. Under the hood, the framework puts together all the pairs with the same key and feeds them to the same call to reduce. Thus, this function just needs to sum all of its input values to find the total appearances of that word.

Do you know it?
The “Word Count” problem is sometimes considered like the “Hello World” problem of distributed computing.

Bonus: Could you apply the same logic to count the number of bricks for each color?

Problem…​
Imagine the number of words present in the books. That’s huge! The reduce task could take a very long time to finish. Do you see a way to accelerate considerably this task? (Hint: consider that each kid (mapper) writes each word with its partial count on a separate post-it. In the current solution, every kid brings all of his post-its to the kid doing the reduce).

Could you help me, please?

Instead of sending all the words to a single person, you could easily distribute them among multiple persons (multiple reducers). The redistribution is called the shuffle.

When using only one reducer, the shuffle algorithm is simple: send every mapper output to the reducer. But when using multiple reducers, the algorithm could be more elaborate. For the problem at hand, we could say every word starting with the letter A-M will be sent to Youcef, while every word starting with the letter N-Z will be sent to Julien. This is an example of shuffling algorithm.

Bonus: Could we use multiple reducers to solve the total brick count problem? Why?

Problem…​
Summing the partial counts of every word is a challenging task even when using multiple reducers. Imagine if each mapper is assigned 5 books to read (the mapper task is executed five times by each person), there will be 5 post-its with the same word and a different count for each one of the books. What if each mapper reads 20 books? 50 books? … That could make a tons of post-its to shuffle! Do you have an better idea?

The Combiner

Instead of sending the list of post-its, each friend could aggregate the results of the different books he reads and only bring this aggregated result. So, instead of aggregating the result of each book individually, we could aggregate the result of each friend. If each friend is responsible of many books, that could represent a huge difference. This idea is named a Combiner in the MapReduce framework, and will often correspond to the same logic as the Reducer (as in this example), but this is not required by the framework. The main difference is the Combiner is executed by the person in charge of the map task, while the Reducer is executed by another person. In practice, the Combiner limit the amount of data the Mappers have to send to the Reducer.

We have now seen all the elements of the MapReduce framework. Let’s try to define it more formally.

MapReduce: Definition

MapReduce is a programming model for processing large data sets with a parallel, distributed algorithm. A MapReduce program is composed of :

  • A Map function that performs filtering over a manageable part of the large data set

  • A Reduce function that performs a summary operation on the results of the Map function

mapreduce

We commonly use multiple mappers (as many as the number of elements) to process the data set. If the data set augments, just add new mappers to keep the processing time constant. We could also use multiple reducers when the mappers generate a considerable volume of data, but in practice, we will have a lot more mappers than reducers.

When executing the MapReduce framework on a cluster of machines (e.g. Hadoop), a single machine could execute multiple mappers tasks and could also be used to execute a reducer task.

Optimizing the communication cost is essential to a good MapReduce algorithm. To avoid having to shuffle a massive amount of data, you could use a combiner function to aggregate results on the mapper side. The reducer function will often be used as the combiner function.

Show me the code!
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
Problem…​
Now, let’s admit we don’t have the books at disposal but need to go to the library to find them. Ok, but what if the library is currently closed (or worse, what happen if the library was damaged by natural disaster and all the books are damaged). Do you have a solution ?

Replicate to better survive

duplicate

We are lucky! We live in a big city where there is not one, but many libraries.

Having multiple copies of the same books is really interesting in practice. If the book is borrowed by someone else, you could go to another library to borrow an exemplary. If there was only one library and one copy of the book and the person that has borrowed it never returned the book, you could lost valuable information. With the principle of redundancy, you are now fault-tolerant.

Problem…​
You could dispatch your friend among the opened library to each read a subset of the Jules Verne’s books. But how to determine which library has such books?

Just “Google” It?

google

Imagine every library shares the same website where you could discover all the available books. With such a website, you could easily determine where to send each one of your friends to be sure to not miss a book. This centralized “index” is called the namenode in Hadoop and every library represent a different datanode in Hadoop terminology. When you are searching after a book, you contact first the namenode to known the list of datanodes having a copy of the desired book.

Problem…​
Consider the impact of having to process a book containing thousand of pages against the impact of processing thousand of single independent page. Both are not really satisfying. If we try to solve our “word count” problem and one of the book is a very big book, this only book could slow down the whole process. At the opposite, if each person have to search after a sheet of paper, read it, before starting again to process the hundred remaining sheets of paper, that could be terribly inefficient. Have you an idea to keep the processing time constant across multiple book sizes?

Uniform size

uniformize

Use a fixed book size. Instead of storing the big book, split it up to multiple volumes (like an encyclopedia). Instead of storing short papers, condense related papers into a book-length format (like Springer editor’s books). In this way, you could assign multiple persons to parse a really big book: each person is assigned a given volume. In the same way, one person could efficiently parse a collection of papers condensed inside the same book, without having to wander desperately in the aisles of the library. This technique is called fixed-block size and has always been used by operating systems and Hadoop uses it too.

We have now covered enough backgrounds to introduce the system at the heart of Hadoop: HDFS.

HDFS: a definition

HDFS, stands for Hadoop Distributed File System, and is a distributed, scalable file system, used to store the large dataset processed by MapReduce jobs.

HDFS stores large files (typically in the range of gigabytes to terabytes) across multiple machines called datanodes. In practice, each file is splitted into fixed-size blocks and these blocks that are automatically replicated. The block-size is considerably bigger than the block size used by the underlying operating system to minimize the network cost involved when moving a block from one machine to another one.

Hadoop achieves reliability by replicating the data across multiple hosts. If one machine suddenly dies, HDFS automatically copy each of the blocks present on this machine to the remaining machines using the other copies at its disposition. To determine which machine hold the copies of a file block, HDFS used a centralized index named the namenode.

hdfs
Show me the code!

Hadoop includes various shell-like commands to interact with the Hadoop Distributed File System (HDFS). For example:

$ hadoop fs -mkdir /user/hadoop/dir1
$ hadoop fs -put localfile /user/hadoop/dir1/hadoopfile
$ hadoop fs -chmod 755 /user/hadoop/dir1/hadoopfile
$ hadoop fs -cat /user/hadoop/dir1/hadoopfile
$ hadoop fs -rm /user/hadoop/dir1/hadoopfile

In practice, numerous tools of the Hadoop ecosystem interact with HDFS through its Java API.

Under the hood, the file system is not organized like that at all. If we send a 1 GB file and the block size is 128 MB, Hadoop will create 8 blocks and partitions them across the datanodes. Each datanode stores blocks as follows

${dfs.datanode.data.dir}/
├── current
│ ├── BP-1-127.0.0.1
│ │ └── current
│ │ ├── VERSION
│ │ ├── finalized
│ │ │ ├── blk_1
│ │ │ ├── blk_1.meta
│ │ │ ├── blk_2
│ │ │ └── blk_2.meta
│ │ └── rbw
│ └── VERSION
└── in_use.lock

By default, there is only one block pool that contains all the raw block files. If the number of block files increases (tens or hundreds of thousands), a new folder will be created to avoid the problems that most operating systems encounter when there are a large number of files in a single folder.

The namenode has a directory structure like this:

${dfs.namenode.name.dir}/
├── current
│ ├── VERSION
│ ├── edits_0000000000000000001-0000000000000000019
│ ├── edits_inprogress_0000000000000000020
│ ├── fsimage_0000000000000000000
│ ├── fsimage_0000000000000000000.md5
│ ├── fsimage_0000000000000000019
│ ├── fsimage_0000000000000000019.md5
│ └── seen_txid
└── in_use.lock

Conceptually the edit log is a single entity that records every modification users do.

Each fsimage file contains a serialized form of all the directory and file inodes in the filesystem as view by the user. An fsimage file does not record the datanodes on which the blocks are stored. Instead, the namenode keeps this mapping in memory, which it constructs by asking the datanodes for their block lists at start-up.

resource sharing
Problem…​
As Jules Verne’s adventures, our journey towards a comprehension of Hadoop also presents a certain number of challenges. The libraries have a limited number of places. If all the tables are busy, you need to wait until a table become available. No work could be done during that time. It’s hard to find a solution to this problem in the real world. Did you have any idea? (Hint: use your imagination)

YARN

To understand the solution implemented by Hadoop, imagine there exists one person that know the capacity (how many tables are present) for each library. If everyone who need to access a book asks that person first, this person could efficiently dispatch them among all libraries. This person is named YARN and stands for Yet Another Resource Negotiator.

If we dives into details, YARN is composed of many components. If we go back to our library example, let’s say there exists a spy in each library that report to a central person everything that happens: when a table becomes free or busy, etc. This spy is called the Node Manager and there is one instance in each library. The Node Manager supervises containers (the table desks in our example) and reports to the Resource Manager (the person to whom the new job applications are sent).

YARN: a definition

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling/monitoring into separate daemons (both was interwoven in the first version of Hadoop). The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM).

yarn

The ResourceManager and the NodeManager form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system. The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

Let’s go back to the Lego bricks example. Imagine between each construction you build (a character, a house), you have to put all the bricks inside their box, even if they will be used to build the next thing. Ask any kid, that’s just boring (and counterproductive in the case of Hadoop as we will see)!

This limitation is due to the inner working of the MapReduce framework. MapReduce was designed to read input from HDFS and write output to HDFS. There is no state kept between two jobs. If you need to run two MapReduce jobs, you have to write the result of the first step on HDFS for the second step to able to find it.

Moreover, each map and reducer run in its own JVM, so every map or reduce task you launch start a fresh new JVM.

Problem…​
When executing long-running jobs, these limitations was acceptable but we could not say as much when running interactive jobs. Do you have an idea to solve these problems? (Hint: I can see a spark in your eyes…)
To remember
  • Big Data is based on the divide and conquer approach, using parallel processing to solve problems no one could solve alone.

  • Hadoop stores its data in a distributed filesystem, called HDFS. Each information is replicated across multiple nodes in order to be fault-tolerant and support multiple processings of the data at the same time.

  • When analyzing such a big volume of data, it is far more better to execute our job where the data is, than to move the data where you are. It’s the principle of data locality.

  • Hadoop supports the MapReduce framework to describe our jobs. It’s a low-level solution that is easy to grasp for simple job but could be more challenging for complex processing. New abstraction models was defined over time to overcome these limitations (Pig, Hive, Spark).

  • YARN represents the brain behind Hadoop. You just have to describe everything you need to fulfill our task and YARN is responsible for finding all the needed resources.

  • When submitting a MapReduce job, Hadoop manages itself the conversation with YARN but when using higher-level tools such as Spark, the conservation is made directly by the tool.

About Julien Sobczak

Julien Sobczak is a software developer at Zenika, France. As an avid reader, his main area of focus are developer productivity, mental literacy, sport programming, and distributed systems in his daily job.

Read Full Profile

Tags