Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail P2PShuffle gracefully upon worker failure #7326

Merged
merged 95 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
46860d5
Minimal checks on closed shuffle
hendrikmakait Nov 16, 2022
43b4cf7
Close Shuffle and WorkerExtension
hendrikmakait Nov 16, 2022
c86bc04
Drop unnecessary
hendrikmakait Nov 16, 2022
f8b59d6
Fail shuffle when worker is removed
hendrikmakait Nov 17, 2022
5c24302
Do not offload if closed
hendrikmakait Nov 17, 2022
f6a2248
Serialize exception
hendrikmakait Nov 17, 2022
cac380d
Avoid test deadlocking on wait_for_state
hendrikmakait Nov 17, 2022
8c07589
Use handle_task_erred
hendrikmakait Nov 17, 2022
012c690
Add test for input-only worker
hendrikmakait Nov 17, 2022
7634a60
Improve exception
hendrikmakait Nov 17, 2022
1af605e
Remember erred shuffles
hendrikmakait Nov 17, 2022
d7db4ba
Clean up shuffle state on workers
hendrikmakait Nov 17, 2022
7fdfe60
Make tests event-based
hendrikmakait Nov 18, 2022
44452c5
Improve tests
hendrikmakait Nov 18, 2022
2e1cf51
Add tests
hendrikmakait Nov 18, 2022
184823c
Additional (deadlocking) test
hendrikmakait Nov 18, 2022
c7b84c4
raise-protected methods
hendrikmakait Nov 18, 2022
a6a9445
Refactor offloading of repartitioning
hendrikmakait Nov 18, 2022
59bc3b1
Improve tests and drop reschedule
hendrikmakait Nov 18, 2022
acfbf30
Clean up
hendrikmakait Nov 18, 2022
8e97c7c
Clean up scheduler and adjust tests
hendrikmakait Nov 21, 2022
5f92727
Idempotency
hendrikmakait Nov 21, 2022
b36862f
Remove race
hendrikmakait Nov 21, 2022
bc16a47
Fail on all participating workers
hendrikmakait Nov 21, 2022
40e5b4b
Clean up participating workers
hendrikmakait Nov 21, 2022
646c721
Properly wait for cleanup
hendrikmakait Nov 21, 2022
da6f160
Remember completed shuffle should workers fail down the line
hendrikmakait Nov 21, 2022
f9c4db3
Revert completed_shuffles
hendrikmakait Nov 21, 2022
6e34ac3
Remove warnings
hendrikmakait Nov 21, 2022
871ddb7
Additional test
hendrikmakait Nov 21, 2022
fea0c7d
Fix tests on Windows
hendrikmakait Nov 21, 2022
0ed9f61
Do not try to transition barrier to erred (it wont work)
hendrikmakait Nov 22, 2022
3a50c7e
Fix deadlock (WIP)
hendrikmakait Nov 22, 2022
fd37f3b
Add transition no-worker -> erred
hendrikmakait Nov 22, 2022
f13cead
Transitions tasks to erred
hendrikmakait Nov 22, 2022
c567651
Improve error messages
hendrikmakait Nov 22, 2022
987e3a3
Improve barrier tests
hendrikmakait Nov 22, 2022
3281152
Test deadlock on last shuffle task
hendrikmakait Nov 22, 2022
e48fe17
Drop unnecessary test
hendrikmakait Nov 22, 2022
bed5c98
TODO
hendrikmakait Nov 22, 2022
23408c3
Fix test_closed_worker_during_barrier
hendrikmakait Nov 23, 2022
3339026
Add test
hendrikmakait Nov 23, 2022
49e3a81
Relax test
hendrikmakait Nov 23, 2022
e83aceb
Remove comment
hendrikmakait Nov 23, 2022
f1c1478
Add docstring
hendrikmakait Nov 23, 2022
426c4fc
Add seed
hendrikmakait Nov 23, 2022
c851887
Relax test
hendrikmakait Nov 23, 2022
f81fff0
Remove comparison
hendrikmakait Nov 23, 2022
79c2834
Improve test runtime and remove slow markers
hendrikmakait Nov 23, 2022
57ccc17
Use raises_with_cause
hendrikmakait Nov 23, 2022
e842e02
Improve docstring
hendrikmakait Nov 23, 2022
e0482f1
Cleaner exception propagation
hendrikmakait Nov 23, 2022
f6efc70
Ensure that fail waits for close
hendrikmakait Nov 23, 2022
96d6aed
Proper shuffle_closed_events
hendrikmakait Nov 24, 2022
5cb3be5
Privatizing
hendrikmakait Nov 24, 2022
8462d93
Merge branch 'main' into close-shuffle
hendrikmakait Nov 24, 2022
a0a6881
Fixes after merge
hendrikmakait Nov 24, 2022
608dea5
Fix docstring
hendrikmakait Nov 24, 2022
307023c
Simplify
hendrikmakait Nov 24, 2022
cf3fce4
Adjust tests
hendrikmakait Nov 25, 2022
7a8f24d
Remove superfluous copy
hendrikmakait Nov 28, 2022
2f5e676
No raise_if_closed on worker extension
hendrikmakait Nov 28, 2022
720b841
Replace idempotency in ShuffleWorkerExtension.close with assertion
hendrikmakait Nov 28, 2022
e392b32
Merge branch 'main' into close-shuffle
hendrikmakait Nov 29, 2022
c9dc954
Attempt to fix shuffle resilience
fjetter Nov 29, 2022
a3229f7
WIP: Finish alternative approach
hendrikmakait Dec 1, 2022
e31b566
Simplify
hendrikmakait Dec 1, 2022
c6b1d30
Remove chaining
hendrikmakait Dec 2, 2022
4865cbe
Fix cleanup
hendrikmakait Dec 2, 2022
588306f
Add tests
hendrikmakait Dec 2, 2022
b44ebe2
Improve tests
hendrikmakait Dec 2, 2022
550dada
Drop _closed_events on scheduler extension
hendrikmakait Dec 2, 2022
7a186c9
Fix cleanup and its testing
hendrikmakait Dec 2, 2022
91516ac
Ignore stale heartbeats
hendrikmakait Dec 2, 2022
9a28675
Fix bug in state machine
hendrikmakait Dec 2, 2022
3c09d8f
Fix race
hendrikmakait Dec 2, 2022
7d7ec2f
Simplify
hendrikmakait Dec 5, 2022
35ca74b
Single-source of barrier_key
hendrikmakait Dec 5, 2022
deaa9a4
Optimize barrier
hendrikmakait Dec 5, 2022
06c0cdc
Fix typo
hendrikmakait Dec 5, 2022
5b9ea61
Add test for early forgetting
hendrikmakait Dec 6, 2022
fd7451b
Clean worker state once forgotten
hendrikmakait Dec 6, 2022
cec76b9
Add tombstone
hendrikmakait Dec 6, 2022
8a447d5
More explicit variable naming
hendrikmakait Dec 6, 2022
d3f8fe8
Fix overwritten worker
hendrikmakait Dec 6, 2022
6e4273a
XFAIL tests
hendrikmakait Dec 6, 2022
e4de791
Increase test size
hendrikmakait Dec 6, 2022
a33ce0e
Ignore leaked subprocess
hendrikmakait Dec 6, 2022
3163b3b
Fix test
hendrikmakait Dec 6, 2022
f8c4adb
Merge branch 'main' into close-shuffle
hendrikmakait Dec 6, 2022
7d54aac
Add test for removing bystander
hendrikmakait Dec 8, 2022
5f1a41f
Rename
hendrikmakait Dec 8, 2022
3c90f50
Remove indirection
hendrikmakait Dec 9, 2022
9040137
Unskip test
hendrikmakait Dec 9, 2022
bc317e2
Skip again
hendrikmakait Dec 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 122 additions & 18 deletions distributed/shuffle/_shuffle_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dask.utils import parse_bytes

