Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous Stream Publisher - Example #377

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

ttulka
Copy link

@ttulka ttulka commented Jun 12, 2017

Based on Supplier, for the case you don't know the count of elements
(for example streaming from a file, etc).

Publisher is controlled by the null value - enf of stream.

Based on Supplier, for the case you don't know the count of elements
(for example streaming from a file, etc).

Publisher is controlled by the null value - enf of stream.
// we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread.
// It also makes it easier to follow rule 1.9
private void doSubscribe() {
if (!cancelled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelled being true here is impossible because the only way to get that true is by submitting a Subscription to the Subscriber that happens once and under this condition.

try {
subscriber.onSubscribe(this);
} catch (final Throwable t) { // Due diligence to obey 2.13
terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with crashing onXXX is that you may end up in an undefined state where even calling onError fails.

cancelled being true here is impossible because the only way to get that
true is by submitting a Subscription to the Subscriber that happens once
and under this condition.
The problem with crashing onXXX is that you may end up in an undefined
state where even calling onError fails.
@viktorklang
Copy link
Contributor

Hi @ttulka!

For my understanding, does this solve a use-case that the Iterable-based example doesn't?

@ttulka
Copy link
Author

ttulka commented Jun 12, 2017

Hi @viktorklang
yes, this example is for the case when a "hasNext()" method is not available, it means, the stream tells us when it is over by sending a 'null' value.
A use-case could be reading lines from a file or a socket - for example using 'BufferedReader.readLine()' as the supplier.

@viktorklang
Copy link
Contributor

@ttulka Ah, ok, couldn't that be solved by doing a read-ahead of 1 element? (So when the iterator() method on Iterable is called we do a read-ahead and then we advance based on that?

def supplier2Iterable[T](s: Supplier[T]): Iterable[T] = new Iterable[T] {
  override def iterator(): Iterator[T] = new Iterator[T] {
    private[this] var elem = s.get()
    override def hasNext(): Boolean = elem != null
    override def next(): T =
      if (elem == null) Iterator.empty.next()
      else {
        val prev = elem
        elem = s.get()
        prev
      }
  }

}

@ttulka
Copy link
Author

ttulka commented Jun 12, 2017

@viktorklang This makes definitely sense, actually it is the same logic I put into the iterable publisher.
I removed my original class and created a new one - a simple extension of the iterable publisher. This is exactly the example to explain this use case.
Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants