Skip to content

Commit

Permalink
fix(platform): http client body stream runtime (#2694)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
2 people authored and fubhy committed May 16, 2024
1 parent 13f1113 commit 1cb5f98
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/bright-apricots-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Add Stream.toReadableStreamEffect / .toReadableStreamRuntime
5 changes: 5 additions & 0 deletions .changeset/seven-cougars-jump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

Run client request stream with a current runtime.
32 changes: 31 additions & 1 deletion packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type { Pipeable } from "./Pipeable.js"
import type { Predicate, Refinement } from "./Predicate.js"
import type * as PubSub from "./PubSub.js"
import type * as Queue from "./Queue.js"
import type { Runtime } from "./Runtime.js"
import type * as Schedule from "./Schedule.js"
import type * as Scope from "./Scope.js"
import type * as Sink from "./Sink.js"
Expand Down Expand Up @@ -3859,7 +3860,36 @@ export const toQueueOfElements: {
* @since 2.0.0
* @category destructors
*/
export const toReadableStream: <A, E>(source: Stream<A, E>) => ReadableStream<A> = internal.toReadableStream
export const toReadableStream: <A, E>(self: Stream<A, E>) => ReadableStream<A> = internal.toReadableStream

/**
* Converts the stream to a `Effect<ReadableStream>`.
*
* See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
*
* @since 2.0.0
* @category destructors
*/
export const toReadableStreamEffect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<ReadableStream<A>, never, R> =
internal.toReadableStreamEffect

/**
* Converts the stream to a `ReadableStream` using the provided runtime.
*
* See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
*
* @since 2.0.0
* @category destructors
*/
export const toReadableStreamRuntime: {
<XR>(
runtime: Runtime<XR>
): <A, E, R extends XR>(self: Stream<A, E, R>) => ReadableStream<A>
<A, E, XR, R extends XR>(
self: Stream<A, E, R>,
runtime: Runtime<XR>
): ReadableStream<A>
} = internal.toReadableStreamRuntime

/**
* Applies the transducer to the stream and emits its outputs.
Expand Down
32 changes: 23 additions & 9 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6541,16 +6541,30 @@ export const toQueueOfElements = dual<
))

/** @internal */
export const toReadableStream = <A, E>(source: Stream.Stream<A, E>) => {
let pull: Effect.Effect<void>
export const toReadableStream = <A, E>(self: Stream.Stream<A, E>) =>
toReadableStreamRuntime(self, Runtime.defaultRuntime)

/** @internal */
export const toReadableStreamEffect = <A, E, R>(self: Stream.Stream<A, E, R>) =>
Effect.map(Effect.runtime<R>(), (runtime) => toReadableStreamRuntime(self, runtime))

/** @internal */
export const toReadableStreamRuntime = dual<
<XR>(runtime: Runtime.Runtime<XR>) => <A, E, R extends XR>(self: Stream.Stream<A, E, R>) => ReadableStream<A>,
<A, E, XR, R extends XR>(self: Stream.Stream<A, E, R>, runtime: Runtime.Runtime<XR>) => ReadableStream<A>
>(2, <A, E, XR, R extends XR>(self: Stream.Stream<A, E, R>, runtime: Runtime.Runtime<XR>): ReadableStream<A> => {
const runSync = Runtime.runSync(runtime)
const runPromise = Runtime.runPromise(runtime)

let pull: Effect.Effect<void, never, R>
let scope: Scope.CloseableScope
return new ReadableStream<A>({
start(controller) {
scope = Effect.runSync(Scope.make())
scope = runSync(Scope.make())
pull = pipe(
toPull(source),
Scope.use(scope),
Effect.runSync,
toPull(self),
Scope.extend(scope),
runSync,
Effect.tap((chunk) =>
Effect.sync(() => {
Chunk.map(chunk, (a) => {
Expand All @@ -6573,13 +6587,13 @@ export const toReadableStream = <A, E>(source: Stream.Stream<A, E>) => {
)
},
pull() {
return Effect.runPromise(pull)
return runPromise(pull)
},
cancel() {
return Effect.runPromise(Scope.close(scope, Exit.void))
return runPromise(Scope.close(scope, Exit.void))
}
})
}
})

/** @internal */
export const transduce = dual<
Expand Down
48 changes: 29 additions & 19 deletions packages/platform-bun/src/internal/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import * as Inspectable from "effect/Inspectable"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import type { ReadonlyRecord } from "effect/Record"
import type * as Runtime from "effect/Runtime"
import type * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import { Readable } from "node:stream"
Expand Down Expand Up @@ -73,25 +74,27 @@ export const make = (
return Server.make({
address: { _tag: "TcpAddress", port: server.port, hostname: server.hostname },
serve(httpApp, middleware) {
const app = App.toHandled(httpApp, (request, exit) =>
Effect.sync(() => {
const impl = request as ServerRequestImpl
if (exit._tag === "Success") {
impl.resolve(makeResponse(request, exit.value))
} else if (Cause.isInterruptedOnly(exit.cause)) {
impl.resolve(
new Response(undefined, {
status: impl.source.signal.aborted ? 499 : 503
})
)
} else {
impl.reject(Cause.pretty(exit.cause))
}
}), middleware)

return pipe(
FiberSet.makeRuntime<never>(),
Effect.flatMap((runFork) =>
Effect.bindTo("runFork"),
Effect.bind("runtime", () => Effect.runtime<never>()),
Effect.let("app", ({ runtime }) =>
App.toHandled(httpApp, (request, exit) =>
Effect.sync(() => {
const impl = request as ServerRequestImpl
if (exit._tag === "Success") {
impl.resolve(makeResponse(request, exit.value, runtime))
} else if (Cause.isInterruptedOnly(exit.cause)) {
impl.resolve(
new Response(undefined, {
status: impl.source.signal.aborted ? 499 : 503
})
)
} else {
impl.reject(Cause.pretty(exit.cause))
}
}), middleware)),
Effect.flatMap(({ app, runFork }) =>
Effect.async<never>((_) => {
function handler(request: Request, server: BunServer) {
return new Promise<Response>((resolve, reject) => {
Expand Down Expand Up @@ -121,7 +124,11 @@ export const make = (
})
})

const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResponse.ServerResponse): Response => {
const makeResponse = (
request: ServerRequest.ServerRequest,
response: ServerResponse.ServerResponse,
runtime: Runtime.Runtime<never>
): Response => {
const fields: {
headers: globalThis.Headers
status?: number
Expand Down Expand Up @@ -157,7 +164,10 @@ const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResp
return new Response(body.formData as any, fields)
}
case "Stream": {
return new Response(Stream.toReadableStream(body.stream), fields)
return new Response(
Stream.toReadableStreamRuntime(body.stream, runtime),
fields
)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/platform/src/Http/ServerResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ export const formData: (body: FormData, options?: Options.WithContent | undefine
* @since 1.0.0
* @category constructors
*/
export const stream: (body: Stream.Stream<Uint8Array, unknown>, options?: Options | undefined) => ServerResponse =
export const stream: <E>(body: Stream.Stream<Uint8Array, E, never>, options?: Options | undefined) => ServerResponse =
internal.stream

/**
Expand Down
26 changes: 9 additions & 17 deletions packages/platform/src/internal/http/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import * as Ref from "effect/Ref"
import type * as Schedule from "effect/Schedule"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import type * as Body from "../../Http/Body.js"
import type * as Client from "../../Http/Client.js"
import * as Error from "../../Http/ClientError.js"
import type * as ClientRequest from "../../Http/ClientRequest.js"
Expand Down Expand Up @@ -222,26 +221,19 @@ export const fetch: Client.Client.Default = makeDefault((request, url, signal, f
(response) => internalResponse.fromWeb(request, response)
)
if (Method.hasBody(request.method)) {
return send(convertBody(request.body))
switch (request.body._tag) {
case "Raw":
case "Uint8Array":
return send(request.body.body as any)
case "FormData":
return send(request.body.formData)
case "Stream":
return Effect.flatMap(Stream.toReadableStreamEffect(request.body.stream), send)
}
}
return send(undefined)
})

const convertBody = (body: Body.Body): BodyInit | undefined => {
switch (body._tag) {
case "Empty":
return undefined
case "Raw":
return body.body as any
case "Uint8Array":
return body.body
case "FormData":
return body.formData
case "Stream":
return Stream.toReadableStream(body.stream)
}
}

/** @internal */
export const transform = dual<
<A, E, R, R1, E1, A1>(
Expand Down
4 changes: 2 additions & 2 deletions packages/platform/src/internal/http/serverResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ export const formData = (
)

/** @internal */
export const stream = (
body: Stream.Stream<Uint8Array, unknown>,
export const stream = <E>(
body: Stream.Stream<Uint8Array, E>,
options?: ServerResponse.Options | undefined
): ServerResponse.ServerResponse =>
new ServerResponseImpl(
Expand Down
23 changes: 23 additions & 0 deletions packages/platform/test/HttpClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { Ref } from "effect"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Logger from "effect/Logger"
import * as Stream from "effect/Stream"
import { assert, describe, expect, it } from "vitest"

Expand Down Expand Up @@ -129,4 +130,26 @@ describe("HttpClient", () => {
const response = yield* _(Http.request.get("/todos/1"), todoClient)
expect(response.id).toBe(1)
}).pipe(Effect.provide(Http.client.layer), Effect.runPromise))

it("streamBody accesses the current runtime", () =>
Effect.gen(function*(_) {
const defaultClient = yield* _(Http.client.Client)

const requestStream = Stream.fromIterable(["hello", "world"]).pipe(
Stream.tap((_) => Effect.log(_)),
Stream.encodeText
)

const logs: Array<unknown> = []
const logger = Logger.make(({ message }) => logs.push(message))

yield* Http.request.post("https://jsonplaceholder.typicode.com").pipe(
Http.request.streamBody(requestStream),
defaultClient,
Effect.provide(Logger.replace(Logger.defaultLogger, logger)),
Effect.scoped
)

expect(logs).toEqual(["hello", "world"])
}).pipe(Effect.provide(Http.client.layer), Effect.runPromise))
})

0 comments on commit 1cb5f98

Please sign in to comment.