diff --git a/README.md b/README.md index 740ff75..1b6452a 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ npm install --save eventsource-client - Firefox >= 105 - Edge >= 79 -Basically, any environment that supports the [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) (with [pipeThrough](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/pipeThrough) support) and the [TextDecoderStream](https://developer.mozilla.org/en-US/docs/Web/API/TextDecoderStream) APIs. +Basically, any environment that supports the [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) and the [TextDecoder](https://developer.mozilla.org/en-US/docs/Web/API/TextDecoder) APIs. ## Usage (async iterator) @@ -96,7 +96,7 @@ es.close() - [ ] Configurable stalled connection detection (eg no data) - [ ] Configurable reconnection policy - [ ] Deno support/tests -- [ ] Bun support/tests (blocked: no `TextDecoderStream` support) +- [ ] Bun support/tests - [ ] Consider legacy build ## License diff --git a/src/client.ts b/src/client.ts index e1ac214..dc674e8 100644 --- a/src/client.ts +++ b/src/client.ts @@ -188,10 +188,9 @@ export function createEventSource( // Ensure that the response stream is a web stream // @todo Figure out a way to make this work without casting // eslint-disable-next-line @typescript-eslint/no-explicit-any - const bodyStream = getStream(body as any) + const stream = getStream(body as any) + const decoder = new TextDecoder() - // EventSources are always UTF-8 per spec - const stream = bodyStream.pipeThrough(new TextDecoderStream()) const reader = stream.getReader() let open = true @@ -199,8 +198,11 @@ export function createEventSource( do { const {done, value} = await reader.read() + if (value) { + parser.feed(decoder.decode(value, {stream: !done})) + } + if (!done) { - parser.feed(value) continue } diff --git a/test/fixtures.ts b/test/fixtures.ts new file mode 100644 index 0000000..d8dbc30 --- /dev/null +++ b/test/fixtures.ts @@ -0,0 +1,4 @@ +export const unicodeLines = [ + '🦄 are cool. 🐾 in the snow. Allyson Felix, 🏃🏽‍♀️ 🥇 2012 London!', + 'Espen ♥ Kokos', +] diff --git a/test/server.ts b/test/server.ts index bfc6d6e..f085e48 100644 --- a/test/server.ts +++ b/test/server.ts @@ -5,6 +5,8 @@ import {resolve as resolvePath} from 'node:path' import esbuild from 'esbuild' +import {unicodeLines} from './fixtures' + export function getServer(port: number): Promise { return new Promise((resolve, reject) => { const server = createServer(onRequest) @@ -14,6 +16,11 @@ export function getServer(port: number): Promise { } function onRequest(req: IncomingMessage, res: ServerResponse) { + // Disable Nagle's algorithm for testing + if (res.socket) { + res.socket.setNoDelay(true) + } + switch (req.url) { // Server-Sent Event endpoints case '/': @@ -36,6 +43,8 @@ function onRequest(req: IncomingMessage, res: ServerResponse) { return writeStalledConnection(req, res) case '/trickle': return writeTricklingConnection(req, res) + case '/unicode': + return writeUnicode(req, res) // Browser test endpoints (HTML/JS) case '/browser-test': @@ -170,6 +179,49 @@ async function writeStalledConnection(req: IncomingMessage, res: ServerResponse) // Intentionally not closing on first-connect that never sends data after welcome } +async function writeUnicode(_req: IncomingMessage, res: ServerResponse) { + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }) + + res.write( + formatEvent({ + event: 'welcome', + data: 'Connected - I will now send some chonks (cuter chunks) with unicode', + }), + ) + + res.write( + formatEvent({ + event: 'unicode', + data: unicodeLines[0], + }), + ) + + await delay(100) + + // Start of a valid SSE chunk + res.write('event: unicode\ndata: ') + + // Write "Espen ❤️ Kokos" in two halves: + // 1st: Espen � [..., 226, 153] + // 2st: � Kokos [165, 32, ...] + res.write(new Uint8Array([69, 115, 112, 101, 110, 32, 226, 153])) + + // Give time to the client to process the first half + await delay(1000) + + res.write(new Uint8Array([165, 32, 75, 111, 107, 111, 115])) + + // Closing end of packet + res.write('\n\n\n\n') + + res.write(formatEvent({event: 'disconnect', data: 'Thanks for listening'})) + res.end() +} + async function writeTricklingConnection(_req: IncomingMessage, res: ServerResponse) { res.writeHead(200, { 'Content-Type': 'text/event-stream', diff --git a/test/tests.ts b/test/tests.ts index 64aee22..8bd6583 100644 --- a/test/tests.ts +++ b/test/tests.ts @@ -1,5 +1,6 @@ import {CLOSED, CONNECTING, OPEN} from '../src/constants' -import type {createEventSource as CreateEventSourceFn} from '../src/default' +import type {createEventSource as CreateEventSourceFn, EventSourceMessage} from '../src/default' +import {unicodeLines} from './fixtures' import {deferClose, expect, getCallCounter} from './helpers' import type {TestRunner} from './waffletest' @@ -41,6 +42,31 @@ export function registerTests(options: { await deferClose(es) }) + test('can handle unicode data correctly', async () => { + const onMessage = getCallCounter() + const es = createEventSource({ + url: new URL(`${baseUrl}:${port}/unicode`), + fetch, + onMessage, + }) + + const messages: EventSourceMessage[] = [] + for await (const event of es) { + if (event.event === 'unicode') { + messages.push(event) + } + + if (messages.length === 2) { + break + } + } + + expect(messages[0].data).toBe(unicodeLines[0]) + expect(messages[1].data).toBe(unicodeLines[1]) + + await deferClose(es) + }) + test('will reconnect with last received message id if server disconnects', async () => { const onMessage = getCallCounter() const onDisconnect = getCallCounter()