Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add option to use async iterables #605

Merged
merged 1 commit into from
Jul 1, 2022

Conversation

paralin
Copy link
Collaborator

@paralin paralin commented Jun 30, 2022

Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },
  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

@paralin
Copy link
Collaborator Author

paralin commented Jun 30, 2022

Note: I avoided using it-stream-types to avoid the extra dependency:

export type Source<T> = AsyncIterable<T> | Iterable<T>
source: Source<Uint8Array | Uint8Array[]>

=>

source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>

paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jun 30, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
paralin added a commit to aperturerobotics/starpc that referenced this pull request Jul 1, 2022
Remove Observable<T> completely in favor of AsyncIterable<T>.

Uses the useAsyncIterable flag in ts-proto.

stephenh/ts-proto#605

Signed-off-by: Christian Stewart <christian@paral.in>
@paralin
Copy link
Collaborator Author

paralin commented Jul 1, 2022

@stephenh Tested this & released for starpc, converted everything to AsyncIterable from Observable: aperturerobotics/starpc@4ecfbe9

@paralin
Copy link
Collaborator Author

paralin commented Jul 1, 2022

@stephenh This is ready to go now that #603 is merged

Adds option useAsyncIterable which uses AsyncIterable instead of Observable.

For example:

  bidirectionalStreamingRequest(
    service: string,
    method: string,
    data: AsyncIterable<Uint8Array>
  ): AsyncIterable<Uint8Array>

Generates Transform async iterables for encoding and decoding:

  // encodeTransform encodes a source of message objects.
  // Transform<TestMessage, Uint8Array>
  async *encodeTransform(
    source: AsyncIterable<TestMessage | TestMessage[]> | Iterable<TestMessage | TestMessage[]>
  ): AsyncIterable<Uint8Array> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.encode(p).finish()];
        }
      } else {
        yield* [TestMessage.encode(pkt).finish()];
      }
    }
  },

  // decodeTransform decodes a source of encoded messages.
  // Transform<Uint8Array, TestMessage>
  async *decodeTransform(
    source: AsyncIterable<Uint8Array | Uint8Array[]> | Iterable<Uint8Array | Uint8Array[]>
  ): AsyncIterable<TestMessage> {
    for await (const pkt of source) {
      if (Array.isArray(pkt)) {
        for (const p of pkt) {
          yield* [TestMessage.decode(p)];
        }
      } else {
        yield* [TestMessage.decode(pkt)];
      }
    }
  },

Generates RPC service implementations which use the Transform iterators:

  BidiStreaming(request: AsyncIterable<TestMessage>): AsyncIterable<TestMessage> {
    const data = TestMessage.encodeTransform(request);
    const result = this.rpc.bidirectionalStreamingRequest('simple.Test', 'BidiStreaming', data);
    return TestMessage.decodeTransform(result);
  }

AsyncIterables indicate a stream has ended by closing with an optional error.

Fixes stephenh#600

Signed-off-by: Christian Stewart <christian@paral.in>
@stephenh
Copy link
Owner

stephenh commented Jul 1, 2022

Looks great, thanks!

@stephenh stephenh merged commit ca8ea8d into stephenh:main Jul 1, 2022
stephenh pushed a commit that referenced this pull request Jul 1, 2022
# [1.116.0](v1.115.5...v1.116.0) (2022-07-01)

### Features

* add option to use async iterables ([#605](#605)) ([ca8ea8d](ca8ea8d)), closes [#600](#600)
@stephenh
Copy link
Owner

stephenh commented Jul 1, 2022

🎉 This PR is included in version 1.116.0 🎉

The release is available on:

Your semantic-release bot 📦🚀

@paralin paralin deleted the async-iterable branch July 1, 2022 22:47
@paralin
Copy link
Collaborator Author

paralin commented Jul 1, 2022

Awesome, thanks @stephenh !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants