Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rayon support #399

Closed
wants to merge 1 commit into from
Closed

Conversation

pitdicker
Copy link
Contributor

@pitdicker pitdicker commented Apr 15, 2018

I opened this PR for discussion, but it is not ready for merging. Above all it needs better documentation.

The idea is to create a parallel iterator that works with Rayon.

The difficult part is where to store/get state of the RNG. Using thread_rng() with every sample is one possible solution. Then one RNG is initialized for every thread Rayon uses.

This PR instead creates one new RNG at every point Rayon splits the job in two. And that can be quite a few times. How should this new RNG be created?

  • We could use FromEntropy, but that would be too expensive for every split.
  • We could seed from thread_rng, but that would require the initialization of one ThreadRng for every thread Rayon uses, which could also be expensive.
  • We could seed the RNG with itself. I expect this to work well for most 'modern' PRNGs, that don't output their internal state directly but permute their output first.
  • We could use streams for RNGs that support that, like PCG. Every split in Rayon would add 1 extra bit to something like a stream_id.
  • Jumping an RNG (when supported) would not fit well with Rayon. It requires a master RNG where another one gets split off, then the master RNG is jumped.

I think option 3 is the fastest, and a good solution for the scenarios where you also want to use rayon. But if we can somehow be flexible, that would be nicer.

@pitdicker
Copy link
Contributor Author

  • Jumping an RNG (when supported) would not fit well with Rayon. It requires a master RNG where another one gets split off, then the master RNG is jumped.

That is, assuming we only have one, fixed-size jump available. With a different scheme is could be possible.

On the first split, we would jump the clone half a period. On the two second-level splits we jump the clones 1/4th of the period. On the four third-level splits the clones jump 1/8th of the period, etc.

Still there is the disadvantage jumping has: for many LFSR RNGs it needs as many rounds of the RNG to do the jump, as it has bits of state. Xorshift with 128 bits of state would need 128 rounds calculate the jump.


We had some discussion around streams and jumping/seeking in the RNG core RFC thread. Back then the conclusion was: probably not worth exposing, mostly useful to decrease the birthday problem when seeding RNGs.

Now I think we are starting to see how they may be practical. We could add something like a split method to SeedableRng, with a default implementation that just seeds the RNG with itself:

pub trait SeedableRng: Sized {
    /* ... */

    fn split(&mut self, _stream_id: u64) -> Self where Self: RngCore {
        // Alternative for custom implementations: use streams or jumping
        Self::from_rng(self).unwrap()
    }
}

Now PRNGs can choose how they want to be split: by seeding from itself, selecting another stream, or jumping. And it is possible with wrapper types to override, by forwarding all methods except split. Those wrappers can implement seeding from thread_rng(), or with FromEntropy.

@dhardy
Copy link
Member

dhardy commented Apr 16, 2018

As I commented in your branch, we don't necessarily want the same PRNG algorithm for the sub-generators as for the source, because there is an extra requirement: that the sub-generators can be initialised fast (and probably also that they have low memory usage).

Copy link
Member

@dhardy dhardy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite a cool idea, but also quite heavy (in terms of extra code, and all the extra PRNG initialisation) which makes me wonder how often this is useful over simply using thread_rng as suggested in #398 (obviously this is useless for reproducibility thus the only possible advantage over thread_rng is performance).

///
/// [`Distribution`]: trait.Distribution.html
/// [`sample_iter`]: trait.Distribution.html#method.sample_iter
#[cfg(feature = "rayon")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary cfg since whole module is behind feature flag

fn split_at(mut self, index: usize) -> (Self, Self) {
assert!(index <= self.amount);
// Create a new PRNG of the same type, by seeding it with this PRNG.
// `from_rng` should never fail.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than say this in a comment just do something like unwrap_or_else(|e| panic!("parallel distribution iterator failed to create new RNG: {}", e)).

There's no contract saying from_rng can't panic; it's just unlikely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should document that ideally from_rng should only forward errors from the RNG used for seeding, and is not supposed to produce errors itself. But that is another issue...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was the original idea, but we can't enforce a contract like that and I don't see any real point trying. Then again, there appear to be no notes on error handling on that function at all so far.

/// for simulations etc. While it will also work with cryptographic RNGs, that
/// is not optimal.
///
/// Every time `rayon` splits the work in two to create parallel tasks, one new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like these warnings should be on sample_par_iter too/instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely. Actually this PR was not ready for review, just to have some code to look at to help exploring the issue 😄.


fn min_len(&self) -> usize {
100
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can make this configurable with a default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rayon already makes it configurable, but this sets a lower bound. Not sure what would be a good value yet, if we should set one at all. I don't know the internals of Rayon well enough. I'd like to ask for review from the contributors there when this is more ready.

}
}

