-
Notifications
You must be signed in to change notification settings - Fork 980
External Sort Operator
The external operator works like other Drill operators in it's use of an incoming batch, the next( )
call hierarchy, and that the operator is a record batch for its "output" (downstream, up-the-call-stack) operator. The external sort operator is interesting for two key reasons:
- It buffers all the incoming rows so that they can be sorted together, and
- It optionally spills data to disk due to memory pressure.
Like all operators, the external sort (xsort) operator does the following:
- Holds an incoming operator (
incoming
). - Calls
incoming.next( )
to obtain a record batch. - Supports an incoming batch with an associated selection vector. (?)
Like all operators, the xsort operator maintains a set of internal states, and performs state transitions based on responses from the incoming batch.
Incoming events are defined by the RecordBatch.IterOutcome
enum:
-
NONE
: Normal completion of batch. -
OK
: Zero or more records with schema same as the previous batch. Sort the individual batch and ... -
OK_NEW_SCHEMA
: Zero or more records with a schema different than the previous batch. Handles "trivial" schema changes (see below), then handles records as forOK
. -
STOP
: Abnormal termination. The xsort operator simply forwards this event downstream. -
NOT_YET
: No data. Not supported by the xsort operator. (Throws anUnsupportedOperationException
exception.) -
OUT_OF_MEMORY
: Non-fatal OOM. The xsort operator attempts to recover by spilling to disk. -
Exception
: Not a enum event, but all methods can throw an exception.
Sorting is done via an indirection vector. That is, the data in the vectors does not move during the sort. Instead, the xsort operator uses a int
-based selection vector (SelectionVector4 sv4
) to hold indexes to the data. During sort, the indexes are reordered, not the vector data.
Batches are sorted individually using a SingleBatchSorter
instance (sorter
). The innerNext( )
method sorts each batch using a SelectionVector2
in response to the OK
and OK_NEW_SCHEMA
events:
sorter.setup(context, sv2, convertedBatch);
sorter.sort(sv2);
Batches are then buffered. (LinkedList<BatchGroup> batchGroups
):
batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), oContext));
Next, xsort considers a number of conditions to determine if it should spill to disk:
- Based on the expected memory needs of
MSorter
. - Based on the total number of batches (must be less than the maximum value of a
short
.) - If the memory used exceeds 95% of the allocated memory budget.
- If enough new batches have accumulated (more than
SPILL_BATCH_GROUP_SIZE
batches.)
If spilling is needed, then:
-
The set of batches to be spilled are merged into a
BatchGroup
-
The merged
BatchGroup
is spilled.final BatchGroup merged = mergeAndSpill(spilledBatchGroups);
The xsort operator does not support schema changes (though some parts of the code seem to be written in anticipation of such support.) The operator ignores "trivial" schema changes (where the new schema is the same as the previous.) However, if the schema really does change, the operator throws a SchemaChangeException
.
PriorityQueueCopier
The operator spills to disk (when)? batch group size? threshold?
The life of a fragment containing an external sort is roughly:
- Planner creates the plan, including the
SortPrel
(physical relational operator definition), subclass ofPrel
. This node contains the cost information such as CPU cost, diskI/O cost, number of fields, field width, input row count, and estimated memory cost. The memory cost for a sort is defined as:
memCost = fieldWidth * numFields * inputRows;
- Planner creates a
PhysicalOperator
(operator definition) from thePrel
usingPhysicalPlanCreator
. - The
PhysicalOperator
theAbstractBase
class, sets default memory limits of 1MB (initial), and 10 GB (maximum):
public abstract class AbstractBase implements PhysicalOperator{
protected long initialAllocation = 1_000_000L;
protected long maxAllocation = 10_000_000_000L;
-
PhysicalPlanCreator
sets a variety of properties. For our purposes, the key properties are the query cost (set bysetCost()
and the memory limits. (The interesting bits is that thePhysicalPlanCreator
does not actually set the limits... See below.) - The
Foreman
allocates the total query memory toExternalSort
operators in each minor fragment:
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
...
public class MemoryAllocationUtilities {
public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
...
for(final ExternalSort externalSort : sortList) {
externalSort.setMaxAllocation(maxSortAlloc);
}
- The
PhysicalOperator
is sent across the wire to each Drillbit. -
FragmentExecutor
on each Drillbit callsImplCreator
to create theFragmentRoot
instance for the fragment. - Each operator has an
OperatorContext
(implementation:OperatorContextImpl
) that creates the allocator for the fragment. - The
OperatorContext
uses thePhysicalOperator
which has the initial and maximum allocations for the operator:
this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
The xsort operator spills when one of two memory conditions are hit:
- The buffered data exceeds 95% of the memory reserved for buffering, or
- An upstream operator reports that it ran out of memory.
Memory is managed using two allocators:
BufferAllocator oAllocator; // The allocator for the xsort operator itself
BufferAllocator copierAllocator; // The allocator for the record copier
The xsort allocator is set from the context during fragment creation. The process is generic to all operators:
The copier allocator is a child of the operator allocator, meaning that the two share the same reservation. The copier is given a fixed allocation of 10 MB to 20 MB (decimal) bytes:
public static final long INITIAL_ALLOCATION = 10_000_000;
public static final long MAX_ALLOCATION = 20_000_000;
copierAllocator = oAllocator.newChildAllocator(oAllocator.getName() + ":copier",
PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
Disk management is integrated into the xsort operator class itself. The algorithm is:
- Use the HDFS
FileSystem
class, referenced byfs
to manage spill files. Allows files to be written into the DFS file system in addition to local disk. This is done when cluster nodes are configured with limited local storage, and so large spill files are written to DFS instead. - A list of spill directories (
dirs
) to which spill files are written (in round robin manner?)
Individual spill files have the following name:
<query-id>_majorfragment<major-fragment-id>_minorfragment<minor-fragment-id>_operator<operator-id>/<spill-id>
Where the items in <...>
are parameters for the particular operator.
The first part of the name is a directory created on each temp directory in turn, marked as delete on exit.
Merging seems to be done by a code-generated implementation of PriorityQueueCopier
. Spilling is done by BatchGroup
. Writing is done in several steps. Given a collection of batches:
- Combines the linked list of batches into an array list (with batches in inverse order.)
- Compute the maximum number of records as
COPIER_BATCH_MEM_LIMIT / estimatedRecordSize
. (That is, estimate row width, then use that to compute the maximum records that can be written.) - Loop to write records in batches of the computed record limit.
- Use a
PriorityQueueCopier
to merge batches to produce an outgoing batch of the desired size. - Add the resulting (merged) batch to the
BatchGroup
. - Gather a list of buffers by looping over all value vectors in the batch, and each buffer for the vectors.
- Write the buffers to disk.
-
drill.exec.sort.external.spill.group.size
(memberSPILL_BATCH_GROUP_SIZE
) -
drill.exec.sort.external.spill.threshold
(memberEXTERNAL_SORT_SPILL_THRESHOLD
) -
drill.exec.sort.external.spill.directories
(memberdirs
)
- MappingSet
- Dir algorithm