-
-
Notifications
You must be signed in to change notification settings - Fork 249
/
Socket.test.ts
125 lines (111 loc) · 3.9 KB
/
Socket.test.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import * as SocketServer from "@effect/experimental/SocketServer/Node"
import * as NodeSocket from "@effect/platform-node/NodeSocket"
import * as Socket from "@effect/platform/Socket"
import { assert, describe, expect, it } from "@effect/vitest"
import { Chunk, Effect, Fiber, Queue, Stream } from "effect"
import WS from "vitest-websocket-mock"
const makeServer = Effect.gen(function*(_) {
const server = yield* _(SocketServer.make({ port: 0 }))
yield* _(
server.run((socket) =>
Effect.gen(function*(_) {
const write = yield* _(socket.writer)
yield* _(socket.run(write))
}).pipe(Effect.scoped)
),
Effect.forkScoped
)
return server
})
describe("Socket", () => {
it.scoped("open", () =>
Effect.gen(function*(_) {
const server = yield* _(makeServer)
const address = yield* _(server.address)
const channel = NodeSocket.makeNetChannel({ port: (address as SocketServer.TcpAddress).port })
const outputEffect = Stream.make("Hello", "World").pipe(
Stream.encodeText,
Stream.pipeThroughChannel(channel),
Stream.decodeText(),
Stream.mkString,
Stream.runCollect
)
const output = yield* _(outputEffect)
assert.strictEqual(Chunk.join(output, ""), "HelloWorld")
}))
describe("WebSocket", () => {
const url = `ws://localhost:1234`
const makeServer = Effect.acquireRelease(
Effect.sync(() => new WS(url)),
(ws) =>
Effect.sync(() => {
ws.close()
WS.clean()
})
)
it.effect("messages", () =>
Effect.gen(function*(_) {
const server = yield* _(makeServer)
const socket = yield* _(Socket.makeWebSocket(Effect.succeed(url)))
const messages = yield* _(Queue.unbounded<Uint8Array>())
const fiber = yield* _(Effect.fork(socket.run((_) => messages.offer(_))))
yield* _(
Effect.gen(function*(_) {
const write = yield* _(socket.writer)
yield* _(write(new TextEncoder().encode("Hello")))
yield* _(write(new TextEncoder().encode("World")))
}),
Effect.scoped
)
yield* _(Effect.promise(async () => {
await expect(server).toReceiveMessage(new TextEncoder().encode("Hello"))
await expect(server).toReceiveMessage(new TextEncoder().encode("World"))
}))
server.send("Right back at you!")
const message = yield* _(messages.take)
expect(message).toEqual(new TextEncoder().encode("Right back at you!"))
server.close()
const exit = yield* _(Fiber.join(fiber), Effect.exit)
expect(exit._tag).toEqual("Success")
}).pipe(
Effect.scoped,
Effect.provideService(Socket.WebSocketConstructor, (url) => new globalThis.WebSocket(url))
))
})
describe("TransformStream", () => {
it.effect("works", () =>
Effect.gen(function*() {
const readable = Stream.make("A", "B", "C").pipe(
Stream.tap(() => Effect.sleep(50)),
Stream.toReadableStream()
)
const decoder = new TextDecoder()
const chunks: Array<string> = []
const writable = new WritableStream<Uint8Array>({
write(chunk) {
chunks.push(decoder.decode(chunk))
}
})
const socket = yield* Socket.fromTransformStream(Effect.succeed({
readable,
writable
}))
yield* socket.writer.pipe(
Effect.tap((write) =>
write("Hello").pipe(
Effect.zipRight(write("World"))
)
),
Effect.scoped
)
const received: Array<string> = []
yield* socket.run((chunk) =>
Effect.sync(() => {
received.push(decoder.decode(chunk))
})
).pipe(Effect.scoped)
assert.deepStrictEqual(chunks, ["Hello", "World"])
assert.deepStrictEqual(received, ["A", "B", "C"])
}))
})
})