Skip to content

Storage Plugin Model

Paul Rogers edited this page Jan 8, 2017 · 20 revisions

Drill exists to rapidly scan (and analyze) data from a variety of data sources. Each data source is represented in Drill as a storage plugin. This discussion describes how the storage plugin system works from a number of angles. The goal is to provide background information for those who maintain the existing storage plugins, and perhaps ease the learning curve for those creating new storage plugins.

Topics

Overview

A storage plugin provides access to a set of tables. In Hadoop-based systems, tables are implemented as files in the HDFS file system, so Drill's primary storage plugin is the FileSystemPlugin. Since a file system stores many kinds of files, the file system plugin is associated with a collection of format plugins (see below). Drill supports any arbitrary storage plugin, including for systems other than file systems.

The storage plugin is a plan-time concept and is not directly available at run time (while fragments execute.) The storage plugin itself is meant to be a light-weight description of the external system and, as such, can be created and discarded frequently, even within the scope of a single query plan session.

A storage plugin itself is a "type": it provides system-specific behavior but typically provides the ability to define multiple storage system instances for a given storage plugin type. It is these instances that we see in the storage plugin section of the Drill UI: we often call these storage plugins, but they are really storage plugin configurations: the information needed to define a specific instance.

Every storage plugin must implement the StoragePlugin interface, often by subclassing the AbstractStoragePlugin class. Each storage plugin also defines a Jackson-serialized configuration object which extends StoragePluginConfig: it is the JSON-serialized version of this object which we see in Drill's storage plugin web UI.

Drill provides a top-level namespace of storage plugins. The names here are not the storage plugin names (types) themselves, but rather the names associated with storage plugin configurations. (The schema name space also holds the names of schemas, which will be discussed later.) Then each storage plugin defines a name space for tables:

SELECT * FROM `myPlugin`.`myTable`

Here, myPlugin is the name of a storage plugin configuration (that maps to a storage plugin via the type field in the configuration), and myTable is a table defined by that plugin instance.

Configuration

This section discusses how Drill learns of the storage plugin and instantiates it and its related classes. Later sections will discuss the operations performed on it.

The key elements of a storage plugin include:

  • The storage plugin configuration: the JSON format and the corresponding Jackson-serialized Java class.
  • The storage plugin class.
  • Planning nodes and physical operators.
  • Execution time operator implementations.

Storage Plugin and Configuration

A storage plugin starts with the plugin configuration and plugin class. Given the elements described below, Drill will find, load and configure the storage plugin, and you will see the default plugin configuration in Drill's web UI when you first start Drill. (Perhaps the configuration must be created manually if the plugin registration already exists?...)

Storage Plugin

Drill locates a storage plugin using the following process:

  • Add a drill-module.conf file that Drill finds at start-up.
  • In the drill-module.conf file, add the Java package to Drill's class path scan.
  • Drill scans the class path looking for classes that derive from StoragePlugin.

drill-module.conf

Storage plugins are of two types: intrinsic or extensions. An intrinsic plugin is one (such as FileSystemPlugin) defined within the Drill source tree. An extension lives outside of the Drill source tree, is packaged into a jar, and is placed in one of Drill's class path folders: often $DRILL_HOME/jars/3rdparty or $DRILL_SITE/jars.

Intrinsic plugins tend to put their configuration into the java-exec/src/main/resources/drill-module.conf file which is packaged into the drill-java-exec-x.y.z.jar file.

Extension plugins are packaged into a jar and that jar must contain a drill-module.conf file in standard HOCON format. The bare minimum configuration file contents is to add the extension package to Drill's class path scan. Here is an example from the Kudu plugin:

drill: {
  classpath.scanning: {
    packages += "org.apache.drill.exec.store.kudu"
  }
}

The file can, of course, add other configuration as desired.

Storage Plugin Class

The storage plugin class must implement StoragePlugin (often via the AbstractStoragePlugin class.) It is the implementation of this interface which marks the class as a storage plugin. Any class that implements StoragePlugin is presumed to be one. To disable a class which is not really a plugin, create a configuration but mark the plugin as disabled in the configuration.

The class must implement a three-argument constructor:

  public MockStorageEngine(MockStorageEngineConfig configuration,
                           DrillbitContext context, String name) {

The first argument is "magic". Drill uses Java introspection to find this constructor and find its first argument. The type of the first argument identifies the class of the configuration for this plugin. That is, it is this constructor that associates a storage plugin configuration (class) with the corresponding storage plugin class.

It appears that Drill attempts to create a single instance of the storage plugin per Drillbit. Some of the logic in {{StoragePluginRegistryImpl}} that seems to suggest the one-per-Drillbit semantics. However, empirical tests suggest that the plugin instance is created multiple times. (Need to clarify.)

Storage Plugin Configuration Class

The plugin configuration class must

  1. Be Jackson serializable, and
  2. implement the StoragePluginConfig interface (often via the StoragePluginConfigBase class.)
  3. Implement the equals() and hash() methods. Evidently, plugins are considered based on their content and Drill must sometimes determine if two plugin configurations are identical.

bootstrap-storage-plugins.json

Somewhere on the class path a file must exist called bootstrap-storage-plugins.json which contains at least one serialized form of the storage plugin configuration. Without such as class, the plugin is invisible to Drill. (That is, the plugin exists only via a configuration.) Again, intrinsic plugins use the file in Drill's java-exec, an extension must provide its own file.

Operator Type (Deprecated)

At one point, each operator also needed a unique enum value in UserBit.proto, CoreOperatorType, but this use is now deprecated. It is used primarily to identify the operator IDs used in the query profile.

Load Plugins from Configuration

Drill maintains a storage registry storage in Zookeeper (normally) or on disk (for an embedded Drillbit.) When Drill starts, it scans the class path for storage plugins as indicated above. Drill then reads either the stored plugin configurations or the bootstrap-storage-plugins file if no stored configurations exist. Each plugin configuration is deserialized using Jackson. Jackson maps from the JSON to Java class form using the type field. For the mock storage configuration:

  "type" : "mock",

This must map to an annotation on the configuration class:

@JsonTypeName("mock")
public class MockStorageEngineConfig extends StoragePluginConfigBase {

Given this, Jackson can search the class path looking for the class with the proper annotation. Jackson then deserializes the JSON form to produce the storage plugin configuration instance. (This is, by the way, why Drill fails if a storage plugin implementation is no longer available: the Jackson deserialization causes a fatal error if the required class does not exist on the class path.)

Next, the configuration instance is mapped to the storage plugin class using a mapping from storage plugin configuration class to storage plugin class. Recall the special three-argument constructor mentioned above: that gives Drill the information to associate the two classes. (Actually, Drill maintains a table from storage plugin configuration class to storage plugin constructor so that Drill can create a new instance on each reference.)

Planning Time

Once configuration is complete, the the plugin is available to users to use in queries. The first step in each query is to for the planner to resolve the table name. We first describe the process, then describe the classes that the storage plugin must provide.

Planning Process for Scans

Drill looks for the schema name space portion of the table name: the myPlugin in the earlier example. (The name may also be set using a USE statement.)

Drill uses that name to look up the storage plugin configuration in the schema name space. The schema name space is formed from the storage plugin configurations, workspace configurations, and a few pre-defined items.

Drill uses the class of the storage plugin configuration as a key to locate the constructor for the storage plugin itself using a mapping from storage plugin configuration class to to storage plugin constructor as described above.

Drill creates an instance of the storage plugin to use for the query, passing the storage plugin instance an instance of the plugin configuration. Actually, Drill will create multiple instances as planning proceeds.

In addition to the schema defined by the storage plugin configuration, the storage plugin can define an additional schema. (Or, perhaps it must redefine the schema given by the configuration?) To do this, Drill calls the registerSchemas method of the plugin. This method creates an instance of a schema which implements the Calcite Schema interface and adds it to the schemas registered for this plugin. (The schemas are registered in Calcite, not in the plugin itself.)

Next, the planner must resolve the actual table name within the schema defined by the storage plugin configuration. (This seems redundant since we had to resolve the schema to get the storage plugin (configuration) in the first place...) Calcite calls the getTable() method on the schema object to resolve the table name from the query into a Calcite Table object, typically using the DynamicDrillTable class which extends Table. DynamicDrillTable holds a list of Jackson-serializable objects which the plugin can retrieve later by deserializing the serialized form of the table data.

The planner now must move from a table definition to a scan operator (definition) for a scan of that table. Tables in Drill usually resolve to a directory of files, or a large file with distributed blocks. To handle this, the planner defines two kinds of scans:

  • A group (or logical) scan that operates on the logical table referenced in the query.
  • A group of sub (or physical) scans that read partitions (chunks, pages, blocks, groups) of the logical table.

The terminology is a bit confusing. A "group scan" is the logical scan of the whole table. A "sub-scan" is a scan of a partition. In some contexts, both the "group scan" and "sub scan" are both called physical scans. This is confusing because the logical scan is not a physical scan (the logical table is often just a concept, not an actual file...)

In any event, the first step is to get the group (logical) scan. The planner to resolves the table into a group scan plan (definition) by calling the getPhysicalScan method in the plugin class, providing it with the user name (for security checks), a place to obtain the deserialized table hints created above, and a list of columns that the user selected from the table. (Though, strangely, for a SQL query, the list of columns is always just *, even if the SELECT statement names specific columns.) The getPhysicalScan() method returns a "group scan" which is a definition of the scan of the table as a whole. (Presumably the "group" refers to the fact that this object represents a group of scans...)

The actual list of SELECT class columns is provided using a two-part process:

  • First, the planner calls canPushdownProjects on the group scan, asking if the scan handles projection (can do anything with the list of columns.)
  • If the return value is true, the planner calls clone, including the list of columns, to produce a new copy of the group scan operator that includes the column set. (The scan operator is not responsible to return just the projected columns.)

The clone operation may occur multiple times.

The planner calls getNewWithChildren to make copy of the node (for reasons not yet clear.) For a scan, the provided child list will always be empty. Again, this may occur multiple times.

Once the group scan is negotiated, the next step is to negotiation the sub-scans. This starts with a call to getMaxParallelizationWidth to discover the maximum number of scanners. The planner then determines the actual number and provides the scan Drillbits (decided upon by a process that needs research) by calling applyAssignments with the Drillbit list.

Next, for each minor fragment (thread of execution), the planner calls getSpecificScan with the minor fragment ID. The method returns the definition of the sub scan that will implement each of the actual physical scans for that minor fragment. Each scan operator may scan multiple table partitions if necessary to scan the entire table in the number of fragments available. (Again, it is bit vague at present how the number of fragments is calculated, and how that information is passed to the group scan.)

Planning Details

Drill uses the Calcite "Volcano" planner (so called because it is based on a paper that describe the Volcano research project.) Volcano is a rules-based engine that works on immutable trees of operators. Scans are the leaf nodes. As Volcano works, it recreates the planning tree, including the leaf scans. Hence, the group and sub scan nodes can expect to be copied many times during a single planning session.

Volcano is a cost-based planner. The group scan must report its estimated cost via the following:

    public ScanStats getScanStats() {
      return ScanStats.TRIVIAL_TABLE;
    }

It seems that some (all?) scans use the TRIVIAL_TABLE constant. (Need more info about how to provide a real estimate.)

Parallelization and Node Assignments

The move from a group scan to a sub scan represents a move from a logical scan of a logical table to a set of physical scans on partitions of the physical table. The question arises, how many sub scans are possible and available. Factors considered are:

  • How many sub scans are possible for a group scan: getMaxParallelizationWidth().
  • Which nodes are available: applyAssignments().
  • How many threads of execution (minor fragments) are available per node? applyAssignments().
  • Given the above, which sub scan(s) should be done per minor fragment: getSpecificScan().

Some the work is done by the planner (determine the set of nodes, the maximum parallelization per nodes). Some is done by the group scan (reporting maximum possible parallelization, deciding how to schedule sub scans on the available nodes.)

Two key methods control this process.

applyAssignments() provides a table of (minor fragment id, Drillbit endpoint) pairs. Since minor fragment IDs start from 0 and monotonically increase, the minor fragment id is implicit as the index into a List<DrillbitEndpoint> endpoints. That is, you may see the list contain (Endpoint1, Endpoint1, Endpoint2, Endpoint3). This means that minor fragments 0 and 1 are both on Endpoint1, minor fragment 2 is on Endpoint2 and minor fragment 3 is on Endpoint 3.

The group scan must decide how to distribute table partitions to the available minor fragments given the node on which they run. The group scan must consider affinity (which node has which data) as well as load balancing (don't schedule all scans on the same node, even if that is where the data resides; it may be better to do network reads than overload the node.)

The group scan has some number of table partitions, n as reported by getMaxParallelizationWidth(). In the ideal case, applyAssignments() will provide this number of threads (minor fragments) to scan those partitions. In practice, however, the number of threads may be smaller than the number of partitions. In this case, the group scan must run multiple scans within a single thread. In terms of Drill, this means scheduling multiple physical scans per to a single sub scan.

This decision is communicated to the planner via the getSpecificScan() method. Assuming that the group scan did the necessary work in response to applyAssignments(), the call to getSpecificScan() simply reads off the decisions already made. getSpecificScan() returns a sub scan. The sub scan can contain a plugin-specific description of multiple physical scans to be done in the sub scan. (For example, for a table given by a directory of files, a sub scan may include scans of 10 of the files.) Each partition scan will give rise (if using the standard framework) to a RecordReader instance as described below.

Planning-time Storage Plugin Classes

Planning starts by resolving the table name within the plugin. This involves three storage plugin classes:

  • The storage plugin class implements registerSchemas to register its schema(s).

The schema class is an extension-specific subclass of AbstractSchema.

  • The schema class implements getTable to return an extension-specific representation of the table.
  • The table class decorates a Drill subclass of Table to hold extension-specific information about the table.

Because the planner serializes, then deserializes, the table information within the planning process, the extension cannot simply add fields to the Table class. Instead, the extension creates a Jackson-serializable class to hold the information. A list of these objects (usually just one) is attached to the DynamicDrillTable.

Planning then moves onto the scan.

The planner serializes the table information, then calls getPhysicalScan on the storage plugin to get a group scan operator. The getPhysicalScan implementation uses Jackson to deserialize the JSON table data, recovering the information provided in the getTable method above.

The planner then uses that table information to create an extension-specific, Jackson-serializable extension of the AbstractGroupScan class. This class may hold the same table information from above, or may hold additional (or different) scan-specific information.

As noted above, the planner calls a number of methods on the group scan to negotiate aspects of the scan. The group scan can accumulate information as work proceeds and (so it seems) that information need not be stored in Jackson-serializable form if used just within the planner. The state information must be copied, however, in the various calls to clone or getWithChildren.

The planner finally calls getSpecificScan to get a sub scan. This method returns another extension-specific class, this one derives from AbstractSubScan and is also Jackson-serializable. Since the sub scan is serialized and sent to each Drillbit, all of its state must be serializable. Think of this class as the description of the subscan sent from Foreman to execution Drillbit, leaving all other intermediate state behind.

Scan Operator

With the plan complete, attention now shifts to execution. The foreman serializes the plan (including the sub scan operator descriptions), then ships the plan to each Drillbit for execution. Each minor fragment is run by a dedicated thread. The first task is to create the record batch (operator implementation) for each operator (description). To do this, the Foreman deserializes the plan to recreate the operator tree, using Jackson to create the plan objects as described above.

Next, the fragment executor must create an record batch each operator definition via the indirection of a record batch creator which implements BatchCreator. This is done using a process similar to the way that plugin configurations are linked to plugins: via a map from operator definition class to record batch creator constructor.

During start the Drillbit searches the scan path for classes which implement BatchCreator. For each, the code gets the list of implemented interfaces, one of which must be BatchCreator. BatchCreator itself is a parameterized type, so the code next looks for the type of the parameterization argument. For example:

public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> {

The creator class must have a zero-argument constructor. If so, that constructor is placed into a map keyed by the parameterized class (here MockSubScanPOP.) The result: a map from physical operator (pop) definition to creator constructor.

Given this map, the fragment executor can easily find the constructor for the batch creator. From the constructor the executor gets the batch creator class and creates an instance. Finally, the executor calls:

  public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)

To get an instance of the scan batch (operator implementation) given the fragment context, operator definition and the list of previously-created children (upstream operators) for this operator. Since our focus here is on scanners, the child list will always be empty.

Given the batch created above, the fragment executor initializes the batch by calling the setup method. Operators that know their schema can set up the schema here. Otherwise, schema setup can wait until later.

Most scanners extend ScanBatch, but doing so is not required; it is simply a convenience. ScanBatch creates an operator-specific RecordReader subclass to handle actual reading, allowing the generic ScanBatch operator implementation to handle interfacing into the Drill operator hierarchy.

Runtime Events

The fragment now runs. The tree calls next() on the ScanBatch to return a batch of records. (By convention, the first batch should return just a schema.)

  • ScanBatch calls allocate() on the RecordReader to set up the vectors for the record batch.
  • Repeatedly calls the next() method on the RecordReader to read a batch of rows into the vectors allocated above.
  • Sets the row count on each of the value vectors.
  • Determines if the batch includes a schema different than the previous batch.
  • Passes along the proper status code to the caller.

Of course, some of these steps involve more than the summary suggests.

  • ScanBatch defines a Mutator class which must be used to build the set of value vectors. The Mutator takes a field schema in the form of a MaterializedField.
  • Uses the TypeHelper to convert from the field schema to a value vector instance.
  • Registers the value vector with the vector container associated with the ScanBatch.
  • Adds the value vector to the field vector map, indexed by field name (actually, a field path for nested structures.)

Comments on Current Design

A number of improvements are possible to the current design.

  • Each plugin should have a registration file that identifies the class name of the plugin. It is far cheaper to search for such files than to scan every class looking for those that extends some particular interface.
  • Each plugin should use annotations to provide static properties such as the plugin tag name (file, mock or whatever) instead of looking for the type of the first constructor argument as is done now.
  • Rather than searching for a known constructor, require that plugins have no constructor or a zero-argument constructor. In the plugin interface define a register method that does what the three-argument constructor currently does. This moves registration into the API rather than as a special non-obvious form of the constructor.
  • Each plugin configuration should name its storage plugin using the tag from the plugin definition.

The above greatly simplifies the storage plugin system:

  • Definition files name the storage plugin class.
  • The class (or definition entry) gives a tag to identify the plugin.
  • Given a definition file, Drill builds a (tag --> plugin) table.
  • Given a storage plugin definition, the plugin tag in the definition maps, via the above table, to the plugin itself.

Other changes:

  • Storage plugins should have extended lives. As it is, they are created multiple times for each query, and thus many times across queries. This makes it had for the plugin to hold onto state (such as a connection to an external system, cached data, etc.)
  • Clearly separate the planner-time (transient) objects and the persistent plan objects passed across the network. This will allow the planner-only objects to contain handles to internal state (something that now is impossible given the internal Jackson serializations.)
  • Avoid unnecessary copies of the planning-time objects by tightening up the API. (Provide columns once rather than twice, etc.)
Clone this wiki locally