from distributed.core import PooledRPCCall
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.protocol import to_serialize
from distributed.shuffle._arrow import (
deserialize_schema,
Expand All @@ -40,6 +41,10 @@
logger = logging.getLogger(__name__)


class ShuffleClosedError(RuntimeError):
pass


class Shuffle:
"""State for a single active shuffle

Expand Down Expand Up @@ -115,6 +120,7 @@ def __init__(
partitions_of[addr].append(part)
self.partitions_of = dict(partitions_of)
self.worker_for = pd.Series(worker_for, name="_workers").astype("category")
self.closed = False

def _dump_batch(batch: pa.Buffer, file: BinaryIO) -> None:
return dump_batch(batch, file, self.schema)
Expand All @@ -138,6 +144,7 @@ def _dump_batch(batch: pa.Buffer, file: BinaryIO) -> None:
self.total_recvd = 0
self.start_time = time.time()
self._exception: Exception | None = None
self._close_lock = asyncio.Lock()

def __repr__(self) -> str:
return f"<Shuffle id: {self.id} on {self.local_address}>"
Expand All @@ -150,6 +157,7 @@ def time(self, name: str) -> Iterator[None]:
self.diagnostics[name] += stop - start

async def barrier(self) -> None:
self.raise_if_closed()
# FIXME: This should restrict communication to only workers
# participating in this specific shuffle. This will not only reduce the
# number of workers we need to contact but will also simplify error
Expand All @@ -173,6 +181,7 @@ async def send(self, address: str, shards: list[bytes]) -> None:
)

async def offload(self, func: Callable[..., T], *args: Any) -> T:
self.raise_if_closed()
with self.time("cpu"):
return await asyncio.get_running_loop().run_in_executor(
self.executor,
Expand All @@ -194,8 +203,7 @@ async def receive(self, data: list[bytes]) -> None:
await self._receive(data)

async def _receive(self, data: list[bytes]) -> None:
if self._exception:
raise self._exception
self.raise_if_closed()

try:
self.total_recvd += sum(map(len, data))
Expand All @@ -219,11 +227,20 @@ async def _receive(self, data: list[bytes]) -> None:
for k, v in groups.items()
}
)
self.raise_if_closed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting place. Why would we need to raise here but not between any of the other awaits?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offload itself is protected with raise_if_closed(). I've been thinking whether I should wrap any async functionality that needs to be protected with raise_if_closed() into individual functions. That would probably make reasoning about these easier.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think we should combine the above calls into a single offload anyhow which would render this comment moot

Good point, done.

await self._disk_buffer.write(groups)
except Exception as e:
self._exception = e
raise

def raise_if_closed(self) -> None:
if self.closed:
if self._exception:
raise self._exception
raise ShuffleClosedError(
f"Shuffle {self.id} has been closed on {self.local_address}"
)

async def add_partition(self, data: pd.DataFrame) -> None:
if self.transferred:
raise RuntimeError(f"Cannot add more partitions to shuffle {self}")
Expand All @@ -244,6 +261,7 @@ def _() -> dict[str, list[bytes]]:
await self._comm_buffer.write(out)

async def get_output_partition(self, i: int) -> pd.DataFrame:
self.raise_if_closed()
assert self.transferred, "`get_output_partition` called before barrier task"

assert self.worker_for[i] == self.local_address, (
Expand All @@ -258,6 +276,7 @@ async def get_output_partition(self, i: int) -> pd.DataFrame:
), f"No outputs remaining, but requested output partition {i} on {self.local_address}."
await self.flush_receive()
try:
self.raise_if_closed()
df = self._disk_buffer.read(i)
with self.time("cpu"):
out = df.to_pandas()
Expand All @@ -269,6 +288,7 @@ async def get_output_partition(self, i: int) -> pd.DataFrame:
async def inputs_done(self) -> None:
assert not self.transferred, "`inputs_done` called multiple times"
self.transferred = True
self.raise_if_closed()
await self._comm_buffer.flush()
try:
self._comm_buffer.raise_on_exception()
Expand All @@ -280,17 +300,23 @@ def done(self) -> bool:
return self.transferred and self.output_partitions_left == 0

async def flush_receive(self) -> None:
if self._exception:
raise self._exception
self.raise_if_closed()
await self._disk_buffer.flush()

async def close(self) -> None:
await self._comm_buffer.close()
await self._disk_buffer.close()
try:
self.executor.shutdown(cancel_futures=True)
except Exception:
self.executor.shutdown()
self.closed = True
async with self._close_lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle.close is called in register_complete and set_exception

How come this is overlapping? why do we need a lock? Are we sure that everything we do below is idempotent?

If we do not want to guarantee idempotency, the pattern we take in the server classes might be better suited than a lock, i.e.

async def close(self) -> None:
    if self.closed:
        await self._event_close.wait()
    self.closed = True
    await close_all_stuff()
    self._event_close.set()

this locks + makes it idempotent even without relying on the buffers/executors / whatever else to come to be.

The only important thing is that nobody must reset the closed attribute. This is a one way street, otherwise this pattern breaks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shuffle.close is called in ShuffleWorkerExtension._register_complete,ShuffleWorkerExtension.close and Shuffle.fail, so we could see scenarios where an extension is closing and a shuffle is either completing or failing.

Fair point about idempotency. While everything should be idempotent at the moment, I'll adjust this to use the more cautious pattern of waiting on an event. Since shuffles should never reopen once closed, this should be fine.

await self._comm_buffer.close()
await self._disk_buffer.close()
try:
self.executor.shutdown(cancel_futures=True)
except Exception:
self.executor.shutdown()

async def set_exception(self, exception: Exception) -> None:
if not self.closed:
self._exception = exception
await self.close()


class ShuffleWorkerExtension:
Expand All @@ -305,17 +331,27 @@ class ShuffleWorkerExtension:
- collecting instrumentation of ongoing shuffles and route to scheduler/worker
"""

worker: Worker
shuffles: dict[ShuffleId, Shuffle]
erred_shuffles: dict[ShuffleId, Exception]
memory_limiter_comms: ResourceLimiter
memory_limiter_disk: ResourceLimiter
closed: bool

def __init__(self, worker: Worker) -> None:
# Attach to worker
worker.handlers["shuffle_receive"] = self.shuffle_receive
worker.handlers["shuffle_inputs_done"] = self.shuffle_inputs_done
worker.handlers["shuffle_set_exception"] = self.shuffle_set_exception
worker.extensions["shuffle"] = self

# Initialize
self.worker: Worker = worker
self.shuffles: dict[ShuffleId, Shuffle] = {}
self.memory_limiter_disk = ResourceLimiter(parse_bytes("1 GiB"))
self.worker = worker
self.shuffles = {}
self.erred_shuffles = {}
self.memory_limiter_comms = ResourceLimiter(parse_bytes("100 MiB"))
self.memory_limiter_disk = ResourceLimiter(parse_bytes("1 GiB"))
self.closed = False

# Handlers
##########
Expand All @@ -333,8 +369,13 @@ async def shuffle_receive(
Handler: Receive an incoming shard of data from a peer worker.
Using an unknown ``shuffle_id`` is an error.
"""
shuffle = await self._get_shuffle(shuffle_id)
await shuffle.receive(data)
try:
shuffle = await self._get_shuffle(shuffle_id)
await shuffle.receive(data)
except ShuffleClosedError:
from distributed.worker import Reschedule

raise Reschedule()
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

async def shuffle_inputs_done(self, shuffle_id: ShuffleId) -> None:
"""
Expand All @@ -353,6 +394,12 @@ async def shuffle_inputs_done(self, shuffle_id: ShuffleId) -> None:
logger.critical(f"Shuffle inputs done {shuffle}")
await self._register_complete(shuffle)

async def shuffle_set_exception(self, shuffle_id: ShuffleId, message: str) -> None:
shuffle = self.shuffles.pop(shuffle_id)
exception = RuntimeError(message)
self.erred_shuffles[shuffle_id] = exception
await shuffle.set_exception(exception)

def add_partition(
self,
data: pd.DataFrame,
Expand All @@ -378,6 +425,7 @@ async def _barrier(self, shuffle_id: ShuffleId) -> None:
await shuffle.barrier()

async def _register_complete(self, shuffle: Shuffle) -> None:
self.raise_if_closed()
await shuffle.close()
await self.worker.scheduler.shuffle_register_complete(
id=shuffle.id,
Expand Down Expand Up @@ -411,6 +459,10 @@ async def _get_shuffle(
"Get a shuffle by ID; raise ValueError if it's not registered."
import pyarrow as pa

self.raise_if_closed()

if exception := self.erred_shuffles.get(shuffle_id):
raise exception
try:
return self.shuffles[shuffle_id]
except KeyError:
Expand All @@ -423,6 +475,11 @@ async def _get_shuffle(
npartitions=npartitions,
column=column,
)
if result["status"] == "ERROR":
raise RuntimeError(
f"Worker {result['worker']} left during active shuffle {shuffle_id}"
)
assert result["status"] == "OK"
except KeyError:
# Even the scheduler doesn't know about this shuffle
# Let's hand this back to the scheduler and let it figure
Expand All @@ -434,6 +491,7 @@ async def _get_shuffle(

raise Reschedule()
else:
self.raise_if_closed()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a bit odd to fail here. I'd expect this to happen somewhere else (and all tests pass if I remove this).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we technically need this here. Otherwise we can run into a very unlikely edge case where the extension has been closed since we started sending the RPC to the scheduler. This would leave the new Shuffle instance open indefinitely. We can likely relax this condition once we have proper cleanup upon task release implemented. Tests are expected to pass since I have not written once for this specific scenario and most (all?) dealing with closing workers expect the shuffle to have started before anything happens. Now that you bring it up though, this might be worth another test if we want to make this rock-solid.

if shuffle_id not in self.shuffles:
shuffle = Shuffle(
Comment on lines 491 to 492
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just discussed that intstatiating a new shuffle instance (and also talking to the scheduler itself) is an error case that can only be caught by implementing the raise_if_closed pattern on extension level. everything else could be handled on shuffle instance level instead

column=result["column"],
Expand All @@ -455,10 +513,17 @@ async def _get_shuffle(
return self.shuffles[shuffle_id]

async def close(self) -> None:
self.closed = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This attribute indicates closing rather than closed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've been thinking whether it would make sense to replace the boolean with a state running, closing, closed for clarity. Naming this closing doesn't really cut it either since it's not closing once it's closed. open seems to suggest that not self.open means it's closed...naming is hard.

while self.shuffles:
_, shuffle = self.shuffles.popitem()
await shuffle.close()

def raise_if_closed(self) -> None:
if self.closed:
raise ShuffleClosedError(
f"{self.__class__.__name__} already closed on {self.worker.address}"
)

#############################
# Methods for worker thread #
#############################
Expand Down Expand Up @@ -507,8 +572,11 @@ def get_output_partition(

Calling this for a ``shuffle_id`` which is unknown or incomplete is an error.
"""
assert shuffle_id in self.shuffles, "Shuffle worker restrictions misbehaving"
shuffle = self.shuffles[shuffle_id]
self.raise_if_closed()
assert (
shuffle_id in self.shuffles or shuffle_id in self.erred_shuffles
), "Shuffle worker restrictions misbehaving"
shuffle = self.get_shuffle(shuffle_id)
output = sync(self.worker.loop, shuffle.get_output_partition, output_partition)
# key missing if another thread got to it first
if shuffle.done() and shuffle_id in self.shuffles:
Expand All @@ -517,7 +585,7 @@ def get_output_partition(
return output


class ShuffleSchedulerExtension:
class ShuffleSchedulerExtension(SchedulerPlugin):
"""
Shuffle extension for the scheduler

Expand All @@ -536,6 +604,7 @@ class ShuffleSchedulerExtension:
columns: dict[ShuffleId, str]
output_workers: dict[ShuffleId, set[str]]
completed_workers: dict[ShuffleId, set[str]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove self.completed_workers and register_complete, but that's work for another PR.

erred_shuffles: dict[ShuffleId, str]

def __init__(self, scheduler: Scheduler):
self.scheduler = scheduler
Expand All @@ -551,6 +620,11 @@ def __init__(self, scheduler: Scheduler):
self.columns = {}
self.output_workers = {}
self.completed_workers = {}
self.erred_shuffles = {}
self.scheduler.add_plugin(self)

def shuffle_ids(self) -> set[ShuffleId]:
return set(self.worker_for)

def heartbeat(self, ws: WorkerState, data: dict) -> None:
for shuffle_id, d in data.items():
Expand All @@ -563,6 +637,9 @@ def get(
column: str | None,
npartitions: int | None,
) -> dict:
if id in self.erred_shuffles:
return {"status": "ERROR", "worker": self.erred_shuffles[id]}

if id not in self.worker_for:
assert schema is not None
assert column is not None
Expand Down Expand Up @@ -590,12 +667,39 @@ def get(
self.completed_workers[id] = set()

return {
"status": "OK",
"worker_for": self.worker_for[id],
"column": self.columns[id],
"schema": self.schemas[id],
"output_workers": self.output_workers[id],
}

async def remove_worker(self, scheduler: Scheduler, worker: str) -> None:
broadcasts = []
for shuffle_id, output_workers in self.output_workers.items():
if worker not in output_workers:
continue
self.erred_shuffles[shuffle_id] = worker
contact_workers = output_workers.copy()
contact_workers.discard(worker)
message = f"Worker {worker} left during active shuffle {shuffle_id}"
broadcasts.append(
scheduler.broadcast(
msg={
"op": "shuffle_set_exception",
"message": message,
"shuffle_id": shuffle_id,
},
workers=list(contact_workers),
fjetter marked this conversation as resolved.
Show resolved Hide resolved
)
)
self.scheduler.handle_task_erred(
f"shuffle-barrier-{shuffle_id}",
exception=to_serialize(RuntimeError(message)),
stimulus_id="shuffle-remove-worker",
)
await asyncio.gather(*broadcasts, return_exceptions=True)
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved
hendrikmakait marked this conversation as resolved.
Show resolved Hide resolved

def register_complete(self, id: ShuffleId, worker: str) -> None:
"""Learn from a worker that it has completed all reads of a shuffle"""
if id not in self.completed_workers:
Expand Down
Loading