-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic error handling for yaml. #27145
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1561,7 +1561,8 @@ def with_exception_handling( | |
use_subprocess, | ||
threshold, | ||
threshold_windowing, | ||
timeout) | ||
timeout, | ||
self.infer_output_type) | ||
|
||
def default_type_hints(self): | ||
return self.fn.get_type_hints() | ||
|
@@ -2154,7 +2155,8 @@ def __init__( | |
use_subprocess, | ||
threshold, | ||
threshold_windowing, | ||
timeout): | ||
timeout, | ||
original_infer_element_type): | ||
if partial and use_subprocess: | ||
raise ValueError('partial and use_subprocess are mutually incompatible.') | ||
self._fn = fn | ||
|
@@ -2168,6 +2170,7 @@ def __init__( | |
self._threshold = threshold | ||
self._threshold_windowing = threshold_windowing | ||
self._timeout = timeout | ||
self._original_infer_element_type = original_infer_element_type | ||
|
||
def expand(self, pcoll): | ||
if self._use_subprocess: | ||
|
@@ -2182,6 +2185,9 @@ def expand(self, pcoll): | |
*self._args, | ||
**self._kwargs).with_outputs( | ||
self._dead_letter_tag, main=self._main_tag, allow_unknown_tags=True) | ||
#TODO(BEAM-18957): Fix when type inference supports tagged outputs. | ||
result[self._main_tag].element_type = self._original_infer_element_type( | ||
pcoll.element_type) | ||
|
||
if self._threshold < 1.0: | ||
|
||
|
@@ -2244,6 +2250,82 @@ def process(self, *args, **kwargs): | |
traceback.format_exception(*sys.exc_info())))) | ||
|
||
|
||
# Idea adapted from Asguard. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean "asgarde" (https://github.com/tosun-si/asgarde)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Fixed (added reference). |
||
# TODO(robertwb): Consider how this could fit into the public API. | ||
# TODO(robertwb): Generalize to all PValue types. | ||
class _PValueWithErrors(object): | ||
"""This wraps a PCollection such that transforms can be chained in a linear | ||
manner while still accumulating any errors.""" | ||
def __init__(self, pcoll, exception_handling_args, upstream_errors=()): | ||
self._pcoll = pcoll | ||
self._exception_handling_args = exception_handling_args | ||
self._upstream_errors = upstream_errors | ||
|
||
def main_output_tag(self): | ||
return self._exception_handling_args.get('main_tag', 'good') | ||
|
||
def error_output_tag(self): | ||
return self._exception_handling_args.get('dead_letter_tag', 'bad') | ||
|
||
def __or__(self, transform): | ||
return self.apply(transform) | ||
|
||
def apply(self, transform): | ||
result = self._pcoll | transform.with_exception_handling( | ||
**self._exception_handling_args) | ||
from apache_beam.typehints import typehints | ||
if result[self.main_output_tag()].element_type == typehints.Any: | ||
result[self.main_output_tag()].element_type = transform.infer_output_type( | ||
self._pcoll.element_type) | ||
# TODO(BEAM-18957): Add support for tagged type hints. | ||
result[self.error_output_tag()].element_type = typehints.Any | ||
return _PValueWithErrors( | ||
result[self.main_output_tag()], | ||
self._exception_handling_args, | ||
self._upstream_errors + (result[self.error_output_tag()], )) | ||
|
||
def accumulated_errors(self): | ||
if len(self._upstream_errors) == 1: | ||
return self._upstream_errors[0] | ||
else: | ||
return self._upstream_errors | Flatten() | ||
Comment on lines
+2283
to
+2287
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need to separately handle a case when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, but that might have issues with streaming pipeline upgrades. Keeping as is for now as not all runners handle one-element flattens optimally. |
||
|
||
def as_result(self, error_post_processing=None): | ||
return { | ||
self.main_output_tag(): self._pcoll, | ||
self.error_output_tag(): self.accumulated_errors() | ||
if error_post_processing is None else self.accumulated_errors() | ||
| error_post_processing, | ||
} | ||
|
||
|
||
class _MaybePValueWithErrors(object): | ||
"""This is like _PValueWithErrors, but only wraps values if | ||
exception_handling_args is non-trivial. It is useful for handling | ||
error-catching and non-error-catching code in a uniform manner. | ||
""" | ||
def __init__(self, pvalue, exception_handling_args=None): | ||
if isinstance(pvalue, _PValueWithErrors): | ||
assert exception_handling_args is None | ||
self._pvalue = pvalue | ||
elif exception_handling_args is None: | ||
self._pvalue = pvalue | ||
else: | ||
self._pvalue = _PValueWithErrors(pvalue, exception_handling_args) | ||
|
||
def __or__(self, transform): | ||
return self.apply(transform) | ||
|
||
def apply(self, transform): | ||
return _MaybePValueWithErrors(self._pvalue | transform) | ||
|
||
def as_result(self, error_post_processing=None): | ||
if isinstance(self._pvalue, _PValueWithErrors): | ||
return self._pvalue.as_result(error_post_processing) | ||
else: | ||
return self._pvalue | ||
|
||
|
||
class _SubprocessDoFn(DoFn): | ||
"""Process method run in a subprocess, turning hard crashes into exceptions. | ||
""" | ||
|
@@ -3232,14 +3314,21 @@ def __init__( | |
_expr_to_callable(expr, ix)) for (ix, expr) in enumerate(args) | ||
] + [(name, _expr_to_callable(expr, name)) | ||
for (name, expr) in kwargs.items()] | ||
self._exception_handling_args = None | ||
|
||
def with_exception_handling(self, **kwargs): | ||
self._exception_handling_args = kwargs | ||
return self | ||
|
||
def default_label(self): | ||
return 'ToRows(%s)' % ', '.join(name for name, _ in self._fields) | ||
|
||
def expand(self, pcoll): | ||
return pcoll | Map( | ||
lambda x: pvalue.Row(**{name: expr(x) | ||
for name, expr in self._fields})) | ||
return ( | ||
_MaybePValueWithErrors(pcoll, self._exception_handling_args) | Map( | ||
lambda x: pvalue.Row( | ||
**{name: expr(x) | ||
for name, expr in self._fields}))).as_result() | ||
|
||
def infer_output_type(self, input_type): | ||
return row_type.RowTypeConstraint.from_fields([ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it be better to avoid adding an additional
_original_infer_element_type
parameter to the_ExceptionHandlingWrapper
constructor, if it's only called from within this function?Then we could have here
Advantages of my suggestion:
Disadvantage:
_ExceptionHandlingWrapper
with anfn
that does not implement theinfer_output_type()
function, we'd need to go back to this implementation. But should we optimize for the future?What's your opinion on that? Do you see other benefits of your solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could do that.
infer_output_type
is overridden by WindowInto, but an implementation is easily enough added to the implementing DoFn itself.