-
Notifications
You must be signed in to change notification settings - Fork 980
Operator Error Handling
Many types of errors can occur in a big data system such as Drill: missing files, network problems, out-of-memory, bad user-provided code, math errors, and on and on. Drill is a high-volume server so a failure within one query must not impact the server as a whole: a failed query must shut down gracefully, releasing all its resources, even if rather severe errors occurred.
Drill handles a variety of error conditions:
- Resource errors: out-of-memory, I/O exceptions and so on.
- Schema change exceptions: when code is not designed to handle schema changes.
- Invariant violations: when one part of Drill detects that some other part of Drill has malfunctioned (that is, wrong code.)
- Cascade errors: errors that result when handling a primary error. For example, if a file write runs out of disk space, a close of that file will also fail (since data cannot be flushed.)
While handling of these errors follows some general patterns, much variation exists.
Let's start the error discussion by exploring the types of error handling observed in Drill operator code.
- Checked exceptions: any Java exception that derives from
Exception
that requires a method declaration. Since the protocolnext()
method does not declare athrows
clause, checked exceptions cannot propagate outside of eachnext()
method. Checked exceptions are turned into one of the conditions discussed below. - Unchecked exceptions: any Java exception that drives from
RuntimeException
(or directly fromThrowable
) but which does not require athrows
declaration. Such exceptions can be thrown fromnext()
. -
UserException
s: Drill's primary unchecked exception thrown in response to serious (but expected) error conditions such as out-of-memory, disk I/O errors and so on. Here "user exception" seems to mean "exception to be reported to the user" instead of "exception caused by the user." Some checked exceptions are converted toUserException
and thrown up the call stack. - Killing the Fragment from within: some operators respond to error conditions by killing the fragment as explained below. It is not clear when a fragment should use this technique vs. the
UserException
approach. - Killing the Fragment from without: The Drill bit can kill the fragment thread in response to a variety of conditions. Here it is necessary to terminate a running thread as discussed below.
- Logging the error and continuing. Often occurs when the error is recoverable.
Because next()
declares no throws
clause, the operator must handle all (checked) errors within the bounds of the next()
method. This seems like a nice, clean protocol, but it raises the question: how should the operator handle such errors if they can't be thrown upward? As indicated above, there are three choices.
First, the operator often uses the internal kill protocol as explained below. Second, the operator may translate the exception into an unchecked exception such as UserException
or RuntimeException
. Third, the operator can elect to ignore the exception. From the external sort:
} catch (UnsupportedOperationException e) {
throw new RuntimeException(e);
}
...
} catch (Throwable e) {
throw UserException. ... .build(logger);
...
} catch (UnsupportedOperationException e) {
estimatedRecordSize += 50;
}
...
} catch (IOException e) {
// since this is meant to be used in a batches's spilling, we don't propagate the exception
logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", e);
}
Clearly, the ignore strategy should be used sparingly. One place that the ignore strategy must be used is when errors occur when releasing resources:
try {
AutoCloseables.close(e, newGroup);
} catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
The operator code appears to be inconsistent in how it handles unchecked exceptions. In theory, the primary protocol would be to throw the exception up the stack, allowing the fragment executor to terminate the query and clean up resources. However, it appears that code uses a variety of exceptions, and translates between them:
-
UserException
: Appears to be the primary means to communicate failures to users: it has quite a bit of functionality to log errors, format user messages and so on. -
RuntimeException
: Java's root unchecked exception which seems to be thrown as a last result when "something is wrong." -
OutOfMemoryException
: Thrown when Drill exhausts its direct memory. (This exception is separate from Java's ownOutOfMemoryError
.)
All unchecked exceptions bubble up to the fragment executor which cleans up the fragment:
while (shouldContinue() && root.next()) {
// loop
}
...
} catch (OutOfMemoryError | OutOfMemoryException e) {
...
} catch (AssertionError | Exception e) {
fail(e);
} finally {
...
// here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
cleanup(FragmentState.FINISHED);
While throwing an unchecked exception is a very common way to report errors, other code takes a different approach (which seems to lead to the same result): internal fragment termination. This requires the following lines of code in next()
(or the methods that next()
calls):
} catch(SomeException ex) {
kill(false);
context.fail(ex);
return IterOutcome.STOP;
Here, the call to kill()
propagates the call to the input operator(s). The flag supposedly says whether to propagate the kill event downward, but some (many?) operators ignore the flag. The idea here seems to be to tell operators down the tree that something has failed. The problem (observed in a bug), is that this does not work for branches that occur off of a parent of the current operator.
The call to context.fail()
informs the fragment context (which manages the fragment as a whole) that the fragment has failed. The call may alert listeners of the failure. This call is only a state change; it does not propagate errors to other operators.
Finally, the STOP
return value tell the parent operator that an error has occurred. The parent should return STOP
to its parent, and so on.
Note that the internal termination does virtually the same work as throwing an unchecked exception. While the unchecked exception propagates up the stack automatically and toggles failure at the fragment executor, the internal termination does the failure toggle in each and every operator, and requires each operator to correctly include code that handles STOP
.
Other threads may choose to terminate the operator tree. For example, a fragment for the same query on a different node might fail, causing the Foreman to terminate this operator tree.
Cancellation is done in four steps:
- An external thread calls
cancel()
on the fragment executor. - The fragment transitions to the
CANCELLATION_REQUESTED
state. - The fragment thread is interrupted with
Thread.interrupt()
. - The fragment thread processes the resulting
ThreadInterrupted
exception using one of the methods listed above.
Although AbstractRecordBatch.kill()
informs operators (from the current one down) that the query failed, this message does not propagate up to parent operators, or down to operators on parallel subtrees. For this, an alternative means is needed to know that the query has terminated. This is the FragmentContext..shouldContinue()
method that detects if the fragment (as a whole) has been put into a termination condition. Code frequently checks this state:
public final IterOutcome next(final int inputIndex, final RecordBatch b){
...
try{
if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
next = b.next();
Problems occur when some operator fails to check this case: it may attempt to run even during the clean-up phase.
Once an error occurs, Drill starts a process to clean up the operator tree. Much of this clean-up is common in both the success and failure case. The key difference is that, in the success case, operator generally know their internal state: all batches have been processed and other activities brought to a natural end. In the error case, however, operators can be in virtually any of their valid internal states, and may even be in an invalid state (due to the failure of some state transition such as partial allocation of memory.) Clean-up is made complex by the need to handle all such cases.
As noted earlier, operators do not perform cleanup on either the normal end-of-data (DONE
status) or error (STOP
status). Instead, they defer clean-up until the fragment executor invokes the close()
method on the operator. This occurs as follows:
- The fragment executor calls the root operator's
next()
method until execution should end. Execution ends: ** When the root fragment operator returns a terminating status (DONE
orSTOP
). ** When the root fragmentnext()
throws an (unchecked) exception. ** When some other operator has calledfail()
to put the fragment into the failed state. - The fragment is put into a completed state (normal case) or failed state.
** If a child operator called
fail()
, the fragment transitioned to the failed state at that time. ** If the root operator exited normally, the fragment executor moves the thread to the normal completion state. ** If the root operator throws an exception, then an exception handler marks the fragment as failed. -
.closeOutResources()
closes the rootRootExec
and the fragment context. - The
RootExec
iterates over all operators in the tree, callingclose()
from the top down, and surpassing any exceptions thrown. - Each operator releases its own resources, including those that may be in an unexpected state due to an error condition.
The confusing bit is that some operators attempt to close their inputs. Is this a vestige of an earlier design in which each operator was tasked with closing its inputs?