Skip to content

Commit

Permalink
fix: parse buffered NDJSON response from kibana correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
vigneshshanmugam committed Nov 28, 2022
1 parent 1de762d commit ee93406
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 22 deletions.
6 changes: 6 additions & 0 deletions __tests__/push/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ heartbeat.monitors:
req.on('data', chunks => {
const schema = JSON.parse(chunks.toString()) as APISchema;
res.write(JSON.stringify(schema.monitors[0].name) + '\n');
if (!schema.keep_stale) {
// write more than the stream buffer to check the broken NDJSON data
res.write(
JSON.stringify(Buffer.from('a'.repeat(70000)).toString()) + '\n'
);
}
});
req.on('end', () => {
res.end(JSON.stringify(apiRes));
Expand Down
6 changes: 3 additions & 3 deletions src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ export function wrapFnWithLocation<A extends unknown[], R>(
}

// Safely parse ND JSON (Newline delimitted JSON) chunks
export function safeNDJSONParse(chunks: string[]) {
// chunks may not be at proper newline boundaries, so we make sure everything is split
export function safeNDJSONParse(data: string) {
// data may not be at proper newline boundaries, so we make sure everything is split
// on proper newlines
chunks = Array.isArray(chunks) ? chunks : [chunks];
const chunks = Array.isArray(data) ? data : [data];
const lines = chunks.join('\n').split(/\r?\n/);
return lines
.filter(l => l.match(/\S/)) // remove blank lines
Expand Down
39 changes: 20 additions & 19 deletions src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,26 +110,27 @@ export async function pushMonitors({
const { error, message } = await body.json();
throw formatAPIError(statusCode, error, message);
}
body.setEncoding('utf-8');
const allchunks = [];
for await (const data of body) {
// Its kind of hacky for now where Kibana streams the response by
// writing the data as NDJSON events (data can be interleaved), we
// distinguish the final data by checking if the event was a progress vs complete event
const chunks = safeNDJSONParse(data);
for (const chunk of chunks) {
if (typeof chunk === 'string') {
// TODO: add progress back for all states once we get the fix
// on kibana side
keepStale && apiProgress(chunk);
continue;
}
const { failedMonitors, failedStaleMonitors } = chunk;
if (failedMonitors && failedMonitors.length > 0) {
throw formatFailedMonitors(failedMonitors);
}
if (failedStaleMonitors.length > 0) {
throw formatStaleMonitors(failedStaleMonitors);
}
allchunks.push(Buffer.from(data));
}
const chunks = safeNDJSONParse(Buffer.concat(allchunks).toString('utf-8'));
// Its kind of hacky for now where Kibana streams the response by
// writing the data as NDJSON events (data can be interleaved), we
// distinguish the final data by checking if the event was a progress vs complete event
for (const chunk of chunks) {
if (typeof chunk === 'string') {
// TODO: add progress back for all states once we get the fix
// on kibana side
keepStale && apiProgress(chunk);
continue;
}
const { failedMonitors, failedStaleMonitors } = chunk;
if (failedMonitors && failedMonitors.length > 0) {
throw formatFailedMonitors(failedMonitors);
}
if (failedStaleMonitors.length > 0) {
throw formatStaleMonitors(failedStaleMonitors);
}
}
}
Expand Down

0 comments on commit ee93406

Please sign in to comment.