Project 4 - Distributed Key-Value Store
Overview
In project 4, you will implement a distributed key-value store that runs across multiple
nodes.
Multiple clients will be communicating with a single master server in a given messaging
format (KVMessage) using a client library (KVClient). The master contains
a set-associative cache (KVCache), and it uses the cache to serve GET
requests without going to the key-value (slave) servers it coordinates. The slave servers are contacted for a GET only upon a cache miss on the master. The master will
use the TPCMaster library to forward client requests for PUT and DEL to multiple
slave servers and follow the TPC
protocol for atomic PUT and DEL operations across multiple slave servers. KVServers remain the same as project 3.
- Operations on the store should be atomic (succeeds
completely or fail altogether) without any side effects, guaranteed by the TPC protocol.
- Data storage is durable, i.e. the system does not lose data if a single node fails. You
will use replication for fault tolerance.
- The key-value store should optimize read throughput by accessing data concurrently from multiple replicas.
Concepts to learn: two-phase commit, logging and recovery,
consistent hashing, failure detection using timeouts
Figure: A distributed key-value store with replication factor of two.
Figure: Functional diagram of a distributed key-value store. Components in colors other
than black are the key ones you will be developing for this project. Blue depicts GET execution
path, while green depicts PUT and DEL; purple ones are shared between operations. Note that,
components in black might also require minor modifications to suit your purposes. Not shown in
the figure: PUT and DEL will involve updating the write-through cache in the master.
Task Outline
- [7%] Modify KVMessage to allow two-phase commit messages (if necessary). See message specs in tables below. Then finish the TPCLog class that slaves will use to log
transactions and rebuild the server during recovery.
- [20%] Implement registration logic in the TPCRegistrationHandler, TPCSlaveInfo, and TPCMasterHandler.registerWithMaster.
- [19%] Implement the methods registerSlave, findFirstReplica and findSuccessor in TPCMaster which will be used to find the slaves involved in a transaction. Then implement TPCClientHandler.
- [29%] Implement the rest of the TPCMaster to handle all for two-phase commit logic on the master node (handleTPCRequest and handleGet).
- [25%] Implement TPCMasterHandler that handles TPC logic on
the slaves.
Note that this project may require a bit more collaboration and understanding
between tasks than previous projects as some tasks are dependent on the
implementation of others: (2,3), (3,4), (4,5), (5,1).
Requirements
Unless otherwise specified below, you will have to satisfy the requirements described in project 3. Again, you should bulletproof your code such that the nodes do not crash under normal operation.
You are required to keep up with this Piazza thread for ongoing clarifications.
Two-phase Commit
- TPCMaster must select replica locations using consistent hashing. Only a single two-phase-commit operation can be executed at a
time (handleTPCRequest is synchronized).
- You do not have to support concurrent update operations across different keys (i.e. TPC PUT and DEL operations are performed one after another), but retrieval
operations (i.e. GET) of different keys must be concurrent unless restricted by an ongoing
update operation on the same set.
- The master will include a set-associative cache, which
will have the same semantics as the cache you used before. If the master finds the key in its cache, it does not need to contact any slaves.
- You must wait until all slaves register before servicing any requests.
This means you must block until registerSlave has been called
successfully for numSlaves unique slaves.
- A slave will send vote-abort to the master if the key doesn't exist for
DEL or oversized key/value for PUT. The situations for this case are very limited in our "dummy" version of a kv-store.
- When sending phase-1 requests, the master must contact both slaves, even if the first slave sends an abort. You can do this by sequentially making the requests or concurrently by forking off threads.
- In theory, if the master receives any response from the slave in phase-2,
it should be an ACK (we ask for this guarantee from phase-1). However, for
the case of "error handling", if the master receives anything besides an ACK, throw a KVException ERROR_INVALID_FORMAT and return this to the client (and probably print an error to the console since this should never happen).
Figure: Sequence diagram of concurrent read/write operations using the TPC protocol in
project 4. Project 3 blocks in the diagram refer to the activities when clients write to a
single-node slave server, where the master is the client to individual slaves. GET request from Client 1 is hitting the cache in the above diagram; if it had not, the
GET request would have been forwarded to each of the slave servers until it is found.
Failures, Timeouts, and Recovery
At any moment there will be at most one slave server down. Upon revival, slave servers always come back with the same ID. For this particular project, you can assume that the master will never go down, meaning there is no need to log its states.
Individual slave servers, however, must log necessary information to survive from
failures.
Your code should be able to handle the slave server failing at any point in the TPC transaction.
- On failure, we assume our in-memory KVStore is wiped. When the slave comes back up, it will be rebuilt using the log that is has been updating.
- When a slave comes back up, it does not contact the server or other slaves. It rebuilds from the log and should figure out if the last request it received was a phase-1 request from the log. If a slave crashes anytime during phase-2 (including before receiving the global decision), the master will need to keep trying to send the global message to that slave until it gets a response (retry using timeouts).
- During phase-1, if master does not get a vote within a single timeout period, it should assume the slave voted abort.
- During phase-2, the master must retry (with timeout) until it receives a response to its decision. You must send a decision to slaves that you timeout on because they may be waiting on a decision once they reboot in the event that they crashed after receiving the request. Note that in the case that the slave restarts, it may bind to a new port and re-register. Your master node must retry with the latest host-port the slave has registered with. Remember that slaves always restart with the same ID. GET requests may be served concurrently during this time.
- You may contact the slaves serially in phase-1. You may return the error message of either of the slaves that fail if you choose to contact them in parallel. For every slave you call sendMessage to, you must send a phase-2 decision, whether or not you received a response from that slave.
- If a slave finishes performing the operation in phase-2 but fails before sending an ack, the master will keep trying to send the decision. In this case, the slave will get a duplicate decision from the master. The slave should immediately send an ack because it will have already performed the operation while rebuilding the server.
- It is up to you to figure out which messages you need to write to your log and at which points in the code you need to write them. Although we say a slave server can crash at any time, for simplicity, you may assume there are not crashes during calls to TPCLog.flushToDisk().
Registration
- Slave servers will have 64-bit globally unique IDs (Java long's),
and they will register with the master with that ID when they start. For simplicity, you
can assume that the total number of slave servers is fixed. (Note that this simplification will cause the system to block on any failed slave server. However, not assuming this is much harder since it will require dynamic successor adjustment and
re-replication, among other changes).
- The master will listen for client requests from port 8080 and listen for registration requests from slaves on port 9090.
- When a slave starts it should start listening in a
random free port for TPC requests and register that port number with the master so that the
the master can send requests to it. You should have already implemented the logic to handle free ports in project 3.
- A slave has successfully registered if it receives a successful response from the master in the form shown below.
- Note: When parsing the registration, remember that the SlaveServerID can be negative.
These are the message formats for registration. You are free to create the registration message itself with string concatenation.
register |
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="register">
<Message>SlaveServerID@HostName:Port</Message>
</KVMessage> |
resp |
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="resp">
<Message>Successfully registered SlaveServerID@HostName:Port</Message>
</KVMessage> |
- Each key will be stored using TPC in two slave servers; the first of
them will be selected using consistent hashing, while the second will be
placed in the successor of the first one.
- There will be at least two slave servers
in the system. Each key-value (slave) server will have a unique 64-bit ID. The master will hash
the keys to 64-bit address space.
- Each slave server will store the first copies of keys
with hash values greater than the ID of its immediate predecessor up to its own ID. Note that
each slave server will also store the keys whose first copies are stored in its
predecessor. These IDs and hashes will be compared as unsigned 64-bit longs with functions provided in TPCMaster.
Figure: Consistent Hashing. Four slave servers and three hashed keys along with
where they are placed in the 64-bit address space. In this figure for example, the different
servers split the key space into S1:; [216 + 1, 223], S2: [223 +
1, 235], S3: [235 + 1, 255] and finally note that the last
server owns the key space S4: [255 + 1, 264 - 1] and [0, 216].
Now when a key is hash to say a value 255 + 1, it will be stored in the server that
owns the key space, i.e, S4 as well as the immediately next server in the ring S1.
TPC message formats
put/del/get requests should remain unchanged from project 3 since they have no two-phase semantics. There are no newlines in these messages. Italicized fields should be replaced with the actual value.
putreq
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="putreq">
<Key>key</Key>
<Value>value</Value>
</KVMessage>
|
delreq
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="delreq">
<Key>key</Key>
</KVMessage>
|
getreq |
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="getreq">
<Key>key</Key>
</KVMessage> |
ready vote
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="ready">
</KVMessage>
|
abort vote
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="abort">
<Message>Error Message</Message>
</KVMessage> |
commit decision
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="commit">
</KVMessage>
|
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="abort">
</KVMessage>
|
ack
|
<?xml version="1.0" encoding="UTF-8"?>
<KVMessage type="ack">
</KVMessage>
|
Testing
Testing for this project is complicated. Expect to spend a significant portion of your time testing (~half the time of the project). To get you started, a basic end-to-end test has been provided in TPCEndToEndTemplate and TPCEndToEndTest. For information about the testing framework, please refer back to the project 3 spec. We expect extensive use of Mockito and/or PowerMock.
Skeleton
Project 4 builds on top of
the single-server key-value store developed in project 3. Below is a package of files to be added to be added to your project. You may define additional classes and methods as you see fit.
http://www-inst.eecs.berkeley.edu/~cs162/sp14/kvstore/proj4-skeleton.tar.gz
To add these files to your repository, you can simply unpack the tar in your groupN-kvstore directory (and outside your kvstore directory).
The following describes the files new to project 4:
- TPCClientHandler - Handler for client requests on master.
- TPCMaster - Two phase commit logic on master.
- TPCSlaveInfo - Used to keep track of slaves on master.
- TPCRegistrationHandler - Handler for slave registration requests.
- TPCMasterHandler - Handles two phase commit logic on slave.
- TPCLog - Logging class for persistent storage.
- SampleMaster - Example instantiation of a master.
- SampleSlave - Example instantiation of a slave.
Deliverables
Initial design (Tue 4/29)
Follow the general outline of our design doc template. 15 page maximum. For testing, include a one to two
sentence description of each test case you plan on implementing for each
class and the reason for including them (what they test).
Code with JUnit tests (Thu 5/8)
Submit a tarball (cs162-proj4.tar.gz) of your code and tests to
proj4-code by running the following command (use a hive server if
you don't have ant installed). You will need to edit the handin
target in your build.xml file to create a cs162-proj4.tar.gz file
instead. This is also equivalent to just renaming the file without modify
your build.xml file.
$ ant handin
Final design and Group Evals (Fri 5/9)
For testing, include a description on each test case you have
implemented for each class. Our evaluation of your test cases will be worth
10 of the 20 points on your final design. We are looking for unit tests
with good code coverage and integration tests that verify the behavior of
your system. 18 page maximum (extra 3 pages for tests).