Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced API with async/await support #205

Closed
alshdavid opened this issue Feb 4, 2020 · 23 comments
Closed

Reduced API with async/await support #205

alshdavid opened this issue Feb 4, 2020 · 23 comments

Comments

@alshdavid
Copy link

alshdavid commented Feb 4, 2020

Simpler API with support for async/await through the use of an object iterator.

This is inspired by Go's channels.
It's intended that this:

  • Flattens code
  • Makes streams chainable with promises (which are essentially streams which complete immediately).
  • Enables use of try/catch for error handling

Piping operators, creating "subjects" and applying a "subscribe" function can happen in userland through libraries.

Example Implementation
Codesandbox

Example usage:

const numbers = new Observable(next => {
  const interval = setInterval(next, 1000)
  return () => clearInterval(interval)
})

for await (const number of numbers) {
  console.log(number)
  if (number === 4) {
    break
  }
}

API

interface Observable<T> {
    [Symbol.Iterator]: Promise<T>
}

type TeardownFunc = () => void

type SetupFunc<T> = (
  next: (value: T) => void,
  complete: () => void,
) => TeardownFunc

type ObservableConstructor<T> = (
   setup: SetupFunc<T>
) => Observable
@hax
Copy link
Member

hax commented Feb 5, 2020

Shouldn't it be AsyncIterator?

@ForsakenHarmony
Copy link

How do you cancel it?

@alshdavid
Copy link
Author

To cancel the subscription you simply break the loop

@ForsakenHarmony
Copy link

In your case the clearInterval never gets called because there seems to be no way to cancel from the consumer?

@alshdavid
Copy link
Author

alshdavid commented Feb 6, 2020

Iterators can have setup and teardown logic

class Observable {
  *[Symbol.asyncIterator]() {
    // Setup logic when a loop starts
    for (const i of [1,2,3,4,5]) {
      try {
        yield Promise.resolve(i)
      } finally {
        // teardown logic when a loop is broken
      }
    }
  }
}

When someone starts iterating over the object you can check if there are any subscribers already, if not, run the setup logic.

When a loop is broken, you can check if the amount of subscribers is 0, if it is, run the teardown logic.

You can see an imperfect implementation here
https://github.com/alshdavid/reactive-channels/blob/master/src/private-channel.ts

MDN Docs

@benlesh
Copy link

benlesh commented Feb 7, 2020

Since observables are all push-based, there are memory and back pressure concerns unless you use a lossy strategy to adapt the incoming values into the asyncIterator. I have a proposal to add this as a feature to RxJS, but it's not without it's drawbacks.

Otherwise, what you've proposed here can be accomplished with async function*.

@dy
Copy link

dy commented May 25, 2020

Not sure where to put that (sorry for possible offtopic), but Observable can serve as handy foundation for asyncIterable, they play well together, example:

  async *[Symbol.asyncIterator]() {
    let resolve, buf = [],
      p = new Promise(r => resolve = r),
      unsubscribe = this[Symbol.observable]().subscribe(
        v => (buf.push(v), resolve(), p = new Promise(r => resolve = r))
      )

    try {
      while (1) yield* buf.splice(0), await p
    } catch {} finally {
      unsubscribe()
    }
  },
  [Symbol.observable]: () => ({
    subscribe: (next = next.next || next) => {
      let handler = e => next(e.detail.props),
        unsubscribe = () => el.removeEventListener('evt', handler)
      el.addEventListener('evt', handler)
      return unsubscribe.unsubscribe = unsubscribe
    },
    [Symbol.observable]() { return this }
  })

Fusing subscription logic into asyncIterator directly would be confusing, having it in lazy observable.subscribe is neat, would be nice to simplify API though. Also sync nature of observable helps avoiding many async-related issues, it gets tricky with asyncIterators sometimes.

@benlesh just curious do you have an idea what's the main blocker for the proposal?

@richytong
Copy link

richytong commented Jun 11, 2020

Observables are not async iterables. Though they do compete for the same streaming data model, it is not right to try to base either one off of the other unless you want spaghetti code. I write more about this here

@dy
Copy link

dy commented Jun 12, 2020

@richytong Observable is invaluable for events, also it is sync - that makes it different from async iterable. Also I find it easier to provide minimal subscription code with observable, rather than dealing with async generators / promises resolution. All I wanted to say - if some structure implements Symbol.observable - it is fairly easy to add async iterable support (for await) − the opposite is not necessarily true.

it is not right to try to base either one off of the other unless you want spaghetti code

Not sure I could get that argument from the article of why your observable code turns spaghetti. I had the opposite experience (regardless of rxjs) with spect or zen-observble - Symbol.observable acts as implicit reactive layer for any data structure, making code tidy.

@richytong
Copy link

@dy Observable may be invaluable for events, but there are safer and less painful ways via async iterables to do what observables are currently doing.

also, declaring an observable data structure may be sync, but you have to think about the events coming through observables as async; those events could come in at any time.

I was referring to @benlesh's new library as spaghetti because he tries to turn observables into async iterables. That library is not one function that turns observables into async iterables; it is four functions providing four separate fallback behaviors in the case that Observables run into backpressure issues. These are the same backpressure issues that async iterables do not have to deal with.

@dy
Copy link

dy commented Jun 12, 2020

less painful ways via async iterables to do what observables are currently doing.

Can you give an example? In my experience that's not the case. During spect@15 iteration I found that to make many async iterables properly it needs promise resolution loop with buffering (this way or another), which is not trivial:

 async *[Symbol.asyncIterator]() {
      // emit initial value
      if (get) yield get()
      let buf = [], resolve, promise = new Promise(r => resolve = r),
      try {
        while (1) {
          await promise
          // from the moment promise was resolved until now, there could be an extra code that resolved promise one more time
          while (buf.length) {
            yield buf.pop()
            // from the moment yield got control back, the buf could've been topped up again
          }
        }
      } catch (e) {
      } finally {
      }
    },

Observables don't have that issue of missing out events between ticks. Also - this async iterator code naturally builds up on existing [Symbol.observable] - otherwise it'd take mixing up various concerns (subscribe, promise, buffer, batch?, skip?) in a single [Symbol.asyncIterator] method, which may turn code into mess.

RxJS is difficult - yes, but Observable is not, it is minimal language primitive for reactivity concern (you're welcome to invent how to minimize it more). I believe that's winning for everybody - for standardizing vast variety of reactive libs and as subscription provider for async iterators. Observable pattern emerges in asyncIterators anyways (for free!) - why not making it a standard.

With Observable handling subscribing concern, async iterator can focus on meaningful optimizations, like skipping duplicates, unchanged state, batching multiple updates, screening, throttling etc.

Observables run into backpressure issues.

I had an impression that observable is push-stream and should not care about consumer.

@obedm503
Copy link

From learning Rxjs, I've understood that Observable is used for push-based streams while AsyncIterator is used for pull-based streams.

@richytong
Copy link

@dy Just because something is difficult doesn't mean you have to learn it. RxJS has made your life hard and yours too @obedm503 because the entire library is built up around the Observable. While Observables certainly sound nice on paper (lazy evaluated, multiple push source, cool streams), your computer will run into memory issues due to backpressure if anything downstream of your Observable needs to do something in series with the events at scale.

I had an impression that observable is push-stream and should not care about consumer.

You are absolutely right about this. Observables really don't care about their consumers. That's why NodeJS streams, which are analogs to Observables, have run into many pains in this regard. I quote @benlesh

Since observables are all push-based, there are memory and back pressure concerns unless you use a lossy strategy to adapt the incoming values into the asyncIterator. I have a proposal to add this as a feature to RxJS, but it's not without it's drawbacks.

^ that turned into this library. Four strats for the price of your sanity. This is fine and dandy for a userland lib, but if we're talking spec, there needs to be something more robust. Actually, there is something - AsyncIterable - and it's already in the spec. The only thing it's missing is a nice API to consume it.

Can you give an example?

Here you are, from my userland lib

const { map, transform } = require('rubico')

const asyncGenerator = async function*() { yield 1; yield 2; yield 3 }

const square = x => x ** 2

transform(map(square), [])(asyncGenerator()) // [1, 4, 9]

My approach is not new; it uses transducers which hails from the land of Functional Programming. If you want something hard to wrap your head around but rewarding as hell, learn about transducers. My approach transforms the AsyncIterable created from asyncGenerator into an array. Some pros

  • Declarative - just say the thing, transform an async iterable to an array
  • Functional - there's only functions here. and empty array (the thing you're transforming into)
  • Pure - no side effects
  • Concise - when you start writing with mostly functions your code size will decrease
  • Intuitive - map, you probably know what this does. transform - are you trying to transform some data? use this. The more powerful and less expressive alternative is reduce
  • Benchmarked

During spect@15 iteration I found that to make many async iterables properly it needs promise resolution loop with buffering

You really don't need buffering with async iterables. The consumer will pull the next item when it is ready. What exactly are you making into an async iterable in your example?

@benjamingr
Copy link

@richytong hey, as Ben and others know I have been very critical of Rx in many places so this is not me jumping to defend it because I am a fanboy.

Your message is FUD. This whole thread shows a fundamental lack of research and understanding of the material. You make several unsubstantiated claims and several claims that simply show you did not grok Rx (for example it is the opposite of lazy after you subscribe so saying it is nice because it is lazy is one such fundamental misunderstanding).

Please read the meeting notes and docs. Promises are not suitable for this events have to be synchronous. The reason why has been discussed at least 20-30 times in this repo alone...

Attacking Ben to plug your library is extremely bad manners IMO. Please stop, do your research and contribute more constructively.

For those of you asking why these proposals are stalled - these sort of attacks that take a big chunk of emotional energy from proposal champions is a big part of why.

@benjamingr
Copy link

Also as this proposal is mostly dead in its current form anyway (afaik) discussing these things here is mostly pointless :)

@richytong
Copy link

@benjamingr Thank you for setting me straight and for your recommendations. I will take a look and focus more on my own contributions.

@benjamingr
Copy link

Ok, your response was well mannered enough to not leave you "hanging" and I couldn't actually find all the answers in this repo so I figured I'll recap some points worth mentioning:

  • Paraphrasing Erik: Iterables are essentially lazy, because nothing happens before you call next, but I’d categorize observables as (hyper) strict since next gets called whether or not the consumer wants it (so to speak).
  • Observables have to be entirely sync. This is in order to replace, complement and interop with the APIs they aim to replace and namely EventTarget. The browser event model requires this. This means no promises required anywhere throughout the API. Of course interop and utility methods are fine.
  • Observables can in fact support backpressure, similarly to how promises can support cancellation but don't. Not supporting backpressure isn't a bad thing and it makes observable a really simple API (with too many operators probably). If you want to take an example of observables with backpressure check our RxJava.
  • Node streams are both pull (if you subscribe to readable) and push (if you subscribe to data). Being able to get events as soon as possible is a feature of observables (and other event APIs like EventTarget and EventEmitter) - it's an important timing guarantee.
  • Observables are single consumer and not multiple consumer. In fact an observable is like a function that when invoked starts the actual action. You can of course put caching behavior on top of functions so it's easy to build multiple consumer (or multiple producer) observables.

For what it's worth - a lot of the API quirkiness is because of the "duality" of observable to LINQ, you can see this (now old) talk about it. A lot of the API was inspired by C#'s initially.

This repo contains a version of the proposal that is intentionally very small and contains as few features as possible in order to allow for interop and extensions. The goal of es-observable was to add a language level primitive so that libraries (like RxJS and presumably your library) could interop (easily) via a shared interface (Symbol.observable or Symbol.observer or whatever ended up last). It was supposed to usher in a new era the same way the Promises/A+ spec did for promises and allow for interop and an ecosystem.


Of course, you are welcome to use async iterables for whatever you want and write libraries. This (mostly dead) repo isn't "async iterable vs. observable" or "observable vs. promise" or "one observable library vs. another" it's just "minimal observable API so platforms like Node and browsers (via DOM) can adopt observables in their APIs.

@dy
Copy link

dy commented Jun 13, 2020

this proposal is mostly dead in its current form

The proposal is used by hundreds of packages if not more (9M+ downloads per week), it has multiple implementations and growing demand - ecosystem and many opinionated reactive implementations would only win from the standard. That's live in userland.

@benjamingr
Copy link

@dy to clarify, when I wrote:

this proposal is mostly dead in its current form

I meant "the push to promote this specification in its current form to the ECMAScript language at its current iteration".

I am very much still interested in this and I am interested in standardization so Node APIs are automatically convertible to observables.

@alshdavid
Copy link
Author

alshdavid commented Jun 23, 2020

[Redacted]

@benjamingr
Copy link

@alshdavid did you read the rest of the discussion in the issue? It looks like you are still arguing for async semantics which are a no-go (explained now 21-31 times in the repo and not 20-30 after #205 (comment) )

@alshdavid
Copy link
Author

Yep sorry, redacted my comment in response.

What is the expectation surrounding Observables in the spec? Is there a primitive type we can use currently or can expect to use as a target for rxjs-like functionality?

@benjamingr
Copy link

What is the expectation surrounding Observables in the spec?

Mostly stalled I think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants