Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tls: fix writeQueueSize prop, long write timeouts #15791

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/_tls_wrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -455,9 +455,8 @@ TLSSocket.prototype._init = function(socket, wrap) {
var ssl = this._handle;

// lib/net.js expect this value to be non-zero if write hasn't been flushed
// immediately
// TODO(indutny): revise this solution, it might be 1 before handshake and
// represent real writeQueueSize during regular writes.
// immediately. After the handshake is done this will represent the actual
// write queue size
ssl.writeQueueSize = 1;

this.server = options.server;
Expand Down
8 changes: 8 additions & 0 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {


Socket.prototype._onTimeout = function() {
// `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
// an active write in progress, so we suppress the timeout.
const prevWriteQueueSize = this._handle.writeQueueSize;
if (prevWriteQueueSize > 0 &&
prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
this._unrefTimer();
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unref? That doesn't quite match what I understood when I read "suppress the timeout"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll call _unrefActive so that it can come back and check this condition again in whatever the _idleTimeout is, if need be. Possible the comment is confusing but I'm obviously familiar with what it does so I'm a bad judge.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So … I might be wrong here, but judging from the naming I’d think _unrefActive only affects whether the timer handle is ref()ed or not, i.e. whether it is keeping the process alive or not … does that sound about right?

Copy link
Member Author

@apapirovski apapirovski Oct 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, missed the notification — _unrefActive re-schedules a timer for a timer that's a part of the unrefedLists.

node/lib/timers.js

Lines 141 to 151 in 686e092

// Schedule or re-schedule a timer.
// The item must have been enroll()'d first.
const active = exports.active = function(item) {
insert(item, false);
};
// Internal APIs that need timeouts should use `_unrefActive()` instead of
// `active()` so that they do not unnecessarily keep the process open.
exports._unrefActive = function(item) {
insert(item, true);
};

Not sure if that's a great explanation... ? Let me know if it isn't.

(Admittedly the name _unrefTimer in net.js is rather misleading given that it's more like _activeTimer but I didn't name it... :()

Also, let me know once you're done reviewing and if all looks good. Planning to merge this weekend.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, don’t feel like you need to wait on anything from me! There’s nothing obviously wrong in this PR.

And yes, this should just be renamed…

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Just being extra safe since this will go into LTS and is a fairly major change to something that's been broken for quite a while. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific time frame? You can just add a baking-for-lts label to indicate that the releaser (likely @gibfahn) should let this sit a while before pulling it in, if you would feel more comfortable with that.

(On the other hand, if you feel comfortable not waiting any longer than usual, feel free to voice that as well!)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's a reasonably serious bug fix, it hasn't broken CITGM and no one else has expressed desire for having it bake, I think having it go in at the next opportunity should be fine. Thanks again for reviewing ❤️

}
debug('_onTimeout');
this.emit('timeout');
};
Expand Down
22 changes: 18 additions & 4 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
Expand Down Expand Up @@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() {
}


void LibuvStreamWrap::UpdateWriteQueueSize() {
uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
Local<Integer> write_queue_size =
Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
object()->Set(env()->write_queue_size_string(), write_queue_size);
uint32_t write_queue_size = stream()->write_queue_size;
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


Expand Down Expand Up @@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}


void LibuvStreamWrap::UpdateWriteQueueSize(
const FunctionCallbackInfo<Value>& args) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
Expand Down
4 changes: 3 additions & 1 deletion src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}

AsyncWrap* GetAsyncWrap() override;
void UpdateWriteQueueSize();
uint32_t UpdateWriteQueueSize();

static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);

// Callbacks for libuv
Expand Down
34 changes: 32 additions & 2 deletions src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::String;
Expand Down Expand Up @@ -297,6 +298,7 @@ void TLSWrap::EncOut() {

// No data to write
if (BIO_pending(enc_out_) == 0) {
UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
Expand Down Expand Up @@ -551,6 +553,18 @@ bool TLSWrap::IsClosing() {
}


uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
HandleScope scope(env()->isolate());
if (write_queue_size == 0)
write_queue_size = BIO_pending(enc_out_);
object()->Set(env()->context(),
env()->write_queue_size_string(),
Integer::NewFromUnsigned(env()->isolate(),
write_queue_size)).FromJust();
return write_queue_size;
}


int TLSWrap::ReadStart() {
return stream_->ReadStart();
}
Expand Down Expand Up @@ -591,8 +605,12 @@ int TLSWrap::DoWrite(WriteWrap* w,
ClearOut();
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0)
if (BIO_pending(enc_out_) == 0) {
// net.js expects writeQueueSize to be > 0 if the write isn't
// immediately flushed
UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}

// Queue callback to execute it on next tick
Expand Down Expand Up @@ -642,13 +660,15 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Try writing data immediately
EncOut();
UpdateWriteQueueSize();

return 0;
}


void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
// Intentionally empty
TLSWrap* wrap = static_cast<TLSWrap*>(ctx);
wrap->UpdateWriteQueueSize();
}


