Skip to content

Commit

Permalink
http: report request start and end with diagnostics_channel
Browse files Browse the repository at this point in the history
PR-URL: #34895
Reviewed-By: Bryan English <bryan@bryanenglish.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
Reviewed-By: Vladimir de Turckheim <vlad2t@hotmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: Gabriel Schulhof <gabriel.schulhof@intel.com>
Reviewed-By: Michael Dawson <midawson@redhat.com>
  • Loading branch information
Qard authored and targos committed Nov 3, 2020
1 parent b38a43d commit f861733
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 0 deletions.
96 changes: 96 additions & 0 deletions benchmark/diagnostics_channel/http.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
'use strict';
const common = require('../common.js');
const dc = require('diagnostics_channel');
const { AsyncLocalStorage } = require('async_hooks');
const http = require('http');

const bench = common.createBenchmark(main, {
apm: ['none', 'diagnostics_channel', 'patch'],
type: 'buffer',
len: 1024,
chunks: 4,
connections: [50, 500],
chunkedEnc: 1,
duration: 5
});

function main({ apm, connections, duration, type, len, chunks, chunkedEnc }) {
const done = { none, patch, diagnostics_channel }[apm]();

const server = require('../fixtures/simple-http-server.js')
.listen(common.PORT)
.on('listening', () => {
const path = `/${type}/${len}/${chunks}/normal/${chunkedEnc}`;
bench.http({
path,
connections,
duration
}, () => {
server.close();
if (done) done();
});
});
}

function none() {}

function patch() {
const als = new AsyncLocalStorage();
const times = [];

const { emit } = http.Server.prototype;
function wrappedEmit(...args) {
const [name, req, res] = args;
if (name === 'request') {
als.enterWith({
url: req.url,
start: process.hrtime.bigint()
});

res.on('finish', () => {
times.push({
...als.getStore(),
statusCode: res.statusCode,
end: process.hrtime.bigint()
});
});
}
return emit.apply(this, args);
}
http.Server.prototype.emit = wrappedEmit;

return () => {
http.Server.prototype.emit = emit;
};
}

function diagnostics_channel() {
const als = new AsyncLocalStorage();
const times = [];

const start = dc.channel('http.server.request.start');
const finish = dc.channel('http.server.response.finish');

function onStart(req) {
als.enterWith({
url: req.url,
start: process.hrtime.bigint()
});
}

function onFinish(res) {
times.push({
...als.getStore(),
statusCode: res.statusCode,
end: process.hrtime.bigint()
});
}

start.subscribe(onStart);
finish.subscribe(onFinish);

return () => {
start.unsubscribe(onStart);
finish.unsubscribe(onFinish);
};
}
22 changes: 22 additions & 0 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const { observerCounts, constants } = internalBinding('performance');
const { setTimeout, clearTimeout } = require('timers');
const { NODE_PERFORMANCE_ENTRY_TYPE_HTTP } = constants;

const dc = require('diagnostics_channel');
const onRequestStartChannel = dc.channel('http.server.request.start');
const onResponseFinishChannel = dc.channel('http.server.response.finish');

const kServerResponse = Symbol('ServerResponse');
const kServerResponseStatistics = Symbol('ServerResponseStatistics');

Expand Down Expand Up @@ -775,6 +779,15 @@ function clearRequestTimeout(req) {
}

function resOnFinish(req, res, socket, state, server) {
if (onResponseFinishChannel.hasSubscribers) {
onResponseFinishChannel.publish({
request: req,
response: res,
socket,
server
});
}

// Usually the first incoming element should be our request. it may
// be that in the case abortIncoming() was called that the incoming
// array will be empty.
Expand Down Expand Up @@ -862,6 +875,15 @@ function parserOnIncoming(server, socket, state, req, keepAlive) {
res.shouldKeepAlive = keepAlive;
DTRACE_HTTP_SERVER_REQUEST(req, socket);

if (onRequestStartChannel.hasSubscribers) {
onRequestStartChannel.publish({
request: req,
response: res,
socket,
server
});
}

if (socket._httpMessage) {
// There are already pending outgoing res, append.
state.outgoing.push(res);
Expand Down
65 changes: 65 additions & 0 deletions test/parallel/test-diagnostics-channel-http-server-start.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
'use strict';

const common = require('../common');
const { AsyncLocalStorage } = require('async_hooks');
const dc = require('diagnostics_channel');
const assert = require('assert');
const http = require('http');

const incomingStartChannel = dc.channel('http.server.request.start');
const outgoingFinishChannel = dc.channel('http.server.response.finish');

const als = new AsyncLocalStorage();
let context;

// Bind requests to an AsyncLocalStorage context
incomingStartChannel.subscribe(common.mustCall((message) => {
als.enterWith(message);
context = message;
}));

// When the request ends, verify the context has been maintained
// and that the messages contain the expected data
outgoingFinishChannel.subscribe(common.mustCall((message) => {
const data = {
request,
response,
server,
socket: request.socket
};

// Context is maintained
compare(als.getStore(), context);

compare(context, data);
compare(message, data);
}));

let request;
let response;

const server = http.createServer(common.mustCall((req, res) => {
request = req;
response = res;

setTimeout(() => {
res.end('done');
}, 1);
}));

server.listen(() => {
const { port } = server.address();
http.get(`http://localhost:${port}`, (res) => {
res.resume();
res.on('end', () => {
server.close();
});
});
});

function compare(a, b) {
assert.strictEqual(a.request, b.request);
assert.strictEqual(a.response, b.response);
assert.strictEqual(a.socket, b.socket);
assert.strictEqual(a.server, b.server);
}

0 comments on commit f861733

Please sign in to comment.