Skip to content

Commit

Permalink
Ensure events always include the body, even for immediate responses
Browse files Browse the repository at this point in the history
This fixes a bug, where responses that completed immediately (a
synchronous handler, such that the response was ended before
setImmediate fired for the async event) did not include a body.

The issue is that Node intentionally resumes to dump the remaining
body from the request when the response is finished, if we haven't made
any attempt to read it at all (server.resOnFinish). We still want to
fire the events async, so they're independent of the processing
pipeline, but we need to synchronously start reading the body to ensure
Node never does this.

Note that this means we do need to actively grab the stream earlier in
handlers, since otherwise it runs through buffering, which may truncate
streams that go over length. Async handlers who _must_ receive the full
request body need to do so synchronously. If not, they can still get the
body, but it'll pass through truncation to maxSize en route. Probably
doesn't matter in practice since maxSize will be big enough that'll take
some time, but relevant for tiny maxSize in tests.
  • Loading branch information
pimterry committed Jan 27, 2023
1 parent 918dc96 commit c3e65b4
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
11 changes: 7 additions & 4 deletions src/rules/requests/request-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,10 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
let { method, url: reqUrl, rawHeaders } = clientReq as OngoingRequest;
let { protocol, hostname, port, path } = url.parse(reqUrl);

// We have to capture the request stream immediately, to make sure nothing is lost if it
// goes past its max length (truncating the data) before we start sending upstream.
const clientReqBody = clientReq.body.asStream();

const isH2Downstream = isHttp2(clientReq);

if (isLocalhostAddress(hostname) && clientReq.remoteIpAddress && !isLocalhostAddress(clientReq.remoteIpAddress)) {
Expand Down Expand Up @@ -951,15 +955,14 @@ export class PassThroughHandler extends PassThroughHandlerDefinition {
});

if (reqBodyOverride) {
clientReq.body.asStream().resume(); // Dump any remaining real request body
clientReqBody.resume(); // Dump any remaining real request body

if (reqBodyOverride.length > 0) serverReq.end(reqBodyOverride);
else serverReq.end(); // http2-wrapper fails given an empty buffer for methods that aren't allowed a body
} else {
// asStream includes all content, including the body before this call
const reqBodyStream = clientReq.body.asStream();
reqBodyStream.pipe(serverReq);
reqBodyStream.on('error', () => serverReq.abort());
clientReqBody.pipe(serverReq);
clientReqBody.on('error', () => serverReq.abort());
}

// If the downstream connection aborts, before the response has been completed,
Expand Down
36 changes: 18 additions & 18 deletions src/server/mockttp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,45 +314,45 @@ export class MockttpServer extends AbstractMockttp implements Mockttp {
}

private announceCompletedRequestAsync(request: OngoingRequest) {
setImmediate(() => {
waitForCompletedRequest(request)
.then((completedReq: CompletedRequest) => {
waitForCompletedRequest(request)
.then((completedReq: CompletedRequest) => {
setImmediate(() => {
this.eventEmitter.emit('request', Object.assign(
completedReq,
{
timingEvents: _.clone(completedReq.timingEvents),
tags: _.clone(completedReq.tags)
}
));
})
.catch(console.error);
});
});
})
.catch(console.error);
}

private announceResponseAsync(response: OngoingResponse | CompletedResponse) {
setImmediate(() => {
waitForCompletedResponse(response)
.then((res: CompletedResponse) => {
waitForCompletedResponse(response)
.then((res: CompletedResponse) => {
setImmediate(() => {
this.eventEmitter.emit('response', Object.assign(res, {
timingEvents: _.clone(res.timingEvents),
tags: _.clone(res.tags)
}));
})
.catch(console.error);
});
});
})
.catch(console.error);
}

private announceWebSocketRequestAsync(request: OngoingRequest) {
setImmediate(() => {
waitForCompletedRequest(request)
.then((completedReq: CompletedRequest) => {
waitForCompletedRequest(request)
.then((completedReq: CompletedRequest) => {
setImmediate(() => {
this.eventEmitter.emit('websocket-request', Object.assign(completedReq, {
timingEvents: _.clone(completedReq.timingEvents),
tags: _.clone(completedReq.tags)
}));
})
.catch(console.error);
});
});
})
.catch(console.error);
}

private announceWebSocketUpgradeAsync(response: CompletedResponse) {
Expand Down
20 changes: 19 additions & 1 deletion test/integration/subscriptions/request-events.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,11 @@ describe("Request initiated subscriptions", () => {

describe("Request subscriptions", () => {
describe("with a local server", () => {
let server = getLocal();
let server = getLocal({
// Disabling this exposes some possible bugs, notably that the body may not
// be captured if the response finishes the request immediately.
recordTraffic: false
});

beforeEach(() => server.start());
afterEach(() => server.stop());
Expand All @@ -261,6 +265,20 @@ describe("Request subscriptions", () => {
expect(seenRequest.tags).to.deep.equal([]);
});

it("should notify with the body even if the response does not wait for it", async () => {
// The only rule here does not process the request body at all, so it's not explicitly
// being read anywhere (except by our async event subscription)
await server.forAnyRequest().thenReply(200);

let seenRequestPromise = getDeferred<CompletedRequest>();
await server.on('request', (r) => seenRequestPromise.resolve(r));

fetch(server.urlFor("/mocked-endpoint"), { method: 'POST', body: 'body-text' });

let seenRequest = await seenRequestPromise;
expect(await seenRequest.body.getText()).to.equal('body-text');
});

it("should include the matched rule id", async () => {
let seenRequestPromise = getDeferred<CompletedRequest>();
await server.on('request', (r) => seenRequestPromise.resolve(r));
Expand Down

0 comments on commit c3e65b4

Please sign in to comment.