Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fold creates identity for each sublist #1124

Open
nazar-pc opened this issue Feb 3, 2024 · 14 comments
Open

fold creates identity for each sublist #1124

nazar-pc opened this issue Feb 3, 2024 · 14 comments

Comments

@nazar-pc
Copy link

nazar-pc commented Feb 3, 2024

I just noticed that fold calls identity many more times than number of threads in thread pool. In case identity is a non-trivial operation, it might be quite expensive to call it unnecessarily.

Would be great if its returned value was repurposed for folding of multiple sublists.

This discovery resulted from rewriting https://github.com/supranational/blst/blob/0d46eefa45fc1e57aceb42bba0e84eab3a7a9725/bindings/rust/src/lib.rs#L1110-L1147 with rayon (code there essentially does work stealing manually) and seeing significantly slower benchmark results.

Here is what I tried to do: https://github.com/nazar-pc/blst/blob/7d1074e7df3f31dc8b8acfde3260ac9ceac8f0d9/bindings/rust/src/lib.rs#L1092-L1128

The reason why I believe it is slower is that more identity elements are created and due to cryptography involved reducing more elements takes more time.

Probably duplicate of #840 that original author decided to close, but I believe there is a value in ability to do something like this in rayon (or maybe there is and I just don't see it).

@cuviper
Copy link
Member

cuviper commented Feb 3, 2024

There are two main factors that I see here:

  1. Rayon uses "adaptive" splitting, not perfect per-thread splitting, in case the work is imbalanced or other threads are also busy with other work.
  2. Rayon's fold assumes associativity but not commutativity, so we can't just use the same accumulator when a thread steals a different section of the workload. (Even assuming we figured out how to share that across work-stealing, which is non-trivial.)
    • In Limit number of sublists created with fold #840, I'm guessing that the manual sharing they implemented would basically assume commutativity -- which may be perfectly fine for their particular operation.
    • Another way around this is to feed your input as a serial iterator into par_bridge(), as long as you have enough real computation that you don't get bottle-necked on its Mutex. This should give you a pretty even per-thread fold, but with non-deterministic order of the items.

@nazar-pc
Copy link
Author

nazar-pc commented Feb 9, 2024

Rayon uses "adaptive" splitting, not perfect per-thread splitting, in case the work is imbalanced or other threads are also busy with other work.

That is exactly what I want here. In the library I linked above this is done with a simple atomic counter that worker threads are using to pull the next piece of work to process. In that case inputs are read-only, so no mutexes or other heavier infrastructure is necessary and work is essentially split in sublists of size 1, such that threads are kept occupied in the most efficient way. I will check with #857 to see how it performs.

With indexed iterators like your .par_iter().zip(), you can add .with_min_len(N) before your fold to set a lower bound on how far it will split.

Was not aware of it, will give it a try as well, thanks!

Rayon's fold assumes associativity but not commutativity

Hm, is that documented somewhere? To me the fact that things can be processed in form of sublists of undefined size means that both should be the case and user can't really have any guarantees about order of things in which input is processed. Especially if better performance can be unlocked.

@nazar-pc
Copy link
Author

nazar-pc commented Feb 9, 2024

Interesting, just did testing of the above example code I linked a.par_iter().zip(b)... ended up being slower than a.iter().zip(b).par_bridge(), which seems counter-intuitive, I always assumed that parallel indexed iterators have the most knowledge about what is being processed and should be the most efficient as the result.

.with_min_len(1) doesn't really help because it doesn't fix the root problem in this case. On my 32-thread CPU (13900K) for 128 elements of input rayon creates 128 sublists by default, which is excessive. If I change minimum then it does create fewer sublists, but also loses granularity of balancing between threads in the process.

I'd imagine a simple atomic counter like blst does for efficient split of work with single item granularity, while also creating at as many results to be folded as there is threads in thread pool.

#857 is based on very old version of rayon, can you rebase it maybe (it is a very small change, so should be fine to do so).

@wagnerf42
Copy link
Contributor

so, i did a log of the algorithm (took the code from the benches) and there is virtually no overhead i can see for splitting and merging. however i then ran the code on a larger machine with more cores. still no visible overheads for computations but there are some overheads inside rayon. my guess is that due to the very low number of tasks available (compared to threads) the atomics which are used for the sleep algorithm get under pressure.

@nazar-pc
Copy link
Author

nazar-pc commented Feb 9, 2024

