In this project you will use MapReduce to analyze data from real social networks. In Part 1, you will develop and test your code locally and for Part 2, you will run it on EC2. You will submit your code and answers via git
to your class github repo. You may work with a partner.
Have you ever heard of the infamous six degrees of separation or played the game Six Degrees of Kevin Bacon? They are both outcomes of the small-world property of social networks, which states that the average distance between any two vertices grows very slowly (logarithmically) relative to the size of the network. In practice, even for very large social networks, the median distance is small -- Twitter user follows (4), MSN Message Conversations (6), Facebook friendships (5), and Hollywood co-actors (4).
We will use the large processing capabilities of a Hadoop cluster to analyze data from real social networks to approximate the distance distribution to get a feel for how far apart most things are. To do this, we will randomly pick a subset of the vertices in the graph, and compute their distances to all other vertices. Your final program will take in a social graph and a constant for sampling and it will return a histogram of the approximated distance distribution.
Formally, we can think of each input social network as defining a graph. Each person is a vertex and each relationship is an edge. This graph is unweighted since all relationships (friendships, follows, collaborations) are equivalent. The distance between any two vertices is the number of edges in the shortest path between them. To find the all of the distances from one starting point, one would normally use a Single-source Shortest Path algorithm such as Dijkstra's Algorithm, but since the graph is unweighted, we can get away with a simple Breadth-First Search (BFS). Better yet, the BFS will be much easier to parallelize. To get a feel for how this works, examine the pseudocode for a serial BFS implementation below:
def bfs(vertices, s) for v in vertices dist[v] = -1 dist[s] = 0 queue = [s] for v in queue for n in neighbors(v) if dist[n] == -1 dist[n] = dist[v] + 1 queue += [n] return dist
This variant of breadth-first search will return the distance each vertex is from the starting point s
. Notice that each vertex is visited only once, so its distance is updated if it is ever reached and previously unvisited. Your parallel version for MapReduce will be different, but above example was given to show BFS can compute distances on an unweighted graph.
We will only perform breadth-first searches from a subset of the nodes to save on time since not much more fidelity is gained by examining all of them. The program will: load in the social graph, perform multiple breadth-first searches, and compute the histogram. We recommend performing the searches simultaneously since it will result in less times the graph is copied and it will require less mapreduces, both of which will accelerate processing. To pick the starting points, we search from each vertex with the probability of 1/denom, so in expectation, the program will perform num_vertices/denom searches. We leave the actual number of searches to chance since this method is better for a distributed setting like MapReduce. For each vertex, the choice of whether to search from it is completely parallel, and the only global information it needs is with what probability to select it. It does not require calculating (and distributing) how many vertices are in the graph or making sure each starting vertex exists.
The histogram is simply the totals of how many shortest paths are of each distance.
The input graphs are encoded in Hadoop's SequenceFileType
and each element is (key,value)
= (LongWritable source, LongWritable destination)
. The output should be in Hadoop's TextFormat
, with (key,value)
= (LongWritable distance, LongWritable total)
, where total is the number of shortest paths with that distance. The breadth-first searches should stop early if there are no more distances to be computed, but it also should not run more than the provided MAX_ITERATIONS
= 20 to limit runtime in the presence of a few outliers.
The vertices in each graph are given by long identifiers. The address range is not necessarily contiguous (e.g. could have vertices {0,1,5,9}). Each input relation is intended to be treated as a directed edge. If the original relation is undirected, the other direction for that relation will be somewhere in the input. There can be repeat relations, but there will not be loops (self-edges).
The denom constant will be fed in on the command line. To propagate it to all Hadoop instances in the cluster, the skeleton code will actually write it to a file and use the HDFS caching feature to distribute it. The skeleton code takes care of this for you.
If a vertex is unreachable (or has a distance greater than MAX_ITERATIONS
), it should not contribute to the histogram.
To help understand the intended operation, we provide an example.
We provide you with SmallWorld.java
and a Makefile
in ~cs61c/proj1
. You can copy them to your home directory by:
$ cp -r ~cs61c/proj1 ~
There should not be need for you to create any additional files, but if you do, be sure that make
compiles them all and that main
is still present in the SmallWorld
class in sw.jar
. Feel free to modify SmallWorld.java
, however you want while still completing the program requirements. The code in there is intended to take care of tedious coding details or provide examples of useful things you may want to modify or mimic in your solution.
The skeleton code assumes your code will use three types of mapreduces (graph loading, breadth-first search, and histogram making), and the code in main
supports that. The skeleton code is intended to demonstrate how to chain and even iterate multiple mapreduce jobs. Currently all of the maps and reduces are identity, except LoaderMap
which filters out some of the edges. It will need to be changed, but it is there to demonstrate accessing and using denom
.
The EValue
class is provided to give an example implementation of a class that implements Hadoop's Writable
Interface. This allows it to be the value of a map or reduce phase. For it to be used as a key type for map or reduce, it would need to implement the WritableComparable
interface. EValue
currently packages an enumerated type and a long together, but feel free to modify it to suit your implementation.
Counters are a useful feature of Hadoop that lets you count properties of your program as the data flows by. There are default counters that already count things such as how many map and reduce tasks are done each phase. The skeleton code provides an example of making and reading a new counter on lines 122 and 180. It reads in how many edges the graph input has and prints them out.
You should complete this project on the machines in 330 Soda. If you are not sitting physically in front of one of these lab machines, you can access one of them remotely by following these instructions. The code should run both locally and remotely the same as in lab2 and lab3. We recommend spending the majority of your development time working locally and with a small dataset to speed up and simplify debugging. The syntax for using the completed program is:
$ hadoop jar sw.jar SmallWorld input_graph output_dir denom
~cs61c/p1data/
)ring4.seq
- 4 vertices in a ring (0→1, 1→2, 2→3, 3→0)cit-HepPh.sequ
- 35K vertices from High Energy Physics collaborationss3n://cs61cSp12/
)hollywood.sequ
- Hollywood co-star database circa 2009 (1M actors)wikipedia.seq
- Wikipedia links circa 2011 (5.7M articles) - For fun: not part of assignmenttwitter
- Twitter user follow graph circa 2009 (60M profiles, 1.2B follows) - Very large: if course has budget, will make availablegit
to manage your code for this project. You will need to submit your code using git
, so you might as well take advantage of it while you are developing.TextOutput
. The SequenceFileOutputFormat
is the fastest because it is a compressed binary encoding.ring4.seq
with denom=1 is {(0,4),(1,4),(2,4),(3,4)}cit-HepPh.sequ
on hive
, you should be able to complete within your disk quota with denom=10000.ring4.seq
and cit-HepPh.sequ
are also available in s3n://cs61cSp12/
to test.hc large dfs -rmr hdfs:///*-out hdfs:///user/cs61c-XX/*-out
job
, you can set it in your source file with:job.setNumReduceTasks(1);To set the value globally for all reducers from the command line, use:
hc large jar sw.jar SmallWorld -Dmapred.reduce.tasks=20 ...If you use the command line option, override it to be 1 for your
histogram
reduce (with the source snippet above) because we want only 1 output file.Complete the problem locally and submit SmallWorld.java
. Submit the Makefile
(if modified) or any additional source files if needed.
Before running your code on EC2, run it locally to be sure it is correct and decently efficient. We are providing a solution to use for Part 2 for those whose Part 1 code isn't fast enough at ~cs61c/proj1/sw_ref.jar
. If you code is efficient, please use it.
Submit SmallWorld.java
, Makefile
(if modified), and any additional source files it needs. Additionally, complete the following questions and submit them in proj1.txt
.
hollywood.sequ
with denom=100000 on clusters of size 6, 9, and 12. How long does each take? How many searches did each perform? How many reducers did you use for each? (Read the rest of the questions to see what other data you will need)ec2-usage
returns bogus values, please try to approximate (and indicate this).You will submit this project via git
. You should have set up access to your own CS 61C repo in lab1. Place your code in a subdirectory called proj1
inside of your class repo. Tag the commit for your Part 1 submission proj1-1
and tag the commit for your Part 2 submission proj1-2
. To be fully submitted, those commits need to be pushed to github by their respective deadlines. The code below assumes the most recent commit is the one you want to submit for part 1.
$ cd (your local repo) $ git tag proj1-1 $ git push origin (branch of your commit) $ git push --tags
Be sure to put your name and login (and your partner's name and login if you have one) at the top of your submitted source files, especially SmallWorld.java