-
Notifications
You must be signed in to change notification settings - Fork 530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RangePublisher - Possible example improvement? #551
Comments
It uses that logic because request may happen from a different thread than the emission of items are currently happening. That requires atomics or thread confinement. I have an example FlowRange which uses the latter, more like both techniques. The thing is, unless the |
Thank you for the prompt reply.
But the Publisher is synchronous, it will emit on the same thread as the In my mind, the
Within a particular call to On the current @Override
public void request(long n) {
// trimmed
// Downstream requests are cumulative and may come from any thread
for (; ; ) {
long requested = get();
long update = requested + n;
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
// we treat the signalled demand as "effectively unbounded"
if (update < 0L) {
update = Long.MAX_VALUE;
}
// atomically update the current requested amount
if (compareAndSet(requested, update)) {
// if there was no prior request amount, we start the emission loop
if (requested == 0L) {
emit(update);
}
break;
}
}
} For example, between the To illustrate my case further, I have set up this other implementation as a gist:
It would be interesting to hear you insights. |
Requests can be serialized but nothing says they have to be serialized with onNexts. The If you can ensure thread confinement, i.e., by rolling your own framework around RS, your implementation might just work (until it stack overflows due to the recursion). |
Requests MUST be serialized. Right? Or else the subscriber (any subscriber) would violate 2.7.
But because this is a synchronous publisher, one implies the other.
I am not saying that. I am saying the opposite: Because the
As my example implementation (linked above) is passing the I might off course be making a glaringly wrong assumption... but can't see it. Would me opening a MR for analysis by others be reasonable? |
I don't know how to explain it better. Try thinking about non-trivial but conforming |
On the examples, could RangePublisher rely on volatile variable rather than on
AtomicLong
?From the examples, the
RangePublisher
's Subscription extends an AtomicLong. It does so because its methods can be run from different threads. The following comment can be read:However, from rule 2.7
Would this then not mean that, regardless of how the subscriber behaves, the subscription only needs to be concerned about publishing the index of that subscriber? In which case a volatile variable would suffice?
Provided I am right, using a less powerful form of synchronization is preferable by the principle of least power. More importantly, for people learning via these examples, usage of
volatile
would emphasize that theSubscriber
must comply with rule 2.7.What do you think?
The text was updated successfully, but these errors were encountered: