Skip to content

Commit

Permalink
Update core, move to using runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Nov 8, 2022
1 parent 0e1d2a2 commit e3ae968
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 86 deletions.
2 changes: 1 addition & 1 deletion temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 14 additions & 3 deletions temporalio/bridge/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import google.protobuf.message

import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
from temporalio.bridge.temporal_sdk_bridge import RPCError

Expand Down Expand Up @@ -68,14 +69,24 @@ class Client:
"""RPC client using SDK Core."""

@staticmethod
async def connect(config: ClientConfig) -> Client:
async def connect(
runtime: temporalio.bridge.runtime.Runtime, config: ClientConfig
) -> Client:
"""Establish connection with server."""
return Client(
await temporalio.bridge.temporal_sdk_bridge.connect_client(config)
runtime,
await temporalio.bridge.temporal_sdk_bridge.connect_client(
runtime._ref, config
),
)

def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.ClientRef):
def __init__(
self,
runtime: temporalio.bridge.runtime.Runtime,
ref: temporalio.bridge.temporal_sdk_bridge.ClientRef,
):
"""Initialize client with underlying SDK Core reference."""
self._runtime = runtime
self._ref = ref

def update_metadata(self, metadata: Mapping[str, str]) -> None:
Expand Down
79 changes: 47 additions & 32 deletions temporalio/bridge/telemetry.py → temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,58 @@

from __future__ import annotations

import warnings
from dataclasses import dataclass
from typing import ClassVar, Mapping, Optional

import temporalio.bridge.temporal_sdk_bridge

_default_runtime: Optional[Runtime] = None


class Runtime:
"""Runtime for SDK Core.
Users are encouraged to use :py:meth:`default`. It can be set with
:py:meth:`set_default`.
"""

@staticmethod
def default() -> Runtime:
"""Get the default runtime, creating if not already created.
If the default runtime needs to be different, it should be done with
:py:meth:`set_default` before this is called or ever used.
Returns:
The default runtime.
"""
global _default_runtime
if not _default_runtime:
_default_runtime = Runtime(telemetry=TelemetryConfig())
return _default_runtime

@staticmethod
def set_default(runtime: Runtime, *, error_if_already_set: bool = True) -> None:
"""Set the default runtime to the given runtime.
This should be called before any Temporal client is created, but can
change the existing one. Any clients and workers created with the
previous runtime will stay on that runtime.
Args:
runtime: The runtime to set.
error_if_already_set: If True and default is already set, this will
raise a RuntimeError.
"""
global _default_runtime
if _default_runtime and error_if_already_set:
raise RuntimeError("Runtime default already set")
_default_runtime = runtime

def __init__(self, *, telemetry: TelemetryConfig) -> None:
"""Create a default runtime with the given telemetry config."""
self._ref = temporalio.bridge.temporal_sdk_bridge.init_runtime(telemetry)