/// FIXME
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix what?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not compile without doc comments. Maybe a nice place to explain why we sample an amount. I believe we can't just keep generating values until the user decides it has enough; Rayon seems to (reasonably) not support much functionality when the number of values is unknown.

@dhardy
Copy link
Member

dhardy commented Apr 16, 2018

Another thing we could try is adding an equivalent of thread_rng backed instead by SmallRng for faster init; I think it would still have to seed from EntropyRng.

@dhardy dhardy added F-new-int Functionality: new, within Rand T-distributions P-low Priority: Low X-experimental Type: experimental change labels Apr 16, 2018
@pitdicker
Copy link
Contributor Author

As I commented in your branch, we don't necessarily want the same PRNG algorithm for the sub-generators as for the source, because there is an extra requirement: that the sub-generators can be initialised fast (and probably also that they have low memory usage).

This sounds right in theory, but I don't see a way to pass around two RNGs without increasing the problems.

Quite a cool idea, but also quite heavy (in terms of extra code, and all the extra PRNG initialisation) which makes me wonder how often this is useful over simply using thread_rng as suggested in #398 (obviously this is useless for reproducibility thus the only possible advantage over thread_rng is performance).

Yes, performance is the motivation here, and having the choice to easily use different algorithms. That results are not reproducible is an important issue to mention in the documentation.

@pitdicker
Copy link
Contributor Author

Another thing we could try is adding an equivalent of thread_rng backed instead by SmallRng for faster init;

Yes, certainly. It does not give me some kind of intellectual satisfaction 😄. But if it is practical and faster, ...

We can also offer both methods. Using thread_rng is something that works today. Making something similar for SmallRng would only make it work with one other kind of RNG, so that seems like a small disadvantage. But splitting the RNG at every split of the job could be expensive.

It seems nice to me that this would finally be a use for all the theory behind RNGs with streams or jumping. I wonder if Rayon is designed do deal with situations like ours, where every split of the job is not a basically free operation.

@dhardy
Copy link
Member

dhardy commented Apr 16, 2018

This sounds right in theory, but I don't see a way to pass around two RNGs without increasing the problems.

Sorry, I didn't understand how Rayon works. If we're dividing a master runner as needed this idea won't work.

It seems nice to me that this would finally be a use for all the theory behind RNGs with streams or jumping.

Only thing is can you somehow ensure that each thread gets a unique stream/position?

@pitdicker
Copy link
Contributor Author

Only thing is can you somehow ensure that each thread gets a unique stream/position?

On initialization we set stream_id to 1. At every split stream_id <<= 1 for both sub-tasks. And for every clone OR the final bit to 1 stream_id |= 1. I think this scheme would provide a unique stream value for evey split off job.

@cuviper
Copy link
Contributor

cuviper commented Apr 16, 2018

I still need to dig through the questions in rayon-rs/rayon#566, but a quick note of caution here first: rayon-core depends on rand 0.4. If rand 0.5 depends on rayon, then I think we won't be able to ever update rayon to that because of a cyclic dependency! (this pr: rand-0.5 -> rayon -> rayon-core -> rand-0.4)

@pitdicker
Copy link
Contributor Author

If rand 0.5 depends on rayon, then I think we won't be able to ever update rayon to that because of a cyclic dependency!

Good point, definitely don't want that! I did not expect rayon to need some randomness somewhere 😄.

