From 37797f38d81b12d030ba85034baeb49192ea575c Mon Sep 17 00:00:00 2001 From: Mutasem Aldmour <4711238+mutdmour@users.noreply.github.com> Date: Fri, 16 Aug 2024 12:27:55 +0200 Subject: [PATCH] fix(editor): Buffer json chunks in stream response (#10439) --- packages/editor-ui/src/api/assistant.ts | 9 +- .../src/plugins/i18n/locales/en.json | 4 +- .../src/utils/__tests__/apiUtils.spec.ts | 112 ++++++++++++++++++ packages/editor-ui/src/utils/apiUtils.ts | 37 ++++-- 4 files changed, 147 insertions(+), 15 deletions(-) create mode 100644 packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts diff --git a/packages/editor-ui/src/api/assistant.ts b/packages/editor-ui/src/api/assistant.ts index d9e9ec0cded21..4b1d127ef15c6 100644 --- a/packages/editor-ui/src/api/assistant.ts +++ b/packages/editor-ui/src/api/assistant.ts @@ -9,7 +9,14 @@ export function chatWithAssistant( onDone: () => void, onError: (e: Error) => void, ): void { - void streamRequest(ctx, '/ai-assistant/chat', payload, onMessageUpdated, onDone, onError); + void streamRequest( + ctx, + '/ai-assistant/chat', + payload, + onMessageUpdated, + onDone, + onError, + ); } export async function replaceCode( diff --git a/packages/editor-ui/src/plugins/i18n/locales/en.json b/packages/editor-ui/src/plugins/i18n/locales/en.json index b082675d2d077..49b331e4c1787 100644 --- a/packages/editor-ui/src/plugins/i18n/locales/en.json +++ b/packages/editor-ui/src/plugins/i18n/locales/en.json @@ -131,7 +131,7 @@ "auth.signup.setupYourAccount": "Set up your account", "auth.signup.setupYourAccountError": "Problem setting up your account", "auth.signup.tokenValidationError": "Issue validating invite token", - "aiAssistant.name": "Ava", + "aiAssistant.name": "Assistant", "aiAssistant.assistant": "AI Assistant", "aiAssistant.newSessionModal.title.part1": "Start new", "aiAssistant.newSessionModal.title.part2": "session", @@ -139,7 +139,7 @@ "aiAssistant.newSessionModal.question": "Are you sure you want to start a new session?", "aiAssistant.newSessionModal.confirm": "Start new session", "aiAssistant.serviceError.message": "Unable to connect to n8n's AI service", - "aiAssistant.codeUpdated.message.title": "Ava modified workflow", + "aiAssistant.codeUpdated.message.title": "Assistant modified workflow", "aiAssistant.codeUpdated.message.body": "Open the {nodeName} node to see the changes", "banners.confirmEmail.message.1": "To secure your account and prevent future access issues, please confirm your", "banners.confirmEmail.message.2": "email address.", diff --git a/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts b/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts new file mode 100644 index 0000000000000..cefb6f3389bb7 --- /dev/null +++ b/packages/editor-ui/src/utils/__tests__/apiUtils.spec.ts @@ -0,0 +1,112 @@ +import { STREAM_SEPERATOR, streamRequest } from '../apiUtils'; + +describe('streamRequest', () => { + it('should stream data from the API endpoint', async () => { + const encoder = new TextEncoder(); + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}`)); + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 2 })}${STREAM_SEPERATOR}`)); + controller.enqueue(encoder.encode(`${JSON.stringify({ chunk: 3 })}${STREAM_SEPERATOR}`)); + controller.close(); + }, + }); + + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + body: mockResponse, + }); + + global.fetch = mockFetch; + + const onChunkMock = vi.fn(); + const onDoneMock = vi.fn(); + const onErrorMock = vi.fn(); + + await streamRequest( + { + baseUrl: 'https://api.example.com', + pushRef: '', + }, + '/data', + { key: 'value' }, + onChunkMock, + onDoneMock, + onErrorMock, + ); + + expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', { + method: 'POST', + body: JSON.stringify({ key: 'value' }), + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + 'browser-id': expect.stringContaining('-'), + }, + }); + + expect(onChunkMock).toHaveBeenCalledTimes(3); + expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 }); + expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 }); + expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 }); + + expect(onDoneMock).toHaveBeenCalledTimes(1); + expect(onErrorMock).not.toHaveBeenCalled(); + }); + + it('should handle broken stream data', async () => { + const encoder = new TextEncoder(); + const mockResponse = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode(`${JSON.stringify({ chunk: 1 })}${STREAM_SEPERATOR}{"chunk": `), + ); + controller.enqueue(encoder.encode(`2}${STREAM_SEPERATOR}{"ch`)); + controller.enqueue(encoder.encode('unk":')); + controller.enqueue(encoder.encode(`3}${STREAM_SEPERATOR}`)); + controller.close(); + }, + }); + + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + body: mockResponse, + }); + + global.fetch = mockFetch; + + const onChunkMock = vi.fn(); + const onDoneMock = vi.fn(); + const onErrorMock = vi.fn(); + + await streamRequest( + { + baseUrl: 'https://api.example.com', + pushRef: '', + }, + '/data', + { key: 'value' }, + onChunkMock, + onDoneMock, + onErrorMock, + ); + + expect(mockFetch).toHaveBeenCalledWith('https://api.example.com/data', { + method: 'POST', + body: JSON.stringify({ key: 'value' }), + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + 'browser-id': expect.stringContaining('-'), + }, + }); + + expect(onChunkMock).toHaveBeenCalledTimes(3); + expect(onChunkMock).toHaveBeenNthCalledWith(1, { chunk: 1 }); + expect(onChunkMock).toHaveBeenNthCalledWith(2, { chunk: 2 }); + expect(onChunkMock).toHaveBeenNthCalledWith(3, { chunk: 3 }); + + expect(onDoneMock).toHaveBeenCalledTimes(1); + expect(onErrorMock).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/editor-ui/src/utils/apiUtils.ts b/packages/editor-ui/src/utils/apiUtils.ts index e9201b7c331ea..b07f8910b90c8 100644 --- a/packages/editor-ui/src/utils/apiUtils.ts +++ b/packages/editor-ui/src/utils/apiUtils.ts @@ -3,7 +3,6 @@ import axios from 'axios'; import { ApplicationError, jsonParse, type GenericValue, type IDataObject } from 'n8n-workflow'; import type { IExecutionFlattedResponse, IExecutionResponse, IRestApiContext } from '@/Interface'; import { parse } from 'flatted'; -import type { ChatRequest } from '@/types/assistant.types'; import { assert } from '@/utils/assert'; const BROWSER_ID_STORAGE_KEY = 'n8n-browserId'; @@ -14,6 +13,7 @@ if (!browserId && 'randomUUID' in crypto) { } export const NO_NETWORK_ERROR_CODE = 999; +export const STREAM_SEPERATOR = '⧉⇋⇋➽⌑⧉§§\n'; export class ResponseError extends ApplicationError { // The HTTP status code of response @@ -194,15 +194,15 @@ export function unflattenExecutionData(fullExecutionData: IExecutionFlattedRespo return returnData; } -export const streamRequest = async ( +export async function streamRequest( context: IRestApiContext, apiEndpoint: string, - payload: ChatRequest.RequestPayload, - onChunk?: (chunk: ChatRequest.ResponsePayload) => void, + payload: object, + onChunk?: (chunk: T) => void, onDone?: () => void, onError?: (e: Error) => void, - separator = '⧉⇋⇋➽⌑⧉§§\n', -): Promise => { + separator = STREAM_SEPERATOR, +): Promise { const headers: Record = { 'Content-Type': 'application/json', }; @@ -223,23 +223,36 @@ export const streamRequest = async ( const reader = response.body.getReader(); const decoder = new TextDecoder('utf-8'); + let buffer = ''; + async function readStream() { const { done, value } = await reader.read(); if (done) { onDone?.(); return; } - const chunk = decoder.decode(value); - const splitChunks = chunk.split(separator); + buffer += chunk; + const splitChunks = buffer.split(separator); + + buffer = ''; for (const splitChunk of splitChunks) { - if (splitChunk && onChunk) { + if (splitChunk) { + let data: T; + try { + data = jsonParse(splitChunk, { errorMessage: 'Invalid json' }); + } catch (e) { + // incomplete json. append to buffer to complete + buffer += splitChunk; + + continue; + } + try { - onChunk(jsonParse(splitChunk, { errorMessage: 'Invalid json chunk in stream' })); + onChunk?.(data); } catch (e: unknown) { if (e instanceof Error) { - console.log(`${e.message}: ${splitChunk}`); onError?.(e); } } @@ -257,4 +270,4 @@ export const streamRequest = async ( assert(e instanceof Error); onError?.(e); } -}; +}