Skip to content

Commit

Permalink
Test nanny.environ precedence (#5204)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Aug 12, 2021
1 parent 0528d8d commit 2298861
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
23 changes: 21 additions & 2 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,21 @@ class Nanny(ServerNode):
``Client.restart`` method, or to restart the worker automatically if
it gets to the terminate fractiom of its memory limit.
The parameters for the Nanny are mostly the same as those for the Worker.
The parameters for the Nanny are mostly the same as those for the Worker
with exceptions listed below.
Parameters
----------
env: dict, optional
Environment variables set at time of Nanny initialization will be
ensured to be set in the Worker process as well. This argument allows to
overwrite or otherwise set environment variables for the Worker. It is
also possible to set environment variables using the option
`distributed.nanny.environ`. Precedence as follows
1. Nanny arguments
2. Existing environment variables
3. Dask configuration
See Also
--------
Expand Down Expand Up @@ -166,7 +180,12 @@ def __init__(
self.death_timeout = parse_timedelta(death_timeout)

self.Worker = Worker if worker_class is None else worker_class
self.env = dask.config.get("distributed.nanny.environ")
config_environ = dask.config.get("distributed.nanny.environ", {})
if not isinstance(config_environ, dict):
raise TypeError(
f"distributed.nanny.environ configuration must be of type dict. Instead got {type(config_environ)}"
)
self.env = config_environ.copy()
for k in self.env:
if k in os.environ:
self.env[k] = os.environ[k]
Expand Down
29 changes: 29 additions & 0 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import random
from contextlib import suppress
from time import sleep
from unittest import mock

import psutil
import pytest
Expand Down Expand Up @@ -375,6 +376,34 @@ async def test_environment_variable(c, s):
await asyncio.gather(a.close(), b.close())


@gen_cluster(nthreads=[], client=True)
async def test_environment_variable_by_config(c, s, monkeypatch):

with dask.config.set({"distributed.nanny.environ": "456"}):
with pytest.raises(TypeError, match="configuration must be of type dict"):
Nanny(s.address, loop=s.loop, memory_limit=0)

with dask.config.set({"distributed.nanny.environ": {"FOO": "456"}}):

# precedence
# kwargs > env var > config

with mock.patch.dict(os.environ, {"FOO": "BAR"}, clear=True):
a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"})
x = Nanny(s.address, loop=s.loop, memory_limit=0)

b = Nanny(s.address, loop=s.loop, memory_limit=0)

await asyncio.gather(a, b, x)
results = await c.run(lambda: os.environ["FOO"])
assert results == {
a.worker_address: "123",
b.worker_address: "456",
x.worker_address: "BAR",
}
await asyncio.gather(a.close(), b.close(), x.close())


@gen_cluster(
nthreads=[],
client=True,
Expand Down

0 comments on commit 2298861

Please sign in to comment.