From ee93406a22447ee9e59171f2519df305053ba431 Mon Sep 17 00:00:00 2001 From: vigneshshanmugam Date: Mon, 28 Nov 2022 14:40:38 -0800 Subject: [PATCH] fix: parse buffered NDJSON response from kibana correctly --- __tests__/push/index.test.ts | 6 ++++++ src/helpers.ts | 6 +++--- src/push/index.ts | 39 ++++++++++++++++++------------------ 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/__tests__/push/index.test.ts b/__tests__/push/index.test.ts index 9fa8fe672..36218ce18 100644 --- a/__tests__/push/index.test.ts +++ b/__tests__/push/index.test.ts @@ -263,6 +263,12 @@ heartbeat.monitors: req.on('data', chunks => { const schema = JSON.parse(chunks.toString()) as APISchema; res.write(JSON.stringify(schema.monitors[0].name) + '\n'); + if (!schema.keep_stale) { + // write more than the stream buffer to check the broken NDJSON data + res.write( + JSON.stringify(Buffer.from('a'.repeat(70000)).toString()) + '\n' + ); + } }); req.on('end', () => { res.end(JSON.stringify(apiRes)); diff --git a/src/helpers.ts b/src/helpers.ts index 46c023e64..f917a8279 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -322,10 +322,10 @@ export function wrapFnWithLocation( } // Safely parse ND JSON (Newline delimitted JSON) chunks -export function safeNDJSONParse(chunks: string[]) { - // chunks may not be at proper newline boundaries, so we make sure everything is split +export function safeNDJSONParse(data: string) { + // data may not be at proper newline boundaries, so we make sure everything is split // on proper newlines - chunks = Array.isArray(chunks) ? chunks : [chunks]; + const chunks = Array.isArray(data) ? data : [data]; const lines = chunks.join('\n').split(/\r?\n/); return lines .filter(l => l.match(/\S/)) // remove blank lines diff --git a/src/push/index.ts b/src/push/index.ts index 7f55e887e..18831d66f 100644 --- a/src/push/index.ts +++ b/src/push/index.ts @@ -110,26 +110,27 @@ export async function pushMonitors({ const { error, message } = await body.json(); throw formatAPIError(statusCode, error, message); } - body.setEncoding('utf-8'); + const allchunks = []; for await (const data of body) { - // Its kind of hacky for now where Kibana streams the response by - // writing the data as NDJSON events (data can be interleaved), we - // distinguish the final data by checking if the event was a progress vs complete event - const chunks = safeNDJSONParse(data); - for (const chunk of chunks) { - if (typeof chunk === 'string') { - // TODO: add progress back for all states once we get the fix - // on kibana side - keepStale && apiProgress(chunk); - continue; - } - const { failedMonitors, failedStaleMonitors } = chunk; - if (failedMonitors && failedMonitors.length > 0) { - throw formatFailedMonitors(failedMonitors); - } - if (failedStaleMonitors.length > 0) { - throw formatStaleMonitors(failedStaleMonitors); - } + allchunks.push(Buffer.from(data)); + } + const chunks = safeNDJSONParse(Buffer.concat(allchunks).toString('utf-8')); + // Its kind of hacky for now where Kibana streams the response by + // writing the data as NDJSON events (data can be interleaved), we + // distinguish the final data by checking if the event was a progress vs complete event + for (const chunk of chunks) { + if (typeof chunk === 'string') { + // TODO: add progress back for all states once we get the fix + // on kibana side + keepStale && apiProgress(chunk); + continue; + } + const { failedMonitors, failedStaleMonitors } = chunk; + if (failedMonitors && failedMonitors.length > 0) { + throw formatFailedMonitors(failedMonitors); + } + if (failedStaleMonitors.length > 0) { + throw formatStaleMonitors(failedStaleMonitors); } } }