Skip to content

Commit

Permalink
Workflow sandbox (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Oct 28, 2022
1 parent e436ecc commit 0126e6e
Show file tree
Hide file tree
Showing 26 changed files with 2,397 additions and 51 deletions.
15 changes: 8 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ jobs:
strategy:
fail-fast: true
matrix:
python: ["3.7", "3.10"]
# TODO(cretz): 3.10.8 is breaking Windows Rust build
python: ["3.7", "3.10.7"]
os: [ubuntu-latest, macos-latest, windows-latest]
include:
- os: ubuntu-latest
python: 3.10
python: 3.10.7
docsTarget: true
protoCheckTarget: true
runs-on: ${{ matrix.os }}
Expand All @@ -32,7 +33,7 @@ jobs:
- uses: Swatinem/rust-cache@v1
with:
working-directory: temporalio/bridge
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python }}
# Needed for tests since they use external server
Expand Down Expand Up @@ -88,9 +89,9 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.10.7"

# Install Rust locally for non-Linux (Linux uses an internal docker
# command to build with cibuildwheel which uses rustup install defined
Expand Down Expand Up @@ -142,9 +143,9 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: recursive
- uses: actions/setup-python@v1
- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.10.7"

# Need QEMU for ARM build on Linux
- uses: docker/setup-qemu-action@v1
Expand Down
141 changes: 141 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ async def create_greeting_activity(info: GreetingInfo) -> str:

Some things to note about the above code:

