Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sdk core related updates #191

Merged
merged 8 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 18 additions & 3 deletions temporalio/bridge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import google.protobuf.message

import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
from temporalio.bridge.temporal_sdk_bridge import RPCError

Expand Down Expand Up @@ -68,16 +69,30 @@ class Client:
"""RPC client using SDK Core."""

@staticmethod
async def connect(config: ClientConfig) -> Client:
async def connect(
runtime: temporalio.bridge.runtime.Runtime, config: ClientConfig
) -> Client:
"""Establish connection with server."""
return Client(
await temporalio.bridge.temporal_sdk_bridge.connect_client(config)
runtime,
await temporalio.bridge.temporal_sdk_bridge.connect_client(
runtime._ref, config
),
)

def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.ClientRef):
def __init__(
self,
runtime: temporalio.bridge.runtime.Runtime,
ref: temporalio.bridge.temporal_sdk_bridge.ClientRef,
):
"""Initialize client with underlying SDK Core reference."""
self._runtime = runtime
self._ref = ref

def update_metadata(self, metadata: Mapping[str, str]) -> None:
"""Update underlying metadata on Core client."""
self._ref.update_metadata(metadata)

async def call(
self,
*,
Expand Down
2 changes: 2 additions & 0 deletions temporalio/bridge/proto/workflow_commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
CompleteWorkflowExecution,
ContinueAsNewWorkflowExecution,
FailWorkflowExecution,
ModifyWorkflowProperties,
QueryResult,
QuerySuccess,
RequestCancelActivity,
Expand All @@ -31,6 +32,7 @@
"CompleteWorkflowExecution",
"ContinueAsNewWorkflowExecution",
"FailWorkflowExecution",
"ModifyWorkflowProperties",
"QueryResult",
"QuerySuccess",
"RequestCancelActivity",
Expand Down
126 changes: 64 additions & 62 deletions temporalio/bridge/proto/workflow_commands/workflow_commands_pb2.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class WorkflowCommand(google.protobuf.message.Message):
SCHEDULE_LOCAL_ACTIVITY_FIELD_NUMBER: builtins.int
REQUEST_CANCEL_LOCAL_ACTIVITY_FIELD_NUMBER: builtins.int
UPSERT_WORKFLOW_SEARCH_ATTRIBUTES_FIELD_NUMBER: builtins.int
MODIFY_WORKFLOW_PROPERTIES_FIELD_NUMBER: builtins.int
@property
def start_timer(self) -> global___StartTimer: ...
@property
Expand Down Expand Up @@ -138,6 +139,8 @@ class WorkflowCommand(google.protobuf.message.Message):
def upsert_workflow_search_attributes(
self,
) -> global___UpsertWorkflowSearchAttributes: ...
@property
def modify_workflow_properties(self) -> global___ModifyWorkflowProperties: ...
def __init__(
self,
*,
Expand Down Expand Up @@ -165,6 +168,7 @@ class WorkflowCommand(google.protobuf.message.Message):
request_cancel_local_activity: global___RequestCancelLocalActivity | None = ...,
upsert_workflow_search_attributes: global___UpsertWorkflowSearchAttributes
| None = ...,
modify_workflow_properties: global___ModifyWorkflowProperties | None = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -183,6 +187,8 @@ class WorkflowCommand(google.protobuf.message.Message):
b"continue_as_new_workflow_execution",
"fail_workflow_execution",
b"fail_workflow_execution",
"modify_workflow_properties",
b"modify_workflow_properties",
"request_cancel_activity",
b"request_cancel_activity",
"request_cancel_external_workflow_execution",
Expand Down Expand Up @@ -226,6 +232,8 @@ class WorkflowCommand(google.protobuf.message.Message):
b"continue_as_new_workflow_execution",
"fail_workflow_execution",
b"fail_workflow_execution",
"modify_workflow_properties",
b"modify_workflow_properties",
"request_cancel_activity",
b"request_cancel_activity",
"request_cancel_external_workflow_execution",
Expand Down Expand Up @@ -273,6 +281,7 @@ class WorkflowCommand(google.protobuf.message.Message):
"schedule_local_activity",
"request_cancel_local_activity",
"upsert_workflow_search_attributes",
"modify_workflow_properties",
] | None: ...

global___WorkflowCommand = WorkflowCommand
Expand Down Expand Up @@ -352,7 +361,6 @@ class ScheduleActivity(google.protobuf.message.Message):
SEQ_FIELD_NUMBER: builtins.int
ACTIVITY_ID_FIELD_NUMBER: builtins.int
ACTIVITY_TYPE_FIELD_NUMBER: builtins.int
NAMESPACE_FIELD_NUMBER: builtins.int
TASK_QUEUE_FIELD_NUMBER: builtins.int
HEADERS_FIELD_NUMBER: builtins.int
ARGUMENTS_FIELD_NUMBER: builtins.int
Expand All @@ -367,7 +375,6 @@ class ScheduleActivity(google.protobuf.message.Message):
"""/ Lang's incremental sequence number, used as the operation identifier"""
activity_id: builtins.str
activity_type: builtins.str
namespace: builtins.str
task_queue: builtins.str
"""The name of the task queue to place this activity request in"""
@property
Expand Down Expand Up @@ -422,7 +429,6 @@ class ScheduleActivity(google.protobuf.message.Message):
seq: builtins.int = ...,
activity_id: builtins.str = ...,
activity_type: builtins.str = ...,
namespace: builtins.str = ...,
task_queue: builtins.str = ...,
headers: collections.abc.Mapping[
builtins.str, temporalio.api.common.v1.message_pb2.Payload
Expand Down Expand Up @@ -472,8 +478,6 @@ class ScheduleActivity(google.protobuf.message.Message):
b"headers",
"heartbeat_timeout",
b"heartbeat_timeout",
"namespace",
b"namespace",
"retry_policy",
b"retry_policy",
"schedule_to_close_timeout",
Expand Down Expand Up @@ -1531,3 +1535,27 @@ class UpsertWorkflowSearchAttributes(google.protobuf.message.Message):
) -> None: ...

global___UpsertWorkflowSearchAttributes = UpsertWorkflowSearchAttributes

class ModifyWorkflowProperties(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

UPSERTED_MEMO_FIELD_NUMBER: builtins.int
@property
def upserted_memo(self) -> temporalio.api.common.v1.message_pb2.Memo:
"""If set, update the workflow memo with the provided values. The values will be merged with
the existing memo. If the user wants to delete values, a default/empty Payload should be
used as the value for the key being deleted.
"""
def __init__(
self,
*,
upserted_memo: temporalio.api.common.v1.message_pb2.Memo | None = ...,
) -> None: ...
def HasField(
self, field_name: typing_extensions.Literal["upserted_memo", b"upserted_memo"]
) -> builtins.bool: ...
def ClearField(
self, field_name: typing_extensions.Literal["upserted_memo", b"upserted_memo"]
) -> None: ...

global___ModifyWorkflowProperties = ModifyWorkflowProperties
147 changes: 147 additions & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Telemetry for SDK Core. (unstable)

Nothing in this module should be considered stable. The API may change.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import ClassVar, Mapping, Optional

import temporalio.bridge.temporal_sdk_bridge

_default_runtime: Optional[Runtime] = None


class Runtime:
"""Runtime for SDK Core.

Users are encouraged to use :py:meth:`default`. It can be set with
:py:meth:`set_default`.
"""

@staticmethod
def default() -> Runtime:
"""Get the default runtime, creating if not already created.

If the default runtime needs to be different, it should be done with
:py:meth:`set_default` before this is called or ever used.

Returns:
The default runtime.
"""
global _default_runtime
if not _default_runtime:
_default_runtime = Runtime(telemetry=TelemetryConfig())
return _default_runtime

@staticmethod
def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None:
"""Set the default runtime to the given runtime.

This should be called before any Temporal client is created, but can
change the existing one. Any clients and workers created with the
previous runtime will stay on that runtime.

Args:
runtime: The runtime to set.
error_if_already_set: If True and default is already set, this will
raise a RuntimeError.
"""
global _default_runtime
if _default_runtime and error_if_already_set:
raise RuntimeError("Runtime default already set")
_default_runtime = runtime

def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create a default runtime with the given telemetry config."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)


def format_filter(core_level: str, other_level: str) -> str:
"""Helper to build a filter from Core and other level.

Levels can be ``ERROR``, ``WARN``, ``INFO``, ``DEBUG``, or ``TRACE``.

Args:
core_level: Level for SDK Core.
other_level: Level for other things besides Core.

Returns:
Formatted string for use as a ``filter`` in telemetry configs.
"""
return f"{other_level},temporal_sdk_core={core_level},temporal_client={core_level},temporal_sdk={core_level}"


@dataclass(frozen=True)
class TracingConfig:
"""Configuration for Core tracing."""

filter: str
"""Filter string for tracing. Use :py:func:`format_filter`."""

opentelemetry: OpenTelemetryConfig
"""Configuration for OpenTelemetry tracing collector."""


@dataclass(frozen=True)
class LoggingConfig:
"""Configuration for Core logging."""

filter: str
"""Filter string for logging. Use :py:func:`format_filter`."""

forward: bool = False
"""If true, logs are not on console but instead forwarded."""
Comment on lines +98 to +99
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we exposing this to users? forward isn't implemented for Python.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire temporalio.bridge bridge package (and children) is marked "unstable" because I knew this type of change was coming and future changes may come. Now that we have a concept of a "runtime" I do want to have a discussion about solidifying runtime/telemetry API outside of temporalio.bridge. Included in that discussion is whether to support forwarding and how. In the meantime, I just mirrored core options even though we don't expose the forwarded logs themselves (yet).


default: ClassVar[LoggingConfig]
"""Default logging configuration of Core WARN level and other ERROR
level.
"""


LoggingConfig.default = LoggingConfig(filter=format_filter("WARN", "ERROR"))


@dataclass(frozen=True)
class MetricsConfig:
"""Configuration for Core metrics.

One and only one of :py:attr:`opentelemetry` or :py:attr:`prometheus` must
be set.
"""

opentelemetry: Optional[OpenTelemetryConfig] = None
"""Configuration for OpenTelemetry metrics collector."""

prometheus: Optional[PrometheusConfig] = None
"""Configuration for Prometheus metrics endpoint."""


@dataclass(frozen=True)
class OpenTelemetryConfig:
"""Configuration for OpenTelemetry collector."""

url: str
headers: Mapping[str, str]


@dataclass(frozen=True)
class PrometheusConfig:
"""Configuration for Prometheus metrics endpoint."""

bind_address: str


@dataclass(frozen=True)
class TelemetryConfig:
"""Configuration for Core telemetry."""

tracing: Optional[TracingConfig] = None
"""Tracing configuration."""

logging: Optional[LoggingConfig] = LoggingConfig.default
"""Logging configuration."""

metrics: Optional[PrometheusConfig] = None
"""Metrics configuration."""
2 changes: 1 addition & 1 deletion temporalio/bridge/sdk-core
Submodule sdk-core updated 66 files
+1 −1 .buildkite/docker/Dockerfile
+2 −2 .buildkite/docker/docker-compose.yaml
+1 −1 bridge-ffi/Cargo.toml
+0 −25 bridge-ffi/include/sdk-core-bridge.h
+29 −108 bridge-ffi/src/lib.rs
+33 −25 bridge-ffi/src/wrappers.rs
+8 −3 client/src/retry.rs
+3 −2 core-api/Cargo.toml
+1 −43 core-api/src/lib.rs
+145 −0 core-api/src/telemetry.rs
+2 −2 core/Cargo.toml
+1 −2 core/benches/workflow_replay.rs
+1 −1 core/src/abstractions.rs
+78 −1 core/src/core_tests/activity_tasks.rs
+182 −3 core/src/core_tests/local_activities.rs
+211 −4 core/src/core_tests/queries.rs
+2 −2 core/src/core_tests/workers.rs
+1 −1 core/src/core_tests/workflow_cancels.rs
+41 −8 core/src/core_tests/workflow_tasks.rs
+101 −19 core/src/lib.rs
+0 −62 core/src/log_export.rs
+190 −0 core/src/telemetry/log_export.rs
+165 −135 core/src/telemetry/metrics.rs
+268 −336 core/src/telemetry/mod.rs
+4 −3 core/src/telemetry/prometheus_server.rs
+9 −3 core/src/test_help/mod.rs
+1 −1 core/src/worker/activities.rs
+2 −1 core/src/worker/activities/local_activities.rs
+1 −1 core/src/worker/mod.rs
+5 −18 core/src/worker/workflow/driven_workflow.rs
+1 −5 core/src/worker/workflow/machines/activity_state_machine.rs
+1 −5 core/src/worker/workflow/machines/cancel_external_state_machine.rs
+1 −5 core/src/worker/workflow/machines/cancel_workflow_state_machine.rs
+1 −5 core/src/worker/workflow/machines/child_workflow_state_machine.rs
+1 −5 core/src/worker/workflow/machines/complete_workflow_state_machine.rs
+2 −6 core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs
+1 −5 core/src/worker/workflow/machines/fail_workflow_state_machine.rs
+12 −14 core/src/worker/workflow/machines/local_activity_state_machine.rs
+12 −38 core/src/worker/workflow/machines/mod.rs
+178 −0 core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs
+0 −127 core/src/worker/workflow/machines/mutable_side_effect_state_machine.rs
+1 −5 core/src/worker/workflow/machines/patch_state_machine.rs
+0 −71 core/src/worker/workflow/machines/side_effect_state_machine.rs
+1 −5 core/src/worker/workflow/machines/signal_external_state_machine.rs
+1 −5 core/src/worker/workflow/machines/timer_state_machine.rs
+8 −2 core/src/worker/workflow/machines/transition_coverage.rs
+1 −5 core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs
+229 −214 core/src/worker/workflow/machines/workflow_machines.rs
+1 −6 core/src/worker/workflow/machines/workflow_task_state_machine.rs
+13 −5 core/src/worker/workflow/managed_run.rs
+1 −1 core/src/worker/workflow/managed_run/managed_wf_test.rs
+61 −27 core/src/worker/workflow/mod.rs
+2 −2 core/src/worker/workflow/wft_poller.rs
+14 −2 core/src/worker/workflow/workflow_stream.rs
+1 −1 fsm/rustfsm_procmacro/tests/trybuild/no_handle_conversions_require_into_fail.stderr
+8 −1 protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto
+9 −0 sdk-core-protos/src/history_builder.rs
+20 −0 sdk-core-protos/src/lib.rs
+4 −4 sdk/src/lib.rs
+13 −2 sdk/src/workflow_context.rs
+1 −0 test-utils/Cargo.toml
+33 −16 test-utils/src/lib.rs
+37 −0 tests/integ_tests/metrics_tests.rs
+2 −0 tests/integ_tests/workflow_tests.rs
+53 −0 tests/integ_tests/workflow_tests/modify_wf_properties.rs
+6 −4 tests/main.rs
Loading