Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add back Worker.transition_fetch_missing #6112

Merged
merged 8 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3389,3 +3389,24 @@ async def test_tick_interval(c, s, a, b):
while s.workers[a.address].metrics["event_loop_interval"] < 0.100:
await asyncio.sleep(0.01)
time.sleep(0.200)


class BreakingWorker(Worker):
broke_once = False

def get_data(self, comm, **kwargs):
if not self.broke_once:
self.broke_once = True
raise OSError("fake error")
return super().get_data(comm, **kwargs)


@pytest.mark.slow
@gen_cluster(client=True, Worker=BreakingWorker)
async def test_broken_comm(c, s, a, b):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am somewhat ok with this test, since it does reliably trigger the behavior. But I think @fjetter was hoping to see a more minimized case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with that desire. I encourage folks to work on that. I think that this suffices.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does not reliably trigger the condition for me. I do hit it but it is not deterministic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can increase the data volume and it will become more and more likely. I don't have a deterministic test. I think that it would be good to have. I think that this suffices though.

df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
)
s = df.shuffle("id", shuffle="tasks")
await c.compute(s.size)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kudos to @gjoseph92 and @nils-braun for the test

13 changes: 13 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def __init__(
("executing", "released"): self.transition_executing_released,
("executing", "rescheduled"): self.transition_executing_rescheduled,
("fetch", "flight"): self.transition_fetch_flight,
("fetch", "missing"): self.transition_fetch_missing,
("fetch", "released"): self.transition_generic_released,
("flight", "error"): self.transition_flight_error,
("flight", "fetch"): self.transition_flight_fetch,
Expand Down Expand Up @@ -1929,6 +1930,14 @@ def transition_flight_missing(
ts.done = False
return {}, []

def transition_fetch_missing(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some assertions about ts in here. Likely at least

assert not ts.done  # ??? really not sure about this. I find `done` confusingly named.
if self.validate:
    assert ts.state == "fetch"
    assert not ts.who_has

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in validate_task_fetch already, which gets called at the end of every transitions call. I think that we're safe here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean validate_task_missing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both validation methods assert the correct (I think) who_has state

    def validate_task_fetch(self, ts):
        assert ts.key not in self.data
        assert self.address not in ts.who_has
        assert not ts.done
        assert ts in self.data_needed
        assert ts.who_has

        for w in ts.who_has:
            assert ts.key in self.has_what[w]
            assert ts in self.pending_data_per_worker[w]

    def validate_task_missing(self, ts):
        assert ts.key not in self.data
        assert not ts.who_has
        assert not ts.done
        assert not any(ts.key in has_what for has_what in self.has_what.values())
        assert ts in self._missing_dep_flight

So after this transition is called, the validate_task_missing method will be called, and verify that not ts.who_has

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, it looks like we are asserting the incoming state by default though.

This seems a little odd to me given how few lines of code there are in between getting the state and choosing this method (below for convenience)

        start = ts.state
        func = self._transitions_table.get((start, cast(str, finish)))

I'd be happy to add it for now as convention is you like

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like it's inconsistently done. I'm going to pass on this one unless you feel strongly about it. If you do speak up and I'll add it. I'm probably hesitating here just because it feels weird to keep doing something just because we've been doing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing. They raise an exception. The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable.
I haven't seen these problems recently but it's been a big problem a few months ago

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suggest not to rely on these validate calls. They are helpful but do not replace testing.

We should talk about this in more depth if you want to rely less on them. Validation testing has, historically, been invaluable in maintaining correctness and stability.

This is still testing. The differnce now is that the tests themselves trigger certain behaviors, and assertions are checked in a more systematic way. It is, I think, a better way of verifying state than explicitly checking state in every test. This would be, I think, an inefficient way of writing our tests.

The exception is lost in tornado and the only thing we see is an error log. Sometimes that causes the tests to get stuck but it's not reliable

If tests are passing even when these are not then that's certainly an issue and we should address it quickly. You might not be saying this though. If these aren't as ergonomic as we'd like then let's see if we can make them more ergonomic.

Alternatively, if we have a good alternative for the validation methods then I'm happy to engage. I would be -1 to putting explicit state testing at this into all of the tests though. I'm curious to learn what other alternatives there might be. Could I ask you to raise an issue with thoughts and we can move the conversation there?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, they are useful and I don't want to get rid of it. I'm just saying that we cannot blindly rely on them at this point in time. The way our concurrency model with tornado works is that AssertionError are just lost and logged. Sometimes that causes a worker to deadlock which then is a good thing because the test times and fails. However, it depends on where this assert is exactly called and relying on this "implicit deadlock" is not great.

To counter this, I proposed #4735 a while ago which proposes to log an exception and close the worker if a transition error occurs. I believe this would be a drastic behaviour but still a sane one, even for production. If anything goes wrong during state transitions, we should throw the worker away and rely on the scheduler to clean up the mess.
Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only run in validation mode in tests anyway, so I'm totally fine with it.

ts.state = "missing"
self._missing_dep_flight.add(ts)
ts.done = False
return {}, []

def transition_released_fetch(
self, ts: TaskState, *, stimulus_id: str
) -> RecsInstrs:
Expand Down Expand Up @@ -2671,6 +2680,10 @@ def ensure_communicating(self) -> None:
if ts.state != "fetch":
continue

if not ts.who_has:
self.transition(ts, "missing", stimulus_id=stimulus_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having this safety net. I still think it would also be appropriate to add

diff --git a/distributed/worker.py b/distributed/worker.py
index 7a062876..5e72f007 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -2991,6 +2991,9 @@ class Worker(ServerNode):
                 for d in has_what:
                     ts = self.tasks[d]
                     ts.who_has.remove(worker)
+                    if not ts.who_has:
+                        # TODO send `missing-data` to scheduler?
+                        recommendations[ts] = "missing"
 
             except Exception as e:
                 logger.exception(e)

In this case where we know we're making a task missing, it seems better to immediately transition it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a look and try to incorporate this shortly. Thank you for the suggestion.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. I've verified that this independently fixes the problem and pushed up this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like these "safety nets". We had "safety nets" all over the place that not only covered up actually severe problems but also made everything much more complicated than necessary, harder to debug, harder to understand, etc.
This is essentially uncovered, dead code. If we ever hit this line something went wrong. If something goes wrong, we should raise and not try to guess what might be a good resolution.
I prefer the fix in gather_dep over this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to remove this. Should I add in a if self.validate: check here? That would have caught things previously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the validate instead of the transition would be just fine. I do believe this is the only way one could even trigger the transition_fetch_missing transition and I believe we should get rid of it as well

continue

workers = [w for w in ts.who_has if w not in self.in_flight_workers]
if not workers:
assert ts.priority is not None
Expand Down