From 4b94697aad08ade71858b0eeaa4fc508efa7b3b1 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 5 Oct 2017 16:16:20 -0400 Subject: [PATCH 1/3] tls: properly track writeQueueSize during writes Make writeQueueSize represent the actual size of the write queue within the TLS socket. Add tls test to confirm that bufferSize works as expected. Fixes: https://github.com/nodejs/node/issues/15005 Refs: https://github.com/nodejs/node/pull/15006 --- lib/_tls_wrap.js | 5 ++-- src/tls_wrap.cc | 24 ++++++++++++++-- src/tls_wrap.h | 1 + test/parallel/test-tls-buffersize.js | 43 ++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-tls-buffersize.js diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js index b5ac8a5c9bcb9d..d7e349b239cb05 100644 --- a/lib/_tls_wrap.js +++ b/lib/_tls_wrap.js @@ -455,9 +455,8 @@ TLSSocket.prototype._init = function(socket, wrap) { var ssl = this._handle; // lib/net.js expect this value to be non-zero if write hasn't been flushed - // immediately - // TODO(indutny): revise this solution, it might be 1 before handshake and - // represent real writeQueueSize during regular writes. + // immediately. After the handshake is done this will represent the actual + // write queue size ssl.writeQueueSize = 1; this.server = options.server; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 738bd040718233..0da332f16eef48 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -42,6 +42,7 @@ using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; +using v8::Integer; using v8::Local; using v8::Object; using v8::String; @@ -297,6 +298,7 @@ void TLSWrap::EncOut() { // No data to write if (BIO_pending(enc_out_) == 0) { + UpdateWriteQueueSize(); if (clear_in_->Length() == 0) InvokeQueued(0); return; @@ -551,6 +553,18 @@ bool TLSWrap::IsClosing() { } +uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) { + HandleScope scope(env()->isolate()); + if (write_queue_size == 0) + write_queue_size = BIO_pending(enc_out_); + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; +} + + int TLSWrap::ReadStart() { return stream_->ReadStart(); } @@ -591,8 +605,12 @@ int TLSWrap::DoWrite(WriteWrap* w, ClearOut(); // However, if there is any data that should be written to the socket, // the callback should not be invoked immediately - if (BIO_pending(enc_out_) == 0) + if (BIO_pending(enc_out_) == 0) { + // net.js expects writeQueueSize to be > 0 if the write isn't + // immediately flushed + UpdateWriteQueueSize(1); return stream_->DoWrite(w, bufs, count, send_handle); + } } // Queue callback to execute it on next tick @@ -642,13 +660,15 @@ int TLSWrap::DoWrite(WriteWrap* w, // Try writing data immediately EncOut(); + UpdateWriteQueueSize(); return 0; } void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - // Intentionally empty + TLSWrap* wrap = static_cast(ctx); + wrap->UpdateWriteQueueSize(); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index fe3abe04f5556a..8d75d6fcc90cc7 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -132,6 +132,7 @@ class TLSWrap : public AsyncWrap, AsyncWrap* GetAsyncWrap() override; bool IsIPCPipe() override; + uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation static void OnAfterWriteImpl(WriteWrap* w, void* ctx); diff --git a/test/parallel/test-tls-buffersize.js b/test/parallel/test-tls-buffersize.js new file mode 100644 index 00000000000000..49848cd865aca5 --- /dev/null +++ b/test/parallel/test-tls-buffersize.js @@ -0,0 +1,43 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const tls = require('tls'); + +const iter = 10; +const overhead = 30; + +const server = tls.createServer({ + key: fixtures.readKey('agent2-key.pem'), + cert: fixtures.readKey('agent2-cert.pem') +}, common.mustCall((socket) => { + socket.on('readable', common.mustCallAtLeast(() => { + socket.read(); + }, 1)); + + socket.on('end', common.mustCall(() => { + server.close(); + })); +})); + +server.listen(0, common.mustCall(() => { + const client = tls.connect({ + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + + for (let i = 1; i < iter; i++) { + client.write('a'); + assert.strictEqual(client.bufferSize, i + overhead); + } + + client.on('finish', common.mustCall(() => { + assert.strictEqual(client.bufferSize, 0); + })); + + client.end(); + })); +})); From d101e7a763dfa037892d1e6adfd5f59c21bff816 Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Thu, 12 Oct 2017 06:57:42 -0700 Subject: [PATCH 2/3] net: fix timeouts during long writes Add updateWriteQueueSize which updates and returns queue size (net & tls). Make _onTimeout check whether an active write is ongoing and if so, call _unrefTimer rather than emitting a timeout event. Add http & https test that checks whether long-lasting (but active) writes timeout or can finish writing as expected. Fixes: https://github.com/nodejs/node/issues/15082 --- lib/net.js | 8 ++ src/stream_wrap.cc | 22 ++++- src/stream_wrap.h | 4 +- src/tls_wrap.cc | 10 +++ src/tls_wrap.h | 4 + .../test-http-keep-alive-large-write.js | 76 +++++++++++++++++ .../test-https-keep-alive-large-write.js | 83 +++++++++++++++++++ 7 files changed, 202 insertions(+), 5 deletions(-) create mode 100644 test/sequential/test-http-keep-alive-large-write.js create mode 100644 test/sequential/test-https-keep-alive-large-write.js diff --git a/lib/net.js b/lib/net.js index 3c97110e3fbe15..6fbc58b4b38830 100644 --- a/lib/net.js +++ b/lib/net.js @@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) { Socket.prototype._onTimeout = function() { + // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is + // an active write in progress, so we suppress the timeout. + const prevWriteQueueSize = this._handle.writeQueueSize; + if (prevWriteQueueSize > 0 && + prevWriteQueueSize !== this._handle.updateWriteQueueSize()) { + this._unrefTimer(); + return; + } debug('_onTimeout'); this.emit('timeout'); }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 660702eb354511..0107cbad2d9196 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, void LibuvStreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { + env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } @@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() { } -void LibuvStreamWrap::UpdateWriteQueueSize() { +uint32_t LibuvStreamWrap::UpdateWriteQueueSize() { HandleScope scope(env()->isolate()); - Local write_queue_size = - Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size); - object()->Set(env()->write_queue_size_string(), write_queue_size); + uint32_t write_queue_size = stream()->write_queue_size; + object()->Set(env()->context(), + env()->write_queue_size_string(), + Integer::NewFromUnsigned(env()->isolate(), + write_queue_size)).FromJust(); + return write_queue_size; } @@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle, } +void LibuvStreamWrap::UpdateWriteQueueSize( + const FunctionCallbackInfo& args) { + LibuvStreamWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo& args) { LibuvStreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); diff --git a/src/stream_wrap.h b/src/stream_wrap.h index d8fbcf709a8eeb..43df504e81b86e 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { } AsyncWrap* GetAsyncWrap() override; - void UpdateWriteQueueSize(); + uint32_t UpdateWriteQueueSize(); static void AddMethods(Environment* env, v8::Local target, int flags = StreamBase::kFlagNone); private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); static void SetBlocking(const v8::FunctionCallbackInfo& args); // Callbacks for libuv diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 0da332f16eef48..63e3494047a21e 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -932,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) { #endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB +void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo& args) { + TLSWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); + + uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); + args.GetReturnValue().Set(write_queue_size); +} + + void TLSWrap::Initialize(Local target, Local unused, Local context) { @@ -958,6 +967,7 @@ void TLSWrap::Initialize(Local target, env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks); env->SetProtoMethod(t, "destroySSL", DestroySSL); env->SetProtoMethod(t, "enableCertCb", EnableCertCb); + env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize); StreamBase::AddMethods(env, t, StreamBase::kFlagHasWritev); SSLWrap::AddMethods(env, t); diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 8d75d6fcc90cc7..99d2dc9121f139 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -188,6 +188,10 @@ class TLSWrap : public AsyncWrap, // If true - delivered EOF to the js-land, either after `close_notify`, or // after the `UV_EOF` on socket. bool eof_; + + private: + static void UpdateWriteQueueSize( + const v8::FunctionCallbackInfo& args); }; } // namespace node diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js new file mode 100644 index 00000000000000..02289f8bbd6697 --- /dev/null +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -0,0 +1,76 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 3000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = http.createServer(common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + http.get({ + path: '/', + port: server.address().port + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = writeSize - serverConnectionHandle.writeQueueSize; + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js new file mode 100644 index 00000000000000..bb7ed2a6ee029d --- /dev/null +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -0,0 +1,83 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const https = require('https'); + +// This test assesses whether long-running writes can complete +// or timeout because the socket is not aware that the backing +// stream is still writing. +// To simulate a slow client, we write a really large chunk and +// then proceed through the following cycle: +// 1) Receive first 'data' event and record currently written size +// 2) Once we've read up to currently written size recorded above, +// we pause the stream and wait longer than the server timeout +// 3) Socket.prototype._onTimeout triggers and should confirm +// that the backing stream is still active and writing +// 4) Our timer fires, we resume the socket and start at 1) + +const serverTimeout = common.platformTimeout(500); +let offsetTimeout = common.platformTimeout(100); +let serverConnectionHandle; +let writeSize = 2000000; +let didReceiveData = false; +// this represents each cycles write size, where the cycle consists +// of `write > read > _onTimeout` +let currentWriteSize = 0; + +const server = https.createServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}, common.mustCall((req, res) => { + const content = Buffer.alloc(writeSize, 0x44); + + res.writeHead(200, { + 'Content-Type': 'application/octet-stream', + 'Content-Length': content.length.toString(), + 'Vary': 'Accept-Encoding' + }); + + serverConnectionHandle = res.socket._handle; + res.write(content); + res.end(); +})); +server.setTimeout(serverTimeout); +server.on('timeout', () => { + assert.strictEqual(didReceiveData, false, 'Should not timeout'); +}); + +server.listen(0, common.mustCall(() => { + https.get({ + path: '/', + port: server.address().port, + rejectUnauthorized: false + }, common.mustCall((res) => { + const resume = () => res.resume(); + let receivedBufferLength = 0; + let firstReceivedAt; + res.on('data', common.mustCallAtLeast((buf) => { + if (receivedBufferLength === 0) { + currentWriteSize = writeSize - serverConnectionHandle.writeQueueSize; + didReceiveData = false; + firstReceivedAt = Date.now(); + } + receivedBufferLength += buf.length; + if (receivedBufferLength >= currentWriteSize) { + didReceiveData = true; + writeSize = serverConnectionHandle.writeQueueSize; + receivedBufferLength = 0; + res.pause(); + setTimeout( + resume, + serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt) + ); + offsetTimeout = 0; + } + }, 1)); + res.on('end', common.mustCall(() => { + server.close(); + })); + })); +})); From 5ba063bdea8348c11d60ccb02f160241073074df Mon Sep 17 00:00:00 2001 From: Anatoli Papirovski Date: Wed, 18 Oct 2017 17:55:30 -0400 Subject: [PATCH 3/3] !fixup test: specify minReadSize for tests --- test/sequential/test-http-keep-alive-large-write.js | 6 +++++- test/sequential/test-https-keep-alive-large-write.js | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js index 02289f8bbd6697..2cdf539e76b2fc 100644 --- a/test/sequential/test-http-keep-alive-large-write.js +++ b/test/sequential/test-http-keep-alive-large-write.js @@ -15,6 +15,7 @@ const http = require('http'); // that the backing stream is still active and writing // 4) Our timer fires, we resume the socket and start at 1) +const minReadSize = 250000; const serverTimeout = common.platformTimeout(500); let offsetTimeout = common.platformTimeout(100); let serverConnectionHandle; @@ -52,7 +53,10 @@ server.listen(0, common.mustCall(() => { let firstReceivedAt; res.on('data', common.mustCallAtLeast((buf) => { if (receivedBufferLength === 0) { - currentWriteSize = writeSize - serverConnectionHandle.writeQueueSize; + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); didReceiveData = false; firstReceivedAt = Date.now(); } diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js index bb7ed2a6ee029d..88468dc03fc99b 100644 --- a/test/sequential/test-https-keep-alive-large-write.js +++ b/test/sequential/test-https-keep-alive-large-write.js @@ -18,6 +18,7 @@ const https = require('https'); // that the backing stream is still active and writing // 4) Our timer fires, we resume the socket and start at 1) +const minReadSize = 250000; const serverTimeout = common.platformTimeout(500); let offsetTimeout = common.platformTimeout(100); let serverConnectionHandle; @@ -59,7 +60,10 @@ server.listen(0, common.mustCall(() => { let firstReceivedAt; res.on('data', common.mustCallAtLeast((buf) => { if (receivedBufferLength === 0) { - currentWriteSize = writeSize - serverConnectionHandle.writeQueueSize; + currentWriteSize = Math.max( + minReadSize, + writeSize - serverConnectionHandle.writeQueueSize + ); didReceiveData = false; firstReceivedAt = Date.now(); }