def format_filter(core_level: str, other_level: str) -> str:
"""Helper to build a filter from Core and other level.
Expand Down Expand Up @@ -99,34 +145,3 @@ class TelemetryConfig:

metrics: Optional[PrometheusConfig] = None
"""Metrics configuration."""


_inited = False


def init_telemetry(
config: TelemetryConfig, *, warn_if_already_inited: bool = True
) -> bool:
"""Initialize telemetry with the given configuration.
This must be called before any Temporal client is created. Does nothing if
already called.
.. warning::
This API is not stable and may change in a future release.
Args:
config: Telemetry config.
warn_if_already_inited: If True and telemetry is already initialized,
this will emit a warning.
"""
global _inited
if _inited:
if warn_if_already_inited:
warnings.warn(
"Telemetry initialization already called, ignoring successive calls"
)
return False
temporalio.bridge.temporal_sdk_bridge.init_telemetry(config)
_inited = True
return True
12 changes: 9 additions & 3 deletions temporalio/bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use temporal_client::{
use tonic::metadata::MetadataKey;
use url::Url;

use crate::runtime;

pyo3::create_exception!(temporal_sdk_bridge, RPCError, PyException);

type Client = RetryClient<ConfiguredClient<TemporalServiceClientWithMetrics>>;
Expand Down Expand Up @@ -61,18 +63,22 @@ struct RpcCall {
timeout_millis: Option<u64>,
}

pub fn connect_client(py: Python, config: ClientConfig) -> PyResult<&PyAny> {
// TODO(cretz): Add metrics_meter?
pub fn connect_client<'a>(
py: Python<'a>,
runtime: &runtime::RuntimeRef,
config: ClientConfig,
) -> PyResult<&'a PyAny> {
let headers = if config.metadata.is_empty() {
None
} else {
Some(Arc::new(RwLock::new(config.metadata.clone())))
};
let opts: ClientOptions = config.try_into()?;
let metric_meter = runtime.runtime.metric_meter().cloned();
future_into_py(py, async move {
Ok(ClientRef {
retry_client: opts
.connect_no_namespace(None, headers)
.connect_no_namespace(metric_meter.as_ref(), headers)
.await
.map_err(|err| {
PyRuntimeError::new_err(format!("Failed client connect: {}", err))
Expand Down
31 changes: 20 additions & 11 deletions temporalio/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use pyo3::prelude::*;
use pyo3::types::PyTuple;

mod client;
mod telemetry;
mod runtime;
mod testing;
mod worker;

Expand All @@ -13,9 +13,9 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<client::ClientRef>()?;
m.add_function(wrap_pyfunction!(connect_client, m)?)?;

// Telemetry stuff
m.add_class::<telemetry::TelemetryRef>()?;
m.add_function(wrap_pyfunction!(init_telemetry, m)?)?;
// Runtime stuff
m.add_class::<runtime::RuntimeRef>()?;
m.add_function(wrap_pyfunction!(init_runtime, m)?)?;

// Testing stuff
m.add_class::<testing::EphemeralServerRef>()?;
Expand All @@ -35,13 +35,17 @@ fn temporal_sdk_bridge(py: Python, m: &PyModule) -> PyResult<()> {
}

#[pyfunction]
fn connect_client(py: Python, config: client::ClientConfig) -> PyResult<&PyAny> {
client::connect_client(py, config)
fn connect_client<'a>(
py: Python<'a>,
runtime: &runtime::RuntimeRef,
config: client::ClientConfig,
) -> PyResult<&'a PyAny> {
client::connect_client(py, &runtime, config)
}

#[pyfunction]
fn init_telemetry(config: telemetry::TelemetryConfig) -> PyResult<telemetry::TelemetryRef> {
telemetry::init_telemetry(config)
fn init_runtime(telemetry_config: runtime::TelemetryConfig) -> PyResult<runtime::RuntimeRef> {
runtime::init_runtime(telemetry_config)
}

#[pyfunction]
Expand All @@ -56,13 +60,18 @@ fn start_test_server(py: Python, config: testing::TestServerConfig) -> PyResult<

#[pyfunction]
fn new_worker(
runtime: &runtime::RuntimeRef,
client: &client::ClientRef,
config: worker::WorkerConfig,
) -> PyResult<worker::WorkerRef> {
worker::new_worker(&client, config)
worker::new_worker(&runtime, &client, config)
}

#[pyfunction]
fn new_replay_worker(py: Python, config: worker::WorkerConfig) -> PyResult<&PyTuple> {
worker::new_replay_worker(py, config)
fn new_replay_worker<'a>(
py: Python<'a>,
runtime: &runtime::RuntimeRef,
config: worker::WorkerConfig,
) -> PyResult<&'a PyTuple> {
worker::new_replay_worker(py, &runtime, config)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ use pyo3::prelude::*;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use temporal_sdk_core::{
telemetry_init, Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions,
TelemetryOptionsBuilder, TraceExportConfig, TraceExporter,
use temporal_sdk_core::CoreRuntime;
use temporal_sdk_core_api::telemetry::{
Logger, MetricsExporter, OtelCollectorOptions, TelemetryOptions, TelemetryOptionsBuilder,
TraceExportConfig, TraceExporter,
};
use url::Url;

#[pyclass]
pub struct TelemetryRef {
// TODO(cretz): This is private
// telemetry: &'static temporal_sdk_core::telemetry::GlobalTelemDat,
pub struct RuntimeRef {
pub(crate) runtime: CoreRuntime,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -51,13 +51,13 @@ pub struct PrometheusConfig {
bind_address: String,
}

pub fn init_telemetry(config: TelemetryConfig) -> PyResult<TelemetryRef> {
let opts: TelemetryOptions = config.try_into()?;
telemetry_init(&opts).map_err(|err| {
PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err))
})?;
Ok(TelemetryRef {
// telemetry: telem_dat,
pub fn init_runtime(telemetry_config: TelemetryConfig) -> PyResult<RuntimeRef> {
// We need to be in Tokio context to create the runtime
let _guard = pyo3_asyncio::tokio::get_runtime().enter();
Ok(RuntimeRef {
runtime: CoreRuntime::new_assume_tokio(telemetry_config.try_into()?).map_err(|err| {
PyRuntimeError::new_err(format!("Failed initializing telemetry: {}", err))
})?,
})
}

Expand Down
26 changes: 19 additions & 7 deletions temporalio/bridge/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;

use crate::client;
use crate::runtime;

pyo3::create_exception!(temporal_sdk_bridge, PollShutdownError, PyException);

Expand Down Expand Up @@ -44,21 +45,32 @@ pub struct WorkerConfig {
max_task_queue_activities_per_second: Option<f64>,
}

pub fn new_worker(client: &client::ClientRef, config: WorkerConfig) -> PyResult<WorkerRef> {
pub fn new_worker(
runtime: &runtime::RuntimeRef,
client: &client::ClientRef,
config: WorkerConfig,
) -> PyResult<WorkerRef> {
// This must be run with the Tokio context available
let _guard = pyo3_asyncio::tokio::get_runtime().enter();
let config: temporal_sdk_core::WorkerConfig = config.try_into()?;
let worker = temporal_sdk_core::init_worker(
&runtime.runtime,
config,
client.retry_client.clone().into_inner(),
)
.map_err(|err| PyValueError::new_err(format!("Failed creating worker: {}", err)))?;
Ok(WorkerRef {
worker: Some(Arc::new(temporal_sdk_core::init_worker(
config,
client.retry_client.clone().into_inner(),
))),
worker: Some(Arc::new(worker)),
})
}

pub fn new_replay_worker(py: Python, config: WorkerConfig) -> PyResult<&PyTuple> {
pub fn new_replay_worker<'a>(
py: Python<'a>,
runtime: &runtime::RuntimeRef,
config: WorkerConfig,
) -> PyResult<&'a PyTuple> {
// This must be run with the Tokio context available
let _guard = pyo3_asyncio::tokio::get_runtime().enter();
let _guard = runtime.runtime.tokio_handle().enter();
let config: temporal_sdk_core::WorkerConfig = config.try_into()?;
let (history_pusher, stream) = HistoryPusher::new();
let worker = WorkerRef {
Expand Down
14 changes: 8 additions & 6 deletions temporalio/bridge/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import temporalio.bridge.proto.activity_task
import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
import temporalio.converter
import temporalio.exceptions
Expand Down Expand Up @@ -54,22 +55,23 @@ class Worker:
def create(client: temporalio.bridge.client.Client, config: WorkerConfig) -> Worker:
"""Create a bridge worker from a bridge client."""
return Worker(
temporalio.bridge.temporal_sdk_bridge.new_worker(client._ref, config)
temporalio.bridge.temporal_sdk_bridge.new_worker(
client._runtime._ref, client._ref, config
)
)

@staticmethod
def for_replay(
runtime: temporalio.bridge.runtime.Runtime,
config: WorkerConfig,
) -> Tuple[Worker, temporalio.bridge.temporal_sdk_bridge.HistoryPusher]:
"""Create a bridge replay worker."""
temporalio.bridge.telemetry.init_telemetry(
temporalio.bridge.telemetry.TelemetryConfig(),
warn_if_already_inited=False,
)
[
replay_worker,
pusher,
] = temporalio.bridge.temporal_sdk_bridge.new_replay_worker(config)
] = temporalio.bridge.temporal_sdk_bridge.new_replay_worker(
runtime._ref, config
)
return Worker(replay_worker), pusher

def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.WorkerRef) -> None:
Expand Down
Loading

0 comments on commit e3ae968

Please sign in to comment.