Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Annotations #2180

Closed
wants to merge 37 commits into from
Closed

Task Annotations #2180

wants to merge 37 commits into from

Conversation

sjperkins
Copy link
Member

@sjperkins sjperkins commented Aug 12, 2018

Dask Issue dask/dask#3783
Related dask PR dask/dask#3869

@quasiben
Copy link
Member

quasiben commented Nov 7, 2019

@sjperkins thanks for this PR. I assume this is still a WIP based on the last few comments dask/dask#3783 . We are developing a benchmarks codebase: https://github.com/dask/dask-benchmarks and it would very interesting to evaluate this work with specific GPU workloads . I don't think there is anything to do in the near term and I'll ping back again once the benchmark repo has been flushed out

@sjperkins
Copy link
Member Author

I don't think there is anything to do in the near term and I'll ping back again once the benchmark repo has been flushed out

Would it be helpful to resolve the conflicts with master? I've done this locally. The body of the following test seems to run, but seems to fail on shutting down the test cluster.

/home/sperkins/work/ska/code/distributed/distributed/utils.py:134:
RuntimeWarning: Couldn't detect a suitable IP address for reaching '2001:4860:4860::8888', defaulting to '::1': [Errno 101] Network is unreachable

That looks like an IPV6 address

Expanded test output here:

$ py.test -s -vvv -k test_task_annotations
=========================================================================================== test session starts ============================================================================================
platform linux -- Python 3.6.8, pytest-5.2.2, py-1.8.0, pluggy-0.13.0 -- /home/sperkins/venv/distributed/bin/python3
cachedir: .pytest_cache
rootdir: /home/sperkins/work/ska/code/distributed, inifile: setup.cfg
collected 1012 items / 1011 deselected / 1 selected                                                                                                                                                        

test_client.py::test_task_annotations <- distributed/utils_test.py distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:42185
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:45199
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:45199
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:42185
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   16.64 GB
distributed.worker - INFO -       Local Directory: /home/sperkins/work/ska/code/distributed/distributed/tests/dask-worker-space/worker-gmigo3qb
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:41039
distributed.worker - INFO -          Listening to:      tcp://127.0.0.1:41039
distributed.worker - INFO - Waiting to connect to:      tcp://127.0.0.1:42185
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                   16.64 GB
distributed.worker - INFO -       Local Directory: /home/sperkins/work/ska/code/distributed/distributed/tests/dask-worker-space/worker-gms900fj
distributed.worker - INFO - -------------------------------------------------
distributed.scheduler - INFO - Register tcp://127.0.0.1:45199
distributed.scheduler - INFO - Register tcp://127.0.0.1:41039
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:45199
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Starting worker compute stream, tcp://127.0.0.1:41039
distributed.core - INFO - Starting established connection
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:42185
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:      tcp://127.0.0.1:42185
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Receive client connection: Client-07f9cf5a-0217-11ea-9d40-e09467e6387c
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Remove client Client-07f9cf5a-0217-11ea-9d40-e09467e6387c
distributed.scheduler - INFO - Remove client Client-07f9cf5a-0217-11ea-9d40-e09467e6387c
distributed.scheduler - INFO - Close client connection: Client-07f9cf5a-0217-11ea-9d40-e09467e6387c
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:41039
distributed.worker - INFO - Stopping worker at tcp://127.0.0.1:45199
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:41039
distributed.core - INFO - Removing comms to tcp://127.0.0.1:41039
distributed.scheduler - INFO - Remove worker tcp://127.0.0.1:45199
distributed.core - INFO - Removing comms to tcp://127.0.0.1:45199
distributed.scheduler - INFO - Lost all workers
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
FAILED

================================================================================================= FAILURES =================================================================================================
__________________________________________________________________________________________ test_task_annotations ___________________________________________________________________________________________

    def test_func():
        result = None
        workers = []
        with clean(timeout=active_rpc_timeout, **clean_kwargs) as loop:
    
            async def coro():
                with dask.config.set(config):
                    s = False
                    for i in range(5):
                        try:
                            s, ws = await start_cluster(
                                nthreads,
                                scheduler,
                                loop,
                                security=security,
                                Worker=Worker,
                                scheduler_kwargs=scheduler_kwargs,
                                worker_kwargs=worker_kwargs,
                            )
                        except Exception as e:
                            logger.error(
                                "Failed to start gen_cluster, retrying",
                                exc_info=True,
                            )
                        else:
                            workers[:] = ws
                            args = [s] + workers
                            break
                    if s is False:
                        raise Exception("Could not start cluster")
                    if client:
                        c = await Client(
                            s.address,
                            loop=loop,
                            security=security,
                            asynchronous=True,
                            **client_kwargs
                        )
                        args = [c] + args
                    try:
                        future = func(*args)
                        if timeout:
                            future = gen.with_timeout(
                                timedelta(seconds=timeout), future
                            )
                        result = await future
                        if s.validate:
                            s.validate_state()
                    finally:
                        if client and c.status not in ("closing", "closed"):
                            await c._close(fast=s.status == "closed")
                        await end_cluster(s, workers)
                        await gen.with_timeout(
                            timedelta(seconds=1), cleanup_global_workers()
                        )
    
                    try:
                        c = await default_client()
                    except ValueError:
                        pass
                    else:
                        await c._close(fast=True)
    
                    for i in range(5):
                        if all(c.closed() for c in Comm._instances):
                            break
                        else:
                            await gen.sleep(0.05)
                    else:
                        L = [c for c in Comm._instances if not c.closed()]
                        Comm._instances.clear()
                        # raise ValueError("Unclosed Comms", L)
                        print("Unclosed Comms", L)
    
                    return result
    
            result = loop.run_sync(
>               coro, timeout=timeout * 2 if timeout else timeout
            )

../utils_test.py:952: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/usr/lib/python3.6/contextlib.py:88: in __exit__
    next(self.gen)
../utils_test.py:1549: in clean
    del thread_state.on_event_loop_thread
/usr/lib/python3.6/contextlib.py:88: in __exit__
    next(self.gen)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    @contextmanager
    def check_thread_leak():
        active_threads_start = set(threading._active)
    
        yield
    
        start = time()
        while True:
            bad = [
                t
                for t, v in threading._active.items()
                if t not in active_threads_start
                and "Threaded" not in v.name
                and "watch message" not in v.name
                and "TCP-Executor" not in v.name
            ]
            if not bad:
                break
            else:
                sleep(0.01)
            if time() > start + 5:
                from distributed import profile
    
                tid = bad[0]
                thread = threading._active[tid]
                call_stacks = profile.call_stack(sys._current_frames()[tid])
>               assert False, (thread, call_stacks)
E               AssertionError: (<Thread(Profile, started daemon 140702366746368)>, ['  File "/usr/lib/python3.6/threading.py", line 884, in _bootstra... '  File "/home/sperkins/work/ska/code/distributed/distributed/profile.py", line 269, in _watch
E                 \tsleep(interval)
E                 '])
E               assert False

../utils_test.py:1451: AssertionError
============================================================================================= warnings summary =============================================================================================
/home/sperkins/venv/distributed/lib/python3.6/site-packages/_pytest/mark/structures.py:325
/home/sperkins/venv/distributed/lib/python3.6/site-packages/_pytest/mark/structures.py:325
  /home/sperkins/venv/distributed/lib/python3.6/site-packages/_pytest/mark/structures.py:325: 

/home/sperkins/work/ska/code/distributed/distributed/utils.py:134
  /home/sperkins/work/ska/code/distributed/distributed/utils.py:134: RuntimeWarning: Couldn't detect a suitable IP address for reaching '2001:4860:4860::8888', defaulting to '::1': [Errno 101] Network is unreachable
    RuntimeWarning,

-- Docs: https://docs.pytest.org/en/latest/warnings.html
======================================================================================== slowest 10 test durations =========================================================================================
5.13s call     distributed/tests/test_client.py::test_task_annotations
0.00s setup    distributed/tests/test_client.py::test_task_annotations
0.00s teardown distributed/tests/test_client.py::test_task_annotations
============================================================================== 1 failed, 1011 deselected, 3 warnings in 5.92s ==============

@quasiben
Copy link
Member

quasiben commented Nov 8, 2019

@sjperkins this PR will take some time to properly review and one of the question we would want to answer before merging is whether or not there is any impact on performance.

@sjperkins
Copy link
Member Author

sjperkins commented Nov 8, 2019

@sjperkins this PR will take some time to properly review and one of the question we would want to answer before merging is whether or not there is any impact on performance.

OK, I think the tests are currently failing (after the master merge) due to complex tasks which have changed a bit, especially handling the following case in args:

key: (apply, func, (tuple, list(args)), kwargs2)
for key, args in zip(keys, zip(*iterables))

let me spend some time getting the tests working, its been a year since I touched this PR.

@quasiben
Copy link
Member

quasiben commented Nov 8, 2019

I wouldn't prioritize fixing tests and merging with master . It will be some time before we can look at this work. My intention in commenting yesterday was not to nudge you into more work, but rather explain why there wasn't a review.

@sjperkins
Copy link
Member Author

sjperkins commented Nov 11, 2019

I've merged master and simplified complex task handling. Previously, this was split over worker.py::dumps_task, worker.py::_deserialize and worker.py::execute_task in order to ignore Annotations within complex, nested tasks

Most functionality has now been moved into worker.py::dumps_task and an assumption has been made that no Task Annotations will be present in complex, nested tasks (they are still handled in complex tasks when they are present in the outermost level of args). In other words (inc, (inc, 1), TaskAnnotation) is valid but (inc, (inc, 1, TaskAnnotation), TaskAnnotation) is not.

I believe this is a reasonable compromise as complex tasks are generally created by the optimization process and, for the moment, I think it's reasonable to only use a single task annotation for a complex task. The alternative is for the scheduler to unpack the entire complex task to search for annotations, but this likely has further performance implications. I view the optimization step as a good place for future handling the removal/merging of annotations.

Regardless, the PR in its current state should be sufficient for benchmarking/testing basic cases.

@sjperkins
Copy link
Member Author

sjperkins commented Nov 11, 2019

I wouldn't prioritize fixing tests and merging with master.

Thanks for letting me know. I felt motivated to relook at the PR because it's been a year since it was created and wanted to refresh the issue in my mind. Don't feel pressure from my side, although I am interested to see how useful others might find the concept.

@sjperkins sjperkins changed the title [WIP] Task Annotations Task Annotations Aug 1, 2020
@sjperkins sjperkins marked this pull request as draft August 1, 2020 17:57
Base automatically changed from master to main March 8, 2021 19:03
@jrbourbeau
Copy link
Member

Closing this as we ended up going a different route for task annotations. Thanks @sjperkins!

@jrbourbeau jrbourbeau closed this Jun 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants