From adfe2b6aaf7b4fe085e7ac090e4531cb8d77fcc9 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 12 Oct 2017 06:57:42 -0700 Subject: [PATCH] net: fix timeouts during long writes Add updateWriteQueueSize which updates and returns queue size (net & tls). Make _onTimeout check whether an active write is ongoing and if so, call _unrefTimer rather than emitting a timeout event. Add http & https test that checks whether long-lasting (but active) writes timeout or can finish writing as expected. PR-URL: https://github.com/nodejs/node/pull/15791 Fixes: https://github.com/nodejs/node/issues/15082 Reviewed-By: Anna Henningsen Reviewed-By: Matteo Collina Reviewed-By: Refael Ackermann Reviewed-By: Fedor Indutny --- lib/net.js | 8 ++ src/stream_wrap.cc | 22 ++++- src/stream_wrap.h | 4 +- src/tls_wrap.cc | 10 +++ src/tls_wrap.h | 4 + .../test-http-keep-alive-large-write.js | 80 +++++++++++++++++ .../test-https-keep-alive-large-write.js | 87 +++++++++++++++++++ 7 files changed, 210 insertions(+), 5 deletions(-) create mode 100644 test/sequential/test-http-keep-alive-large-write.js create mode 100644 test/sequential/test-https-keep-alive-large-write.js diff --git a/lib/net.js b/lib/net.js index 53b9d33f48..5356357fc9 100644 --- a/lib/net.js +++ b/lib/net.js @@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { + // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + // an active write in progress, so we suppress the timeout. + const prevWriteQueueSize = this._handle.writeQueueSize; + if (prevWriteQueueSize > 0 && + prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + this._unrefTimer(); + return; + } debug('_onTimeout'); this.emit('timeout'); }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 660702eb35..0107cbad2d 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, void LibuvStreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { + env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } @@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() { } -void LibuvStreamWrap::UpdateWriteQueueSize() { +uint32_t LibuvStreamWrap::UpdateWriteQueueSize() { HandleScope scope(env()->isolate()); - Local write_queue_size = - Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size); - object()->Set(env()->write_queue_size_string(), write_queue_size); + uint32_t write_queue_size = stream()->write_queue_size; + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; } @@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle, } +void LibuvStreamWrap::UpdateWriteQueueSize( + const FunctionCallbackInfo& args) { + LibuvStreamWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo& args) { LibuvStreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index d8fbcf709a..43df504e81 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { } AsyncWrap* GetAsyncWrap() override; - void UpdateWriteQueueSize(); + uint32_t UpdateWriteQueueSize(); static void AddMethods(Environment* env, v8::Local target, int flags = StreamBase::kFlagNone); private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 0da332f16e..63e3494047 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -932,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB +void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo& args) { + TLSWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void TLSWrap::Initialize(Local target, Local unused, Local context) { @@ -958,6 +967,7 @@ void TLSWrap::Initialize(Local target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "destroySSL", DestroySSL); env->SetProtoMethod(t, "enableCertCb", EnableCertCb); + env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize); StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); SSLWrap::AddMethods(env, t); diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 8d75d6fcc9..99d2dc9121 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -188,6 +188,10 @@ class TLSWrap : public AsyncWrap, // If true - delivered EOF to the js-land, either after `close_notify`, or // after the `UV_EOF` on socket. bool eof_; + + private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); }; } // namespace node diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js new file mode 100644 index 0000000000..2cdf539e76 --- /dev/null +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -0,0 +1,80 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 3000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = http.createServer(common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + http.get({ + path: '/', + port: server.address().port + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js new file mode 100644 index 0000000000..88468dc03f --- /dev/null +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -0,0 +1,87 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const https = require('https'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const minReadSize = 250000; +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 2000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = https.createServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}, common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + https.get({ + path: '/', + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +}));