* Workflows run in a sandbox by default. Users are encouraged to define workflows in files with no side effects or other
complicated code. See the [Workflow Sandbox](#workflow-sandbox) section for more details.
* This workflow continually updates the queryable current greeting when signalled and can complete with the greeting on
a different signal
* Workflows are always classes and must have a single `@workflow.run` which is an `async def` function
Expand Down Expand Up @@ -581,6 +583,145 @@ method.
Activities are just functions decorated with `@activity.defn`. Simply write different ones and pass those to the worker
to have different activities called during the test.

#### Workflow Sandbox

By default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to be
non-deterministic is performed, an exception will be thrown in the workflow which will "fail the task" which means the
workflow will not progress until fixed.

The sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad code
early. Users are encouraged to define their workflows in files with no other side effects.

##### How the Sandbox Works

The sandbox is made up of two components that work closely together:

* Global state isolation
* Restrictions preventing known non-deterministic library calls

Global state isolation is performed by using `exec`. Upon workflow start, the file that the workflow is defined in is
imported into a new sandbox created for that workflow run. In order to keep the sandbox performant a known set of
"passthrough modules" are passed through from outside of the sandbox when they are imported. These are expected to be
side-effect free on import and have their non-deterministic aspects restricted. By default the entire Python standard
library, `temporalio`, and a couple of other modules are passed through from outside of the sandbox. To update this
list, see "Customizing the Sandbox".

Restrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped around
the custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, while
some restrictions only apply at workflow run time. A default set of restrictions is included that prevents most
dangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, like
reading a file from disk via `open` or using `os.environ`, are done as part of importing modules. To customize what is
and isn't restricted, see "Customizing the Sandbox".

##### Avoiding the Sandbox

There are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox if
possible.

To remove restrictions around a particular block of code, use `with temporalio.workflow.unsafe.sandbox_unrestricted():`.
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.

To run an entire workflow outside of a sandbox, set `sandboxed=False` on the `@workflow.defn` decorator when defining
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
things.

To disable the sandbox entirely for a worker, set the `Worker` init's `workflow_runner` keyword argument to
`temporalio.worker.UnsandboxedWorkflowRunner()`. This value is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()` so by changing it to the unsandboxed runner, the sandbox
will not be used at all.

##### Customizing the Sandbox

⚠️ WARNING: APIs in the `temporalio.worker.workflow_sandbox` module are not yet considered stable and may change in
future releases.

When creating the `Worker`, the `workflow_runner` is defaulted to
`temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()`. The `SandboxedWorkflowRunner`'s init accepts a
`restrictions` keyword argument that is defaulted to `SandboxRestrictions.default`. The `SandboxRestrictions` dataclass
is immutable and contains three fields that can be customized, but only two have notable value

###### Passthrough Modules

To make the sandbox quicker when importing known third party libraries, they can be added to the
`SandboxRestrictions.passthrough_modules` set like so:

```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
passthrough_modules=SandboxRestrictions.passthrough_modules_default | SandboxMatcher(access={"pydantic"}),
)
my_worker = Worker(..., runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```

If an "access" match succeeds for an import, it will simply be forwarded from outside of the sandbox. See the API for
more details on exact fields and their meaning.

###### Invalid Module Members

`SandboxRestrictions.invalid_module_members` contains a root matcher that applies to all module members. This already
has a default set which includes things like `datetime.date.today()` which should never be called from a workflow. To
remove this restriction:

```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(
"datetime", "date", "today",
),
)
my_worker = Worker(..., runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```

Restrictions can also be added by `|`'ing together matchers, for example to restrict the `datetime.date` class from
being used altogether:

```python
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default | SandboxMatcher(
children={"datetime": SandboxMatcher(use={"date"})},
),
)
my_worker = Worker(..., runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
```

See the API for more details on exact fields and their meaning.

##### Known Sandbox Issues

Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.

###### Global Import/Builtins

Currently the sandbox references/alters the global `sys.modules` and `builtins` fields while running workflow code. In
order to prevent affecting other sandboxed code, thread locals are leveraged to only intercept these values during the
workflow thread running. Therefore, technically if top-level import code starts a thread, it may lose sandbox
protection.

###### Sandbox is not Secure

The sandbox is built to catch many non-deterministic and state sharing issues, but it is not secure. Some known bad
calls are intercepted, but for performance reasons, every single attribute get/set cannot be checked. Therefore a simple
call like `setattr(temporalio.common, "__my_key", "my value")` will leak across sandbox runs.

The sandbox is only a helper, it does not provide full protection.

###### Sandbox Performance

TODO: This is actively being measured; results to come soon

###### Extending Restricted Classes

Currently, extending classes marked as restricted causes an issue with their `__init__` parameters. This does not affect
most users, but if there is a dependency that is, say, extending `zipfile.ZipFile` an error may occur and the module
will have to be marked as pass through.

###### is_subclass of ABC-based Restricted Classes

Due to [https://bugs.python.org/issue44847](https://bugs.python.org/issue44847), classes that are wrapped and then
checked to see if they are subclasses of another via `is_subclass` may fail (see also
[this wrapt issue](https://github.com/GrahamDumpleton/wrapt/issues/130)).

### Activities

#### Definition
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,17 @@ intersphinx = [
privacy = [
"PRIVATE:temporalio.bridge",
"PRIVATE:temporalio.types",
"PRIVATE:temporalio.worker.workflow_sandbox.restrictions",
"PRIVATE:temporalio.worker.workflow_sandbox.runner",
"HIDDEN:temporalio.testing.activity",
"HIDDEN:temporalio.testing.workflow",
"HIDDEN:temporalio.worker.activity",
"HIDDEN:temporalio.worker.interceptor",
"HIDDEN:temporalio.worker.worker",
"HIDDEN:temporalio.worker.workflow",
"HIDDEN:temporalio.worker.workflow_instance",
"HIDDEN:temporalio.worker.workflow_sandbox.importer",
"HIDDEN:temporalio.worker.workflow_sandbox.in_sandbox",
"HIDDEN:**.*_pb2*",
]
project-name = "Temporal Python"
Expand Down
7 changes: 6 additions & 1 deletion temporalio/worker/replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from .worker import load_default_build_id
from .workflow import _WorkflowWorker
from .workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner
from .workflow_sandbox import SandboxedWorkflowRunner

logger = logging.getLogger(__name__)

Expand All @@ -49,7 +50,8 @@ def __init__(
*,
workflows: Sequence[Type],
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
namespace: str = "ReplayNamespace",
data_converter: temporalio.converter.DataConverter = temporalio.converter.default(),
interceptors: Sequence[Interceptor] = [],
Expand All @@ -70,6 +72,7 @@ def __init__(
workflows=list(workflows),
workflow_task_executor=workflow_task_executor,
workflow_runner=workflow_runner,
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
namespace=namespace,
data_converter=data_converter,
interceptors=interceptors,
Expand Down Expand Up @@ -146,6 +149,7 @@ async def replay_workflows(
workflows=self._config["workflows"],
workflow_task_executor=self._config["workflow_task_executor"],
workflow_runner=self._config["workflow_runner"],
unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"],
data_converter=self._config["data_converter"],
interceptors=self._config["interceptors"],
debug_mode=self._config["debug_mode"],
Expand Down Expand Up @@ -235,6 +239,7 @@ class ReplayerConfig(TypedDict, total=False):
workflows: Sequence[Type]
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
workflow_runner: WorkflowRunner
unsandboxed_workflow_runner: WorkflowRunner
namespace: str
data_converter: temporalio.converter.DataConverter
interceptors: Sequence[Interceptor]
Expand Down
9 changes: 8 additions & 1 deletion temporalio/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .interceptor import Interceptor
from .workflow import _WorkflowWorker
from .workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner
from .workflow_sandbox import SandboxedWorkflowRunner

logger = logging.getLogger(__name__)

Expand All @@ -49,7 +50,8 @@ def __init__(
workflows: Sequence[Type] = [],
activity_executor: Optional[concurrent.futures.Executor] = None,
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] = None,
workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
workflow_runner: WorkflowRunner = SandboxedWorkflowRunner(),
unsandboxed_workflow_runner: WorkflowRunner = UnsandboxedWorkflowRunner(),
interceptors: Sequence[Interceptor] = [],
build_id: Optional[str] = None,
identity: Optional[str] = None,
Expand Down Expand Up @@ -96,6 +98,8 @@ def __init__(
provided, the caller is responsible for shutting it down after
the worker is shut down.
workflow_runner: Runner for workflows.
unsandboxed_workflow_runner: Runner for workflows that opt-out of
sandboxing.
interceptors: Collection of interceptors for this worker. Any
interceptors already on the client that also implement
:py:class:`Interceptor` are prepended to this list and should
Expand Down Expand Up @@ -202,6 +206,7 @@ def __init__(
activity_executor=activity_executor,
workflow_task_executor=workflow_task_executor,
workflow_runner=workflow_runner,
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
interceptors=interceptors,
build_id=build_id,
identity=identity,
Expand Down Expand Up @@ -246,6 +251,7 @@ def __init__(
workflows=workflows,
workflow_task_executor=workflow_task_executor,
workflow_runner=workflow_runner,
unsandboxed_workflow_runner=unsandboxed_workflow_runner,
data_converter=client_config["data_converter"],
interceptors=interceptors,
debug_mode=debug_mode,
Expand Down Expand Up @@ -378,6 +384,7 @@ class WorkerConfig(TypedDict, total=False):
activity_executor: Optional[concurrent.futures.Executor]
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor]
workflow_runner: WorkflowRunner
unsandboxed_workflow_runner: WorkflowRunner
interceptors: Sequence[Interceptor]
build_id: Optional[str]
identity: Optional[str]
Expand Down
35 changes: 24 additions & 11 deletions temporalio/worker/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
workflows: Sequence[Type],
workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor],
workflow_runner: WorkflowRunner,
unsandboxed_workflow_runner: WorkflowRunner,
data_converter: temporalio.converter.DataConverter,
interceptors: Sequence[Interceptor],
debug_mode: bool,
Expand All @@ -70,6 +71,7 @@ def __init__(
)
self._workflow_task_executor_user_provided = workflow_task_executor is not None
self._workflow_runner = workflow_runner
self._unsandboxed_workflow_runner = unsandboxed_workflow_runner
self._data_converter = data_converter
# Build the interceptor classes and collect extern functions
self._extern_functions: MutableMapping[str, Callable] = {}
Expand Down Expand Up @@ -98,6 +100,15 @@ def __init__(
# Confirm name unique
if defn.name in self._workflows:
raise ValueError(f"More than one workflow named {defn.name}")
# Prepare the workflow with the runner (this will error in the
# sandbox if an import fails somehow)
try:
if defn.sandboxed:
workflow_runner.prepare_workflow(defn)
else:
unsandboxed_workflow_runner.prepare_workflow(defn)
except Exception as err:
raise RuntimeError(f"Failed validating workflow {defn.name}") from err
self._workflows[defn.name] = defn

async def run(self) -> None:
Expand Down Expand Up @@ -162,7 +173,7 @@ async def _handle_activation(
# If the workflow is not running yet, create it
workflow = self._running_workflows.get(act.run_id)
if not workflow:
workflow = await self._create_workflow_instance(act)
workflow = self._create_workflow_instance(act)
self._running_workflows[act.run_id] = workflow
# Run activation in separate thread so we can check if it's
# deadlocked
Expand Down Expand Up @@ -254,7 +265,7 @@ async def _handle_activation(
logger.debug("Shutting down worker on eviction")
asyncio.create_task(self._bridge_worker().shutdown())

async def _create_workflow_instance(
def _create_workflow_instance(
self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
) -> WorkflowInstance:
# First find the start workflow job
Expand Down Expand Up @@ -311,13 +322,15 @@ async def _create_workflow_instance(
)

# Create instance from details
return await self._workflow_runner.create_instance(
WorkflowInstanceDetails(
payload_converter_class=self._data_converter.payload_converter_class,
interceptor_classes=self._interceptor_classes,
defn=defn,
info=info,
randomness_seed=start.randomness_seed,
extern_functions=self._extern_functions,
)
det = WorkflowInstanceDetails(
payload_converter_class=self._data_converter.payload_converter_class,
interceptor_classes=self._interceptor_classes,
defn=defn,
info=info,
randomness_seed=start.randomness_seed,
extern_functions=self._extern_functions,
)
if defn.sandboxed:
return self._workflow_runner.create_instance(det)
else:
return self._unsandboxed_workflow_runner.create_instance(det)
Loading

0 comments on commit 0126e6e

Please sign in to comment.