diff --git a/packages/functions/src/service.ts b/packages/functions/src/service.ts index ba997c15246..e69fdb87c8b 100644 --- a/packages/functions/src/service.ts +++ b/packages/functions/src/service.ts @@ -447,11 +447,6 @@ async function streamAtURL( }; } - const reader = response.body!.getReader(); - const decoder = new TextDecoder(); - - const pendingLines: string[] = []; - let buffer = ''; let resultResolver: (value: unknown) => void; let resultRejecter: (reason: unknown) => void; @@ -461,7 +456,6 @@ async function streamAtURL( }); options?.signal?.addEventListener('abort', () => { - void reader.cancel(); const error = new FunctionsError( 'cancelled', 'Request was cancelled.' @@ -469,82 +463,110 @@ async function streamAtURL( resultRejecter(error); }); - const stream = { - [Symbol.asyncIterator]() { + const processLine = (line: string, controller: ReadableStreamDefaultController): void => { + // ignore all other lines (newline, comments, etc.) + if (!line.startsWith('data: ')) { + return; + } + try { + // Skip 'data: ' (5 chars) + const jsonData = JSON.parse(line.slice(6)); + if ('result' in jsonData) { + resultResolver(decode(jsonData.result)); + return; + } + if ('message' in jsonData) { + controller.enqueue(decode(jsonData.message)); + return; + } + if ('error' in jsonData) { + const error = _errorForResponse(0, jsonData); + controller.error(error); + resultRejecter(error); + return; + } + } catch (error) { + if (error instanceof FunctionsError) { + controller.error(error); + resultRejecter(error); + return; + } + // ignore other parsing errors + } + }; - const processLine = (line: string | undefined): { done: boolean, value: unknown } | null => { - // ignore all other lines (newline, comments, etc.) - if (!line?.startsWith('data: ')) { - return null; + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + const rstream = new ReadableStream({ + start(controller) { + let currentText = ''; + return pump(); + async function pump(): Promise { + if (options?.signal?.aborted) { + const error = new FunctionsError('cancelled', 'Request was cancelled'); + controller.error(error); + resultRejecter(error); + return Promise.resolve(); } try { - const jsonData = JSON.parse(line.slice(6)); - if ('result' in jsonData) { - resultResolver(decode(jsonData.result)); - return { done: true, value: undefined }; - } - if ('message' in jsonData) { - return { done: false, value: decode(jsonData.message) }; - } - if ('error' in jsonData) { - const error = _errorForResponse(0, jsonData); - resultRejecter(error); - throw error; - } - return null; // Unrecognize keys. Skip this line. - } catch (error) { - if (error instanceof FunctionsError) { - throw error; + const { value, done } = await reader.read(); + if (done) { + if (currentText.trim()) { + processLine(currentText.trim(), controller); + } + controller.close(); + return; } - // ignore other parsing error - return null; - } - }; - return { - async next() { + if (options?.signal?.aborted) { - const error = new FunctionsError( - 'cancelled', - 'Request was cancelled.' - ); + const error = new FunctionsError('cancelled', 'Request was cancelled'); + controller.error(error); resultRejecter(error); - throw error; - } - - while (pendingLines.length > 0) { - const result = processLine(pendingLines.shift()); - if (result) { return result; } + await reader.cancel(); + return; } - while (true) { - const { value, done } = await reader.read(); - - if (done) { - if (buffer.trim()) { - const result = processLine(buffer); - if (result) { return result; } - } - return { done: true, value: undefined }; - } - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - pendingLines.push(...lines.filter(line => line.trim())); + currentText += decoder.decode(value, { stream: true }); + const lines = currentText.split("\n"); + currentText = lines.pop() || ''; - if (pendingLines.length > 0) { - const result = processLine(pendingLines.shift()); - if (result) { return result; } + for (const line of lines) { + if (line.trim()) { + processLine(line.trim(), controller); } } - } - }; + return pump(); + } catch (error) { + const functionsError = error instanceof FunctionsError + ? error + : _errorForResponse(0, null); + controller.error(functionsError); + resultRejecter(functionsError); + }; + } + }, + cancel() { + return reader.cancel(); } - }; + }); return { - stream, + stream: { + [Symbol.asyncIterator]() { + const rreader = rstream.getReader(); + return { + async next() { + const { value, done } = await rreader.read(); + return { value: value as unknown, done }; + }, + async return() { + await reader.cancel(); + return { done: true, value: undefined }; + } + }; + } + }, data: resultPromise, }; }