-
Notifications
You must be signed in to change notification settings - Fork 1
BH Scan Framework
As has been explained earlier, the goal of the batch size project is to implement controls over batch sizes. The readers turned out to be the largest source of batch size variation for a very simple reason: readers typically have no way of knowing the size of incoming data until it is read. Because of this, readers typically limited the batches based on row count: sometimes 1K, sometimes 4000, sometimes 4K, sometimes 64K. But, are row widths really well enough behaved that, say 4K rows will always fall within the desired batch size range? Or, are rows so variable that 4K rows can range in size rom 16K (integers) to 4 GB (blog posts)? Experience suggests that, in fact, row widths are highly variable, and so we need a mechanism (the result set loader) to track incoming data size and decide when to end one batch and start the next.
Our goal, then, became to rip out the code that readers now use to write to vectors (in all its wondrous variation) and replace that code with the use of the result set loader. Looking into this revealed that not only is vector writing different for each reader, so is projection. Hence the revised projection framework just discussed.
With the result set loader and projection framework in hand, we then need a structure that uses them. Rather than try to fit this structure into the existing readers and scan operators, the simpler and more robust path is to devise a new scan framework that hosts the new services, and simply preserve the source-specific bits of each reader, ported to the new framework.
The goal of this section is to describe the revised scan framework. Later sections will describe how the porting was done for the CSV and JSON readers.
After reviewing the existing set of readers and the existing scan operator, we identified the following requirements:
- Common mechanism for writing to vectors (the result set loader)
- Common mechanism for projection (the projection framework)
- Ability to run unit tests without the full Drill server (the operator framework)
- Scan is used for many kinds of readers. Factor out the common parts, and allow other parts to be build up via composition of reader-specific bits.
The result of the requirements is to refactor the scan operator into a set of modules:
- The scan operator itself which handles nothing other than managing a set of readers.
- The batch reader, which negotiates schema and reads rows.
- The task-specific reader framework which handles the tasks unique to, say, files vs. databases.
We will discuss each component. To avoid being lost in the trees, please keep the above forest in mind.
Let's look at the lifecycle of the scan operator from a number of perspectives.
The new scan operator works within the new Operator Framework by implementing the OperatorExec
interface as follow:
- Constructor: Takes a
ScanOperatorEvents
which provides task-specific callbacks for each scan event. -
buildSchema()
: Loads the first reader, obtains the schema from that reader (see below) and returns an empty batch with that schema. -
next()
: Reads a batch of data, advancing through readers as needed. -
close()
: Closes the current reader, if any and frees resources.
Under the new scan framework, the reader lifecycle is precisely defined by the RowBatchReader
interface:
- Reader is created. This can be done up-front (as is done today) or incrementally.
-
open()
: Open the reader. This can succeed (returntrue
), fail softly (returnfalse
), or fail hard (throw an exception.) A soft failure might be opening a file that no longer exists and so should be ignored. A hard failure might be a disk I/O error. -
next()
: Read a batch of data. This can return data or not. It can also indicate, if data is returned, that this is known to be the last batch. -
schemaVersion()
: Returns an increasing number indicating the schema version. A change in the number indicates that the reader changed the schema. -
close()
: Release resources.
This is the basic flow: it makes no assumptions about how a batch is created or how schema is managed. Those details are handled by an application-specific scan framework as described below.
The reader lifecycle is complex: errors can happen at any time. Once we discuss the result set loader, we'll see a very specific set of steps required. In fact, the reader lifecycle is so complex that it is factored out of the scan operator into its own class: ReaderState
.
The core scan framework classes know nothing about schemas, projection and the like. Rather, they simply provide lifecycle management. The next level of functionality resides in a scan framework. The idea is that files need different services than, say, JDBC or Kafka. The scan framework provides the required services, building on a basic platform that includes the result set loader and projection mechanisms. This also leaves open the possibility that, perhaps, there may be some reader for which these mechanisms are not needed: the core scan framework allows completely new implementations. But, for now, we'll focus on the one used for files.
A framework is an implementation of ScanOperatorEvents
that provides, essentially, a single operation:
-
nextReader()
: Return the next reader to run.
All of the complexity resides in the way that we create and manage that reader.
The bulk of the framework resides in AbstractScanFramework<T extends SchemaNegotiator>
. Notice the parameter: it is how a reader coordinates with the framework.
In this framework, readers extend ManagedReader<T extends SchemaNegotiator>
:
-
open(T negotiator)
: The reader opens its data source, whatever it is. The reader determines if it is "early" or "late" schema. Let's say it is "early" schema, such as Parquet. The reader determines the table schema and reports it to the framework using theSchemaNegotiator
, receiving a result set loader in response, which the reader uses to load data. -
next()
: Using the result set loader obtained inopen()
read a batch of rows, or report EOF.
Here is the full interface:
public interface ManagedReader<T extends SchemaNegotiator> {
boolean open(T negotiator);
boolean next();
void close();
}
Interfaces don't get much simpler! (Much effort went into making this simple so that it becomes very easy for community contributors to create new readers.)
The system provides different schema negotiators for different tasks. Here is the one for a file-based reader:
public interface SchemaNegotiator {
OperatorContext context();
void setTableSchema(TupleMetadata schema);
void setBatchSize(int maxRecordsPerBatch);
ResultSetLoader build();
boolean isProjectionEmpty();
}
The idea is that the schema negotiator provides all the information that the reader needs. This way, we pass a single value to the reader (rather than a zillon parameters) and the reader can pull out items of interest. The same mechanism allows the reader to specify a schema (if it has one), and to obtain a result set loader "primed" with the table schema and set of projected columns.
The isProjectionEmpty()
handles the special case of a SELECT COUNT(*)
query: it tells the reader it can skip reading any data if it can implement a fast path to provide just the row count.
The scan framework was specifically designed to simplify the task of developing new readers. (The ability to extend Drill with new readers is one of Drill's unique differentiators, so we should make this task as easy as possible.) Here is what a developer would need to do.
First, using the "Easy" plugin, define a method that instantiates the custom reader following a process we'll cover in the Easy plugin section. Basically, implement a class that will create each reader as it is needed, passing in information such as the Drill file system, file name, and so on.
Next, define the reader class as implementing the ManagedReader
. In that class, implement the open()
method to open the underlying data source, however that needs to be done. If the data source can provide a schema, pass it to the schema negotiator. Ask the schema negotiator for a result set loader, which is retained by the reader.
In the next()
method, read rows until either the result set loader says the batch is full, or the data source reaches EOF. If the reader must read all columns (as in a text file reader), just write all the columns to the result set loader; it will figure out which are projected.
If the reader is more complex (such as JSON or Parquet), then the reader may have a way of working with projected columns differently than non-projected columns. In this case, obtain a column writer only for the projected columns.
In the close()
method, release resources.
That's it. No projection. No mucking about with vectors. Just clean focus on the job of reading data and passing it to the result set loader.
This is best illustrated by looking at unit tests. See, for example TestFileScanFramework
which defines a variety of record batch readers in order to test the scan framework.
Let's now examine the internals that comprise the scan framework.
We've already discussed the scan operator: ScanOperatorExec
. It has been reduced to a state machine that steps through a set of readers, managing each. We've discussed the classes that assist with this task.
The scan operator delegates application-specific tasks to an instance of the ScanOperatorEvents
interface.
The scan framework
provides concrete implementation: AbstractScanFramework
. This class acts as a bridge to the projection and result set loader mechanism via a reference to the ScanSchemaOrchestrator
class. The abstract class also provides a set of configuration options such as:
- The default type for null columns
- Whether to mimic the Drill 1.12 placement of partition columns, or to follow Drill 1.11 practice and put them at the end of the row.
- Specify batch sizes (row count, byte size).
- Specify the projection from the physical plan.
The ManagedReader
interface provides an alternative reader interface that is aware of the result set loader via the SchemaNegotiator
described above.
Various implementation classes glue the pieces together. Of these, the ShimBatchReader
is of interest. It implements the scan operator's reader interface to wrap the managed reader interface. This seems a bit odd, but it allows the scan operator to remain simple, while allowing the managed reader to offer the services we've discussed.
Most readers read from files. The file
framework provides file-related services, such as the Drill file system, the "FileWork
" object that describes the file block to scan, and so on.
The columns
framework extends the files framework to handle the columns[]
column unique to the text (CSV) reader. The semantics of columns[]
turn out to be surprisingly complex. For example:
- A query can contain either references to the
columns
column, or to named columns, but not both. - A query can contain multiple references to columns indexes:
columns[2], columns[4]
, but not multiple references to the top-levelcolumns
column. - If a query references
columns
, but the CSV has headers, ignore the headers. - If a query contains the wildcard, then the wildcard is treated as
columns
for files without headers, but the full list of headers for files with columns. - The type of
columns
is always repeated VarChar.
Since these rules are really independent of any particular reader, they are implemented in a framework. (This means that a community user could, say, use the columns framework to read a custom log file format.)
Multiple references in Drill JIRAs and private conversations exist to a "fast schema path." Presumably the idea is that each operator must return its schema on the first call to next()
so that Drill can provide the schema to the client. Since Drill has evolved to not write down design decisions, it has been hard to identify if this is, or is not, the current design.
The scan framework assumes that fast schema is desired. As a result, the scan operator provides special handling to ensure that the first batch returns schema, but no data.
- If the first reader plays the "fast schema" game, then it will return an empty batch with only schema. This is passed downstream.
- If the same reader happens to be the second one in the scan operator, then the empty batch is simply discarded.
- If the first reader returns a batch with data, then the scan operator sets the data aside and creates an empty batch with only schema.
Unit tests show that the mechanism works as designed.
However, after updating the CSV reader, several system tests failed. It turned out that those tests were anticipating that the test query would return just one batch (with data and schema.) It was at this point that we began to wonder if Drill did, actually, implement a fast schema path. It may be that that work was done only for a LIMIT 0
query.
In any event, the full functionality exists and the system tests were updated where necessary to account for the schema-only batch. Still, if the "fast schema" is not required, it would be easy to add a setting for the scan operator, normally off, that skips the "fast schema" work.
Jinfeng recently did work on empty batches. A large discussion at the time centered around what to do with readers that return no data. Should the resulting empty batch cause a schema change (and thus fail) the query? A strong case can be made. If a JSON file is empty, then it has no schema, and the correct result is to combine an empty schema with a non-empty one and fail the query.
Another argument was that empty batches should be ignored. An empty JSON file has no data, and so no data should not cause the query to fail.
Still another argument came from the JDBC case. JDBC can provide a schema (as can Parquet.) If one fragment receives a schema of (a, b, c) with no data, but other fragments receive a schema of (a, b) with data, shouldn't we forward the empty batch to the rest of the query, and fail the query with a schema change, so that the user knows he was trying to work with multiple different schemas?
There is no good answer. Well, there is a good answer, but it requires the user to specify their intention by providing a schema that the query should use.
In the mean time, the revised scan operator implements the following rules:
- If the
open()
call for a reader fails, the scan (and thus the query) fails. - If the
open()
call for a reader returnsfalse
, the scan operator interprets this as "I've got nothing for you, Move along. Nothing to see here" and silently skips that reader. - If the
open()
call for a reader returnstrue
, then the scan operator will consider whether the reader provided an "early schema" when asking for the first batch of data. - If the first call to
next()
returns EOF, then the scanner will hold onto the current schema (if there is one) or will silently skip the reader (if no schema.) - If the first call to
next()
indicates it has data, then the scan operator uses the reader's schema as the "fast schema" schema for the scan operator, if the is is the first reader. - If the scan operator ends up skipping all readers, then the schema for the scan operator is that of the first reader that had a schema. The scan operator returns
OK_NEW_SCHEMA
with that schema, followed byNONE
. - Else, the scan operator skips all readers, and found no schema, then the scan operator returns
NONE
(Jinfeng's "fast NONE.") - Else, if at least one reader had data, that is returned as usual (
ON_NEW_SCHEMA
,OK
*,NONE
.)
The above logic is complex, but does seem to be the best compromise available until Drill has an actual schema available.
The above discussion alluded to schema handling, at least for the "fast schema" path. In the general case, the scan operator uses new mechanisms to track schema changes.
- Within each reader, the result set loader tracks schema changes by providing a schema version after each "harvest." An increased schema version number is an internal indication of a schema change.
- The scan operator, as discussed earlier, attempts to perform "schema smoothing" to avoid gratuitous schema changes. (An example discussed earlier is retaining the type of a previous column when a later reader omits that column.)
- The operator framework tracks the schema of each new output batch by comparing schemas. This mechanism detects schema changes across readers.
- The operator framework translates schema changes between output batches into the Drill iterator framework
OK
andOK_NEW_SHEMA
statuses.
Again, the above is complex because Drill claims to be schema-free (on input), but has a rigid schema internally, and has no specified design or rules for handling schemas. Providing the user with an ability to specify a schema would (as has been said several times) go a long way towards simplifying and rationalizing the above mechanisms.
The scan framework works and satisfies its requirements. However, it feels just a bit clunky. There is an opportunity to streamline it as we gain experience.
And, as has been mentioned over and over, the ability for a user to specify a schema will go a long way to eliminating the confusion and complexity about how Drill should handle schema changes.
Finally, having a written design for how things work will avoid the oscillations and confusions that arise in projects with no written records. (Writing has been generally acknowledged as a much more powerful way to pass information from one generation to the next than older oral traditions.)