-
Notifications
You must be signed in to change notification settings - Fork 1
Storage Plugin Model
A storage plugin provides access to a set of tables. In Hadoop-based systems, tables are implemented as files in the HDFS file system, so Drill's primary storage plugin is the FileSystemPlugin
. Since a file system stores many kinds of files, the file system plugin is associated with a collection of format plugins (see below). Drill supports any arbitrary storage plugin, including for systems other than file systems.
The storage plugin is a plan-time concept and is not directly available at run time (while fragments execute.) The storage plugin itself is meant to be a light-weight description of the external system and, as such, can be created and discarded frequently, even within the scope of a single query plan session.
A storage plugin itself is a "type": it provides system-specific behavior but typically provides the ability to define multiple storage system instances for a given storage plugin type. It is these instances that we see in the storage plugin section of the Drill UI: we often call these storage plugins, but they are really storage plugin configurations: the information needed to define a specific instance.
Every storage plugin must implement the StoragePlugin
interface, often by subclassing the AbstractStoragePlugin
class. Each storage plugin also defines a Jackson-serialized configuration object which extends StoragePluginConfig
: it is the JSON-serialized version of this object which we see in Drill's storage plugin web UI.
Drill provides a top-level namespace of storage plugins. The names here are not the storage plugin names (types) themselves, but rather the names associated with storage plugin configurations. (The schema name space also holds the names of schemas, which will be discussed later.) Then each storage plugin defines a name space for tables:
SELECT * FROM `myPlugin`.`myTable`
Here, myPlugin
is the name of a storage plugin configuration (that maps to a storage plugin via the type
field in the configuration), and myTable
is a table defined by that plugin instance.
This section discusses how Drill learns of the storage plugin and instantiates it and its related classes. Later sections will discuss the operations performed on it.
The key elements of a storage plugin include:
- The storage plugin configuration: the JSON format and the corresponding Jackson-serialized Java class.
- The storage plugin class.
- Planning nodes and physical operators.
- Execution time operator implementations.
A storage plugin starts with the plugin configuration and plugin class. Given the elements described below, Drill will find, load and configure the storage plugin, and you will see the default plugin configuration in Drill's web UI when you first start Drill. (Perhaps the configuration must be created manually if the plugin registration already exists?...)
Drill locates a storage plugin using the following process:
- Add a
drill-module.conf
file that Drill finds at start-up. - In the
drill-module.conf
file, add the Java package to Drill's class path scan. - Drill scans the class path looking for classes that derive from
StoragePlugin
.
Storage plugins are of two types: intrinsic or extensions. An intrinsic plugin is one (such as FileSystemPlugin
) defined within the Drill source tree. An extension lives outside of the Drill source tree, is packaged into a jar, and is placed in one of Drill's class path folders: often $DRILL_HOME/jars/3rdparty
or $DRILL_SITE/jars
.
Intrinsic plugins tend to put their configuration into the java-exec/src/main/resources/drill-module.conf
file which is packaged into the drill-java-exec-x.y.z.jar
file.
Extension plugins are packaged into a jar and that jar must contain a drill-module.conf
file in standard HOCON format. The bare minimum configuration file contents is to add the extension package to Drill's class path scan. Here is an example from the Kudu plugin:
drill: {
classpath.scanning: {
packages += "org.apache.drill.exec.store.kudu"
}
}
The file can, of course, add other configuration as desired.
The storage plugin class must implement StoragePlugin
(often via the AbstractStoragePlugin
class.) It is the implementation of this interface which marks the class as a storage plugin. Any class that implements StoragePlugin
is presumed to be one. To disable a class which is not really a plugin, create a configuration but mark the plugin as disabled in the configuration.
The class must implement a three-argument constructor:
public MockStorageEngine(MockStorageEngineConfig configuration,
DrillbitContext context, String name) {
The first argument is "magic". Drill uses Java introspection to find this constructor and find its first argument. The type of the first argument identifies the class of the configuration for this plugin. That is, it is this constructor that associates a storage plugin configuration (class) with the corresponding storage plugin class.
The plugin configuration class must
- Be Jackson serializable, and
- implement the
StoragePluginConfig
interface (often via theStoragePluginConfigBase
class.) - Implement the
equals()
andhash()
methods. Evidently, plugins are considered based on their content and Drill must sometimes determine if two plugin configurations are identical.
Somewhere on the class path a file must exist called bootstrap-storage-plugins.json
which contains at least one serialized form of the storage plugin configuration. Without such as class, the plugin is invisible to Drill. (That is, the plugin exists only via a configuration.) Again, intrinsic plugins use the file in Drill's java-exec, an extension must provide its own file.
At one point, each operator also needed a unique enum value in UserBit.proto, CoreOperatorType
, but this use is now deprecated. It is used primarily to identify the operator IDs used in the query profile.
Drill maintains a storage registry storage in Zookeeper (normally) or on disk (for an embedded Drillbit.) When Drill starts, it scans the class path for storage plugins as indicated above. Drill then reads either the stored plugin configurations or the bootstrap-storage-plugins
file if no stored configurations exist. Each plugin configuration is deserialized using Jackson. Jackson maps from the JSON to Java class form using the type
field. For the mock storage configuration:
"type" : "mock",
This must map to an annotation on the configuration class:
@JsonTypeName("mock")
public class MockStorageEngineConfig extends StoragePluginConfigBase {
Given this, Jackson can search the class path looking for the class with the proper annotation. Jackson then deserializes the JSON form to produce the storage plugin configuration instance. (This is, by the way, why Drill fails if a storage plugin implementation is no longer available: the Jackson deserialization causes a fatal error if the required class does not exist on the class path.)
Next, the configuration instance is mapped to the storage plugin class using a mapping from storage plugin configuration class to storage plugin class. Recall the special three-argument constructor mentioned above: that gives Drill the information to associate the two classes. (Actually, Drill maintains a table from storage plugin configuration class to storage plugin constructor so that Drill can create a new instance on each reference.)
Once configuration is complete, the the plugin is available to users to use in queries. The first step in each query is to for the planner to resolve the table name. We first describe the process, then describe the classes that the storage plugin must provide.
Drill looks for the schema name space portion of the table name: the myPlugin
in the earlier example. (The name may also be set using a USE
statement.)
Drill uses that name to look up the storage plugin configuration in the schema name space. The schema name space is formed from the storage plugin configurations, workspace configurations, and a few pre-defined items.
Drill uses the class of the storage plugin configuration as a key to locate the constructor for the storage plugin itself using a mapping from storage plugin configuration class to to storage plugin constructor as described above.
Drill creates an instance of the storage plugin to use for the query, passing the storage plugin instance an instance of the plugin configuration. Actually, Drill will create multiple instances as planning proceeds.
In addition to the schema defined by the storage plugin configuration, the storage plugin can define an additional schema. (Or, perhaps it must redefine the schema given by the configuration?) To do this, Drill calls the registerSchemas
method of the plugin. This method creates an instance of a schema which implements the Calcite Schema
interface and adds it to the schemas registered for this plugin. (The schemas are registered in Calcite, not in the plugin itself.)
Next, the planner must resolve the actual table name within the schema defined by the storage plugin configuration. (This seems redundant since we had to resolve the schema to get the storage plugin (configuration) in the first place...) Calcite calls the getTable()
method on the schema object to resolve the table name from the query into a Calcite Table
object, typically using the DynamicDrillTable
class which extends Table
. DynamicDrillTable
holds a list of Jackson-serializable objects which the plugin can retrieve later by deserializing the serialized form of the table data.
The planner now must move from a table definition to a scan operator (definition) for a scan of that table. Tables in Drill usually resolve to a directory of files, or a large file with distributed blocks. To handle this, the planner defines two kinds of scans:
- A group (or logical) scan that operates on the logical table referenced in the query.
- A group of sub (or physical) scans that read partitions (chunks, pages, blocks, groups) of the logical table.
The terminology is a bit confusing. A "group scan" is the logical scan of the whole table. A "sub-scan" is a scan of a partition. In some contexts, both the "group scan" and "sub scan" are both called physical scans. This is confusing because the logical scan is not a physical scan (the logical table is often just a concept, not an actual file...)
In any event, the first step is to get the group (logical) scan. The planner to resolves the table into a group scan plan (definition) by calling the getPhysicalScan
method in the plugin class, providing it with the user name (for security checks), a place to obtain the deserialized table hints created above, and a list of columns that the user selected from the table. (Though, strangely, for a SQL query, the list of columns is always just *
, even if the SELECT statement names specific columns.) The getPhysicalScan()
method returns a "group scan" which is a definition of the scan of the table as a whole. (Presumably the "group" refers to the fact that this object represents a group of scans...)
The actual list of SELECT
class columns is provided using a two-part process:
- First, the planner calls
canPushdownProjects
on the group scan, asking if the scan handles projection (can do anything with the list of columns.) - If the return value is
true
, the planner callsclone
, including the list of columns, to produce a new copy of the group scan operator that includes the column set. (The scan operator is not responsible to return just the projected columns.)
The clone operation may occur multiple times.
The planner calls getNewWithChildren
to make copy of the node (for reasons not yet clear.) For a scan, the provided child list will always be empty. Again, this may occur multiple times.
Once the group scan is negotiated, the next step is to negotiation the sub-scans. This starts with a call to getMaxParallelizationWidth
to discover the maximum number of scanners. The planner then determines the actual number and provides the scan Drillbits (decided upon by a process that needs research) by calling applyAssignments
with the Drillbit list.
Next, for each minor fragment (thread of execution), the planner calls getSpecificScan
with the minor fragment ID. The method returns the definition of the sub scan that will implement each of the actual physical scans for that minor fragment. Each scan operator may scan multiple table partitions if necessary to scan the entire table in the number of fragments available. (Again, it is bit vague at present how the number of fragments is calculated, and how that information is passed to the group scan.)
Planning starts by resolving the table name within the plugin. This involves three storage plugin classes:
- The storage plugin class implements
registerSchemas
to register its schema(s).
- The schema class implements
getTable
to return an extension-specific representation of the table. - The table class decorates a Drill subclass of
Table
to hold extension-specific information about the table.
Because the planner serializes, then deserializes, the table information within the planning process, the extension cannot simply add fields to the Table
class. Instead, the extension creates a Jackson-serializable class to hold the information. A list of these objects (usually just one) is attached to the DynamicDrillTable
.
Planning then moves onto the scan.
The planner serializes the table information, then calls getPhysicalScan
on the storage plugin to get a group scan operator. The getPhysicalScan
implementation uses Jackson to deserialize the JSON table data, recovering the information provided in the getTable
method above.
The planner then uses that table information to create an extension-specific, Jackson-serializable extension of the AbstractGroupScan
class. This class may hold the same table information from above, or may hold additional (or different) scan-specific information.
As noted above, the planner calls a number of methods on the group scan to negotiate aspects of the scan. The group scan can accumulate information as work proceeds and (so it seems) that information need not be stored in Jackson-serializable form if used just within the planner. The state information must be copied, however, in the various calls to clone
or getWithChildren
.
The planner finally calls getSpecificScan
to get a sub scan. This method returns another extension-specific class, this one derives from AbstractSubScan
and is also Jackson-serializable. Since the sub scan is serialized and sent to each Drillbit, all of its state must be serializable. Think of this class as the description of the subscan sent from Foreman to execution Drillbit, leaving all other intermediate state behind.
Scans are created at runtime as follows:
- The physical plan contains the "sub-scan" physical operator (definition) created above.
- The
ImplCreator
class uses the operator name in the sub-scan to look up the corresponding operator implementation (record batch) in using theOperatorCreatorRegistry
associated with the Drillbit. - The batch creator class is associated with each operator (implementation) and is responsible for converting the physical operator (definition) into an record batch (operator implementation.)
- Scans appear to create a
RecordReader
subclass to handle actual reading, and pair this with aScanBatch
operator implementation to handle interfacing into the Drill operator hierarchy. -
ScanBatch
callssetup
on theRecordReader
to do initial setup. Operators that know their schema can set up the schema here. Otherwise, schema setup can wait until later.
The fragment now runs. The tree calls next()
on the ScanBatch
to return a batch of records. (By convention, the first batch should return just a schema.)
-
ScanBatch
callsallocate()
on theRecordReader
to set up the vectors for the record batch. - Repeatedly calls the
next()
method on theRecordReader
to read a batch of rows into the vectors allocated above. - Sets the row count on each of the value vectors.
- Determines if the batch includes a schema different than the previous batch.
- Passes along the proper status code to the caller.
Of course, some of these steps involve more than the summary suggests.
-
ScanBatch
defines aMutator
class which must be used to build the set of value vectors. TheMutator
takes a field schema in the form of aMaterializedField
. - Uses the
TypeHelper
to convert from the field schema to a value vector instance. - Registers the value vector with the vector container associated with the
ScanBatch
. - Adds the value vector to the field vector map, indexed by field name (actually, a field path for nested structures.)
A number of improvements are possible to the current design.
- Each plugin should have a registration file that identifies the class name of the plugin. It is far cheaper to search for such files than to scan every class looking for those that extends some particular interface.
- Each plugin should use annotations to provide static properties such as the plugin tag name (
file
,mock
or whatever) instead of looking for the type of the first constructor argument as is done now. - Rather than searching for a known constructor, require that plugins have no constructor or a zero-argument constructor. In the plugin interface define a
register
method that does what the three-argument constructor currently does. This moves registration into the API rather than as a special non-obvious form of the constructor. - Each plugin configuration should name its storage plugin using the tag from the plugin definition.
The above greatly simplifies the storage plugin system:
- Definition files name the storage plugin class.
- The class (or definition entry) gives a tag to identify the plugin.
- Given a definition file, Drill builds a (tag --> plugin) table.
- Given a storage plugin definition, the plugin tag in the definition maps, via the above table, to the plugin itself.
Other changes:
- Storage plugins should have extended lives. As it is, they are created multiple times for each query, and thus many times across queries. This makes it had for the plugin to hold onto state (such as a connection to an external system, cached data, etc.)
- Clearly separate the planner-time (transient) objects and the persistent plan objects passed across the network. This will allow the planner-only objects to contain handles to internal state (something that now is impossible given the internal Jackson serializations.)
- Avoid unnecessary copies of the planning-time objects by tightening up the API. (Provide columns once rather than twice, etc.)