Skip to content

Commit

Permalink
fix(streaming): Fix stream SSE, not necessary close stream. (#2320)
Browse files Browse the repository at this point in the history
* 🐛 Fix stream SSE, not necessary close stream.

* ✨ Remove unused

* ✨ Add deno files
  • Loading branch information
damianpumar authored Mar 16, 2024
1 parent bc6330c commit b827efb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 41 deletions.
46 changes: 27 additions & 19 deletions deno_dist/helper/streaming/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,25 @@ export class SSEStreamingApi extends StreamingApi {
}
}

const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
const run = async (
stream: SSEStreamingApi,
cb: (stream: SSEStreamingApi) => Promise<void>,
onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>
) => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)

await stream.writeSSE({
event: 'error',
data: e.message,
})
} else {
console.error(e)
}
}
}

export const streamSSE = (
Expand All @@ -49,19 +63,13 @@ export const streamSSE = (
) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable, readable)
;(async () => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)
} else {
console.error(e)
}
} finally {
stream.close()
}
})()
setSSEHeaders(c)

c.header('Transfer-Encoding', 'chunked')
c.header('Content-Type', 'text/event-stream')
c.header('Cache-Control', 'no-cache')
c.header('Connection', 'keep-alive')

run(stream, cb, onError)

return c.newResponse(stream.responseReadable)
}
8 changes: 5 additions & 3 deletions src/helper/streaming/sse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ describe('SSE Streaming helper', () => {
const res = streamSSE(
c,
async () => {
throw new Error('error')
throw new Error('Test error')
},
onError
)
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const decoder = new TextDecoder()
const { value } = await reader.read()
expect(value).toBeUndefined()
const decodedValue = decoder.decode(value)
expect(decodedValue).toBe('event: error\ndata: Test error\n\n')
expect(onError).toBeCalledTimes(1)
expect(onError).toBeCalledWith(new Error('error'), expect.anything()) // 2nd argument is StreamingApi instance
expect(onError).toBeCalledWith(new Error('Test error'), expect.anything()) // 2nd argument is StreamingApi instance
})
})
46 changes: 27 additions & 19 deletions src/helper/streaming/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,25 @@ export class SSEStreamingApi extends StreamingApi {
}
}

const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
const run = async (
stream: SSEStreamingApi,
cb: (stream: SSEStreamingApi) => Promise<void>,
onError?: (e: Error, stream: SSEStreamingApi) => Promise<void>
) => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)

await stream.writeSSE({
event: 'error',
data: e.message,
})
} else {
console.error(e)
}
}
}

export const streamSSE = (
Expand All @@ -49,19 +63,13 @@ export const streamSSE = (
) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable, readable)
;(async () => {
try {
await cb(stream)
} catch (e) {
if (e instanceof Error && onError) {
await onError(e, stream)
} else {
console.error(e)
}
} finally {
stream.close()
}
})()
setSSEHeaders(c)

c.header('Transfer-Encoding', 'chunked')
c.header('Content-Type', 'text/event-stream')
c.header('Cache-Control', 'no-cache')
c.header('Connection', 'keep-alive')

run(stream, cb, onError)

return c.newResponse(stream.responseReadable)
}

0 comments on commit b827efb

Please sign in to comment.