I rewrote code with rayon::scope(|_scope| scope.spawn_broadcast()) in a way similar to original code in blst and was able to beat it in terms of performance. But code is far less idiomatic. Interestingly, rayon::broadcast() wasn't anywhere as fast for some reason.

I don't think atomics or mutexes make any difference here, the bottleneck in my case is math that is slow in case more elements need to be aggregated unnecessarily.

@wagnerf42
Copy link
Contributor

i'm still not too sure about that. here are the logs.
each file is for the corresponding number of threads. the area of each box corresponds to the time it takes.
overheads inside rayon are the boxes at the bottom left.
as you can see:

  • splitting and reducing is free of charge
  • there might be some overhead for the creation of the first element of each fold but it's not that much
  • at 32 threads there are 128 folds taking place, each around 1.18ms
  • at 16 threads we still get 128 folds taking place but this time at 616ms
    -> for me, the logical conclusion for that is that the memory bus is under pressure which perf stat confirms on my machine with a higher number of instructions per cycles

@nazar-pc
Copy link
Author

nazar-pc commented Feb 9, 2024

Visualization looks cool, but I'm not sure how to read it to be completely honest.

My point was that doing 128 folds is unnecessary. On my machine with logical CPU cores there is no point in doing 128 folds if 32 is sufficient. Whether it is slower due to memory pressure or something else is kind of irrelevant if it is possible to make it not do 128 folds in the first place. And it seems to me that if order doesn't matter at all (which is the case very often) the behavior (and performance) should be similar to rayon::broadcast() with an atomic counter.

@cuviper
Copy link
Member

cuviper commented Feb 9, 2024

Rayon's fold assumes associativity but not commutativity

Hm, is that documented somewhere? To me the fact that things can be processed in form of sublists of undefined size means that both should be the case and user can't really have any guarantees about order of things in which input is processed. Especially if better performance can be unlocked.

There are a few places in the docs that talk about associativity and parallel non-determinism, but we do generally keep the relative order of items. e.g. a list of 4 items could be fold/reduced like any of these ordered partitions:

  • (((a, b), c), d)
  • ((a, b), (c, d))
  • ((a, (b, c)), d)
  • (a, ((b, c), d))
  • (a, (b, (c, d)))

We don't promise anything about the order in which any inner part is called on their own -- e.g. the individual items could be produced in any order, and the reduction (a, b) could happen before or after (c, d) in the second one -- but we do keep them in order on the way out.

Interesting, just did testing of the above example code I linked a.par_iter().zip(b)... ended up being slower than a.iter().zip(b).par_bridge(), which seems counter-intuitive, I always assumed that parallel indexed iterators have the most knowledge about what is being processed and should be the most efficient as the result.

The devil is in trying to preserve that semblance of order on the way out, which par_bridge() doesn't attempt at all. I'll have to think about how to make something like unordered_fold that could do a better job of reusing the accumulator, for when you really don't care about the relative order.

Even with ordered fold, in theory we should be able to tell when a join split didn't get its latter half stolen, so it can continue in the same accumulator. We do have some of that visibility in join_context, but right now that's too far abstracted from the parallel iterator implementation, and I think there would be borrowck challenges as well.

.with_min_len(1) doesn't really help because it doesn't fix the root problem in this case. On my 32-thread CPU (13900K) for 128 elements of input rayon creates 128 sublists by default, which is excessive. If I change minimum then it does create fewer sublists, but also loses granularity of balancing between threads in the process.

If you sent min 1, I don't expect any behavioral change at all! The idea with that is to set a level that amortizes the cost of creating your accumulator, but as you say that's a balancing act.

#857 is based on very old version of rayon, can you rebase it maybe (it is a very small change, so should be fine to do so).

Sure, I pushed a rebase.

But when we're only talking about 128 items for 32 threads, I fear it will still overestimate even the initial level splits, regardless of this change about re-splitting stolen work.

I rewrote code with rayon::scope(|_scope| scope.spawn_broadcast()) in a way similar to original code in blst and was able to beat it in terms of performance. But code is far less idiomatic. Interestingly, rayon::broadcast() wasn't anywhere as fast for some reason.

IMO, it's totally fine to reach for different primitives when the generic ("idiomatic") APIs don't meet your needs!

I'm not sure why broadcast would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.

And it seems to me that if order doesn't matter at all (which is the case very often) the behavior (and performance) should be similar to rayon::broadcast() with an atomic counter.

The generic API doesn't know whether the order of reduction matters in your code, but we could explore more explicit APIs for you to communicate that, like an unordered_fold I mentioned above.

@nazar-pc
Copy link
Author

nazar-pc commented Feb 9, 2024

If you sent min 1, I don't expect any behavioral change at all! The idea with that is to set a level that amortizes the cost of creating your accumulator, but as you say that's a balancing act.

I'm sorry, I did try various (higher) values, posted 1 by accident.

I'm not sure why broadcast would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.

I think there might be an optimization opportunity here due to usage of zero-size types. I am returning () from each thread, so essentially there is nothing to keep track of.

The generic API doesn't know whether the order of reduction matters in your code, but we could explore more explicit APIs for you to communicate that, like an unordered_fold I mentioned above.

The approach with unordered_fold makes sense to me, though I'm still wondering how many users rely on the order right now.

@cuviper
Copy link
Member

cuviper commented Feb 10, 2024

I'm not sure why broadcast would be much slower. It does have a little more synchronization to collect its return values, but that should be tiny compared to your real work.

I think there might be an optimization opportunity here due to usage of zero-size types. I am returning () from each thread, so essentially there is nothing to keep track of.

It propagates panics through the same mechanism as the return value, enum JobResult. We also do this for scope.spawn_broadcast, but in a different way.

@Yikai-Liao
Copy link

Yikai-Liao commented Sep 17, 2024

Hi, I'm recently working on improving the BPE trainer in tokenizers. And I find that it would be great if we could get the unordered_fold.

The concrete usage scenario is to perform multi-threaded word frequency statistics for the results after pre-tokenization. If use into_par_iter().fold direcetly, merging all the hashmap would be extremely slow, especially for languages like Chinese, in which we can't use space for pre-tokenization, since we'll get a large number of unique entries in the hashmap.

Creating all the hashmaps according to the number of threads and access them is a kind of solution, but the code is not elegent. And it's not possible to further optimize if we do so. That is, we can't just start the reduce step while waiting for the last thread for counting to finish executing.

So I think it's important to provide the unordered_fold api for this type of task.

@cuviper
Copy link
Member

cuviper commented Sep 17, 2024

Here's a quick attempt: main...cuviper:rayon:fold_unordered

I flipped the name to fold_unordered, if only so it will group with other folds in the docs.

If you try out that branch, please let me know how it goes!

That is, we can't just start the reduce step while waiting for the last thread for counting to finish executing.

My implementation also does not start any reduction until all the folds are done. I think it would require significantly more internal changes to facilitate that, but I don't really have an idea of what that would look like.

@Yikai-Liao
Copy link

Yikai-Liao commented Sep 18, 2024

Wow, thanks for your prompt reply, I'll try that later.

I think it would require significantly more internal changes to facilitate that, but I don't really have an idea of what that would look like.

Well, I haven't dive into rayon's implementation, but according to my understanding of high-level api, in order to achieve this kind of functionality, the result should be pressed into the iterator immediately when a worker has no new work. So this means that, although some workers might no finish, we could still consume part of the result iterator.

For example, the second job could be extremely slower than other jobs, so three results in shoud be available in the iterator while waiting for the slow one. When the slowest job is finished, we could do the last reduce directly.

This improves the overall utilization of the cpu instead of letting the cpu sit idle waiting for some tasks.

let ans = vec![1,10000,1, 1, 1, 1].into_par_iter(|| 0, |count, num| {
    for i in 0..num {
        count += 1;
    }   count
}).reduce( || 0, |a, b| a + b );

Update:

I have tried your implementation, and it works correctly.

@cuviper
Copy link
Member

cuviper commented Sep 18, 2024

The concept of that request is clear, but the actual implementation is harder, especially in such a generic and composable framework as rayon.

I have tried your implementation, and it works correctly.

Did it also help performance in any noticeable way?

I added some brief usage in the rayon-demo benchmarks for map-collect strategies. It worked well in one case, but was rather poor in another. So I don't think we should jump to start using this in any existing generic code, but having it available for particular situations may be worthwhile.

test map_collect::i_mod_10_to_i::with_fold                              ... bench:     599,495.59 ns/iter (+/- 162,917.00)
test map_collect::i_mod_10_to_i::with_fold_unordered                    ... bench:     424,433.58 ns/iter (+/- 115,927.55)
test map_collect::i_to_i::with_fold                                     ... bench:  18,600,808.80 ns/iter (+/- 3,209,686.58)
test map_collect::i_to_i::with_fold_unordered                           ... bench:  28,964,659.40 ns/iter (+/- 20,052,510.20)

(Plus, collecting a map does have some semantic ordering effects if there are any duplicate keys.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants