From b7d0f8493ebf202c432a7a88856c3ca446d4152e Mon Sep 17 00:00:00 2001 From: Milan Suk Date: Mon, 6 May 2024 01:19:13 +0200 Subject: [PATCH] fix(platform): http client body stream runtime (#2694) Co-authored-by: Tim --- .changeset/bright-apricots-sit.md | 5 ++ .changeset/seven-cougars-jump.md | 5 ++ packages/effect/src/Stream.ts | 32 ++++++++++++- packages/effect/src/internal/stream.ts | 32 +++++++++---- .../platform-bun/src/internal/http/server.ts | 48 +++++++++++-------- packages/platform/src/Http/ServerResponse.ts | 2 +- packages/platform/src/internal/http/client.ts | 26 ++++------ .../src/internal/http/serverResponse.ts | 4 +- packages/platform/test/HttpClient.test.ts | 23 +++++++++ 9 files changed, 128 insertions(+), 49 deletions(-) create mode 100644 .changeset/bright-apricots-sit.md create mode 100644 .changeset/seven-cougars-jump.md diff --git a/.changeset/bright-apricots-sit.md b/.changeset/bright-apricots-sit.md new file mode 100644 index 0000000000..d5dc12b8ce --- /dev/null +++ b/.changeset/bright-apricots-sit.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +Add Stream.toReadableStreamEffect / .toReadableStreamRuntime diff --git a/.changeset/seven-cougars-jump.md b/.changeset/seven-cougars-jump.md new file mode 100644 index 0000000000..5848a53702 --- /dev/null +++ b/.changeset/seven-cougars-jump.md @@ -0,0 +1,5 @@ +--- +"@effect/platform": patch +--- + +Run client request stream with a current runtime. diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 83d0564512..ea1e2b8c12 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -22,6 +22,7 @@ import type { Pipeable } from "./Pipeable.js" import type { Predicate, Refinement } from "./Predicate.js" import type * as PubSub from "./PubSub.js" import type * as Queue from "./Queue.js" +import type { Runtime } from "./Runtime.js" import type * as Schedule from "./Schedule.js" import type * as Scope from "./Scope.js" import type * as Sink from "./Sink.js" @@ -3859,7 +3860,36 @@ export const toQueueOfElements: { * @since 2.0.0 * @category destructors */ -export const toReadableStream: (source: Stream) => ReadableStream = internal.toReadableStream +export const toReadableStream: (self: Stream) => ReadableStream = internal.toReadableStream + +/** + * Converts the stream to a `Effect`. + * + * See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream. + * + * @since 2.0.0 + * @category destructors + */ +export const toReadableStreamEffect: (self: Stream) => Effect.Effect, never, R> = + internal.toReadableStreamEffect + +/** + * Converts the stream to a `ReadableStream` using the provided runtime. + * + * See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream. + * + * @since 2.0.0 + * @category destructors + */ +export const toReadableStreamRuntime: { + ( + runtime: Runtime + ): (self: Stream) => ReadableStream + ( + self: Stream, + runtime: Runtime + ): ReadableStream +} = internal.toReadableStreamRuntime /** * Applies the transducer to the stream and emits its outputs. diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index f095b2b7d1..15ec125e44 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -6541,16 +6541,30 @@ export const toQueueOfElements = dual< )) /** @internal */ -export const toReadableStream = (source: Stream.Stream) => { - let pull: Effect.Effect +export const toReadableStream = (self: Stream.Stream) => + toReadableStreamRuntime(self, Runtime.defaultRuntime) + +/** @internal */ +export const toReadableStreamEffect = (self: Stream.Stream) => + Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime)) + +/** @internal */ +export const toReadableStreamRuntime = dual< + (runtime: Runtime.Runtime) => (self: Stream.Stream) => ReadableStream, + (self: Stream.Stream, runtime: Runtime.Runtime) => ReadableStream +>(2, (self: Stream.Stream, runtime: Runtime.Runtime): ReadableStream => { + const runSync = Runtime.runSync(runtime) + const runPromise = Runtime.runPromise(runtime) + + let pull: Effect.Effect let scope: Scope.CloseableScope return new ReadableStream({ start(controller) { - scope = Effect.runSync(Scope.make()) + scope = runSync(Scope.make()) pull = pipe( - toPull(source), - Scope.use(scope), - Effect.runSync, + toPull(self), + Scope.extend(scope), + runSync, Effect.tap((chunk) => Effect.sync(() => { Chunk.map(chunk, (a) => { @@ -6573,13 +6587,13 @@ export const toReadableStream = (source: Stream.Stream) => { ) }, pull() { - return Effect.runPromise(pull) + return runPromise(pull) }, cancel() { - return Effect.runPromise(Scope.close(scope, Exit.void)) + return runPromise(Scope.close(scope, Exit.void)) } }) -} +}) /** @internal */ export const transduce = dual< diff --git a/packages/platform-bun/src/internal/http/server.ts b/packages/platform-bun/src/internal/http/server.ts index ea81da0ef2..3626460eb8 100644 --- a/packages/platform-bun/src/internal/http/server.ts +++ b/packages/platform-bun/src/internal/http/server.ts @@ -27,6 +27,7 @@ import * as Inspectable from "effect/Inspectable" import * as Layer from "effect/Layer" import * as Option from "effect/Option" import type { ReadonlyRecord } from "effect/Record" +import type * as Runtime from "effect/Runtime" import type * as Scope from "effect/Scope" import * as Stream from "effect/Stream" import { Readable } from "node:stream" @@ -73,25 +74,27 @@ export const make = ( return Server.make({ address: { _tag: "TcpAddress", port: server.port, hostname: server.hostname }, serve(httpApp, middleware) { - const app = App.toHandled(httpApp, (request, exit) => - Effect.sync(() => { - const impl = request as ServerRequestImpl - if (exit._tag === "Success") { - impl.resolve(makeResponse(request, exit.value)) - } else if (Cause.isInterruptedOnly(exit.cause)) { - impl.resolve( - new Response(undefined, { - status: impl.source.signal.aborted ? 499 : 503 - }) - ) - } else { - impl.reject(Cause.pretty(exit.cause)) - } - }), middleware) - return pipe( FiberSet.makeRuntime(), - Effect.flatMap((runFork) => + Effect.bindTo("runFork"), + Effect.bind("runtime", () => Effect.runtime()), + Effect.let("app", ({ runtime }) => + App.toHandled(httpApp, (request, exit) => + Effect.sync(() => { + const impl = request as ServerRequestImpl + if (exit._tag === "Success") { + impl.resolve(makeResponse(request, exit.value, runtime)) + } else if (Cause.isInterruptedOnly(exit.cause)) { + impl.resolve( + new Response(undefined, { + status: impl.source.signal.aborted ? 499 : 503 + }) + ) + } else { + impl.reject(Cause.pretty(exit.cause)) + } + }), middleware)), + Effect.flatMap(({ app, runFork }) => Effect.async((_) => { function handler(request: Request, server: BunServer) { return new Promise((resolve, reject) => { @@ -121,7 +124,11 @@ export const make = ( }) }) -const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResponse.ServerResponse): Response => { +const makeResponse = ( + request: ServerRequest.ServerRequest, + response: ServerResponse.ServerResponse, + runtime: Runtime.Runtime +): Response => { const fields: { headers: globalThis.Headers status?: number @@ -157,7 +164,10 @@ const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResp return new Response(body.formData as any, fields) } case "Stream": { - return new Response(Stream.toReadableStream(body.stream), fields) + return new Response( + Stream.toReadableStreamRuntime(body.stream, runtime), + fields + ) } } } diff --git a/packages/platform/src/Http/ServerResponse.ts b/packages/platform/src/Http/ServerResponse.ts index 126b209a47..a84c3b06a8 100644 --- a/packages/platform/src/Http/ServerResponse.ts +++ b/packages/platform/src/Http/ServerResponse.ts @@ -165,7 +165,7 @@ export const formData: (body: FormData, options?: Options.WithContent | undefine * @since 1.0.0 * @category constructors */ -export const stream: (body: Stream.Stream, options?: Options | undefined) => ServerResponse = +export const stream: (body: Stream.Stream, options?: Options | undefined) => ServerResponse = internal.stream /** diff --git a/packages/platform/src/internal/http/client.ts b/packages/platform/src/internal/http/client.ts index 2be03beb81..ca4b08f806 100644 --- a/packages/platform/src/internal/http/client.ts +++ b/packages/platform/src/internal/http/client.ts @@ -14,7 +14,6 @@ import * as Ref from "effect/Ref" import type * as Schedule from "effect/Schedule" import * as Scope from "effect/Scope" import * as Stream from "effect/Stream" -import type * as Body from "../../Http/Body.js" import type * as Client from "../../Http/Client.js" import * as Error from "../../Http/ClientError.js" import type * as ClientRequest from "../../Http/ClientRequest.js" @@ -222,26 +221,19 @@ export const fetch: Client.Client.Default = makeDefault((request, url, signal, f (response) => internalResponse.fromWeb(request, response) ) if (Method.hasBody(request.method)) { - return send(convertBody(request.body)) + switch (request.body._tag) { + case "Raw": + case "Uint8Array": + return send(request.body.body as any) + case "FormData": + return send(request.body.formData) + case "Stream": + return Effect.flatMap(Stream.toReadableStreamEffect(request.body.stream), send) + } } return send(undefined) }) -const convertBody = (body: Body.Body): BodyInit | undefined => { - switch (body._tag) { - case "Empty": - return undefined - case "Raw": - return body.body as any - case "Uint8Array": - return body.body - case "FormData": - return body.formData - case "Stream": - return Stream.toReadableStream(body.stream) - } -} - /** @internal */ export const transform = dual< ( diff --git a/packages/platform/src/internal/http/serverResponse.ts b/packages/platform/src/internal/http/serverResponse.ts index e576c4ca96..3834e6f550 100644 --- a/packages/platform/src/internal/http/serverResponse.ts +++ b/packages/platform/src/internal/http/serverResponse.ts @@ -258,8 +258,8 @@ export const formData = ( ) /** @internal */ -export const stream = ( - body: Stream.Stream, +export const stream = ( + body: Stream.Stream, options?: ServerResponse.Options | undefined ): ServerResponse.ServerResponse => new ServerResponseImpl( diff --git a/packages/platform/test/HttpClient.test.ts b/packages/platform/test/HttpClient.test.ts index b513e990d4..15d02327d2 100644 --- a/packages/platform/test/HttpClient.test.ts +++ b/packages/platform/test/HttpClient.test.ts @@ -4,6 +4,7 @@ import { Ref } from "effect" import * as Context from "effect/Context" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" +import * as Logger from "effect/Logger" import * as Stream from "effect/Stream" import { assert, describe, expect, it } from "vitest" @@ -129,4 +130,26 @@ describe("HttpClient", () => { const response = yield* _(Http.request.get("/todos/1"), todoClient) expect(response.id).toBe(1) }).pipe(Effect.provide(Http.client.layer), Effect.runPromise)) + + it("streamBody accesses the current runtime", () => + Effect.gen(function*(_) { + const defaultClient = yield* _(Http.client.Client) + + const requestStream = Stream.fromIterable(["hello", "world"]).pipe( + Stream.tap((_) => Effect.log(_)), + Stream.encodeText + ) + + const logs: Array = [] + const logger = Logger.make(({ message }) => logs.push(message)) + + yield* Http.request.post("https://jsonplaceholder.typicode.com").pipe( + Http.request.streamBody(requestStream), + defaultClient, + Effect.provide(Logger.replace(Logger.defaultLogger, logger)), + Effect.scoped + ) + + expect(logs).toEqual(["hello", "world"]) + }).pipe(Effect.provide(Http.client.layer), Effect.runPromise)) })