Skip to content

Commit

Permalink
Failure converter (#185)
Browse files Browse the repository at this point in the history
Fixes #142
Fixes #131
Fixes #139
  • Loading branch information
cretz authored Nov 7, 2022
1 parent a75256e commit 929dc81
Show file tree
Hide file tree
Showing 17 changed files with 596 additions and 416 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ other_ns_client = Client(**config)

Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
`temporalio.converter.DataConverter` can be set via the `data_converter` client parameter. Data converters are a
combination of payload converters and payload codecs. The former converts Python values to/from serialized bytes, and
the latter converts bytes to bytes (e.g. for compression or encryption).
combination of payload converters, payload codecs, and failure converters. Payload converters convert Python values
to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption). Failure converters
convert exceptions to/from serialized failures.

The default data converter supports converting multiple types including:

Expand Down
42 changes: 16 additions & 26 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,22 +219,20 @@ async def decode_activation(
await _decode_payloads(job.query_workflow.arguments, codec)
elif job.HasField("resolve_activity"):
if job.resolve_activity.result.HasField("cancelled"):
await temporalio.exceptions.decode_failure(
job.resolve_activity.result.cancelled.failure, codec
await codec.decode_failure(
job.resolve_activity.result.cancelled.failure
)
elif job.resolve_activity.result.HasField("completed"):
if job.resolve_activity.result.completed.HasField("result"):
await _decode_payload(
job.resolve_activity.result.completed.result, codec
)
elif job.resolve_activity.result.HasField("failed"):
await temporalio.exceptions.decode_failure(
job.resolve_activity.result.failed.failure, codec
)
await codec.decode_failure(job.resolve_activity.result.failed.failure)
elif job.HasField("resolve_child_workflow_execution"):
if job.resolve_child_workflow_execution.result.HasField("cancelled"):
await temporalio.exceptions.decode_failure(
job.resolve_child_workflow_execution.result.cancelled.failure, codec
await codec.decode_failure(
job.resolve_child_workflow_execution.result.cancelled.failure
)
elif job.resolve_child_workflow_execution.result.HasField(
"completed"
Expand All @@ -245,32 +243,28 @@ async def decode_activation(
job.resolve_child_workflow_execution.result.completed.result, codec
)
elif job.resolve_child_workflow_execution.result.HasField("failed"):
await temporalio.exceptions.decode_failure(
job.resolve_child_workflow_execution.result.failed.failure, codec
await codec.decode_failure(
job.resolve_child_workflow_execution.result.failed.failure
)
elif job.HasField("resolve_child_workflow_execution_start"):
if job.resolve_child_workflow_execution_start.HasField("cancelled"):
await temporalio.exceptions.decode_failure(
job.resolve_child_workflow_execution_start.cancelled.failure, codec
await codec.decode_failure(
job.resolve_child_workflow_execution_start.cancelled.failure
)
elif job.HasField("resolve_request_cancel_external_workflow"):
if job.resolve_request_cancel_external_workflow.HasField("failure"):
await temporalio.exceptions.decode_failure(
job.resolve_request_cancel_external_workflow.failure, codec
await codec.decode_failure(
job.resolve_request_cancel_external_workflow.failure
)
elif job.HasField("resolve_signal_external_workflow"):
if job.resolve_signal_external_workflow.HasField("failure"):
await temporalio.exceptions.decode_failure(
job.resolve_signal_external_workflow.failure, codec
)
await codec.decode_failure(job.resolve_signal_external_workflow.failure)
elif job.HasField("signal_workflow"):
await _decode_payloads(job.signal_workflow.input, codec)
elif job.HasField("start_workflow"):
await _decode_payloads(job.start_workflow.arguments, codec)
if job.start_workflow.HasField("continued_failure"):
await temporalio.exceptions.decode_failure(
job.start_workflow.continued_failure, codec
)
await codec.decode_failure(job.start_workflow.continued_failure)
for val in job.start_workflow.memo.fields.values():
# This uses API payload not bridge payload
new_payload = (await codec.decode([val]))[0]
Expand All @@ -285,7 +279,7 @@ async def encode_completion(
) -> None:
"""Recursively encode the given completion with the codec."""
if comp.HasField("failed"):
await temporalio.exceptions.encode_failure(comp.failed.failure, codec)
await codec.encode_failure(comp.failed.failure)
elif comp.HasField("successful"):
for command in comp.successful.commands:
if command.HasField("complete_workflow_execution"):
Expand All @@ -300,14 +294,10 @@ async def encode_completion(
for val in command.continue_as_new_workflow_execution.memo.values():
await _encode_payload(val, codec)
elif command.HasField("fail_workflow_execution"):
await temporalio.exceptions.encode_failure(
command.fail_workflow_execution.failure, codec
)
await codec.encode_failure(command.fail_workflow_execution.failure)
elif command.HasField("respond_to_query"):
if command.respond_to_query.HasField("failed"):
await temporalio.exceptions.encode_failure(
command.respond_to_query.failed, codec
)
await codec.encode_failure(command.respond_to_query.failed)
elif command.respond_to_query.HasField(
"succeeded"
) and command.respond_to_query.succeeded.HasField("response"):
Expand Down
19 changes: 9 additions & 10 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def connect(
target_host: str,
*,
namespace: str = "default",
data_converter: temporalio.converter.DataConverter = temporalio.converter.default(),
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand Down Expand Up @@ -139,7 +139,7 @@ def __init__(
service_client: temporalio.service.ServiceClient,
*,
namespace: str = "default",
data_converter: temporalio.converter.DataConverter = temporalio.converter.default(),
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
interceptors: Sequence[Interceptor] = [],
default_workflow_query_reject_condition: Optional[
temporalio.common.QueryRejectCondition
Expand Down Expand Up @@ -862,8 +862,8 @@ async def result(
hist_run_id = fail_attr.new_execution_run_id
break
raise WorkflowFailureError(
cause=await temporalio.exceptions.decode_failure_to_error(
fail_attr.failure, self._client.data_converter
cause=await self._client.data_converter.decode_failure(
fail_attr.failure
),
)
elif event.HasField("workflow_execution_canceled_event_attributes"):
Expand Down Expand Up @@ -1950,15 +1950,16 @@ async def __anext__(self) -> temporalio.api.history.v1.HistoryEvent:
class WorkflowFailureError(temporalio.exceptions.TemporalError):
"""Error that occurs when a workflow is unsuccessful."""

def __init__(self, *, cause: temporalio.exceptions.FailureError) -> None:
def __init__(self, *, cause: BaseException) -> None:
"""Create workflow failure error."""
super().__init__("Workflow execution failed")
self.__cause__ = cause

@property
def cause(self) -> temporalio.exceptions.FailureError:
def cause(self) -> BaseException:
"""Cause of the workflow failure."""
return cast(temporalio.exceptions.FailureError, self.__cause__)
assert self.__cause__
return self.__cause__


class WorkflowContinuedAsNewError(temporalio.exceptions.TemporalError):
Expand Down Expand Up @@ -2546,9 +2547,7 @@ async def complete_async_activity(self, input: CompleteAsyncActivityInput) -> No

async def fail_async_activity(self, input: FailAsyncActivityInput) -> None:
failure = temporalio.api.failure.v1.Failure()
await temporalio.exceptions.encode_exception_to_failure(
input.error, self._client.data_converter, failure
)
await self._client.data_converter.encode_failure(input.error, failure)
last_heartbeat_details = (
None
if not input.last_heartbeat_details
Expand Down
4 changes: 2 additions & 2 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(
default_text_map_propagator
)
# TODO(cretz): Should I be using the configured one at the client and activity level?
self.payload_converter = temporalio.converter.default().payload_converter
self.payload_converter = temporalio.converter.PayloadConverter.default

def intercept_client(
self, next: temporalio.client.OutboundInterceptor
Expand Down Expand Up @@ -309,7 +309,7 @@ def __init__(self, next: temporalio.worker.WorkflowInboundInterceptor) -> None:
default_text_map_propagator
)
# TODO(cretz): Should I be using the configured one for this workflow?
self.payload_converter = temporalio.converter.default().payload_converter
self.payload_converter = temporalio.converter.PayloadConverter.default
# This is the context for the overall workflow, lazily created
self._workflow_context_carrier: Optional[_CarrierDict] = None

Expand Down
Loading

0 comments on commit 929dc81

Please sign in to comment.