-
Notifications
You must be signed in to change notification settings - Fork 2
Shufle Service on REEF
“Shuffle” is a data movement abstraction where senders (mappers) transfer key/value tuples to receivers (reducers) that are selected by a specific shuffling strategy. Shuffle is used in many distributed frameworks and programming models.
For example, in the MapReduce programming model, tuples created by mappers are sent to reducers by hash-based key grouping in order to guarantee all tuples that have the same key go to the same reducer. In Apache Storm, a distributed realtime streaming engine, a Storm Topology has a DAG for pipelining and processing tuple data. An edge in the DAG represents a shuffle from some processing element to another.
Although logical structures of shuffle are almost the same, implementations are quite different. Two major properties that differentiate shuffle implementations are batch/streaming and push/pull.
-
batch / streaming Batch shuffle implementations send lots of tuples at the same time whereas streaming shuffles send tiny chunks of tuples in realtime.
-
pull / push In push shuffle implementations, senders decide when to send push tuples to receivers. However in pull shuffle implementations, receivers decide when to receive tuples and fetch tuple data from senders.
Batch engines like MapReduce tend to use pull-based shuffles, while streaming engines favor push-based shuffles.
General-purpose shuffle API supporting all types of shuffle implementations are not common, since most frameworks only need a specific implementation of shuffle. We should design such a shuffle API to make shuffle as a service on REEF.
We believe the base API of Shuffle Service is pushing tuples because even senders of pull-based shuffle also push tuple data when receivers request them. We made the base push-based shuffle API first and will later support other implementations using the push API.
-
Common
- ShuffleDescription
- A
ShuffleDescription
contains (a) a sender identifier list, (b) a receiver identifier list, (c) key and value types and (d) a shuffling strategy.
- ShuffleStrategy
- The strategy to select which receivers take which tuples, given a set of keys. A Shuffle Service user can define his or her own custom shuffle strategy. For example, a hash-based key grouping is a possible shuffle strategy. It selects one receiver from the receiver list in a shuffle based on the hash code of a key.
-
Driver-Side
- ShuffleDriver
- A
ShuffleDescription
is registered throughShuffleDriver
and managed by aShuffleManager
, which handles the control flow of the shuffling procedure.ShuffleDriver
instantiates and maintains theShuffleManager
s and provides context, task configurations for a certain end point identifier to the user.
- ShuffleManager
- A
ShuffleManager
communicates with evaluators to handle the whole control flow of shuffles such as initial synchronization or adding/removing end points in the shuffle.
-
Evaluator-Side
- ShuffleProvider
- All serialized
Shuffle
s are automatically instantiated inShuffleProvider
. Users retrieveShuffle
s through this class.
- Shuffle
- This provides
ShuffleOperator
s for a specific Shuffle and internally communicates with the Driver-sideShuffleManager
.
- ShuffleOperator (ShuffleSender, ShuffleReceiver)
- Users can register callbacks to
ShuffleReceiver
andShuffleSender
, either to receive or send tuples, respectively.
-
A user registers a
ShuffleDescription
. The user can register more than oneShuffleDescription
s. -
ShuffleDriver
instantiates aShuffleManager
and returns it. -
The user should retrieve shuffle configurations for each Tasks.
-
Make the tasks and/or contexts using the configurations.
-
Shuffle information is serialized by Tang injection.
-
ShuffleProvider
deserializes the information and createsShuffle
instances. -
The user gets a
Shuffle
instance. -
The user gets
ShuffleOperator
s. -
The user now can exchanges tuples in evaluators through the operators.