To get the files necessary for this lab (such as mr.py), let's copy the relevant lab files:
~ # cd ~ # cp -r ~cs61a/lib/lab16_mr . ~ # cd lab16_mr
For this lab, we'll be using MapReduce, a programming paradigm developed by
Google, which allows a programmer to process large amounts of data in parallel
on many computers. Hadoop is an opensource implementation of the MapReduce
design.
Any computation in mapreduce consists primarily of two components: the
mapper and the reducer.
The mapper takes an input file, and outputs a series of
key-value pairs, like:
age 29 name cecilia job gradstudent salary 42In the above, the key-value pairs are:
(age -> 29), (name -> cecilia), (job -> gradstudent), (salary -> 42)The reducer takes the (sorted) output from the mapper, and outputs a single value for each key. The mapper's output will be sorted according to the key.
The MapReduce Pipeline
Your job will be to write the mapper and the reducer.
Let's see an example of using the MapReduce idea to count the number of lines in
Shakespear's works.
On our servers, we happen to have all of Shakespeare's plays in text format. For instance,
if you're so inclined, feel free to read a few phrases from 'Romeo and Juliet':
# emacs ~cs61a/lib/shakespeare/romeo_and_juliet.txt &Or how about...'The Tempest'?
# emacs ~cs61a/lib/shakespeare/the_tempest.txt &Anyways, we'd like to be able to count all the lines in all of his plays. Choose a Shakespeare play (say, the_tempest.txt) and copy it into your current directory by doing:
# cp ~cs61a/lib/shakespeare/the_tempest.txt .
To formulate this as a MapReduce problem, we need to define an
appropriate mapper and reducer function.
One way to do this is to have the mapper create a key-value pair for every
line in each play, whose key is always the word 'line', and whose value is
always 1.
The reducer would then simply be a simple sum of all the values, as this
picture illustrates:
Line-counting in MapReduce
Let's implement each feature (mapper, reducer) separately, then see how each piece fits together.
#!/usr/bin/env python3 import sys from ucb import main from mr import emit @main def run(): for line in sys.stdin: emit('line', 1)
line_count.py is the mapper, which takes input from stdin (i.e.
'standard in') and outputs one key-value pair for each line to stdout
(i.e. 'standard out', which is typically the terminal output).
Let's try running line_count.py by feeding it the_tempest.txt.
The question is, how do we give the_tempest.txt to line_count.py
via stdin? We'll use the Unix pipe '|' feature (Note: the
'pipe' key '|' isn't lowercase 'L', it's (typically) Shift+Backslash):
Note: You will probably have to tell Unix to treat line_count.py as
an executable by issuing the following command:
# chmod +x line_count.pyOnce you've made line_count.py executable, type in the following command:
# cat the_tempest.txt | ./line_count.pyRecall that the cat program will display the contents of a given file to stdout.
'line' 1 'line' 1 'line' 1 ... 'line' 1What pipe-ing does in Unix is take the output of one program (in this case, the cat program), and 'pipe' it into the input to another program (typically via stdin). This technique of pipe-ing programs together is ubiquitous in Unix-style programming, and is a sign of modular programming. The idea is: if you can write modular programs, then it will be easy to accomplish tasks by chaining together multiple programs. We'll do more with this idea in a moment.
In your current directory should be the file sum.py. The body of this file should be:
#!/usr/bin/env python3 import sys from ucb import main from mr import values_by_key, emit @main def run(): for key, value_iterator in values_by_key(sys.stdin): emit(key, sum(value_iterator))
This is the reducer, which reads in sorted key-value pairs from stdin, and
outputs a single value for each key. In this case, sum.py will return
the sum of all the values for a given key. In other words, the reducer
is reducing the values of a given key into a single value.
The emit procedure takes in two arguments: a key, and a
reduced_value, and performs the necessary bookkepping so that
Hadoop is aware that we are combining all key-value pairs from the mapper
(here, stdin) with the key key into the single value
reduced_value.
For the purposes of this simple line-counter, since the mapper only returns
one type of key ('line'), the reducer will also only return one
value - basically the total number of key-value pairs.
Now that we have the mapper and the reducer defined, let's put it all together in the (simplified) MapReduce framework:
The MapReduce Flow
Note: You will probably have to tell Unix to treat sum.py as an executable by issuing the following command:
# chmod +x sum.pyOnce you've done this, issue the following Unix command:
# cat the_tempest.txt | ./line_count.py | sort | ./sum.pyNotice that we're using the Unix program sort, which is a 'built-in' Unix program. As you'd expect, sort will, given a file, sort the lines of the file - by default, it will sort it alphabetically.
Use the MapReduce framework (i.e. Map -> Sort -> Reduce) to count the number of times the following (common) words occur:
the he she it theeA question to ponder is: will you need to create a new mapper, a new reducer, or both?
We have provided a way to practice making calls to the MapReduce framework (using the Hadoop implementation). The provided file mr.py will take care of the details of communicating with Hadoop via Python. Here are a list of commands that you can give to mr.py:
Note: Some terminology. The Hadoop framework, for its own reasons, maintains its own filesystem separate from the filesystems your instructional accounts are on. As such, the following Hadoop filesystem commands are performed with respect to your Hadoop filesystem.
To use the distributed-computing power, you'll need to SSH into the icluster servers (Hadoop is installed only on these machines). To do this, issue the following terminal command:
# ssh -X icluster1.eecs.berkeley.eduYou will be prompted to log in.
Then, some environment variables need to be set - issue the following Unix command:
source lab16_envvarsNote: For some reason, to run some of the commonly-used Unix commands on icluster1 (such has mv, rm, etc), you may need to fully specify the filepath of the program, like:
# /bin/mv myfile.py newmyfile.py-->
# python3 mr.py cat OUTPUT_DIR
This command prints out the contents of all files in one of the directories on the Hadoop FileSystem owned by you (given by OUTPUT_DIR).
# python3 mr.py ls
This command lists the contents of all output directories on the Hadoop FileSystem.
# python3 mr.py rm OUTPUT_DIR
This command will remove an output directory (and all files within it) on the Hadoop FileSystem. Use this with caution - remember, there's no 'undo'!
# python3 mr.py run MAPPER REDUCER INPUT_DIR OUTPUT_DIR
This command will run a MapReduce job of your choosing, where:
# python3 mr.py run line_count.py sum.py ../shakespeare mylinecountYour terminal should then be flooded with the busy output of Hadoop doing its thing. Once it's finished, you'll want to examine the Hadoop results! To do this, first call mr.py's ls command to see the contents of your Hadoop directory:
# python3 mr.py lsYou should see a directory listing for your mylinecount job. To view the results of this job, we'll use mr.py's cat command:
# python3 mr.py cat mylinecountAs an interesting reference point, one TA ran this MapReduce job on a lightly-loaded icluster1, but totally in serial, meaning that each map job had to be done sequentially. The total line_count job took on the order of 5-8 minutes. How much faster was it to run it with Hadoop using distributed computing, where the work can be done in parallel?
Take your solution from Question 1 and run it through the distributed MapReduce (i.e. by using mr.py) to discover the number of occurrences of the following words in the entirety of Shakespeare's works:
the he she it thee
One common MapReduce application is a distributed word count. Given a large body of text, such as
the works of Shakespeare, we want to find out which words are the most common.
Write a mapreduce program that returns each word in a body of text paired with the number of times
it is used. For example, calling your solution with ../shakespeare should
output something like:
the 300 was 249 thee 132 ...
Note: These aren't the actual numbers.
You probably will need to write a mapper function. Will you have to write a new reducer function, or can you re-use a previously-used reducer?
Now, we will determine the most commonly used word. Write a Python script file most_common_word.py that, given the output of the program you wrote in part 3A (via stdin), returns the most commonly used word. The usage should look like (assuming you named the Hadoop job output wordcounts):
# python3 mr.py cat wordcounts | python3 most_common_word.py
Now, write a Python script file that, given the mapreduce output from Q3A (via stdin), outputs all words used only once, in alphabetical order. Finally, output the results into a text file singles.txt. The Unix command should look like this:
# python3 mr.py cat wordcounts | python3 get_singles.py | sort > singles.txt
In this question, you will discover write a MapReduce program that,
given a phrase, outputs which play the phrase came from.
Then, use your solution to figure out which play each of the following
famous Shakespeare phrases came from:
pomp and circumstance foregone conclusion full circle strange bedfellows neither rime nor reason spotless reputation one fell swoop seen better days it smells to heaven a sorry sightHint: In your mapper, you'll want to use the get_file() helper function, which is defined in the mr.py file. get_file() returns the name of the file that the mapper is currently processing - for ../shakespeare, the filenames will be play names. To import get_file, include the following line at the top of your Python script:
from mr import get_fileAlso, you might want to look at the included set.py reducer which reduces the values of each key into a set (i.e. removing duplicates).
Fin.