Would it be okay for rayon_core to depend on rand_core and a crate that provides PRNGs like XorShiftRng (something we plan to separate out of rand? Provided this PR is something we decide to find useful of course.

If I see it right, you only really need rand in src/registry.rs, in the job stealing function. And there are two test files, src/join/test.rs and src/scope/test.rs, that also don't need more than this.

But rayon itself needs rand in a five more tests, that would be a bit more work.

@dhardy
Copy link
Member

dhardy commented Apr 17, 2018

It could instead make sense for Rayon to provide support for Rand's distributions, I don't know. There's also #290 — we could move all distribution stuff outside Rand itself, though I don't think this is so likely.

@pitdicker
Copy link
Contributor Author

Came up with a simpler scheme that supports streams and jumping, and where stream does not need to be 0 initially. We only need to track the split_level. It also avoids the duplication we currently have with plain Xorshift.

fn split_at(mut self, index: usize) -> (Self, Self) {
    let new = DistProducer {
        distr: self.distr,
        amount: self.amount - index,
        rng: R::split_from(&mut self.rng, self.split_level),
        split_level = self.split_level + 1;
        phantom: ::core::marker::PhantomData,
    };
    self.amount = index;
    self.split_level += 1;
    (self, new)
}

// PRNG supporting streams:
pub trait SeedableRng: Sized {
    /* ... */

    fn split_from(&mut self, split_level: usize) -> Self {
        let mut new = self.clone();
        // XOR the stream with bit nr `split_level`.
        new.stream ^= (1 << split_level); // shift by one more if `stream` must be odd
        new
    }
}

// PRNG supporting jumps:
pub trait SeedableRng: Sized {
    /* ... */

    fn split_from(&mut self, split_level: usize) -> Self {
        let mut new = self.clone();
        // Jump by smaller amounts if we are at a deeper `split_level`.
        new.jump(PERIOD / (2 << split_level));
        new
    }
}

// PRNG not supporting either
pub trait SeedableRng: Sized {
    /* ... */

    fn split_from(&mut self, split_level: usize) -> Self {
        let mut new = Self::from_rng(self).unwrap();
        // Adding `split_level` to the state may improve things a bit
        new.state = new.state.wrapping_add(split_level + 1);
        new
    }
}

@dhardy
Copy link
Member

dhardy commented Apr 17, 2018

And what should the default implementation be? Maybe a variant on the last:

fn split_from(&mut self, split_level: usize) -> Self {
    let mut seed = Seed::default();
    self.fill_bytes(&mut seed);
    // somehow do cascading wrapping add to seed (cast to `[usize]` on x86?)
    Self::from_seed(seed)
}

I'm not sure; this is a bit niche but does have a certain utility. For use with Rayon it obviously doesn't need to be reproducible.

An alternative could be to have StreamableRng::set_stream trait/fn and only allow those to be used with Rayon.

Or maybe it just makes more sense using thread-local RNGs...

@vks
Copy link
Collaborator

vks commented Apr 17, 2018

I feel like this should be implemented in rayon. I think they could even make it reproducible (with some performance cost).

/// `SeedableRng::from_rng`. **Important**: Not all RNG algorithms support this!
/// Notably the low-quality plain Xorshift, the current default for `SmallRng`,
/// will simply clone itself using this method instead of seeding the split off
/// RNG well. Consider using something like PCG or Xoroshiro128+.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to use different RNGs for the seeding and the sampling.

@pitdicker
Copy link
Contributor Author

pitdicker commented Apr 17, 2018

And what should the default implementation be? Maybe a variant on the last

Seems like a good default to me. If we use from_seed instead of from_rng, we can adjust the seed a bit before using it to initialize the RNG.

For use with Rayon it obviously doesn't need to be reproducible.

I am not so sure yet it is impossible to be reproducible. If you could control how often the job gets split, the worst that can happen is that the jobs execute in different order. But all the values would be the same.

An alternative could be to have StreamableRng::set_stream trait/fn and only allow those to be used with Rayon.

I thought about requiring a different trait than SeedableRng. But this sort-of has to do with seeding (creating a new RNG from an existing one), and the problem here almost only applies to PRNGs, which all require seeding. Also all should have a good solution, RNGs with multiple streams are not the only option.

A trait to support working with streams and a trait to support seeking can both be interesting. In a way they would be more low-level than what I proposed here. They would need quite some design work, and would probably be less easy to use.

For streams you would want to have both a getter and setter, and figure out how to make good use of streams yourself. Supported values may be specific to the RNG.

Seeking is more difficult, with some RNGs supporting absolute seeking, others only relative. Do you want to emulate the ability to seek backwards (by wrapping around the RNGs period)? Seeking by a variable amount should be possible for most RNGs, but can be very slow. For some RNGs you may want to cache the intermediate polynomial. And should seeking by some powers of two, or fractions of the period need a special function? And I don't even know if it is always possible to calculate the difference between two points in a random number stream. But you may also want to expose a function to get that too.

Even if we someday would have traits for both functionalities, I think something simple like the SeedableRng::split_off method from above would be useful. And I am pretty happy with the solution of a single split_level argument, that is somewhat understandable and works in multiple situations. Only the trouble that you need to use it recursively, not sequentially.

Or maybe it just makes more sense using thread-local RNGs...

I'll assure you: I won't forget that option. I am not pushing this solution, just exploring how plausible it would be as alternative. Because I think there are cases where we can do much better than using thread_rng.

I feel like this should be implemented in rayon.

Something to keep in mind. Yet this is mostly a problem related to how RNGs work.

I think it should be possible to use different RNGs for the seeding and the sampling.

@dhardy had the same comment, but I don't see a way to make that work.

@vks
Copy link
Collaborator

vks commented Apr 17, 2018

Yet this is mostly a problem related to how RNGs work.

Yes, but I think the thread pool should manage the RNGs (see rayon-rs/rayon#566), which can only be implemented in rayon.

@pitdicker
Copy link
Contributor Author

Made a benchmark to compare the alternatives pitdicker@64c6985:

test gen_100_000_sample_iter  ... bench:     438,960 ns/iter (+/- 1,246) = 911 MB/s
test gen_100_000_sequential   ... bench:     414,934 ns/iter (+/- 603) = 964 MB/s

test gen_100_000_parallel     ... bench:      39,801 ns/iter (+/- 3,904) = 10049 MB/s (with seeding from itself)
test gen_100_000_parallel     ... bench:      43,881 ns/iter (+/- 3,288) = 9115 MB/s (with jumping)
test gen_100_000_thread_local ... bench:     141,495 ns/iter (+/- 19,932) = 2826 MB/s

A few things to note. Both sequential methods do not optimize well. Xorshift should be able to do ~4000 MB/s on my machine. But this is kind of a known problem with iterators, not sure how to improve that...

Using rayon and with thread_rng butchered to use Xorshift, we see it gets about 3× as fast. As I have 4 cores, that seems reasonable.

The two parallel methods use the approach from this PR. I am not sure what is going that makes them not suffer from the bad performance of the normal iterator methods. So something to figure out before drawing any conclusions.

@dhardy
Copy link
Member

dhardy commented Apr 18, 2018

Yes, interesting, but as you say 10-fold speedup is more than expected, so something odd is going on.

@pitdicker
Copy link
Contributor Author

But in this case the trouble is with the normale iterator code, the parallel results are roughly as expected.

You also investigated this before, in #275 (comment).

@pitdicker
Copy link
Contributor Author

pitdicker commented Apr 18, 2018

Adjusted the benchmark to generate 100.000 f64s, which are less effected by the optimization troubles XorShift::next_u32 has.

test gen_100_000_parallel     ... bench:      72,693 ns/iter (+/- 12,848) = 11005 MB/s
test gen_100_000_sample_iter  ... bench:     378,965 ns/iter (+/- 2,687) = 2111 MB/s
test gen_100_000_sequential   ... bench:     274,943 ns/iter (+/- 652) = 2909 MB/s
test gen_100_000_thread_local ... bench:     170,941 ns/iter (+/- 27,541) = 4679 MB/s

I have inspected the results with perf.

The parallel method in this PR with jumping overhead is 3.8× as fast as the fastest sequential version. For both the parallel and sequential benchmarks 70-75% of the time is spend generating values in a perfectly optimized loop. Less than 5% of the time is spend jumping the RNGs.

The sample_iter benchmark contains some extra vector handling code. The trouble seems to be it is not an ExactSizeIterator (which the others are). I would expect it to become an ExactSizeIterator when we use take(100_000). Maybe a bug in the standard library? rust-lang/rust#50057

The method using a thread_local Xorshift spends 25~30% of its time retrieving the RNG from thread-local storage (because that has to happen on every iteration). A bit more than 50% is spend in the actual loop, which is not as optimized as in the other variants. Not sure why.

@pitdicker
Copy link
Contributor Author

Simply implementing size_hint for DistIter does the trick to make it perform roughly equal to the other sequential benchmark.

@dhardy
Copy link
Member

dhardy commented Apr 19, 2018

How many sub-generators are used by the parallel benchmark?

Tight loops can benefit from unrolling but of course that can't happen when the same RNG gets used in each iteration.

It would be interesting to try optimising with a native 128-bit generator or even a BlockRng.

@pitdicker
Copy link
Contributor Author

How many sub-generators are used by the parallel benchmark?

Not that many. In a previous test 8, could be a bit more in this benchmark. According to rayon-rs/rayon#566 (comment) the number of cores (I have 4), plus a few more near the end when it starts to steal work.

Tight loops can benefit from unrolling but of course that can't happen when the same RNG gets used in each iteration.

It would be interesting to try optimising with a native 128-bit generator or even a BlockRng.

Not sure what you mean here. I didn't see any loop unrolling with Xorshift. But what I mostly looked for is if LLVM was able to cut through all the abstractions. And it is, leaving a simple loop with the by now familiar Xorshift instructions and float conversion code, basically the same instructions as in the distr_range_f64 benchmark. So I do not expect other RNGs to behave differently, but always good to test.

@pitdicker
Copy link
Contributor Author

pitdicker commented Apr 20, 2018

I tried another approach, having par_dist_iter produced an unindexed iterator at first, and turning that into an indexed one with a specialized take method. Quite some more code, but more consistent with the sequential version and more flexible.

I plan to do another test with a parallel iterator on Rng that works a bit like map(). It is quite complex to implement (just like Rayons map is, because we can't clone a closure), but does not do much special: it makes rng available to a closure, which can be used with the other parallel iterator methods.

I think it is more useful than the par_dist_iter approach, as you can simply wrap the sequential algorithm you need, which probably involves more than sampling from some distribution. And rayon takes care of splitting the RNGs, dividing the work over cores, and collecting the results in order. In combination with a with_min_len and with_max_len that are roughly equal I think it can even be deterministic, although different than the sequential version.

The problem from #398 would then hopefully look something like this:

fn normal(mu:f64, sigma: &Vec<f64>) -> Vec<f64> {
    let mut rng = SmallRng::new();
    rng.par_iter_map(|rng| {
        let mut g = Normal::new(mu, sigma[i]);
        g.sample(&mut rng)
    }).take(sigma.len()).collect();
}

Edit: it would also have to iterate over the sigma vector. That means zipping two iterators together, and using map on that. A good test to see if this is flexible enough...

@pitdicker
Copy link
Contributor Author

I wrote a comment about how I now think going with a simple wrapper that splits an RNG on clone is the right approach, in combination with map_with. But apparently that commment is gone/lost/somewhere...

Advantages are:

  • it can work not only for distributions, but also for common code that needs an RNG;
  • it is better to combine with things like an iterator over something like an input vector;
  • it creates not dependencies between Rand and Rayon.

I do like the part sketched here of adding some kind of split method to SeedableRng.

As there is basically no code shared between that approach and the one in this PR, I am going to close this one.

Thanks for all the good discussion 👍 and expect a new PR soon(ish).

@pitdicker pitdicker closed this Apr 27, 2018
@pitdicker pitdicker mentioned this pull request May 20, 2018
@pitdicker pitdicker mentioned this pull request Jun 21, 2018
28 tasks
@masonk
Copy link
Contributor

masonk commented Jul 4, 2022

Hi, I'm interested in this exact problem, and I'm wonder what the current state-of-the-art is.

In a toy simulation I wrote, I found that calling rand::thread_rng() inside my hot function is 17% slower than passing &mut impl Rng to the hot function.

The easiest way to stop calling rand::thread_rng() inside my hot function when I'm using Rayon is to use map_with(<some clonable Rng>, |rng, _| hot_function(&mut rng)). Unfortunately my understanding of the auto implementation of StdRng is that it doesn't give independent seeds for each clone.

In short, I have this code:

fn slow_sim(trials: usize) {
    let sum = (0..trials)
        .into_par_iter()
        .map(|_| do_one_slow())
        .reduce(|| 0, |a, b| a + b);
}

fn do_one_slow() -> usize {
    let mut count: usize = 1;
    let u = Uniform::new(0.0..1.0);
    let mut rng = thread_rng();
    let mut cursor = u.sample(&mut rng);
    loop {
        let next = u.sample(&mut rng);
        count += 1;
        if next > cursor {
            break;
        }
        cursor = next;
    }
    return count;
}

I would like to write this code, which is 17% faster, almost entirely because it doesn't call rand::thread_rng() inside of do_one (constructing the Uniform inside vs outside, surprisingly, doesn't appreciably affect the benchmark).

fn fast_sim(trials: usize) {
    let u: Uniform<f64> = Uniform::from(0.0..1.0);
    let sum = (0..trials)
        .into_par_iter()
        .map_with(rand::rngs::StdRng::from_entropy(), |rng, _| do_one(&u, rng))
        .reduce(|| 0, |a, b| a + b);
}

fn do_one(u: &Uniform<f64>, rng: &mut impl Rng) -> usize {
    let mut count: usize = 1;
    let mut cursor = u.sample(rng);
    loop {
        let next = u.sample(rng);
        count += 1;
        if next > cursor {
            break;
        }
        cursor = next;
    }
    return count;
}

But IIUC, the second program is buggy, as each chunk is going to be seeded with an identical Rng.

@masonk
Copy link
Contributor

masonk commented Jul 4, 2022

Oh, I think I could just use map_init, right? In fact, that's the example shown in Rayon:

fn fast_sim(trials: usize) {
    let u: Uniform<f64> = Uniform::from(0.0..1.0);
    let sum = (0..trials)
        .into_par_iter()
        .map_init(|| rand::thread_rng(), |rng, _| do_one(&u, rng))
        .reduce(|| 0, |a, b| a + b);
}

fn do_one(u: &Uniform<f64>, rng: &mut impl Rng) -> usize {
    let mut count: usize = 1;
    let mut cursor = u.sample(rng);
    loop {
        let next = u.sample(rng);
        count += 1;
        if next > cursor {
            break;
        }
        cursor = next;
    }
    return count;
}

@dhardy
Copy link
Member

dhardy commented Jul 5, 2022

Unfortunately my understanding of the auto implementation of StdRng is that it doesn't give independent seeds for each clone.

A clone should be identical, right? You almost never want to clone an RNG, aside from certain tests.

You could instead construct a new RNG, possibly from a seed passed into do_one. But you need something unique going into each instance of do_one, and map_with appears to clone its init arg.

I'm a bit confused about that last example: ThreadRng is !Sync yet Rayon is passing a reference into each instance of do_one? (Oh, it probably works by initialising the RNG for each worker thread. So this works, but the result is non-deterministic.)

I would imagine something more like this (playground link), but I'm not clear whether map_init is actually creating a new seed for each instance or not. Edit: no it doesn't! Play link. A better option: play.

fn fast_sim(trials: usize) {
    let sum = (0..trials)
        .into_par_iter()
        .map_init(|| StdRng::from_seed(rand::random()), |rng, _| do_one(rng))
        .reduce(|| 0, |a, b| a + b);
}

fn do_one(rng: &mut StdRng) -> usize {
    // ...
}

Like this you can now replace StdRng with whatever RNG you want, possibly something a little faster like SmallRng or ChaCha8Rng. But it's non-deterministic.

If you want deterministic, you would have to do something like pass the same fixed seed plus a job_number to each job and construct an RNG from those.

constructing the Uniform inside vs outside, surprisingly, doesn't appreciably affect the benchmark

Since it's parametrised with constants, most of the set-up should effectively happen at compile time. Not that it's very complex anyway.

@masonk
Copy link
Contributor

masonk commented Jul 5, 2022

A clone should be identical, right? You almost never want to clone an RNG, aside from certain tests.

I agree. Clone should give an identical value back. It's doing the correct thing.

map_init() is what I really needed, but I didn't know it existed when I wrote my original question.

map_init indeed does not require Sync bounds on the return value of its callback because it invokes that callback once per "rayon job", which is a batch of work handled on a single thread, on the worker thread where the job will run.

I played around in the playground trying to create a deterministic threaded simulation.

I think we can do what we need to do in map_init to deterministically construct a seed, but the issue is that the number of jobs Rayon will spawn, and the number of individual work items that will be allocated to each rayon batch is not deterministic, due to work stealing. For example, this code outputs similar values every time, but if an item of work is stolen onto a different thread (and thus a different seed), it will produce something different:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=dd6e0a7feafb379a19fb2b7bdb57051e

I think it's not currently possible to write a deterministically reproducible monte carlo sim using rayon.

@dhardy
Copy link
Member

dhardy commented Jul 5, 2022

To make it deterministic, you should use a single seed (instead of constructing a new seed in map_init like in my example). Then you should use the "work unit number" n or similar to construct something unique. How, exactly, depends:

  • xoshiro generators support jump and long_jump methods; you could construct a generator from your fixed seed then call jump n times
  • chacha RNGs support set_stream

Some other generators support multiple streams somehow. A little warning: PCG has been criticized in this regard since its streams are more related than might be expected.

@masonk
Copy link
Contributor

masonk commented Jul 5, 2022

I'm not sure if I follow you.

I can't construct an rng one time and mutably share it between the par_iter threads.

I can construct an rng one time per batch from an identical seed and chose the stream based on a work number, but for something like xoshiro, I'd have to call jump a billion times in the hot function, which seems totally intractable.

I tried to adapt the pi estimator example for rayon using chacha set_stream and map_init.

#![cfg(all(feature = "std", feature = "std_rng"))]

use rand::distributions::{Distribution, Uniform};
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;

static SEED: u64 = 0;

fn main() {
    let range = Uniform::new(-1.0f64, 1.0);

    let total = 1_000_000;

    let in_circle: usize = (0..total)
        .into_par_iter()
        .map_init(
            || ChaCha8Rng::seed_from_u64(SEED),
            |rng, i| {
                rng.set_word_pos(i);
                let a = range.sample(rng);
                let b = range.sample(rng);
                if a * a + b * b <= 1.0 {
                    1
                } else {
                    0
                }
            },
        )
        .reduce(|| 0, |a, b| a + b);

    // prints something close to 3.14159...
    println!(
        "π is approximately {}",
        4. * (in_circle as f64) / (total as f64)
    );
}

But, though this function approximates pi, it gives a different approximation every time I run it. Can you help me understand why?

I found that set_stream did not create a deterministic simulation (calling rng.set_stream(i) produced reasonable estimates for pi, but produced a different estimate on every run), but set_word_pos gives the same estimate each time as long as SEED is the same. However, I'm not sure that I'm using ChaCha correctly.

I have a pull request for this as an example, but of course I want to make sure it's correct, e.g., that every work item is generating different samples from the distribution.

@dhardy
Copy link
Member

dhardy commented Jul 6, 2022

You're using the same RNG with the same SEED across multiple work units (the inner function), then calling set_word_pos(i) in each. Result: most of the RNG output will overlap between work units (since i only differs by small values).

I found that set_stream did not create a deterministic simulation

Because you are re-using the RNG across work-units in non-deterministic order, and the "word position" is retained from one work unit to the next.


Try this. The result appears to be deterministic, as expected.

use rand::distributions::{Distribution, Uniform};
use rand_chacha::rand_core::SeedableRng;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;

static SEED: u64 = 0;

fn main() {
    let range = Uniform::new(-1.0f64, 1.0);

    let total = 1_000_000;

    let in_circle: usize = (0..total)
        .into_par_iter()
        .map(|i| {
            let mut rng = ChaCha8Rng::seed_from_u64(SEED);
            rng.set_stream(i);
            let a = range.sample(&mut rng);
            let b = range.sample(&mut rng);
            if a * a + b * b <= 1.0 {
                1
            } else {
                0
            }
        })
        .reduce(|| 0, |a, b| a + b);

    // prints something close to 3.14159...
    println!(
        "π is approximately {}",
        4. * (in_circle as f64) / (total as f64)
    );
}

@dhardy
Copy link
Member

dhardy commented Jul 6, 2022

@masonk thanks for PR #1236. I'm also working on adding a section to the book; you wouldn't mind if I re-use some of this code there?

BTW my last code, with 10M samples, runs in 79ms using Rayon, or ~870ms without Rayon (Ryzen 5800X). I think that's good enough speed up for an 8 core CPU?

@masonk
Copy link
Contributor

masonk commented Jul 6, 2022

Thanks for looking! I will update my PR to use this approach, and I don't mind and fully release this code for any purpose.

BTW my last code, with 10M samples, runs in 79ms using Rayon, or ~870ms without Rayon (Ryzen 5800X). I think that's good enough speed up for an 8 core CPU?

If you use the same code pattern and just vary the number of cores, it's expected to get a speedup factor close to your core count. The trickiness here is that you can use a different code pattern when running ST or when running MT but dropping determinism. No one would write the ST version of this program by building the Rng in the hot loop. I think there are only three plausible programs to consider:

#![cfg(all(feature = "std", feature = "std_rng"))]

use rand::distributions::{Distribution, Uniform};
use rand_chacha::{rand_core::SeedableRng, ChaCha8Rng};
use rayon::prelude::*;

static SEED: u64 = 0;
static TRIALS: u64 = 10_000_000;

fn main() {
    time("deterministic st", deterministic_st);
    time("nondeterministic mt", nondeterministic_mt);
    time("deterministic mt", deterministic_mt);
}

fn time(name: &str, cb: fn(u64) -> f64) {
    let start = std::time::Instant::now();
    let answer = cb(TRIALS);
    let end = start.elapsed();
    println!("\n{}:", name);
    println!("\telapsed: {:?}", end);
    println!("\tπ approximation: {}", answer);
}

fn deterministic_st(total: u64) -> f64 {
    let range = Uniform::new(-1.0f64, 1.0);
    let mut rng = ChaCha8Rng::seed_from_u64(SEED);
    let mut sum = 0u64;
    for _ in 0..total {
        let a = range.sample(&mut rng);
        let b = range.sample(&mut rng);
        if a * a + b * b <= 1.0 { sum += 1 }
    }

    4. * (sum as f64) / (total as f64)
}

fn nondeterministic_mt(total: u64) -> f64 {
    let range = Uniform::new(-1.0f64, 1.0);

    let sum = (0..total).into_par_iter().map_init(
        || ChaCha8Rng::from_entropy(),
        |rng, _| {
            let a = range.sample(rng);
            let b = range.sample(rng);
            if a * a + b * b <= 1.0 { 1 } else { 0 }
        },
    ).reduce(|| 0, |a, b| a + b);
    
    4. * (sum as f64) / (total as f64)
}

fn deterministic_mt(total: u64) -> f64 {
    let range = Uniform::new(-1.0f64, 1.0);

    let sum = (0..total)
        .into_par_iter()
        .map(|i| {
            let mut rng = ChaCha8Rng::seed_from_u64(SEED);
            rng.set_stream(i);
            let a = range.sample(&mut rng);
            let b = range.sample(&mut rng);
            if a * a + b * b <= 1.0 {
                1
            } else {
                0
            }
        })
        .reduce(|| 0, |a, b| a + b);

        4. * (sum as f64) / (total as f64)
}

Here's what I get when I run this with --release on my 16-core Threadripper.

deterministic st:
        elapsed: 94.4171ms
        π approximation: 3.1414576

nondeterministic mt:
        elapsed: 10.7305ms
        π approximation: 3.1422312

deterministic mt:
        elapsed: 69.1351ms
        π approximation: 3.141812

So what we see is that getting determinism back into the mt version increases runtime by 7x using this pattern. I would say it's still not good, even though it does barely beat the st version. Do you get similar numbers?

@dhardy
Copy link
Member

dhardy commented Jul 6, 2022

It's worse actually:

deterministic st:
        elapsed: 54.982968ms
        π approximation: 3.1414576

nondeterministic mt:
        elapsed: 7.243199ms
        π approximation: 3.1419132

deterministic mt:
        elapsed: 79.365859ms
        π approximation: 3.141812

The point is, for determinism, you must fix the RNG to the work unit. The trick then is to make sure your work unit is large enough that the RNG initialisation isn't significant. And for this toy example, I don't think Rayon alone cuts it.

This version actually beats your nondeterministic code:

fn deterministic_mt(total: u64) -> f64 {
    let range = Uniform::new(-1.0f64, 1.0);
    const JOB_SIZE: u64 = 1000;

    let sum = (0..(total / JOB_SIZE))
        .into_par_iter()
        .map(|i| {
            let mut rng = ChaCha8Rng::seed_from_u64(SEED);
            rng.set_stream(i);
            let mut count = 0;
            for _ in 0..JOB_SIZE {
                let a = range.sample(&mut rng);
                let b = range.sample(&mut rng);
                if a * a + b * b <= 1.0 {
                    count += 1;
                }
            }
            count
        })
        .reduce(|| 0, |a, b| a + b);

    4. * (sum as f64) / (total as f64)
}

@masonk
Copy link
Contributor

masonk commented Jul 6, 2022

Awesome! That's what we should show as the example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
F-new-int Functionality: new, within Rand P-low Priority: Low X-experimental Type: experimental change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants