Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cuDF-style operations & NVTX annotations for local CuPy benchmark #548

Merged
merged 4 commits into from
Mar 12, 2021

Conversation

charlesbluca
Copy link
Member

This PR adds the following to the local CuPy benchmark:

  • Binary column sum and gather operations to simulate cuDF workloads
  • NVTX annotations for the array creation + execution of operations
  • args.rmm_pool_size to the worker memory pool setup so user-defined pool sizes work

Some questions:

  • Any other binary operations that would be good to add here?
  • Is it useful to annotate the creation of the arrays or would we only want to focus on the execution?

cc @shwina

@charlesbluca charlesbluca added python python code needed 3 - Ready for Review Ready for review by team non-breaking Non-breaking change labels Mar 9, 2021
@charlesbluca charlesbluca requested a review from a team as a code owner March 9, 2021 19:41
@charlesbluca charlesbluca mentioned this pull request Mar 9, 2021
@charlesbluca charlesbluca added the improvement Improvement / enhancement to an existing function label Mar 9, 2021
@charlesbluca
Copy link
Member Author

charlesbluca commented Mar 10, 2021

I'm getting several compute failures from the workers when running col_gather on GPU:

distributed.worker - WARNING -  Compute Failed
Function:  subgraph_callable
args:      (array([11.08869371,  9.76416487,  9.31292166, ..., 10.41836123,
        9.80410377,  9.77784505]), array([2291, 5547, 8535, 1871, 2223, 4458, 6611, 7833, 8261, 7254, 6056,
       7562, 9498, 5647, 7596, 8377, 4880, 1084, 2684,  963, 5022, 9411,
       6386, 4742, 3783, 2484, 3130, 2164,  819, 3724, 8617, 8715, 1461,
         17, 5364, 7686, 1637, 6863, 6874, 3560, 4886, 4124, 2118, 6952,
       2808, 6576, 4916, 6581, 9481, 8495, 5226, 6423, 5373, 9632, 8108,
       3473, 6938, 7232, 1036, 2322, 8367, 2768, 4547, 4037, 7740, 4796,
       5866, 3676, 3729, 3463, 7181, 4787, 9370,  307, 1434, 4229,  679,
       2565, 9295, 7488, 5522,  178, 9472, 3680, 5059, 8768,  722, 8821,
        517, 5554, 4750, 6523, 3201, 3481, 8359, 3766, 4612, 2760, 7938,
       8729,  613, 6512, 4262, 4310, 7651, 3464, 3890, 8028, 1525, 7431,
       8875, 2472, 8936, 4221, 6307, 4995, 1810, 2369, 3319, 2230, 9494,
       6608, 8302, 8743, 9187, 2860, 1606, 5046, 4743, 7703, 4226, 9117,
       2999,  700, 1127,
kwargs:    {}
Exception: TypeError("Unsupported type <class 'numpy.ndarray'>")

From a cursory glance, it looks like something is happening in slice_with_int_dask_array that is returning a Dask array with NumPy chunks from our CuPy chunked input.

@pentschev
Copy link
Member

@charlesbluca dask/dask#7364 should fix the issue above.

@charlesbluca
Copy link
Member Author

Thanks @pentschev!

@jakirkham
Copy link
Member

rerun tests

@jakirkham
Copy link
Member

It looks like there is a style issue. Charles, could you please run black locally and commit the changes?

@charlesbluca
Copy link
Member Author

Done! Thanks for the catch @jakirkham 🙂

@jakirkham
Copy link
Member

@pentschev do you have any more thoughts here? 🙂


func_args = (x, idx)

func = lambda x, idx: x[idx]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using .copy() here like we do with the slicing operation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I remember when I wrote CuPy benchmarks, I had to add .copy to ensure it actually slice the array instead of returning a view. I'm not totally sure whether there's a case where Dask would return a view only, do you know @jakirkham ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That depends on what computational backend Dask is using. If we are using the threaded scheduler, it is probably a view. If we are using the Distributed Scheduler, it may be a view if the data was already on that worker. Otherwise it wouldn't be

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the safest here is to actually profile both cases. Ideally what we would see if both cases aren't returning a view is:

  1. With .copy(): some kernels with some copy (or copies) at the end;
  2. Without .copy(): same kernels as above but not copies at the end.

@charlesbluca
Copy link
Member Author

It looks like the compute failures are still happening even with @pentschev's fix - I'll try to dig more into this.

@jakirkham
Copy link
Member

Where are you seeing failures? Is this locally? AFAICT CI passed (though haven't dug into the logs)

@pentschev
Copy link
Member

It looks like the compute failures are still happening even with @pentschev's fix - I'll try to dig more into this.

Do you have more details?

This is the simplified version of your code I used as sample to test for that Dask PR:

import numpy as np, cupy as cp, dask.array as da

rs = da.random.RandomState(RandomState=cp.random.RandomState)

x = rs.normal(10, 1, (1000,))
idx = rs.randint(0, len(x), (1000,))

print(x[idx].compute())
print(type(x[idx].compute()))

@charlesbluca
Copy link
Member Author

Where are you seeing failures? Is this locally? AFAICT CI passed (though haven't dug into the logs)

This is local, using the col_gather operation with default arguments (though the results are the same with UCX enabled).

Do you have more details?

It seems like this is an issue with persist(); the compute failures come up when trying to return a result there. Could this be a problem with the creation of the Dask graph there?

@jakirkham
Copy link
Member

Could you please reduced this to an MRE and file on the Distributed repo?

@charlesbluca
Copy link
Member Author

Sure! Would this be better suited for Distributed or Dask? It seems like this is a problem regardless of if a cluster is in use:

import cupy
import dask.array as da

rs = da.random.RandomState(RandomState=cupy.random.RandomState)

x = rs.normal(10, 1, (1000,))
idx = rs.randint(0, len(x), (1000,))

x[idx].persist()
Outputs:
Traceback (most recent call last):
  File "mre.py", line 9, in <module>
    x[idx].persist()
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/base.py", line 256, in persist
    (result,) = persist(self, traverse=False, **kwargs)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/base.py", line 770, in persist
    results = schedule(dsk, keys, **kwargs)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/threaded.py", line 76, in get
    results = get_async(
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/local.py", line 487, in get_async
    raise_exception(exc, tb)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/local.py", line 317, in reraise
    raise exc
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 115, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 115, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/optimization.py", line 963, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 151, in get
    result = _execute_task(task, cache)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/utils.py", line 35, in apply
    return func(*args, **kwargs)
  File "/datasets/charlesb/miniconda3/envs/ptds-bench/lib/python3.8/site-packages/dask/array/chunk.py", line 317, in slice_with_int_dask_array
    idx = idx - offset
  File "cupy/core/core.pyx", line 1079, in cupy.core.core.ndarray.__sub__
  File "cupy/core/core.pyx", line 1466, in cupy.core.core.ndarray.__array_ufunc__
  File "cupy/core/_kernel.pyx", line 1060, in cupy.core._kernel.ufunc.__call__
  File "cupy/core/_kernel.pyx", line 109, in cupy.core._kernel._preprocess_args
TypeError: Unsupported type <class 'numpy.ndarray'>

@pentschev
Copy link
Member

Can you please double-check you indeed have dask/dask#7364 in your install? The example you posted works for me after that PR.

@charlesbluca
Copy link
Member Author

Just checked and I do have those commits in my install. I also see now that your local tests are failing too, so this is probably something with my local env.

What version of CuPy are you using? I realized I had been using a version with cupy#4322 but I'm still getting the same errors with 8.5.0.

@jakirkham
Copy link
Member

Would make sure that any existing dask install has been removed before installing dask in development mode

@charlesbluca
Copy link
Member Author

Made sure dask was entirely uninstalled + removed from the env before doing dev install, still getting the same error - I think this might be an issue with my version/installation of CuPy.

@pentschev
Copy link
Member

You also need NumPy>=1.20, can you confirm you have that too?

@charlesbluca
Copy link
Member Author

Yup, that was it - thanks for the help!

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, and since the issues @charlesbluca was having were resolved, I'm gonna go ahead and merge this. Thanks for working on this!

Comment on lines +32 to +35
rng = start_range(message="make array(s)", color="green")
x = rs.random((args.size, args.size), chunks=args.chunk_size).persist()
await wait(x)
end_range(rng)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it's only two lines, I'm not sure it's worth the trouble, but feels like the start_range/end_range would be a perfect candidate for a decorate function. Just saying for future consideration, no action required. 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or using contextmanager?

cc @shwina (in case you have thoughts on how to do this 🙂)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was considering the same - I'll look into that if we end up extending the benchmark again 😁

@pentschev
Copy link
Member

@gpucibot merge

@rapids-bot rapids-bot bot merged commit 09196cb into rapidsai:branch-0.19 Mar 12, 2021
@jakirkham
Copy link
Member

Thanks for the PR Charles and Peter for the review! 😄

@charlesbluca
Copy link
Member Author

Thanks for the reviews and environment help!

@jakirkham
Copy link
Member

Just realized we missed the column masking case, submitted PR ( #553 ) to include that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team improvement Improvement / enhancement to an existing function non-breaking Non-breaking change python python code needed
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants