Skip to content

Commit

Permalink
Do not use tempfile in utils_test (#6651)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Jul 1, 2022
1 parent 0df8636 commit 1bfd99c
Showing 1 changed file with 130 additions and 150 deletions.
280 changes: 130 additions & 150 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,19 +682,6 @@ def _close_queue(q):
q._writer.close() # https://bugs.python.org/issue42752


class _SafeTemporaryDirectory(tempfile.TemporaryDirectory):
def __exit__(self, exc_type, exc_value, traceback):
try:
return super().__exit__(exc_type, exc_value, traceback)
except (PermissionError, NotADirectoryError):
# It appears that we either have a process still interacting with
# the tmpdirs of the workers or that win process are not releasing
# their lock in time. We are receiving PermissionErrors during
# teardown
# See also https://github.com/dask/distributed/pull/5825
pass


@contextmanager
def cluster(
nworkers=2,
Expand Down Expand Up @@ -736,13 +723,9 @@ def cluster(
q = get_mp_context().Queue()
stack.callback(_close_queue, q)
for _ in range(nworkers):
tmpdirname = stack.enter_context(
_SafeTemporaryDirectory(prefix="_dask_test_worker")
)
kwargs = merge(
{
"nthreads": 1,
"local_directory": tmpdirname,
"memory_limit": system.MEMORY_LIMIT,
},
worker_kwargs,
Expand Down Expand Up @@ -1070,144 +1053,140 @@ def _(func):
def test_func(*outer_args, **kwargs):
async def async_fn():
result = None
with _SafeTemporaryDirectory() as tmpdir:
config2 = merge({"temporary-directory": tmpdir}, config)
with dask.config.set(config2):
workers = []
s = False

for _ in range(60):
try:
s, ws = await start_cluster(
nthreads,
scheduler,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
security=security,
asynchronous=True,
**client_kwargs,
)
args = [c] + args

try:
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
validate_state(s, *workers)

except asyncio.TimeoutError:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)

if cluster_dump_directory:
await dump_cluster_state(
s,
ws,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)

task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)

# Hopefully, the hang has been caused by inconsistent
# state, which should be much more meaningful than the
# timeout
validate_state(s, *workers)

# Remove as much of the traceback as possible; it's
# uninteresting boilerplate from utils_test and asyncio
# and not from the code being tested.
raise asyncio.TimeoutError(
f"Test timeout after {timeout}s.\n"
"========== Test stack trace starts here ==========\n"
f"{buffer.getvalue()}"
) from None

except pytest.xfail.Exception:
raise

except Exception:
if cluster_dump_directory and not has_pytestmark(
test_func, "xfail"
):
await dump_cluster_state(
s,
ws,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
raise

finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == Status.closed)
await end_cluster(s, workers)
await asyncio.wait_for(cleanup_global_workers(), 1)
with dask.config.set(config):
workers = []
s = False

for _ in range(60):
try:
c = await default_client()
except ValueError:
pass
s, ws = await start_cluster(
nthreads,
scheduler,
security=security,
Worker=Worker,
scheduler_kwargs=scheduler_kwargs,
worker_kwargs=worker_kwargs,
)
except Exception as e:
logger.error(
"Failed to start gen_cluster: "
f"{e.__class__.__name__}: {e}; retrying",
exc_info=True,
)
await asyncio.sleep(1)
else:
await c._close(fast=True)

def get_unclosed():
return [c for c in Comm._instances if not c.closed()] + [
c
for c in _global_clients.values()
if c.status != "closed"
]
workers[:] = ws
args = [s] + workers
break
if s is False:
raise Exception("Could not start cluster")
if client:
c = await Client(
s.address,
security=security,
asynchronous=True,
**client_kwargs,
)
args = [c] + args

try:
coro = func(*args, *outer_args, **kwargs)
task = asyncio.create_task(coro)
coro2 = asyncio.wait_for(asyncio.shield(task), timeout)
result = await coro2
validate_state(s, *workers)

except asyncio.TimeoutError:
assert task
buffer = io.StringIO()
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)

if cluster_dump_directory:
await dump_cluster_state(
s,
ws,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)

try:
start = time()
while time() < start + 60:
gc.collect()
if not get_unclosed():
break
await asyncio.sleep(0.05)
task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)

# Hopefully, the hang has been caused by inconsistent
# state, which should be much more meaningful than the
# timeout
validate_state(s, *workers)

# Remove as much of the traceback as possible; it's
# uninteresting boilerplate from utils_test and asyncio
# and not from the code being tested.
raise asyncio.TimeoutError(
f"Test timeout after {timeout}s.\n"
"========== Test stack trace starts here ==========\n"
f"{buffer.getvalue()}"
) from None

except pytest.xfail.Exception:
raise

except Exception:
if cluster_dump_directory and not has_pytestmark(
test_func, "xfail"
):
await dump_cluster_state(
s,
ws,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
raise

finally:
if client and c.status not in ("closing", "closed"):
await c._close(fast=s.status == Status.closed)
await end_cluster(s, workers)
await asyncio.wait_for(cleanup_global_workers(), 1)

try:
c = await default_client()
except ValueError:
pass
else:
await c._close(fast=True)

def get_unclosed():
return [c for c in Comm._instances if not c.closed()] + [
c for c in _global_clients.values() if c.status != "closed"
]

try:
start = time()
while time() < start + 60:
gc.collect()
if not get_unclosed():
break
await asyncio.sleep(0.05)
else:
if allow_unclosed:
print(f"Unclosed Comms: {get_unclosed()}")
else:
if allow_unclosed:
print(f"Unclosed Comms: {get_unclosed()}")
else:
raise RuntimeError("Unclosed Comms", get_unclosed())
finally:
Comm._instances.clear()
_global_clients.clear()

for w in workers:
if getattr(w, "data", None):
try:
w.data.clear()
except OSError:
# zict backends can fail if their storage directory
# was already removed
pass

return result
raise RuntimeError("Unclosed Comms", get_unclosed())
finally:
Comm._instances.clear()
_global_clients.clear()

for w in workers:
if getattr(w, "data", None):
try:
w.data.clear()
except OSError:
# zict backends can fail if their storage directory
# was already removed
pass

return result

async def async_fn_outer():
async with _acheck_active_rpc(active_rpc_timeout=active_rpc_timeout):
Expand Down Expand Up @@ -1903,6 +1882,7 @@ def _reconfigure():

with dask.config.set(
{
"local_directory": tempfile.gettempdir(),
"distributed.comm.timeouts.connect": "5s",
"distributed.admin.tick.interval": "500 ms",
"distributed.worker.profile.enabled": False,
Expand Down

0 comments on commit 1bfd99c

Please sign in to comment.