Skip to content

Commit

Permalink
socket wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 13, 2024
1 parent c850d9d commit bf5b9d1
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
14 changes: 9 additions & 5 deletions packages/platform-bun/src/internal/httpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,20 @@ class ServerRequestImpl extends Inspectable.Class implements ServerRequest.HttpS
})
const writer = Effect.succeed(write)
const runRaw = <R, E, _>(
handler: (_: Uint8Array | string) => Effect.Effect<_, E, R>
handler: (_: Uint8Array | string) => Effect.Effect<_, E, R> | void
): Effect.Effect<void, Socket.SocketError | E, R> =>
FiberSet.make<any, E>().pipe(
Effect.flatMap((set) =>
FiberSet.runtime(set)<R>().pipe(
Effect.flatMap((run) => {
ws.data.run = function(data: Uint8Array | string) {
run(handler(data))
function runRaw(data: Uint8Array | string) {
const result = handler(data)
if (Effect.isEffect(result)) {
run(result)
}
}
ws.data.buffer.forEach((data) => run(handler(data)))
ws.data.run = runRaw
ws.data.buffer.forEach(runRaw)
ws.data.buffer.length = 0
return FiberSet.join(set)
})
Expand All @@ -434,7 +438,7 @@ class ServerRequestImpl extends Inspectable.Class implements ServerRequest.HttpS
)

const encoder = new TextEncoder()
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) => typeof data === "string" ? handler(encoder.encode(data)) : handler(data))

return Socket.Socket.of({
Expand Down
7 changes: 5 additions & 2 deletions packages/platform-node-shared/src/NodeSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const fromDuplex = <RO>(
Effect.bindTo("sendQueue"),
Effect.bind("openContext", () => Effect.context<Exclude<RO, Scope.Scope>>()),
Effect.map(({ openContext, sendQueue }) => {
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
Effect.scope.pipe(
Effect.bindTo("scope"),
Effect.bind("conn", ({ scope }) =>
Expand Down Expand Up @@ -121,7 +121,10 @@ export const fromDuplex = <RO>(
),
Effect.tap(({ conn, fiberSet, run }) => {
conn.on("data", (chunk) => {
run(handler(chunk))
const result = handler(chunk)
if (Effect.isEffect(result)) {
run(result)
}
})

return Effect.async<void, Socket.SocketError, never>((resume) => {
Expand Down
27 changes: 16 additions & 11 deletions packages/platform/src/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ export const Socket: Context.Tag<Socket, Socket> = Context.GenericTag<Socket>(
*/
export interface Socket {
readonly [TypeId]: TypeId
readonly run: <_, E, R>(
handler: (_: Uint8Array) => Effect.Effect<_, E, R>
readonly run: <_, E = never, R = never>(
handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void
) => Effect.Effect<void, SocketError | E, R>
readonly runRaw: <_, E, R>(
handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>
readonly runRaw: <_, E = never, R = never>(
handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void
) => Effect.Effect<void, SocketError | E, R>
readonly writer: Effect.Effect<
(chunk: Uint8Array | string | CloseEvent) => Effect.Effect<boolean>,
Expand Down Expand Up @@ -218,7 +218,9 @@ export const toChannelMap = <IE, A>(
})
),
Effect.tap(({ mailbox, scope }) =>
self.runRaw((data) => mailbox.offer(f(data))).pipe(
self.runRaw((data) => {
mailbox.unsafeOffer(f(data))
}).pipe(
Mailbox.into(mailbox),
Effect.forkIn(scope),
Effect.interruptible
Expand Down Expand Up @@ -395,7 +397,7 @@ export const fromWebSocket = <R>(
(sendQueue) => {
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) =>
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
acquire.pipe(
Effect.bindTo("ws"),
Effect.bind("fiberSet", () => FiberSet.make<any, E | SocketError>()),
Expand All @@ -405,13 +407,16 @@ export const fromWebSocket = <R>(
let open = false

function onMessage(event: MessageEvent) {
run(handler(
const result = handler(
typeof event.data === "string"
? event.data
: event.data instanceof Uint8Array
? event.data
: new Uint8Array(event.data)
))
)
if (Effect.isEffect(result)) {
run(result)
}
}
function onError(cause: Event) {
ws.removeEventListener("message", onMessage)
Expand Down Expand Up @@ -493,7 +498,7 @@ export const fromWebSocket = <R>(
)

const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
Expand Down Expand Up @@ -579,7 +584,7 @@ export const fromTransformStream = <R>(acquire: Effect.Effect<InputTransformStre
(sendQueue) => {
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) =>
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R> | void) =>
acquire.pipe(
Effect.bindTo("stream"),
Effect.bind("reader", ({ stream }) =>
Expand Down Expand Up @@ -660,7 +665,7 @@ export const fromTransformStream = <R>(acquire: Effect.Effect<InputTransformStre
)

const encoder = new TextEncoder()
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
const run = <_, E, R>(handler: (_: Uint8Array) => Effect.Effect<_, E, R> | void) =>
runRaw((data) =>
typeof data === "string"
? handler(encoder.encode(data))
Expand Down

0 comments on commit bf5b9d1

Please sign in to comment.