Skip to content

Commit

Permalink
http2: pause input processing if sending output
Browse files Browse the repository at this point in the history
If we are waiting for the ability to send more output, we should not
process more input. This commit a) makes us send output earlier,
during processing of input, if we accumulate a lot and b) allows
interrupting the call into nghttp2 that processes input data
and resuming it at a later time, if we do find ourselves in a position
where we are waiting to be able to send more output.

This is part of mitigating CVE-2019-9511/CVE-2019-9517.

Backport-PR-URL: #29124
PR-URL: #29122
Reviewed-By: Rich Trott <rtrott@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and BethGriggs committed Aug 15, 2019
1 parent 854dba6 commit 6d687f7
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 45 deletions.
133 changes: 90 additions & 43 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -918,31 +918,51 @@ inline ssize_t Http2Session::OnCallbackPadding(size_t frameLen,
// various callback functions. Each of these will typically result in a call
// out to JavaScript so this particular function is rather hot and can be
// quite expensive. This is a potential performance optimization target later.
inline ssize_t Http2Session::Write(const uv_buf_t* bufs, size_t nbufs) {
size_t total = 0;
// Note that nghttp2_session_mem_recv is a synchronous operation that
// will trigger a number of other callbacks. Those will, in turn have
// multiple side effects.
for (size_t n = 0; n < nbufs; n++) {
DEBUG_HTTP2SESSION2(this, "receiving %d bytes [wants data? %d]",
bufs[n].len,
nghttp2_session_want_read(session_));
ssize_t ret =
nghttp2_session_mem_recv(session_,
reinterpret_cast<uint8_t*>(bufs[n].base),
bufs[n].len);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);

if (ret < 0)
return ret;
ssize_t Http2Session::ConsumeHTTP2Data() {
CHECK_NE(stream_buf_.base, nullptr);
CHECK_LT(stream_buf_offset_, stream_buf_.len);
size_t read_len = stream_buf_.len - stream_buf_offset_;

DEBUG_HTTP2SESSION2(this, "receiving %d bytes [wants data? %d]",
read_len,
nghttp2_session_want_read(session_));
flags_ &= ~SESSION_STATE_NGHTTP2_RECV_PAUSED;
ssize_t ret =
nghttp2_session_mem_recv(session_,
reinterpret_cast<uint8_t*>(stream_buf_.base) +
stream_buf_offset_,
read_len);
CHECK_NE(ret, NGHTTP2_ERR_NOMEM);

if (flags_ & SESSION_STATE_NGHTTP2_RECV_PAUSED) {
CHECK_NE(flags_ & SESSION_STATE_READING_STOPPED, 0);

CHECK_GT(ret, 0);
CHECK_LE(static_cast<size_t>(ret), read_len);

total += ret;
if (static_cast<size_t>(ret) < read_len) {
// Mark the remainder of the data as available for later consumption.
stream_buf_offset_ += ret;
return ret;
}
}

// We are done processing the current input chunk.
DecrementCurrentSessionMemory(stream_buf_.len);
stream_buf_offset_ = 0;
stream_buf_ab_.Reset();
free(stream_buf_allocation_.base);
stream_buf_allocation_ = uv_buf_init(nullptr, 0);
stream_buf_ = uv_buf_init(nullptr, 0);

if (ret < 0)
return ret;

// Send any data that was queued up while processing the received data.
if (!IsDestroyed()) {
SendPendingData();
}
return total;
return ret;
}


Expand Down Expand Up @@ -1238,6 +1258,7 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
size_t offset = data - reinterpret_cast<uint8_t*>(session->stream_buf_.base);

// Verify that the data offset is inside the current read buffer.
CHECK_GE(offset, session->stream_buf_offset_);
CHECK_LE(offset, session->stream_buf_.len);
CHECK_LE(offset + len, session->stream_buf_.len);

Expand All @@ -1250,6 +1271,16 @@ inline int Http2Session::OnDataChunkReceived(nghttp2_session* handle,
else
nghttp2_session_consume_stream(handle, id, len);

// If we have a gathered a lot of data for output, try sending it now.
if (session->outgoing_length_ > 4096) session->SendPendingData();

// If we are currently waiting for a write operation to finish, we should
// tell nghttp2 that we want to wait before we process more input data.
if (session->flags_ & SESSION_STATE_WRITE_IN_PROGRESS) {
session->flags_ |= SESSION_STATE_NGHTTP2_RECV_PAUSED;
return NGHTTP2_ERR_PAUSE;
}

return 0;
}

Expand Down Expand Up @@ -1597,6 +1628,11 @@ void Http2Session::OnStreamAfterWriteImpl(WriteWrap* w, int status, void* ctx) {
session->stream_->ReadStart();
}

// If there is more incoming data queued up, consume it.
if (session->stream_buf_offset_ > 0) {
session->ConsumeHTTP2Data();
}

if (!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// Schedule a new write if nghttp2 wants to send data.
session->MaybeScheduleWrite();
Expand Down Expand Up @@ -1654,6 +1690,7 @@ void Http2Session::ClearOutgoing(int status) {

if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();
outgoing_length_ = 0;

std::vector<nghttp2_stream_write> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
Expand All @@ -1680,6 +1717,11 @@ void Http2Session::ClearOutgoing(int status) {
}
}

void Http2Session::PushOutgoingBuffer(nghttp2_stream_write&& write) {
outgoing_length_ += write.buf.len;
outgoing_buffers_.emplace_back(std::move(write));
}

// Queue a given block of data for sending. This always creates a copy,
// so it is used for the cases in which nghttp2 requests sending of a
// small chunk of data.
Expand All @@ -1692,7 +1734,7 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) {
// of the outgoing_buffers_ vector may invalidate the pointer.
// The correct base pointers will be set later, before writing to the
// underlying socket.
outgoing_buffers_.emplace_back(nghttp2_stream_write {
PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(nullptr, src_length)
});
}
Expand Down Expand Up @@ -1826,13 +1868,13 @@ int Http2Session::OnSendData(
if (write.buf.len <= length) {
// This write does not suffice by itself, so we can consume it completely.
length -= write.buf.len;
session->outgoing_buffers_.emplace_back(std::move(write));
session->PushOutgoingBuffer(std::move(write));
stream->queue_.pop();
continue;
}

// Slice off `length` bytes of the first write in the queue.
session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
session->PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(write.buf.base, length)
});
write.buf.base += length;
Expand All @@ -1842,7 +1884,7 @@ int Http2Session::OnSendData(

if (frame->data.padlen > 0) {
// Send padding if that was requested.
session->outgoing_buffers_.emplace_back(nghttp2_stream_write {
session->PushOutgoingBuffer(nghttp2_stream_write {
uv_buf_init(const_cast<char*>(zero_bytes_256), frame->data.padlen - 1)
});
}
Expand Down Expand Up @@ -1896,8 +1938,6 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
Http2Scope h2scope(session);
CHECK_NE(session->stream_, nullptr);
DEBUG_HTTP2SESSION2(session, "receiving %d bytes", nread);
CHECK_EQ(session->stream_buf_allocation_.base, nullptr);
CHECK(session->stream_buf_ab_.IsEmpty());

// Only pass data on if nread > 0
if (nread <= 0) {
Expand All @@ -1913,26 +1953,34 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
return;
}

// Shrink to the actual amount of used data.
uv_buf_t buf = *buf_;
buf.base = Realloc(buf.base, nread);

session->IncrementCurrentSessionMemory(nread);
OnScopeLeave on_scope_leave([&]() {
// Once finished handling this write, reset the stream buffer.
// The memory has either been free()d or was handed over to V8.
// We use `nread` instead of `buf.size()` here, because the buffer is
// cleared as part of the `.ToArrayBuffer()` call below.
session->DecrementCurrentSessionMemory(nread);
session->statistics_.data_received += nread;

if (UNLIKELY(session->stream_buf_offset_ > 0)) {
// This is a very unlikely case, and should only happen if the ReadStart()
// call in OnStreamAfterWrite() immediately provides data. If that does
// happen, we concatenate the data we received with the already-stored
// pending input data, slicing off the already processed part.
char* new_buf = Malloc(
session->stream_buf_.len - session->stream_buf_offset_ + nread);
memcpy(new_buf,
session->stream_buf_.base + session->stream_buf_offset_,
session->stream_buf_.len - session->stream_buf_offset_);
memcpy(new_buf + session->stream_buf_.len - session->stream_buf_offset_,
buf.base,
nread);
free(buf.base);
nread = session->stream_buf_.len - session->stream_buf_offset_ + nread;
buf = uv_buf_init(new_buf, nread);
session->stream_buf_offset_ = 0;
session->stream_buf_ab_.Reset();
free(session->stream_buf_allocation_.base);
session->stream_buf_allocation_ = uv_buf_init(nullptr, 0);
session->stream_buf_ = uv_buf_init(nullptr, 0);
});
session->DecrementCurrentSessionMemory(session->stream_buf_offset_);
}

// Make sure that there was no read previously active.
CHECK_EQ(session->stream_buf_.base, nullptr);
CHECK_EQ(session->stream_buf_.len, 0);
// Shrink to the actual amount of used data.
buf.base = Realloc(buf.base, nread);
session->IncrementCurrentSessionMemory(nread);

// Remember the current buffer, so that OnDataChunkReceived knows the
// offset of a DATA frame's data into the socket read buffer.
Expand All @@ -1949,8 +1997,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
// to copy memory.
session->stream_buf_allocation_ = buf;

session->statistics_.data_received += nread;
ssize_t ret = session->Write(&session->stream_buf_, 1);
ssize_t ret = session->ConsumeHTTP2Data();

if (UNLIKELY(ret < 0)) {
DEBUG_HTTP2SESSION2(session, "fatal error receiving data: %d", ret);
Expand Down
13 changes: 11 additions & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ enum session_state_flags {
SESSION_STATE_SENDING = 0x10,
SESSION_STATE_WRITE_IN_PROGRESS = 0x20,
SESSION_STATE_READING_STOPPED = 0x40,
SESSION_STATE_NGHTTP2_RECV_PAUSED = 0x80
};

// This allows for 4 default-sized frames with their frame headers
Expand Down Expand Up @@ -831,8 +832,8 @@ class Http2Session : public AsyncWrap {
// Indicates whether there currently exist outgoing buffers for this stream.
bool HasWritesOnSocketForStream(Http2Stream* stream);

// Write data to the session
inline ssize_t Write(const uv_buf_t* bufs, size_t nbufs);
// Write data from stream_buf_ to the session
ssize_t ConsumeHTTP2Data();

size_t self_size() const override { return sizeof(*this); }

Expand Down Expand Up @@ -896,6 +897,9 @@ class Http2Session : public AsyncWrap {
}

void DecrementCurrentSessionMemory(uint64_t amount) {
#ifdef DEBUG
CHECK_LE(amount, current_session_memory_);
#endif
current_session_memory_ -= amount;
}

Expand Down Expand Up @@ -1061,8 +1065,11 @@ class Http2Session : public AsyncWrap {
uint32_t chunks_sent_since_last_write_ = 0;

uv_buf_t stream_buf_ = uv_buf_init(nullptr, 0);
// When processing input data, either stream_buf_ab_ or stream_buf_allocation_
// will be set. stream_buf_ab_ is lazily created from stream_buf_allocation_.
v8::Global<v8::ArrayBuffer> stream_buf_ab_;
uv_buf_t stream_buf_allocation_ = uv_buf_init(nullptr, 0);
size_t stream_buf_offset_ = 0;

size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;
Expand All @@ -1072,6 +1079,7 @@ class Http2Session : public AsyncWrap {

std::vector<nghttp2_stream_write> outgoing_buffers_;
std::vector<uint8_t> outgoing_storage_;
size_t outgoing_length_ = 0;
std::vector<int32_t> pending_rst_streams_;
// Count streams that have been rejected while being opened. Exceeding a fixed
// limit will result in the session being destroyed, as an indication of a
Expand All @@ -1081,6 +1089,7 @@ class Http2Session : public AsyncWrap {
// Also use the invalid frame count as a measure for rejecting input frames.
int32_t invalid_frame_count_ = 0;

void PushOutgoingBuffer(nghttp2_stream_write&& write);
void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length);
void ClearOutgoing(int status);

Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-http2-large-write-multiple-requests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const fixtures = require('../common/fixtures');
const assert = require('assert');
const http2 = require('http2');

const content = fixtures.readSync('person-large.jpg');

const server = http2.createServer({
maxSessionMemory: 1000
});
server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'image/jpeg',
':status': 200
});
stream.end(content);
});
server.unref();

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}/`);

let finished = 0;
for (let i = 0; i < 100; i++) {
const req = client.request({ ':path': '/' });
req.end();
const chunks = [];
req.on('data', (chunk) => {
chunks.push(chunk);
});
req.on('end', common.mustCall(() => {
assert.deepStrictEqual(Buffer.concat(chunks), content);
if (++finished === 100) client.close();
}));
}
}));

0 comments on commit 6d687f7

Please sign in to comment.