Skip to content

Commit

Permalink
http2: fix condition where data is lost
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Feb 26, 2018
1 parent 3d93f39 commit c3dcc3c
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 14 deletions.
55 changes: 41 additions & 14 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,23 @@ function onStreamClose(code) {

if (state.fd !== undefined)
tryClose(state.fd);
stream.push(null);
stream[kMaybeDestroy](null, code);

// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](null, code);
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
// Push a null so the stream can end whenever the client consumes
// it completely.
stream.push(null);

// Same as net.
if (stream.readableLength === 0) {
stream.read(0);
}
}
}

// Receives a chunk of data for a given stream and forwards it on
Expand All @@ -326,11 +341,19 @@ function onStreamRead(nread, buf) {
}
return;
}

// Last chunk was received. End the readable side.
debug(`Http2Stream ${stream[kID]} [Http2Session ` +
`${sessionName(stream[kSession][kType])}]: ending readable.`);
stream.push(null);
stream[kMaybeDestroy]();

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
stream.push(null);
stream.read(0);
}
}

// Called when the remote peer settings have been updated.
Expand Down Expand Up @@ -1833,21 +1856,25 @@ class Http2Stream extends Duplex {
session[kMaybeDestroy]();
process.nextTick(emit, this, 'close', code);
callback(err);
}

}
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error == null) {
if (code === NGHTTP2_NO_ERROR &&
(!this._readableState.ended ||
!this._writableState.ended ||
this._writableState.pendingcb > 0 ||
!this.closed)) {
return;
}
if (error || code !== NGHTTP2_NO_ERROR) {
this.destroy(error);
return;
}

// TODO(mcollina): remove usage of _*State properties
if (this._readableState.ended &&
this._writableState.ended &&
this._writableState.pendingcb === 0 &&
this.closed) {
this.destroy();
// This should return, but eslint complains.
// return
}
this.destroy(error);
}
}

Expand Down
50 changes: 50 additions & 0 deletions test/parallel/test-http2-compat-short-stream-client-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const { Readable } = require('stream');

const server = http2.createServer(common.mustCall((req, res) => {
res.setHeader('content-type', 'text/html');
const input = new Readable({
read() {
this.push('test');
this.push(null);
}
});
input.pipe(res);
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const req = client.request();

req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[':status'], 200);
assert.strictEqual(headers['content-type'], 'text/html');
}));

let data = '';

const notCallClose = common.mustNotCall();

setTimeout(() => {
req.setEncoding('utf8');
req.removeListener('close', notCallClose);
req.on('close', common.mustCall(() => {
server.close();
client.close();
}));
req.on('data', common.mustCallAtLeast((d) => data += d));
req.on('end', common.mustCall(() => {
assert.strictEqual(data, 'test');
}));
}, common.platformTimeout(100));

req.on('close', notCallClose);
}));
55 changes: 55 additions & 0 deletions test/parallel/test-http2-short-stream-client-server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const http2 = require('http2');
const { Readable } = require('stream');

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({
':status': 200,
'content-type': 'text/html'
});
const input = new Readable({
read() {
this.push('test');
this.push(null);
}
});
input.pipe(stream);
}));


server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const req = client.request();

req.on('response', common.mustCall((headers) => {
assert.strictEqual(headers[':status'], 200);
assert.strictEqual(headers['content-type'], 'text/html');
}));

let data = '';

const notCallClose = common.mustNotCall();

setTimeout(() => {
req.setEncoding('utf8');
req.removeListener('close', notCallClose);
req.on('close', common.mustCall(() => {
server.close();
client.close();
}));
req.on('data', common.mustCallAtLeast((d) => data += d));
req.on('end', common.mustCall(() => {
assert.strictEqual(data, 'test');
}));
}, common.platformTimeout(100));

req.on('close', notCallClose);
}));

0 comments on commit c3dcc3c

Please sign in to comment.