-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NumPy's kron for Dask Arrays #3657
Comments
NumPy implements |
Is this doable with atop or slicing syntax do you think? Marking this as good first issue, but it may not be. I think that this requires someone to think about it a bit, and then look at the |
Avoiding reshaping is probably better. It may be possible with |
Hi, is this issue solved? I would like to work on it |
It is still open. Please feel free to work on it if it is of interest. Would be happy to review a PR. 😄 |
@mathdugre Please do let me know if you are working on this, I'd like to give it a try |
@Madhu94 I didn't get the time to work on it, feel free to do so :) |
thanks! Starting on it. |
@Madhu94 any progress on it. I was also looking for the feature |
@gitkarma No I could not find the time. Still interested in exploring this, but if anyone else wants to push this through in the interests of time, please take it up |
Is this rechunking into 1x1 chunks for one of the arguments likely to be as bad or worse than reshaping? I can't figure out how to do it otherwise with x = da.arange(36).reshape(9, 4).rechunk((1, 1))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
np.multiply, 'ij', x, 'ij', y, 'xy', concatenate=True, dtype='f8',
adjust_chunks={'i': y.shape[0], 'j': y.shape[1]}
)
z.shape # -> (81, 16)
# Compute on kron inputs to silence "Not implemented by Dask ..." warning
assert np.all(np.kron(x.compute(), y.compute()) == z.compute())
z.compute()
array([
[ 0., 0., 0., ..., 3., 3., 3.],
[ 0., 0., 0., ..., 3., 3., 3.],
[ 0., 0., 0., ..., 3., 3., 3.],
...,
[32., 32., 32., ..., 35., 35., 35.],
[32., 32., 32., ..., 35., 35., 35.],
[32., 32., 32., ..., 35., 35., 35.]
]) Separately, is this a bug or expected behavior? x = da.arange(36).reshape(9, 4).rechunk((1, 1))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
np.multiply, 'ij', x, 'ij', y, 'xy', concatenate=True, dtype='f8',
# Leave this out to show that inferred shape is wrong
# adjust_chunks={'i': x.shape[0], 'j': y.shape[1]}
)
print(z.shape) # --> (9, 4)
print(z.compute().shape) # --> (81, 16)
# And this still works:
assert np.all(np.kron(x.compute(), y.compute()) == z.compute()) |
Thanks for sharing Eric! 😄 Yeah that's a good question. Rechunking is generally expensive. However reshaping will result in some smearing and remixing of chunks, which comes with its own cost. Probably requires some benchmarking to know for sure. That said, there might be ways to tweak what you came up with to improve things further before measuring timings. 🙂 |
This might be an improvement: x = da.arange(36).reshape(9, 4).rechunk((3, 2))
y = da.ones(36).reshape(9, 4).rechunk((3, 2))
z = da.blockwise(
np.kron, 'ab', x, 'ij', y, 'ij', concatenate=True, dtype='f8',
new_axes={'a': x.shape[0] * y.shape[0], 'b': x.shape[1] * y.shape[1]}
)
assert np.all(np.kron(x, y) == z.compute())
assert z.shape == z.compute().shape
z.compute()
array([[ 0., 0., 0., ..., 3., 3., 3.],
[ 0., 0., 0., ..., 3., 3., 3.],
[ 0., 0., 0., ..., 3., 3., 3.],
...,
[32., 32., 32., ..., 35., 35., 35.],
[32., 32., 32., ..., 35., 35., 35.],
[32., 32., 32., ..., 35., 35., 35.]]) I don't think it's what I want though since passing |
Thanks for the continued exploration here Eric! 😄 Agree this is looking better. 🙂 Yeah |
Thanks @jakirkham! With some guidance from @mrocklin I tried building a custom graph for this and benchmarked it a bit, in this gist, against the first x = da.arange(36).reshape(9, 4).rechunk((3, 2))
y = da.ones(36, dtype=x.dtype).reshape(9, 4).rechunk((3, 2))
z = da.block([
[x[i, j] * y for j in range(x.shape[1])]
for i in range(x.shape[0])
]) I found that the original |
Thanks for the update Eric! This more recent implementation reminds me of issue ( #2946 ) with |
@eric-czech in your case I recommend trying the method you have above with |
That ended up being about 2x slower than the Dask-only version (updated gist). |
Here is my run through of your code: In [1]: import dask.array as da, numpy as np
In [2]: def kron_v2(x, y):^M
...: return da.block([^M
...: [x[i, j] * y for j in range(x.shape[1])]^M
...: for i in range(x.shape[0])^M
...: ])
...:
In [3]: xl = da.arange(50*20).reshape(50, 20).rechunk((25, 4)).persist()
In [4]: yl = da.ones(xl.size, dtype=xl.dtype).reshape(xl.shape).rechunk(xl.chunksize).persist()
In [5]: %time zl = kron_v2(xl, yl)
Wall time: 616 ms
In [6]: %time _ = zl.compute()
Wall time: 1.79 s
In [7]: xl = xl.compute()
In [8]: %time zl = kron_v2(xl, yl)
Wall time: 419 ms
In [9]: %time _ = zl.compute()
Wall time: 1.66 s
In [10]: len(zl.__dask_graph__())
Out[10]: 30010
In [11]: from dask.utils import format_time
In [12]: format_time(1.66 / len(zl.__dask_graph__()))
Out[12]: '55.31 us' Copy-pasteable version hereimport dask.array as da, numpy as np
def kron_v2(x, y):
return da.block([
[x[i, j] * y for j in range(x.shape[1])]
for i in range(x.shape[0])
])
xl = da.arange(50*20).reshape(50, 20).rechunk((25, 4)).persist()
yl = da.ones(xl.size, dtype=xl.dtype).reshape(xl.shape).rechunk(xl.chunksize).persist()
%time zl = kron_v2(xl, yl)
%time _ = zl.compute()
xl = xl.compute()
%time zl = kron_v2(xl, yl)
%time _ = zl.compute()
len(zl.__dask_graph__())
from dask.utils import format_time
format_time(1.66 / len(zl.__dask_graph__())) My guess is that we're bound based on the task throughput of the scheduler. 50us/task is pretty good for us. If we wanted to go faster than this then we would probably want to start bundling a few chunks together at once, perhaps by calling |
Or maybe the answer is just to rechunk |
Hey @mrocklin, thanks for taking a deeper look! Knowing 50us/task is helpful as is that rechunking is an approach you would take to reduce the number of tasks. I was able to get comparable times to what you had if I switch to the single-threaded scheduler, but could I get some intuition on why the distributed scheduler even with 1 worker and 1 thread per worker is >10x slower? I was thinking that it makes more sense to optimize the process for that scheduler -- would you agree? Or is it likely that optimizing any process for a local scheduler will translate well (i.e. we still just want to avoid small chunks -- nothing more complicated)? Thanks again for taking a look. |
The distributed scheduler has higher per-task overheads than the single-node scheduler, often in the few-hundred microseconds per task. Because of this, we want to keep task compute times well above the millisecond range (possibly much higher, depending on the scale that you're looking for) This is made easier if we keep chunk sizes large-ish, and also try hard to fuse operations (another goal of yours, I believe). This example has chunksizes of |
So for this particular example we can significantly reduce compute time by reducing the chunk count of In [5]: yl = yl.rechunk("auto").persist()
In [6]: %time zl = kron_v2(xl, yl)
...: %time _ = zl.compute()
CPU times: user 642 ms, sys: 29.5 ms, total: 671 ms
Wall time: 627 ms
CPU times: user 305 ms, sys: 119 µs, total: 305 ms
Wall time: 291 ms This is still bound by the size of Here is another, perhaps more extreme, example that shows the effect where In [14]: xl = np.array([[1, 2], [3, 4]])
In [15]: yl = da.ones((100, 100)).persist()
In [16]: %time zl = kron_v2(xl, yl)
CPU times: user 12.2 ms, sys: 0 ns, total: 12.2 ms
Wall time: 11.7 ms
In [17]: %time zl = kron_v2(xl, zl) # reapply kron onto zl repeatedly
CPU times: user 4.11 ms, sys: 1 µs, total: 4.11 ms
Wall time: 3.84 ms
In [18]: %time zl = kron_v2(xl, zl)
CPU times: user 11.1 ms, sys: 0 ns, total: 11.1 ms
Wall time: 10.6 ms
In [19]: %time zl = kron_v2(xl, zl)
CPU times: user 12.6 ms, sys: 0 ns, total: 12.6 ms
Wall time: 12.1 ms
In [20]: zl
Out[20]: dask.array<concatenate, shape=(1600, 1600), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>
In [21]: %time zl = kron_v2(xl, zl) # with larger graphs on the right, graph construction times start to increase
CPU times: user 18.1 ms, sys: 0 ns, total: 18.1 ms
Wall time: 17.6 ms
In [22]: %time zl = kron_v2(xl, zl)
CPU times: user 43.1 ms, sys: 92 µs, total: 43.2 ms
Wall time: 42.3 ms
In [23]: %time zl = kron_v2(xl, zl)
CPU times: user 111 ms, sys: 4.04 ms, total: 115 ms
Wall time: 114 ms
In [24]: zl
Out[24]: dask.array<concatenate, shape=(12800, 12800), dtype=float64, chunksize=(100, 100), chunktype=numpy.ndarray>
In [25]: zl = zl.rechunk("auto") # but fortunately 100x100 is still quite small, so let's rechunk
In [26]: %time zl = kron_v2(xl, zl) # and it's decently fast again
CPU times: user 12.6 ms, sys: 14 µs, total: 12.6 ms
Wall time: 11.7 ms
In [27]: %time zl = kron_v2(xl, zl)
CPU times: user 13.9 ms, sys: 0 ns, total: 13.9 ms
Wall time: 13 ms
In [28]: zl
Out[28]: dask.array<concatenate, shape=(51200, 51200), dtype=float64, chunksize=(4000, 4000), chunktype=numpy.ndarray>
In [29]: %time zl.sum().compute()
CPU times: user 42.5 s, sys: 21.2 s, total: 1min 3s
Wall time: 12.1 s
Out[29]: 10000000000000.0 |
Hey @mrocklin sorry for the delay. That much makes sense, and I have one last performance question then. A more representative version of the use case I have would look like this: import dask.array as da, numpy as np
import dask
dask.config.set(scheduler='single-threaded')
def kron_v2(x, y):
return da.block([
[x[i, j] * y for j in range(x.shape[1])]
for i in range(x.shape[0])
])
# Multiply tiny 2x2 by giant tall and skinny matrix
x = np.ones((2, 2))
y = da.ones((1000000, 1000)).rechunk('auto').persist() # 1M x 1k
y.chunksize, y.numblocks # 125MB chunks
# > ((15625, 1000), (64, 1))
z = kron_v2(x, y)
%time _ = z.compute()
# CPU times: user 4.52 s, sys: 3.91 s, total: 8.43 s
# Wall time: 8.43 s
len(z.__dask_graph__())
# 576 And vs the distributed scheduler: from dask.distributed import Client
Client(n_workers=1, threads_per_worker=1, processes=True)
%time _ = z.compute()
# CPU times: user 32.5 s, sys: 48.5 s, total: 1min 20s
# Wall time: 1min 22s In short, I see that it takes 1min 22s vs 8s using the distributed and single node scheduler, respectively, when operating on a small number (64) of large chunks. If I'm understanding you correctly, then the overhead from using the distributed scheduler for this should be something like 576 times a few hundred microseconds right? Even if the overhead was a millisecond per-task that would still only explain a minute portion of the difference. Could there be something else going on here? |
I was just responding to this :) it's fun seeing github change live so many times :) Yeah, the overhead you're running into here is likely moving the data from the remote worker into your client process. Recall that Your output array is 32GB, which for 83s comes out to a bandwidth of about 385 MB/s, which isn't terrible for commodity network hardware and TCP. The right way to test this is probably %%time
z = z.persist() # watch out, this is asynchronous
from dask.distributed import wait
wait(z) |
Ah interesting, so even on a single host (this one isn't actually remote), the communication is still going through the network interface. That makes sense, thanks! |
Why? Can’t you just pass a file handle to the other local process? |
Yeah, you have some options here. If you use If you wanted to get fancy and you're on Linux then you could try out In practice though you probably won't want to bring back giant results to your local session. So this comes up more often in benchmarking than in anything else. |
You definitely could. You could save your data to a file (HDF5, Zarr, whatever) and then pass that file handle back. You would have to make that choice explicitly though. Dask isn't going to do a high level analysis of your code and say "Hrm, they asked me to send back this numpy array, but probably they should have stored it to disk. Let me do that instead". Typically today we leave high level decisions like that to the user, or to systems built on top of Dask. |
Yeah we continue to improve UCX. I think today most of the issues are particularly combinations of features UCX supports as opposed to run-of-the-mill things. This could be a handy thing to try as it supports things like shared memory, which should level the playing field when using interprocess communication on the same machine/node. If you do try this, I'd be very interested to hear about your experience 😄 If you look into intermediate storage, you might find this doc helpful. |
Sorry I am not being clear and I am not close to the code. But when I hear we are making a network call to a local process I think of HDFS short-circuit reads and their use of a UNIX domain socket to make this sort of call very fast. Is that not applicable in this situation? |
The data is in memory on one process, another process is asking for that data. Dask uses point-to-point transfers for communication rather than using a global file system. So something like short-circuit reads don't make sense. Now, ideally for transfers on a shared machine we would use something like posix shared memory (maybe this is your point for UNIX domain sockets). This is what the UCX conversation above with @jakirkham refers to, and is a major focus of the NVIDIA Dask team (although mostly for other network transfers rather than shared memory). Pragmatically though, this is rarely an issue, and it isn't something that I encourage you to focus on. Typically for array based workloads we just have a single process per node and use a thread pool for on-node parallelism. The cost to transfer data within a single node is typically zero for numpy-style workloads. In @eric-czech 's example above he can engage this on the single-node cluster case by setting |
I don't think I've explained clearly the motivation for short-circuit local reads. The JIRA for this feature has more details: HDFS-347. There is no global file system involved; this feature is invoked when an HDFS client process happens to be on the same node as the DataNode process which has the data the client wants to read. In this setting, there are two processes on the same server which are going to exchange a large volume of data by inter-process communication over a socket. UNIX provides a standardized way to bypass the network stack in this setting called a UNIX domain socket. While it's not trivial to make use of this optimization while also respecting access control policies, it is generally far less invasive to a codebase that's already using sockets for IPC to use UNIX domain sockets than to try to implement a shared memory approach. |
Ah, my apologies. When people talk about short-circuit reads with HDFS I mostly think of reading local files rather than peer-to-peer communication of in-memory data. My understanding of the HDFS internals is not super deep. Googling about it seems like it's not terribly hard to convince Tornado (our standard networking stack) to use UNIX domain sockets. It would be a fun experiment.
The fun thing about UCX is that it's motivated by NVLink and Infiniband work (which NVIDIA needs desparately), but shared memory comes along for free (hypothetically). But yeah UNIX domain sockets should be easy enough to play with. I've raised a separate issue here: dask/distributed#3630 I still think though that this isn't likely to be your main bottleneck for these workloads. It would be great to improve (and presumably fairly doable), but it isn't where I would guess profiles would point first. |
@mrocklin Can kron_v2 implementation used on single dimension array ? |
Thanks for the comment @gitkarma I think that probably you wanted to direct that question to @eric-czech ? He is the author of the kron implementations here. I don't know much about them. |
Would be nice to have a Dask Array implementation of NumPy's
kron
. A generalization of outer products for two tensors.The text was updated successfully, but these errors were encountered: