Thursday, February 11, 2010

Cloud MapReduce Tricks

Cloud Map/Reduce developed by Huan Liu and Dan Orban offers some good lessons in how to design applications on top of Cloud environment, which has a set of characteristics and constraints.

Although it is providing the same map, reduce programming model to the application developer, the underlying implementation architecture of Cloud MR is drastically different from Hadoop. For a description of Hadoop internals, here is it.

Build on top of a Cloud OS (which is Amazon AWS), Cloud MR enjoys the inherit scalability and resiliency, which greatly simplifies its architecture.
  1. Cloud MR doesn't need to design a central coordinator components (like the NameNode and JobTracker in the Hadoop environment). They simply store the job progress status information in the distributed metadata store (SimpleDB).
  2. Cloud MR doesn't need to worry about scalability in the communication path and how data can be moved efficiently between nodes, all is taken care by the underlying CloudOS
  3. Cloud MR doesn't need to worry about disk I/O issue because all storage is effectively remote and being taken care by the Cloud OS.
On the other hand, Cloud OS has a set of constraints and limitations that the design of Cloud MR has to deal with
  • Network latency and throughput : 20 - 100 ms for SQS access, SimpleDB domain write througput is 30 - 40 items/sec
  • Eventual consistency : 2 simultaneous requests to dequeue from SQS can both get the same message. SQS sometimes report empty when there are still messages in the queue.
Cloud MR use a "parallel path" technique to overcome the network throughput issue. The basic idea is to read/write data across multiple network paths so the effective throughput is the aggregated result of individual path.

Cloud MR use a "double check" technique to overcome the consistency issue. Writer will write status into multiple places and reader will read from multiple places also. If the reader read inconsistent result from different place, that means the eventual consistent state hasn't arrived yet so it needs to retry later. When the state read from different places agrees with each other, eventual consistency has arrived and the state is valid.

Following describe the technical details of Cloud MR ...

Cloud MR Architecture


SimpleDB is used to store Job status. Client submit jobs to SimpleDB, Map and Reduce workers update and extract job status from the SimpleDB. The actual data of each job is stored in SQS (which can also points to an Object stored in S3).

So the job progress in the following way

Job Client Processing Cycle
  1. Store data in many S3 file objects
  2. Create a Mapper task request for each file split (each map task request contains a reference to the S3 object and the byte range).
  3. Create an input queue in SQS and enqueue each Mapper task request to it.
  4. Create a master reduce queue, an result output queue as well as multiple partition queues.
  5. Create one reducer task request for each partition queue. Each reducer task request contains a pointer to the partition queue.
  6. Enqueue the reducer task requests to the master reducer queue
  7. Create a job request that contains a mapper task count S as well as a reference to all the SQS queue created so far.
  8. Add the job request into SimpleDB
  9. Invoke AWS commands to start the EC2 instances for Mappers and Reducers, passing along queue and SimpleDB locations as "user data" to the EC2 instances.
  10. From this point onwards, poll the SimpleDB on the job progress status
  11. When the job is complete, download the result from output queue and S3
Mapper Processing Cycle
  1. Take a mapper task request from the SQS input queue
  2. Fetch the file split and parse out each record
  3. Invoke user defined map() function, for each emit intermediate key, perform a (hash(k1) % no_of_partitions). Enqueue the intermediate record to the corresponding partition queue.
  4. When done with the mapper task request, write a commit record containing worker id, map request id and number of records processed count per partition (in other words, R[i][j] where i is the map request and j is the partition no.).
  5. Remove the map task request from the SQS input queue
Due to the eventual consistency model constraint, Mapper cannot stop processing even when it sees the input queue is empty. Instead it count all the commit records to make sure the unique map request id has been sum up equal to the mapper task count S in the Job request. When this happens, it enters the reduce phase.

It is possible that a Mapper worker crashes before it finishes the mapper task, so another mapper will re-process the map task request (after the SQS timeout). Or due to eventual consistency model, it is possible to have 2 simultaneous mappers working on the same file splits. In both case, it is possible of causing some duplications in the partition queues.

To facilitates the duplicate elimination, each intermediate records emit by the mapper will be tagged with [map request id, worker id, a unique number]

Reducer Processing Cycle
  1. Monitor SimpleDB and wait for seeing commit records from all mappers.
  2. Dequeue a reducer task request from the master reducer queue
  3. Go to the corresponding partition queue, dequeue each intermediate record
  4. Invoke user define reduce() function and write the reducer output to the output queue
  5. When done with the reducer task request, write a commit record in a similar way as the Mapper worker
  6. Remove the reduce task request from the master reducer queue
To eliminate duplicated intermediate messages in the partition queue, each Reducer will first query the SimpleDB for all the commit records written by the successful mapper worker. If there are different workers working on the same mapper request, there maybe be multiple commit records with the same mapper request id. In this case, the Reduce will arbitrary pick the winner and then all intermediary records tagged with the mapper request id but not the winner worker id will be discarded.

Similar to the Mapper, Reducer j will not stop getting the message from the partition queue even when it is empty, it will keep reading the message up to the sum all R[i][j] over i.

Due to eventual consistency, it is possible that multiple reducers dequeue the same reducer task request from the master reducer queue and then taking messages from the same partition queue. Since they are competing on the same partition queue, one of them will find the queue is empty before they reach the sum of R[i][j] over i. After certain timeout period, the reducer will write a "suspect conflict" record (containing its worker id) in the SimpleDB. If it found another reducer has written such record, it knows there is another reducer working in the same partition. Workers with the lowest id is the loser and so the reducer will keep reading until it sees another conflict record with a lower id, then it will drop off existing processing and pickup another reducer task. All the records read by the loser will come back to the queue after the timeout period, and will be picked up by the winner.

Network Latency and Throughput

One of the SimpleDB-specific implementation constraint is they read and write throughput is very asymmetric. While the read response is very fast, the write is slow. To mitigate this asymmetry, Cloud MR using multiple domains in SimpleDB. When it writes to SimpleDB, it randomly pick one domain and write to it. This way, the write request workload is spread across multiple domains. When it reads from SimpleDB, it read every domain and aggregate the results (since one domain will have the result).

To overcome the latency issue of SQS, CloudMR at the Mapper side uses buffering technique to batch up intermediate messages (destined to the same partition), A message buffer is 8k size (the maximum size of a SQS message). When the buffer is full (or after some timeout period), a designated thread will flush the buffer by writing a composite message (which contains all the intermediate records) into the SQS.


The reducer side works in a similar way, multiple read threads will dequeue message from the partition queue and put them in a Read buffer, where the Reducer will read the intermediate messages. Notice that it is possible to have 2 threads reading the same message from the partition queue (remember the eventual consistency scenario described above). To eliminate the potential duplicated message, the reducer will examine the unique number tagged with the message and discard the message that has seen before.

Difference with Hadoop

Map/Reduce developers familiar with Hadoop implementation will find Cloud MR behaves in a similar way. But there are a number of difference that I want to highlight here.
  • Reducer key is not sorted: Unlike Hadoop which guarantee that keys arrived at the same partition (or reducer) will be in sorted order, Cloud MR doesn't provide such feature. Application need to do their own sorting if they need the sort order.

For a detail technical description of Cloud MR, as well as how it is compared with Hadoop, read the original paper from Huan Liu and Dan Orban

1 comment:

Anonymous said...

is cloudmr a product or opensource project?