Expand Down Expand Up @@ -912,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB


void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
TLSWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
args.GetReturnValue().Set(write_queue_size);
}


void TLSWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
Expand All @@ -938,6 +967,7 @@ void TLSWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);

StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
Expand Down
5 changes: 5 additions & 0 deletions src/tls_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class TLSWrap : public AsyncWrap,

AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() override;
uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);

// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
Expand Down Expand Up @@ -187,6 +188,10 @@ class TLSWrap : public AsyncWrap,
// If true - delivered EOF to the js-land, either after `close_notify`, or
// after the `UV_EOF` on socket.
bool eof_;

private:
static void UpdateWriteQueueSize(
const v8::FunctionCallbackInfo<v8::Value>& args);
};

} // namespace node
Expand Down
43 changes: 43 additions & 0 deletions test/parallel/test-tls-buffersize.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const assert = require('assert');
const fixtures = require('../common/fixtures');
const tls = require('tls');

const iter = 10;
const overhead = 30;

const server = tls.createServer({
key: fixtures.readKey('agent2-key.pem'),
cert: fixtures.readKey('agent2-cert.pem')
}, common.mustCall((socket) => {
socket.on('readable', common.mustCallAtLeast(() => {
socket.read();
}, 1));

socket.on('end', common.mustCall(() => {
server.close();
}));
}));

server.listen(0, common.mustCall(() => {
const client = tls.connect({
port: server.address().port,
rejectUnauthorized: false
}, common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);

for (let i = 1; i < iter; i++) {
client.write('a');
assert.strictEqual(client.bufferSize, i + overhead);
}

client.on('finish', common.mustCall(() => {
assert.strictEqual(client.bufferSize, 0);
}));

client.end();
}));
}));
80 changes: 80 additions & 0 deletions test/sequential/test-http-keep-alive-large-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');

// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
// To simulate a slow client, we write a really large chunk and
// then proceed through the following cycle:
// 1) Receive first 'data' event and record currently written size
// 2) Once we've read up to currently written size recorded above,
// we pause the stream and wait longer than the server timeout
// 3) Socket.prototype._onTimeout triggers and should confirm
// that the backing stream is still active and writing
// 4) Our timer fires, we resume the socket and start at 1)

const minReadSize = 250000;
const serverTimeout = common.platformTimeout(500);
let offsetTimeout = common.platformTimeout(100);
let serverConnectionHandle;
let writeSize = 3000000;
let didReceiveData = false;
// this represents each cycles write size, where the cycle consists
// of `write > read > _onTimeout`
let currentWriteSize = 0;

const server = http.createServer(common.mustCall((req, res) => {
const content = Buffer.alloc(writeSize, 0x44);

res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Length': content.length.toString(),
'Vary': 'Accept-Encoding'
});

serverConnectionHandle = res.socket._handle;
res.write(content);
res.end();
}));
server.setTimeout(serverTimeout);
server.on('timeout', () => {
assert.strictEqual(didReceiveData, false, 'Should not timeout');
});

server.listen(0, common.mustCall(() => {
http.get({
path: '/',
port: server.address().port
}, common.mustCall((res) => {
const resume = () => res.resume();
let receivedBufferLength = 0;
let firstReceivedAt;
res.on('data', common.mustCallAtLeast((buf) => {
if (receivedBufferLength === 0) {
currentWriteSize = Math.max(
minReadSize,
writeSize - serverConnectionHandle.writeQueueSize
);
didReceiveData = false;
firstReceivedAt = Date.now();
}
receivedBufferLength += buf.length;
if (receivedBufferLength >= currentWriteSize) {
didReceiveData = true;
writeSize = serverConnectionHandle.writeQueueSize;
receivedBufferLength = 0;
res.pause();
setTimeout(
resume,
serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you comment on why we need this formula to calculate the throttle time?

Copy link
Member Author

@apapirovski apapirovski Oct 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to always wait after the _onTimeout fires but not give it a chance to fire twice. On slower systems (or if this were in parallel) the processing of data can sometimes take long enough that after several iterations of reading, it's offset enough that two timeouts get a chance to fire. It's rare but I'm just being extra defensive here.

);
offsetTimeout = 0;
}
}, 1));
res.on('end', common.mustCall(() => {
server.close();
}));
}));
}));
Loading