-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sdk core related updates #191
Conversation
temporalio/worker/worker.py
Outdated
@@ -250,6 +256,8 @@ def __init__( | |||
data_converter=client_config["data_converter"], | |||
interceptors=interceptors, | |||
debug_mode=debug_mode, | |||
disable_eager_activity_execution=disable_eager_activity_execution, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the server seems to call this ActivityEagerExecution
, do we want to be consistent with the server?
// EnableActivityEagerExecution indicates if acitivty eager execution is enabled per namespace
EnableActivityEagerExecution = "system.enableActivityEagerExecution"
https://github.com/temporalio/temporal/blob/master/common/dynamicconfig/constants.go#L91
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think we want to be consistent with the server here. "eager activity execution" is clearer English than "activity eager execution" (adjective in front) and matches Go SDK (granted TS doesn't have a worker-level option and Java has disableEagerExecution
which doesn't qualify that it's about activities, but I believe neither are as good as Go and Python).
e3ae968
to
4adf1ac
Compare
def rpc_metadata(self, value: Mapping[str, str]) -> None: | ||
"""Update the headers for this client. | ||
|
||
Do not mutate this mapping after set. Rather, set an entirely new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like an explicit setter method is better than this setter + comment to not mutate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more than a comment not to mutate, Mapping
is an immutable dict type. If there were a frozendict
in the stdlib, I'd use it.
temporalio/worker/worker.py
Outdated
@@ -71,6 +71,7 @@ def __init__( | |||
graceful_shutdown_timeout: timedelta = timedelta(), | |||
shared_state_manager: Optional[SharedStateManager] = None, | |||
debug_mode: bool = False, | |||
disable_eager_activity_execution: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather maintain consistency between the SDKs.
Looks like in Go we expose both MaxConcurrentEagerActivityExecutionSize
and a disable flag.
In Python we now only have a disable flag.
In Java / TS we do not expose eager activity worker options.
I don't think we should expose this setting for now in Python.
Later on we might want to expose max_eager_activity_requests_per_task
which is hardcoded to 3 for now.
Let's wait and do some more testing so we can give out advice how to tune this parameter.
BTW, while reviewing the Go worker options, it looks like we either have the wrong behavior or the MaxConcurrentEagerActivityExecutionSize
doesn't do what it says it does, I was lazy didn't bother to read the implementation. I missed this in my review of your Go PR that set the default to 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify exactly what I should put wrt eager activities settings today? We of course want to make sure people can disable it if broken (something we didn't have in last beta).
I will check the Go implementation. Can you open an issue or put somewhere how you're testing that it's not working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO best course of action is only expose max and 0 == disable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the Go issue, will investigate.
@bergundy - Can you clarify exactly what I should put wrt eager activities settings today? As in this PR before Python beta release? We of course want to make sure people can disable it if broken (something we didn't have in last beta).
In Java / TS we do not expose eager activity worker options.
Java has https://www.javadoc.io/static/io.temporal/temporal-sdk/1.17.0/io/temporal/worker/WorkerOptions.Builder.html#setDisableEagerExecution(boolean). TS I think is the only one that can't disable eager activities at the worker level. (arguably it's silly to be able to disable at the activity level as the workflow author should not care about activity task distribution)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the disable flag but mark it experimental.
Later on we'll allow max per task (0 == disable) and, pending discussion, consider adding max concurrent eager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, as I was a bit worried about, we need to set the default tracing subscriber in every tokio-owned thread for logging to work properly.
This can be done by doing something lame like this around every call to future_into_py
:
let tc = self.trace_sub.clone();
future_into_py(py, async move {
let guard = tracing::subscriber::set_default(tc);
Alternatively, we could implement https://docs.rs/pyo3-asyncio/0.17.0/pyo3_asyncio/generic/trait.Runtime.html and do it more easily that way. Ideally we also avoid doing this every time if it's already been done for that thread, which would be easy with a thread_local
.
So that's a bit annoying.
--- You can skip this section if you want, figured it out --
Anyway, regarding the problem w/ test_workflow_logging
. The 10 second thing is happening because it's hitting a WFT timeout. Why it's not gracefully finishing the task though, is super weird, and why that would have changed I really don't know - but I can repro it sometimes with a Core integ test so I will work on it more first thing tomorrow.
I looked into it a bit more, and core literally does not even see the raw grpc call resolve, so from it's perspective shutdown was graceful. So somehow the task is considered taken but hasn't even made it out of the raw grpc call. Super super weird. Some total fluke of async timing changing has made this happen I guess. It only affects people on shutdown, so real world impact is pretty small, but still worth understanding.
--- skip past here --
The problem was the tq name change. Before, since the sticky queue was based on process ID, the new worker would get the sticky task, then request whole history since it didn't have cache, and do that all fast. Now the new worker gets a new queue and the task hits the sticky queue timeout. We could call reset sticky queue on shutdown but that is annoying, because you need to call it per workflow instead of resetting the whole queue, so it's a lot of calls, and it's not clear to me we actually want to do that. If we decide yes, I can make the fix tomorrow. It doesn't look like the other SDKs do. You can make your test fast by calling it explicitly.
pub struct MetricsConfig { | ||
opentelemetry: Option<OpenTelemetryConfig>, | ||
prometheus: Option<PrometheusConfig>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this could be an actual enum: https://pyo3.rs/v0.12.3/conversions/traits.html#deriving-a-hrefhttpsdocsrspyo3latestpyo3conversiontraitfrompyobjecthtmlfrompyobjecta-for-enums
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but I've been trying to avoid unions over field names down at this level (exhaustive sum types are a hint whereas field names are enforced, granted mutual exclusivity has to be enforced by me manually)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sum types are more than just a hint, they're automatic enforcement, but either way it has to get surfaced to python a bit differently here so w/e
temporalio/bridge/src/worker.rs
Outdated
// This must be run with the Tokio context available | ||
let _guard = pyo3_asyncio::tokio::get_runtime().enter(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably want to enter the runtime's runtime (lol) here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I did it on replayer but not here, missed it
temporalio/worker/worker.py
Outdated
@@ -71,6 +71,7 @@ def __init__( | |||
graceful_shutdown_timeout: timedelta = timedelta(), | |||
shared_state_manager: Optional[SharedStateManager] = None, | |||
debug_mode: bool = False, | |||
disable_eager_activity_execution: bool = False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO best course of action is only expose max and 0 == disable
forward: bool = False | ||
"""If true, logs are not on console but instead forwarded.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we exposing this to users? forward
isn't implemented for Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire temporalio.bridge
bridge package (and children) is marked "unstable" because I knew this type of change was coming and future changes may come. Now that we have a concept of a "runtime" I do want to have a discussion about solidifying runtime/telemetry API outside of temporalio.bridge
. Included in that discussion is whether to support forwarding and how. In the meantime, I just mirrored core options even though we don't expose the forwarded logs themselves (yet).
Yeah, for now I'll wrap https://docs.rs/pyo3-asyncio/0.17.0/pyo3_asyncio/tokio/fn.future_into_py.html w/ my own helper, but yes I should probably get around to making my own asyncio runtime from Tokio runtime. Would support thread local + https://docs.rs/tokio/latest/tokio/runtime/struct.Builder.html#method.on_thread_start in Do I also need to do this for non-async calls too? Also, what happens if a single Tokio runtime and its threads are used across different
Ah! I completely agree we don't want to do that. I remember this cache-disabling is what we do in Go integration tests on cross-worker-same-task-queue tests: https://github.com/temporalio/sdk-go/blob/620fa72f102480aeb233f54fb9bba39cfde2a56c/test/integration_test.go#L222-L224. Is there an option to disable sticky queue? (do we even want one?) I have confirmed reset sticky queue does fix the issue. Will push shortly. I also noticed that without reset sticky queue, it sometimes waits the 10s and then proceeds and sometimes just hangs and never gets the new work. Can you replicate? Is there something there? Can we at least confirm this only happens on worker-of-same-queue restart? I have also found this with |
@cretz Setting |
Updated to carry Tokio runtime around with core runtime which means no more Rust globals, yay. There is a default runtime singleton, but technically everywhere a runtime is used, it can be customized. |
tests/worker/test_workflow.py
Outdated
# Clear queue and start a new one with more signals. We have to call | ||
# reset sticky queue to save penalty on same-queue worker restart. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not actually called here since you were able to use 0 cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, lgtm. This will clean up quite a bit if they give you an easier hook in pyo3 to adjust which runtime is used, hopefully they can do that.
# Conflicts: # temporalio/bridge/src/telemetry.rs # temporalio/bridge/telemetry.py # temporalio/service.py # temporalio/worker/_worker.py
What was changed
Checklist