Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate data_needed #6587

Merged
merged 12 commits into from
Jun 26, 2022
Merged

Deduplicate data_needed #6587

merged 12 commits into from
Jun 26, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jun 16, 2022

@crusaderky crusaderky self-assigned this Jun 16, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Jun 16, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   10h 6m 23s ⏱️ + 28m 43s
  2 897 tests +    1    2 812 ✔️ +    2    84 💤 +  1  1  - 2 
21 458 runs  +830  20 491 ✔️ +793  966 💤 +39  1  - 2 

For more details on these failures, see this check.

Results for commit c8267eb. ± Comparison against base commit 4b24753.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky force-pushed the WSMR/data_needed branch 2 times, most recently from d5324b7 to b1529f4 Compare June 17, 2022 11:29
Note
----
Instead of number of tasks, we could've measured total nbytes and/or number of
tasks that only exist on the worker. Raw number of tasks is cruder but simpler.
Copy link
Collaborator Author

@crusaderky crusaderky Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the previous algorithm was already random.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the top priority task was never random but defined by the heap. Even same priority tasks would be deterministic given deterministic heap internals.
Even the _select_keys_for_gather / data_needed_per_worker was not random but rather insertion ordered.

Do we have an option to not use random? How about a str-compare of tasks.peek() to make it deterministic?

Yield the peer workers and tasks in data_needed, sorted by:

1. first local, then remote
2. if tied, by highest-priority task available
Copy link
Collaborator Author

@crusaderky crusaderky Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could trivially swap these two, reverting it to fetch higher priority tasks first as in main. As explained in the opening comment, this would cause up to 50MB worth of lower-priority tasks to be fetched from a remote host even if they're available on a local one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would indeed swap these and always prefer prioritized data

  1. The previous behavior always chose priority (next in data_needed) and fetched this locally if there was a local worker available. If priority is listed first in this heap, we'd have the same behavior. I would suggest to only engage in any changes of behavior here if we can back it up with benchmarks
  2. By preferring local workers I can see an edge case where many tasks / many GBs of data is on a couple of local workers but a couple of small high priority tasks are on a remote worker. These small high priority task would be needed to unblock an important downstream task (hence the high prio). We'd fetch a lot of data and risk even overflowing the worker until all local data is replicated before we even tried to fetch the remote data.

@crusaderky crusaderky force-pushed the WSMR/data_needed branch 3 times, most recently from 46065ff to 39d789e Compare June 17, 2022 12:36
@crusaderky crusaderky marked this pull request as ready for review June 17, 2022 16:08
@crusaderky
Copy link
Collaborator Author

This PR is no longer blocked by #6593

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I'm slightly worried about runtimes for very large clusters (think ~1k workers upwards). Maybe this is not a big deal on the workers but I could see this amounting to a few orders of magnitude slower in the average case for large clusters
  2. I think we should always prefer priority over anything else. Every other change should be verified with benchmarking
  3. We had a similar argument when testing the Scheduler.rebalance algorithm in AMM but I'm again wondering if we should write dedicated unit tests for this, e.g. define these things as functions and test them in isolation

Testing could look like

DATA_NEEDED = dict[str, HeapSet[TaskState]]

def _select_workers_for_gather(
    host: str,
    data_needed: DATA_NEEDED,
    skip_workers: set[str] # union of in_flight_workers and busy_workers
) -> Iterator[tuple[str, HeapSet[TaskState]]]:
    ...

def _select_keys_for_gather(
    data_needed,
    available,
) -> tuple[list[TaskState], int]:
    ...


def _get_data_to_fetch(host: str, data_needed, skip_workers)-> Iterator[tuple[str, set[TaskState], float]]:
    for worker, available in _select_keys_for_gather(
        host=host,
        data_needed=data_needed,
        skip_workers=skip_workers
    ):
        yield worker, *_select_keys_for_gather(data_needed=data_needed, available)



def test_prefer_priority():
    tasks = []
    data_needed = defaultdict(HeapSet)
    host = "127.0.0.1"
    local_worker = f"{host}:1234"
    remote_worker = "10.5.6.7:5678"

    ts1 = TaskState(
        key="key", 
        priority=1, 
        nbytes=1, 
        who_has={remote_worker}
    )
    data_needed[remote_worker].push(ts1)
    ts1 = TaskState(
        key="key2", 
        priority=0, 
        nbytes=1, 
        who_has={local_worker}
    )
    data_needed[local_worker].push(ts1)

    assert list(_get_data_to_fetch(host, data_needed, [])) == [
        (remote_worker, {ts1}, 1)
        (local_worker, {ts2}, 1)
    ]

This would at least provide us some lever to test a few of the edge cases encountered in this implementation. For instance, the mutability of data_needed and the cross play between _select_keys_for_gather and _select_workers_for_gather

distributed/worker_state_machine.py Show resolved Hide resolved
Comment on lines 1311 to 1316
get_address_host(worker) != host, # False < True
tasks.peek().priority,
-len(tasks),
random.random(),
worker,
tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like that we now have a very simple lever to control the fetching behavior +1

I would suggest not to engage in any large conversations here. I think we could theorycraft this for a while but eventually I'd be interested in real world benchmarks. We might want to revisit this once we have a couple of test workloads ready to go.

Yield the peer workers and tasks in data_needed, sorted by:

1. first local, then remote
2. if tied, by highest-priority task available
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would indeed swap these and always prefer prioritized data

  1. The previous behavior always chose priority (next in data_needed) and fetched this locally if there was a local worker available. If priority is listed first in this heap, we'd have the same behavior. I would suggest to only engage in any changes of behavior here if we can back it up with benchmarks
  2. By preferring local workers I can see an edge case where many tasks / many GBs of data is on a couple of local workers but a couple of small high priority tasks are on a remote worker. These small high priority task would be needed to unblock an important downstream task (hence the high prio). We'd fetch a lot of data and risk even overflowing the worker until all local data is replicated before we even tried to fetch the remote data.

Note
----
Instead of number of tasks, we could've measured total nbytes and/or number of
tasks that only exist on the worker. Raw number of tasks is cruder but simpler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the top priority task was never random but defined by the heap. Even same priority tasks would be deterministic given deterministic heap internals.
Even the _select_keys_for_gather / data_needed_per_worker was not random but rather insertion ordered.

Do we have an option to not use random? How about a str-compare of tasks.peek() to make it deterministic?

Comment on lines +1303 to +1308
for worker, tasks in list(self.data_needed.items()):
if not tasks:
del self.data_needed[worker]
continue
if worker in self.in_flight_workers or worker in self.busy_workers:
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I initially suggested to go down this road but I would still like to raise the point that this implementation now would iterate over all workers every time we're calling _ensure_communicating

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even worse, if I'm not mistaken, the below iteration is not even linear (even without the additional push).

Copy link
Collaborator Author

@crusaderky crusaderky Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iteration is O(n) to the number of peer workers that hold any data to fetch, and O(n*logn) to the number of workers that hold data to fetch and are neither in flight nor busy - split between O(n) in python bytecode * O(logn) in C.

In a worst-case scenario of a cluster with 1000 workers where you suddenly have data to fetch from all 1000 (and I seriously doubt this is realistic), you'd perform

  1. a for loop in pure python of 1000 iterations
  2. a single heapq.heapify on a list of 1000 elements, which costs O(n*logn) and is implemented in C
  3. a for loop in pure python of 50 iterations (distributed.worker.connections.outgoing), each of which calls heapq.heappop on a list of 950~1000 elements, which costs O(logn) and is implemented in C

At the next call to _ensure_communicating, before any worker has responded, you'll just repeat step 1, while skipping steps 2 and 3.

I strongly suspect this whole thing to be negligible in most cases.

I can easily improve this, moving aside the busy/in flight workers to a separate attribute (e.g. WorkerState.busy_workers), thus making the second call O(1). However if you agree I'd rather do it in a follow-up PR.

Comment on lines 1329 to 1332
heapq.heappush(
heap,
(is_remote, tasks.peek().priority, -len(tasks), rnd, worker, tasks),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tying a knot in my head when thinking about runtime.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heappop/heappush are O(logn) and are implemented in C

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not talking about the heappush itself but rather that it is pushing on the heap we're currently iterating over.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially there was no heap, just a sort(). But then I realised that, if you have tasks with multiple replicas, your order after the first worker may change, so you need to re-sort. a heap is the most efficient way to do it.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jun 22, 2022

OK, I'm swapping priority and local/remote around.

Well, the top priority task was never random but defined by the heap.

The previous algorithm was:

  1. pick top priority task
  2. given two tasks with the same priority, pick the one inserted first
  3. of all hosts in ts.who_has, pick a random one among the local ones (see call to random.choice)
  4. if no local peers exist, pick a random one among the remote ones
  5. pick up to 50MB worth of lower-priority tasks from the same host, following priority first and insertion order next

Do we have an option to not use random? How about a str-compare of tasks.peek() to make it deterministic?

That would be pointless when the same task is available in multiple replicas from different workers.
You could str-compare worker addresses, but that would be disastrous:

  • when multiple workers are on the same host - lower-port workers would be hammered
  • on a network where IP addresses convey locality - e.g. there's a ticket somewhere from a user that had their cluster spanning two AWS regions

I'm adding a statically seeded random state to obtain reproducibility.

@crusaderky
Copy link
Collaborator Author

Would it be OK if I add unit tests (1) for the determinism and (2) to test that priority is chosen over locality as a follow-up after #6446? That PR makes it easier to write state tests.

@fjetter
Copy link
Member

fjetter commented Jun 22, 2022

Would it be OK if I add unit tests (1) [...] after #6446?

sure

@crusaderky
Copy link
Collaborator Author

Added test about gather priority.
The performance test will be in https://github.com/dask/dask-benchmarks as discussed (PR to follow)

to_gather={"x5"},
total_nbytes=4 * 2**20,
),
]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing this test filled me with pure, unadultered joy ❤️

crusaderky added a commit to crusaderky/distributed that referenced this pull request Jun 22, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jun 23, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jun 23, 2022
crusaderky added a commit to crusaderky/distributed that referenced this pull request Jun 23, 2022
@crusaderky crusaderky merged commit c82bba5 into dask:main Jun 26, 2022
@crusaderky crusaderky deleted the WSMR/data_needed branch June 26, 2022 10:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Alternatives for current ensure_communicating
2 participants