CS 61A Lab 12

MapReduce

Starter Files

This lab makes extensive use of starter files that we provide here, so make sure to download them! After downloading the zip archive, don't forget to extract the files.

Introduction: MapReduce

In this lab, we'll be working with MapReduce, a programming paradigm developed by Google, which allows a programmer to process large amounts of data in parallel on many computers.

A computation in MapReduce consists two components: the mapper and the reducer.

The following diagram summarizes the entire MapReduce pipeline:

Mapreduce Diagram

This lab is split up into two parts: 1. Serial MapReduce: to introduce our MapReduce framework, we will first use a non-parallelized version. Even with this limitation, we can still do quite a lot of data processing! 2. Parallelized MapReduce: the real power of MapReduce comes from parallelization. The same MapReduce jobs from Part 1 can be executed much faster by using multiple computers (i.e. a cluster) at the same time. For this lab, we will be using Hadoop, an open source implementation of the MapReduce paradignm.

Part 1: Serial MapReduce

In this section, we introduce the framework for mappers and reducers. We will be running MapReduce jobs locally, so no parallelization occurs during this section. However, observe that we can still do an impressive amount of data processing just by defining two simple modules: the mapper and the reducer!

Example: Line-Counting

Our first exercise will be counting the number of lines in one of Shakespeare's plays.

To formulate this as a MapReduce problem, we need to define an appropriate mapper and reducer function.

Recall what the mapper does: for each line in a text file, the mapper outputs a key-value pair. What should our key-value pairs be for our line counting example?

For example, the mapper will take in an input file like this:

so much depends
upon
a red wheel
barrow
glazed with rain
water
beside the white
chickens.

(notice there are 8 lines); it then outputs a sequence of key-value pairs like this:

'line'  1
'line'  1
'line'  1
'line'  1
'line'  1
'line'  1
'line'  1
'line'  1

The reducer takes this sequence and simply adds up all the values that are associated with the key 'line':

'line'  8

This is illustrated by the following diagram:

Linecount example

Let's examine the code mapper and reducer.

The Mapper: line_count.py

In your current directory should be a file line_count.py with the following body:

#!/usr/bin/env python3

import sys
from ucb import main
from mr import emit

@main
def run():
    for line in sys.stdin:
        emit('line', 1)

line_count.py is the mapper, which takes input from stdin (i.e. 'standard in') and outputs one key-value pair for each line to stdout (i.e. 'standard out', which is typically the terminal output).

Let's run line_count.py by feeding it the_tempest.txt (provided with the starter files). The question is, how do we give the_tempest.txt to line_count.py via stdin? We'll use the Unix pipe '|' feature (Note: the 'pipe' key '|' isn't lowercase 'L', it's (typically) Shift+Backslash):

Note: You will probably have to tell Unix to treat line_count.py as an executable by issuing the following command:

chmod +x line_count.py

Once you've made line_count.py executable, type in the following command:

cat the_tempest.txt | ./line_count.py

Recall that the cat program will display the contents of a given file to stdout.

If you've completed line_count.py correctly, your terminal output should be full of key-value pairs, looking something like:

'line' 1
'line' 1
'line' 1
...
'line' 1

Unix pipe-ing takes the output of one program (in this case, the cat program), and 'pipes' it as the input to another program (typically via stdin). This technique of piping programs together is called "mudlar programming" and is ubiquitous in Unix-style programming. Modular programming allows us to write small, simple programs and chain them together to accomplish complicated tasks.

The Reducer: sum.py

In your current directory should be the file sum.py. The body of this file should be:

#!/usr/bin/env python3

import sys
from ucb import main
from mr import values_by_key, emit

@main
def run():
    for key, value_iterator in values_by_key(sys.stdin):
        emit(key, sum(value_iterator))

Let's break down the process:

  1. values_by_key is a function that reads input from stdin, and groups all key-value pairs that have the same key. For example,

    'line'  1
    'line'  1
    'line'  1
    

    will turn into the following pair:

    ('line', [1, 1, 1])
    

    Note: the second element should actually be an iterator, not a Python list; it is represented with square brackets for visual clarity.

  2. The variables key and value_iterator get bound to their respective values in the example above:

    key: 'line'
    value_iterator: [1, 1, 1]
    
  3. For each of these key-iterator pairs, sum.py will add up all the values in the iterator and output this new value with the same key:

    'line'  3
    

    The emit function prints out a key and a value in the format shown above. emit also handles logistics for parallelization, which becomes important in Part 2 of the lab.

You can think of the reducer as taking all the values of a key and collapsing it into a single value.

Putting it all together

Now that we have the mapper and the reducer defined, let's put it all together in the (simplified) MapReduce framework:

map sort reduce

Note: You will probably have to tell Unix to treat sum.py as an executable by issuing the following command:

chmod +x sum.py

Once you've done this, issue the following Unix command:

cat the_tempest.txt | ./line_count.py | sort | ./sum.py

Notice that we're using the Unix program sort, which is a built-in Unix program. As you'd expect, sort will, given a file, sort the lines of the file - by default, it will sort it alphabetically.

Take a moment and make sure you understand how the above Unix command is exactly the MapReduce framework (Map -> Sort -> Reduce). What's neat is that, in a very simple manner, we executed the MapReduce idea of using mappers and reducers to solve a problem. However, the main benefit of using the MapReduce idea is to take advantage of distributed computing - don't worry, we'll do that soon!

Exercises

Question 1: Use the MapReduce framework (i.e. Map -> Sort -> Reduce) to count the number of times the following common words occur:

A question to ponder is: will you need to create a new mapper, a new reducer, or both?

Part 2: Parallelized MapReduce

Now that we are familiar with the MapReduce framework, it's time to parallelize the process! Parallelization across multiple computers allows programmers to process vast amounts of data (think Google or Facebook) in a reasonable amount of time.

In this part of the lab, we will be using the Hadoop implementation of MapReduce. The provided file mr.py will take care of the details of communicating with Hadoop through Python. All you have to worry about is writing the mapper and reducer, just like before!

Getting started

In order to use Hadoop, you need to connect to a special Berkeley server called icluster1. This server is able to make connections to a cluster of computers for distributed computing. You can connect just like you normally would to a Berkeley server:

ssh -X icluster1.eecs.berkeley.edu

You will be asked if you want to remember the RSA signature -- type yes. You will then be asked to login to your class account.

Note: you will only be able to do this part of the lab if you ssh onto the icluster1 server.

Finally, some Unix environment variables need to be set. Go to the directory containing the lab starter files. One of them should be a file called envvars. Simply run the following command:

source envvars

Now you're ready to start using Hadoop!

Terminology and Commands

For various reasons, the Hadoop framework uses its own filesystem separate from the filesystems on your class account. To interact with the Hadoop filesystem, we'll be using mr.py:

Example: Line-counting with Hadoop

We are going to perform the same line counting example as we did in Part 1, but with Hadoop. Make sure that your line_count.py, sum.py, and mr.py are in the current directory, then issue the command:

python3 mr.py run line_count.py sum.py ../shakespeare.txt mylinecount

Your terminal should then be flooded with the busy output of Hadoop doing its thing. In particular, the output should contain lines that look like this:

map 0%  reduce 0%
map 100%  reduce 0%
map 100%  reduce 17%
map 100%  reduce 67%
map 100%  reduce 100%
Job complete: job_201311261343_0001
Output: output/mylinecount

This tells you the progress of your MapReduce job, specifically how many mappers and reducers have completed.

Once it's finished, you'll want to examine the Hadoop results! To do this, first issue the following command to see the contents of your Hadoop directory:

python3 mr.py ls

You should see a directory listing for your mylinecount job. To view the results of this job, we'll use mr.py's cat command:

python3 mr.py cat mylinecount/part-00000

As an interesting reference point, one TA ran this MapReduce job on a lightly-loaded icluster1, but totally in serial, meaning that each map job had to be done sequentially. The total line_count job took on the order of 5-8 minutes. How much faster was it to run it with Hadoop using distributed computing, where the work can be done in parallel?

Exercises

Question 2: Take your solution from Question 1 and run it through the distributed MapReduce (i.e. by using mr.py) to discover the number of occurrences of the following words in the entirety of Shakespeare's works:

Question 3: One common MapReduce application is a distributed word count. Given a large body of text, such as the works of Shakespeare, we want to find out which words are the most common.

Write a MapReduce program that returns each word in a body of text paired with the number of times it is used. For example, calling your solution with ../shakespeare.txt should output something like:

the 300
was 249
thee 132
...

Note: These aren't the actual numbers.

You probably will need to write a mapper function. Will you have to write a new reducer function, or can you re-use a previously-used reducer?

Working with the Trends Project

Question 4a: We've included a portion of the trends project in the file that you copied at the beginning of the lab in the files "trends.py" and "data.py". We're going to calculate the total sentiment of each of Shakespeare's plays much the same way that we calculated the total sentiment of a tweet in the trends project.

In order to do this, we need to create a new mapper. The skeleton for this new mapper is in the file sentiment_mapper.py. Fill in the function definition so that it emits the average sentiment of each line fed to it.

Note that we need to provide our code with the big sentiments.csv file. We've already stored this for you on the distributed file system that Hadoop uses. To make sure the file is available to our code, we use the run_with_cache command instead of the "run" command which allows us to provide one additional parameter: the path (on the virtual file system) to the cache file which contains the sentiments. Don't worry too much about this part -- it's just the specifics of our implementation.

Long story short, we will use the following command to run this map reduce task:

python3 mr.py run_with_cache sentiment_mapper.py sum.py ../shakespeare.txt MY_OUTFILE ../sentiments.csv#sentiments.csv

More Fun Exercises!

Question 4b: Now, we will determine the most commonly used word. Write a Python script file most_common_word.py that, given the output of the program you wrote in part 3 (via stdin), returns the most commonly used word. The usage should look like (assuming you named the Hadoop job output q3):

# python3 mr.py cat q3 | python3 most_common_word.py

Question 4c: Now, write a Python script file that, given the MapReduce output from Q3 (via stdin), outputs all words used only once, in alphabetical order. Finally, output the results into a text file singles.txt. The Unix command should look like this:

# python3 mr.py cat q3 | python3 get_singles.py | sort > singles.txt

Question 5: In this question, you will write a MapReduce program that, given a phrase, outputs which play the phrase came from.

Then, use your solution to figure out which play each of the following famous Shakespeare phrases came from:

Hint: In your mapper, you'll want to use the get_file() helper function, which is defined in the mr.py file. get_file() returns the name of the file that the mapper is currently processing - for ../shakespeare, the filenames will be play names. To import get_file, include the following line at the top of your Python script:

from mr import get_file

Also, you might want to look at the included set.py reducer which reduces the values of each key into a set (i.e. removing duplicates).

To run the MapReduce job, use the following command:

python3 mr.py MAPPER REDUCER ../shakespeare MY_OUTFILE

where MAPPER is the name of your mapper file and REDUCER is the name of your reducer file.