-
Notifications
You must be signed in to change notification settings - Fork 980
Storage Distribution
If you followed the basic steps, you were left with a working storage plugin that runs its scans in a single scan operator regardless of the complexity of the query and the number of available nodes. Here we will look at how to leverage Drill's ability to break a scan into multiple fragments which are run in parallel, typically on multiple nodes.
Drill is a distributed query engine. This means that the scan which your storage plugin generates operates in three "modes:"
- As a single logical scan (the
GroupScan
) in which a single node represents the scan of the "table" (or data source) which your storage plugin represents. For example, for a classic HDFS scan, the group scan might represent the scan of an entire directory tree. A logical scan is assigned to a "major fragment" in your query, where major fragments are subsections of the entire query plan DAG up to exchanges. - Multiple scan "segments", which are the actual scans done at runtime. Again in the HDFS case, there is one scan per block per file. So, if your directory contains five files (after any partition pruning), and each of those files is large enough to be stored in three blocks, then there will be 15 scan segments.
- One
SubScan
instance per "minor fragment" (execution thread of a major fragment.) If your scan has more segment than the available number of executors, then eachSubScan
may need to handle multiple scan segments. Thus, there is a many-to-one relationship between scan segments andSubScan
s. A "parallelizer" within Drill works out the association of segments to threads as represented by a list of scan segments assigned to eachSubScan
.
In terms of process flow and cardinality relationships:
1 n 1 n n 1
Table --> Logical Scan --> Scan Segments --> Scan Operator
(GroupScan) (Sub Scan)
While this process is more-or-less easy to understand at this level, the actual implementation is quite complex. While Drill provides classes for the logical scan (GroupScan
) and the scan operator (SubScan
), Drill provides no structure for the scan segments: that bit of logic is unique to each storage plugin.
This, then, is the challenge: how should you represent your scan segments, and how to you tell Drill how to parallelize your segments into scan operators?
Earlier sections introduced the group scan, but glossed over the scan segment issue. If your storage plugin is simple (such as a single REST call), then there is a 1:1 correspondence between group scans and sub scans. However, if you can parallelize (partition) your scan, then you must work out how to split a logical scan into multiple scan segments. Drill's code mostly focuses on how that is done with HDFS and Parquet.
The first step is to work out what drives scan segments. For a REST call, say, we might be able to split a call for x records into n calls of x/n records each. For example, suppose we are querying a time-series system for a time range from 1PM to 2PM. We might be able to split this into four scan segments, each of which queries a 15-minute period: 1-1:15 PM, 1:15-1:30PM, 1:30-1:45PM and 1:45-2PM.
Typically you use "filter push-down" to figure out these partitions:
SELECT * FROM myPlugin.timeSeries
WHERE eventTime BETWEEN '2019-11-15 13:00:00' AND '2019-11-15 14:00:00'
You use the WHERE
clause filter to identify the time range, then use plugin-specific logic to understand that, say, eventTime
is a partition column, and to work out the partitions. For file-based storage, the directory structure drives partitions. For external systems, the partitioning is likely to be very specific to that system.
In any case, you create some kind of data structure, in your GroupScan
that identifies the number of scan segments you can create, and gather the information needed to generate those segments.
For now, let us assume that you can identify your scan segments from data available in the query or plugin config. For now, you may have to hard-code some assumptions. A later section discusses filter push-down so you can replace any temporary hacks with a proper solution.
Parallelization in Drill can appear to be "black magic": it is complicated and is designed primarily for partitioned file structures on HDFS. There is no simple "parallelization strategy" that you implement. Instead, you offer a series of (often opaque) suggestions so Drill can do its thing.
Parallelization occurs during the physical planning phase. For advance needs you can add custom planner rules to assist the process. (See the getOptimizerRules()
method for details. Here we focus on the default "out-of-the-box" process.
We've mentioned that planning occurs in a number of phases. Vastly simplifying, we should be aware of two main phases:
- The logical plan, in which the original SQL query is converted into a set of blocks, and the blocks moved around, split and combined to produce an optimal query plan.
- The physical plan in which Drill decides how to parallelize the query, the number of fragments to run, which parts of the query run in which fragments, and which fragments run on which nodes.
For example, project push-down and (if we support it) filter push-down happen during logical planning. Parallelization handles in physical planning.
From the perspective of a storage plugin, all this work happens within the confines of the group scan. Unfortunately, there is no single event which tells us that logical planning is complete and we should get ready for physical planning. The lack of this event makes things a bit more complex than necessary as we have to infer that physical planning has started.
If you need more detail, here are the actual planner phases that appear in the getOptimizerRules()
call:
- DIRECTORY_PRUNING
- LOGICAL (creates the Group scan)
- PARTITION_PRUNING
- JOIN_PLANNING
- ROWKEYJOIN_CONVERSION
- SUM_CONVERSION
- PHYSICAL (where decision about exchange is made)
For obscure reasons we won't explain here, the set of logical planning phases can differ from those shown above. The best way to detect the logical planning phase is:
@Override
public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
// Push-down planning is done at the logical phase so it can
// influence parallelization in the physical phase. Note that many
// existing plugins perform filter push-down at the physical
// phase.
if (phase.isFilterPushDownPhase()) {
return // logical planning rules
}
return ImmutableSet.of();
}
Our group scan accumulates state (fields) as planning proceeds. The primary consideration is that any state used during logical planning must be Jackson-serializable. (The logical plan can be written to JSON then read back in.) Physical planning state is never serialized and should not be serialized to JSON. This is a more subtle point than it should be.
One way to visualize this is to put all of the physical planning state into a single class which does not have any serialization. Making something up:
class MyGroupScan extends BaseGroupScan {
// Logical planning state
String tableName;
...
// Physical planning state
MyPhysicalPlan physicalPlan
...
}
private class MyPhysicalPlan {
List<Something> scanSegments;
List<DrillbitEndpoint> endpoints;
...
}
You can also simply add comments (or remember) that certain fields are for physical planning and thus are not serialized.
You may find it necessary to do some work once logical planning is done, before doing physical planning (that is, returning results from the methods mentioned below.) Since there is no good Drill-provided solution, we have to hack one. Suppose we use the physical state class above. Then, we can observe that the parallelization width calls are made only in the physical planning phase. So:
@Override
public int getMinParallelizationWidth() {
prepPhysicalPlan();
...
}
@Override
public int getMaxParallelizationWidth() {
prepPhysicalPlan();
...
}
private void prepPhysicalPlan() {
if (physicalPlan != null) {
return;
}
// Physical planning is starting. Do any prep work.
physicalPlan = new MyPhysicalPlan();
...
}
Not pretty, but it works. The same can be done with a flag if you keep all state in the group scan.
At a high level, Drill follows this process to parallelize your scan:
- Calls the group scan
getMinParallelizationWidth()
. If it returns 1, the scan is single thread. If greater than 1, then Drill creates a parallel, distributed scan. - Calls the group scan
getMaxParallelizationWidth()
to determine the number of scan segments possible. - Calls the group scan
applyAssignments(List<DrillbitEndpoint> endpoints)
to offer a set of endpoints (Drillbits) on which the query can run. The same Drillbit may appear multiple times (if, say we can run 5 minor fragments per node.) (TODO: What is the relationship between max width an Drillbit count.) - The group scan optionally works out the best placement of scan segments on Drillbits. This could be random. Or, the plugin may have locality preferences (if a database, say, run a scan segment on the same node as the database if possible.)
- Calls
getSpecificScan(int minorFragmentId)
for each minor fragment to get the scan that will run in that fragment. (Which, as we discussed above, can hold one or more scan segments.)
Note that there is no call that says, simply, "the query will have x minor fragments." The number of minor fragments must be inferred from the number of endpoints. (TODO: What happens if there are more endpoints than max width?)
Notice that planning is a negotiation among three factors:
- The number of scan segments that group scan could create.
- The number of minor fragments available to run scans.
- The placement of scan segments on available nodes.
To simplify the problem, just focus on the ideal scan width: the number of scan segments. Let's say that we shared a time series query into five non-overlapping segments. Then we'd offer 5 as the minimum and maximum parallelization widths.
We might want to be more subtle: we want to create as many shards as there are executors (run one shard per executor.) In this case, we want to know the maximum number of executors available before we give an opinion about our parallelization width. It is not clear how (or whether) this can be done. Instead, we might first offer a very large number for parallelization width, then parallelize to the actual number of Drillbits offered (which will be less than our large number, but should be all that Drill can provide based on cluster size and configuration.)
The key point is that, at the beginning of physical planning, create an optimistic value for the scan width based on scan segments. Then, later, revise that width based on actual Drillbits.
private class MyPhysicalPlan {
int scanWidth;
}
private void prepPhysicalPlan() {
...
physicalPlan.scanWidth = // Number of scan segments
}
@Override
public int getMinParallelizationWidth() {
prepPhysicalPlan();
return physicalPlan.scanWidth;
}
@Override
public int getMaxParallelizationWidth() {
prepPhysicalPlan();
return physicalPlan.scanWidth;
}
Drill runs as a distributed collection of nodes. Some scans have a preference about which node should run each scan segment. If, for example, your scan connects to a distributed database, you may wish to structure our scan segments to hit data one one node, run a Drillbit on that node, and tell Drill to run the scan segment on the same node as the data. In the HDFS, case, we want to run the scan on the HDFS node that contains the data block we wish to scan. This process is called "node affinity." In other cases (such as the REST example), we have no preference which node runs the scan segment.
Affinity comes in three forms:
- Hard: the scan must run on a given node.
- Soft: the scan should run on a given node, but can run elsewhere (at, perhaps, a hit to performance.)
- None: the can can run anywhere.
Affinity, if provided by getOperatorAffinity()
and getDistributionAffinity()
(if the GroupScan
implements HasAffinity
) are added to the base operator stats.
Affinity is a complex topic which we'll skip for now, instead focusing on the broader question: how do distribution if we have no affinity.
The next step in the planning process is to associate scan segments (those logical chunks of our scan) with endpoints (threads of execution on a Drillbit.) We do this in applyAssignments()
. We are assuming that we have no node affinity, so our scans can run on any node. As a result, we really only care about the number of endpoints, not the details of each. The BaseGroupScan
class handles this for us by saving the endpoint count:
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
endpointCount = endpoints.size();
}
It seems (need to verify) that the list of endpoints is indexed by minor fragment ID. That is, the list of endpoints corresponds to minor fragments indexes passed into getSpecificScan(int minorFragmentId)
. Drill will offer us no more endpoints than the limit set by a call to our getMaxParallelizationWidth()
method.
If our query has some fixed number of scan segments, we can skip this step. But, if we parallelize the query based on available minor fragments, now is the time to refine that estimate. The example above assumed we could shard a time-series query. We suggested we could break it down into five segments and that was the number we returned from getMaxParallelizationWidth()
. Maybe it turns out that Drill can only provide four minor fragments (four endpoints).
We have two choices:
- Combine two scan segments into a single scan operator. (The path we must take if the number of segments is fixed.)
- Recompute our shards based on available endpoints. (In the time-shard example, instead of five segments of 3 minutes each, we might do four segments of 15/4 = 3.75 minutes each.)
In pseudocode:
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) {
super.applyAssignments(endpoints);
physicalPlan.scanWidth = // compute new width based on endpointCount
}
We've now negotiated the final (sub) scan count. All that is left is to create the scan operator definitions. We do so on demand for each minor fragment:
@Override
public abstract SubScan getSpecificScan(int minorFragmentId) {
List<Something> segmentList = new ArrayList<>();
// assign 1/endpointCount segments to this scan
return ExampleSubScan(config, segmentList);
}
We create a list of scan segments if we might have to "pack" multiple segments into each scan operator. If we adjusted scan segment count to the number of endpoints, then we don't need the list and the "packing" logic.
The reader creator class we created earlier is now adjusted to iterate over each of our scan segments (however this plugin defines them.)
private static class ExampleReaderFactory implements ReaderFactory {
public ManagedReader<? extends SchemaNegotiator> next() {
if (readerCount >= subScan.segments().size()) {
return null;
}
return new ExampleBatchReader(config, subScan.columns(), subScan.segments().get(readerCount++));
}
As you can see, parallelization and distribution are complex topics. However, if you take things step-by-step, and follow the guidelines above, you should be able to turn our single-thread scan into a multi-thread scan with only moderate effort.