Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
updated socket api
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Dec 25, 2023
1 parent 56db1c5 commit 0d94992
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 218 deletions.
14 changes: 4 additions & 10 deletions src/DevTools/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export const make = Effect.gen(function*(_) {

yield* _(clients.offer(client))

yield* _(
return yield* _(
Stream.fromQueue(responses),
Stream.pipeThroughChannel(
MsgPack.duplexSchema(Socket.toChannel(socket), {
Expand All @@ -84,19 +84,13 @@ export const make = Effect.gen(function*(_) {
]))
)
}).pipe(
Effect.catchAllCause(Effect.log),
Effect.fork
Effect.catchAllCause(Effect.log)
)

yield* _(
server.sockets.take,
Effect.flatMap(handle),
Effect.forever,
Effect.forkScoped
)
const run = server.run(handle)

return {
run: server.run,
run,
clients
} satisfies ServerImpl
})
128 changes: 67 additions & 61 deletions src/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
*/
import * as Cause from "effect/Cause"
import * as Channel from "effect/Channel"
import type * as Chunk from "effect/Chunk"
import * as Chunk from "effect/Chunk"
import * as Context from "effect/Context"
import * as Data from "effect/Data"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Exit from "effect/Exit"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import type * as AsyncProducer from "effect/SingleProducerAsyncInput"
import WebSocket from "isomorphic-ws"
Expand Down Expand Up @@ -40,10 +42,11 @@ export const Socket: Context.Tag<Socket, Socket> = Context.Tag<Socket>(
*/
export interface Socket {
readonly [SocketTypeId]: SocketTypeId
readonly run: Effect.Effect<never, SocketError, void>
readonly run: <R, E, _>(
handler: (_: Uint8Array) => Effect.Effect<R, E, _>
) => Effect.Effect<R, SocketError | E, void>
readonly writer: Effect.Effect<Scope.Scope, never, (chunk: Uint8Array) => Effect.Effect<never, never, void>>
readonly messages: Queue.Dequeue<Uint8Array>
readonly source?: unknown
// readonly messages: Queue.Dequeue<Uint8Array>
}

/**
Expand Down Expand Up @@ -95,18 +98,12 @@ export const toChannel = <IE>(
}

yield* _(
self.run,
self.run((data) => Queue.offer(exitQueue, Exit.succeed(Chunk.of(data)))),
Effect.zipRight(Effect.failCause(Cause.empty)),
Effect.exit,
Effect.tap((exit) => Queue.offer(exitQueue, exit)),
Effect.fork
)
yield* _(
Queue.takeBetween(self.messages, 1, Number.MAX_SAFE_INTEGER),
Effect.flatMap((chunk) => Queue.offer(exitQueue, Exit.succeed(chunk))),
Effect.forever,
Effect.fork
)

const loop: Channel.Channel<
never,
Expand Down Expand Up @@ -191,68 +188,77 @@ export const fromWebSocket = (
Effect.gen(function*(_) {
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const sendQueue = yield* _(Queue.unbounded<Uint8Array>())
const messages = yield* _(Queue.unbounded<Uint8Array>())

const run = Effect.gen(function*(_) {
const ws = yield* _(acquire)
const encoder = new TextEncoder()
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<R, E, _>) =>
Effect.gen(function*(_) {
const ws = yield* _(acquire)
const encoder = new TextEncoder()
const runtime = yield* _(Effect.runtime<R>())
const deferred = yield* _(Deferred.make<E, never>())
const run = Runtime.runFork(runtime)

ws.onmessage = (event) => {
Queue.unsafeOffer(
messages,
event.data instanceof Uint8Array
? event.data
: typeof event.data === "string"
? encoder.encode(event.data)
: new Uint8Array(event.data)
)
}
ws.onmessage = (event) => {
run(
Effect.catchAllCause(
handler(
event.data instanceof Uint8Array
? event.data
: typeof event.data === "string"
? encoder.encode(event.data)
: new Uint8Array(event.data)
),
(cause) => Deferred.failCause(deferred, cause)
)
)
}

if (ws.readyState !== WebSocket.OPEN) {
yield* _(Effect.async<never, SocketError, void>((resume) => {
ws.onopen = () => {
resume(Effect.unit)
}
ws.onerror = (error_) => {
resume(Effect.fail(new SocketError({ reason: "Open", error: (error_ as any).message })))
}
}))
}
if (ws.readyState !== WebSocket.OPEN) {
yield* _(Effect.async<never, SocketError, void>((resume) => {
ws.onopen = () => {
resume(Effect.unit)
}
ws.onerror = (error_) => {
resume(Effect.fail(new SocketError({ reason: "Open", error: (error_ as any).message })))
}
}))
}

yield* _(
Queue.take(sendQueue),
Effect.tap((chunk) =>
Effect.try({
try: () => ws.send(chunk),
catch: (error) => Effect.fail(new SocketError({ reason: "Write", error: (error as any).message }))
})
),
Effect.forever,
Effect.fork
)
yield* _(
Queue.take(sendQueue),
Effect.tap((chunk) =>
Effect.try({
try: () => ws.send(chunk),
catch: (error) => Effect.fail(new SocketError({ reason: "Write", error: (error as any).message }))
})
),
Effect.forever,
Effect.fork
)

yield* _(Effect.async<never, SocketError, void>((resume) => {
ws.onclose = (event) => {
if (closeCodeIsError(event.code)) {
resume(Effect.fail(new SocketError({ reason: "Close", error: event })))
} else {
resume(Effect.unit)
}
}
ws.onerror = (error) => {
resume(Effect.fail(new SocketError({ reason: "Read", error: (error as any).message })))
}
}))
}).pipe(Effect.scoped)
yield* _(Effect.race(
Effect.async<never, SocketError, void>((resume) => {
ws.onclose = (event) => {
if (closeCodeIsError(event.code)) {
resume(Effect.fail(new SocketError({ reason: "Close", error: event })))
} else {
resume(Effect.unit)
}
}
ws.onerror = (error) => {
resume(Effect.fail(new SocketError({ reason: "Read", error: (error as any).message })))
}
}),
Deferred.await(deferred)
))
}).pipe(Effect.scoped)

const write = (chunk: Uint8Array) => Queue.offer(sendQueue, chunk)
const writer = Effect.succeed(write)

return Socket.of({
[SocketTypeId]: SocketTypeId,
run,
writer,
messages
writer
})
})

Expand Down
82 changes: 46 additions & 36 deletions src/Socket/Node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
*/
import * as Channel from "effect/Channel"
import type * as Chunk from "effect/Chunk"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Fiber from "effect/Fiber"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Runtime from "effect/Runtime"
import type * as Scope from "effect/Scope"
import * as Net from "node:net"
import * as Socket from "../Socket.js"
Expand Down Expand Up @@ -59,41 +61,50 @@ export const fromNetSocket = (
): Effect.Effect<never, never, Socket.Socket> =>
Effect.gen(function*(_) {
const sendQueue = yield* _(Queue.unbounded<Uint8Array | typeof EOF>())
const messagesQueue = yield* _(Queue.unbounded<Uint8Array>())

const run = Effect.gen(function*(_) {
const conn = yield* _(open)
const writeFiber = yield* _(
Queue.take(sendQueue),
Effect.tap((chunk) =>
Effect.async<never, Socket.SocketError, void>((resume) => {
if (chunk === EOF) {
conn.end(() => resume(Effect.unit))
} else {
conn.write(chunk, (error) => {
resume(error ? Effect.fail(new Socket.SocketError({ reason: "Write", error })) : Effect.unit)
})
}
})
),
Effect.forever,
Effect.fork
)
conn.on("data", (chunk) => {
Queue.unsafeOffer(messagesQueue, chunk)
})
yield* _(
Effect.async<never, Socket.SocketError, void>((resume) => {
conn.on("end", () => {
resume(Effect.unit)
})
conn.on("error", (error) => {
resume(Effect.fail(new Socket.SocketError({ reason: "Read", error })))
})
}),
Effect.race(Fiber.join(writeFiber))
)
}).pipe(Effect.scoped)
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<R, E, _>) =>
Effect.gen(function*(_) {
const conn = yield* _(open)
const runtime = yield* _(Effect.runtime<R>())
const run = Runtime.runFork(runtime)
const deferred = yield* _(Deferred.make<E, never>())
const writeFiber = yield* _(
Queue.take(sendQueue),
Effect.tap((chunk) =>
Effect.async<never, Socket.SocketError, void>((resume) => {
if (chunk === EOF) {
conn.end(() => resume(Effect.unit))
} else {
conn.write(chunk, (error) => {
resume(error ? Effect.fail(new Socket.SocketError({ reason: "Write", error })) : Effect.unit)
})
}
})
),
Effect.forever,
Effect.fork
)
conn.on("data", (chunk) => {
run(
Effect.catchAllCause(
handler(chunk),
(cause) => Deferred.failCause(deferred, cause)
)
)
})
yield* _(Effect.raceAll([
Effect.async<never, Socket.SocketError | E, void>((resume) => {
conn.on("end", () => {
resume(Effect.unit)
})
conn.on("error", (error) => {
resume(Effect.fail(new Socket.SocketError({ reason: "Read", error })))
})
}),
Deferred.await(deferred),
Fiber.join(writeFiber)
]))
}).pipe(Effect.scoped)

const write = (chunk: Uint8Array) => Queue.offer(sendQueue, chunk)
const writer = Effect.acquireRelease(
Expand All @@ -104,8 +115,7 @@ export const fromNetSocket = (
return Socket.Socket.of({
[Socket.SocketTypeId]: Socket.SocketTypeId,
run,
writer,
messages: messagesQueue
writer
})
})

Expand Down
6 changes: 3 additions & 3 deletions src/SocketServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import * as Context from "effect/Context"
import * as Data from "effect/Data"
import type * as Effect from "effect/Effect"
import type * as Queue from "effect/Queue"
import type * as Socket from "./Socket.js"

/**
Expand Down Expand Up @@ -34,8 +33,9 @@ export const SocketServer: Context.Tag<SocketServer, SocketServer> = Context.Tag
export interface SocketServer {
readonly [SocketServerTypeId]: SocketServerTypeId
readonly address: Effect.Effect<never, never, Address>
readonly run: Effect.Effect<never, SocketServerError, never>
readonly sockets: Queue.Dequeue<Socket.Socket>
readonly run: <R, E, _>(
handler: (socket: Socket.Socket) => Effect.Effect<R, E, _>
) => Effect.Effect<R, SocketServerError | E, never>
}

/**
Expand Down
Loading

0 comments on commit 0d94992

Please sign in to comment.