From ef5959402db659b0145c98b35543fbb902442fd9 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Tue, 7 Jan 2020 14:17:03 -0800 Subject: [PATCH 1/8] quic: refactoring sendpendingdata Refactoring SendPendingData for both default and http3 applications. This a step towards unifying the logic. --- src/quic/node_quic_default_application.cc | 147 ++++++++++--------- src/quic/node_quic_default_application.h | 4 + src/quic/node_quic_http3_application.cc | 165 +++++++++++++--------- src/quic/node_quic_http3_application.h | 7 +- src/quic/node_quic_session-inl.h | 29 ++++ src/quic/node_quic_session.h | 24 ++++ 6 files changed, 229 insertions(+), 147 deletions(-) diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 08494aa6ca..914e9ccbb1 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -118,67 +118,53 @@ int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { } } // anonymous namespace +int DefaultApplication::GetStreamData(StreamData* stream_data) { + QuicStream* stream = session()->FindStream(stream_data->id); + stream_data->remaining = + stream->DrainInto(&stream_data->data, &stream_data->count, 16); + stream_data->fin = stream->is_writable() ? 0 : 1; + + Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", + stream_data->count, + stream_data->id, + stream_data->fin == 1 ? " (fin)" : ""); + return 0; +} + bool DefaultApplication::SendStreamData(QuicStream* stream) { ssize_t ndatalen = 0; QuicPathStorage path; Debug(session(), "Default QUIC Application sending stream %" PRId64 " data", stream->GetID()); - std::vector vec; - - // remaining is the total number of bytes stored in the vector - // that are remaining to be serialized. - size_t remaining = stream->DrainInto(&vec); - Debug(stream, "Sending %d bytes of stream data. Still writable? %s", - remaining, - stream->is_writable() ? "yes" : "no"); - - // c and v are used to track the current serialization position - // for each iteration of the for(;;) loop below. - size_t c = vec.size(); - ngtcp2_vec* v = vec.data(); + StreamData stream_data; + stream_data.id = stream->GetID(); + stream_data.user_data = stream; + GetStreamData(&stream_data); // If there is no stream data and we're not sending fin, // Just return without doing anything. - if (c == 0 && stream->is_writable()) { + if (stream_data.count == 0 && !stream_data.fin) { Debug(stream, "There is no stream data to send"); return true; } std::unique_ptr packet = CreateStreamDataPacket(); - size_t packet_offset = 0; + uint8_t* pos = packet->data(); for (;;) { - Debug(stream, "Starting packet serialization. Remaining? %d", remaining); - // If packet was sent on the previous iteration, it will have been reset - if (!packet) + if (!packet) { packet = CreateStreamDataPacket(); + pos = packet->data(); + } - ssize_t nwrite = - ngtcp2_conn_writev_stream( - session()->connection(), - &path.path, - packet->data() + packet_offset, - session()->max_packet_length(), - &ndatalen, - remaining > 0 ? - NGTCP2_WRITE_STREAM_FLAG_MORE : - NGTCP2_WRITE_STREAM_FLAG_NONE, - stream->GetID(), - stream->is_writable() ? 0 : 1, - reinterpret_cast(v), - c, - uv_hrtime()); + ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); if (nwrite <= 0) { switch (nwrite) { case 0: - // If zero is returned, we've hit congestion limits. We need to stop - // serializing data and try again later to empty the queue once the - // congestion window has expanded. - Debug(stream, "Congestion limit reached"); - return true; + goto congestion_limited; case NGTCP2_ERR_PKT_NUM_EXHAUSTED: // There is a finite number of packets that can be sent // per connection. Once those are exhausted, there's @@ -190,45 +176,30 @@ bool DefaultApplication::SendStreamData(QuicStream* stream) { session()->SilentClose(); return false; case NGTCP2_ERR_STREAM_DATA_BLOCKED: - Debug(stream, "Stream data blocked"); session()->StreamDataBlocked(stream->GetID()); + if (session()->max_data_left() == 0) + goto congestion_limited; return true; case NGTCP2_ERR_STREAM_SHUT_WR: - Debug(stream, "Stream writable side is closed"); + if (UNLIKELY(!BlockStream(stream_data.id))) + return false; return true; case NGTCP2_ERR_STREAM_NOT_FOUND: - Debug(stream, "Stream does not exist"); return true; case NGTCP2_ERR_WRITE_STREAM_MORE: - if (ndatalen > 0) { - remaining -= ndatalen; - Debug(stream, - "%" PRIu64 " stream bytes serialized into packet. %d remaining", - ndatalen, - remaining); - Consume(&v, &c, ndatalen); - stream->Commit(ndatalen); - packet_offset += ndatalen; - } + CHECK_GT(ndatalen, 0); + CHECK(StreamCommit(&stream_data, ndatalen)); + pos += ndatalen; continue; - default: - Debug(stream, "Error writing packet. Code %" PRIu64, nwrite); - session()->set_last_error( - QUIC_ERROR_SESSION, - static_cast(nwrite)); - return false; } + session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); + return false; } - if (ndatalen > 0) { - remaining -= ndatalen; - Debug(stream, - "%" PRIu64 " stream bytes serialized into packet. %d remaining", - ndatalen, - remaining); - Consume(&v, &c, ndatalen); - stream->Commit(ndatalen); - } + pos += nwrite; + + if (ndatalen >= 0) + CHECK(StreamCommit(&stream_data, ndatalen)); Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite); packet->set_length(nwrite); @@ -236,20 +207,44 @@ bool DefaultApplication::SendStreamData(QuicStream* stream) { return false; packet.reset(); - packet_offset = 0; - - if (IsEmpty(v, c)) { - // fin will have been set if all of the data has been - // encoded in the packet and is_writable() returns false. - if (!stream->is_writable()) { - Debug(stream, "Final stream has been sent"); - stream->set_fin_sent(); - } + pos = nullptr; + + if (ShouldSetFin(&stream_data)) + set_stream_fin(stream_data.id); + + if (IsEmpty(stream_data.buf, stream_data.count)) break; - } } return true; + + congestion_limited: + if (pos - packet->data()) { + // Some data was serialized into the packet. We need to send it. + packet->set_length(pos - packet->data()); + Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", + packet->length()); + if (!session()->SendPacket(std::move(packet), path)) + return false; + } + return true; +} + +bool DefaultApplication::StreamCommit( + StreamData* stream_data, + size_t datalen) { + QuicStream* stream = static_cast(stream_data->user_data); + stream_data->remaining -= datalen; + Consume(&stream_data->buf, &stream_data->count, datalen); + stream->Commit(datalen); + return true; +} + +bool DefaultApplication::ShouldSetFin(StreamData* stream_data) { + if (!IsEmpty(stream_data->buf, stream_data->count)) + return false; + QuicStream* stream = static_cast(stream_data->user_data); + return !stream->is_writable(); } } // namespace quic diff --git a/src/quic/node_quic_default_application.h b/src/quic/node_quic_default_application.h index 65fad9875e..34efe18972 100644 --- a/src/quic/node_quic_default_application.h +++ b/src/quic/node_quic_default_application.h @@ -36,6 +36,10 @@ class DefaultApplication final : public QuicApplication { bool SendPendingData() override; bool SendStreamData(QuicStream* stream) override; + int GetStreamData(StreamData* stream_data) override; + bool StreamCommit(StreamData* stream_data, size_t datalen) override; + bool ShouldSetFin(StreamData* stream_data) override; + SET_SELF_SIZE(DefaultApplication) SET_MEMORY_INFO_NAME(DefaultApplication) SET_NO_MEMORY_INFO() diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 40f143c081..838c341edc 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -437,11 +437,11 @@ void Http3Application::ExtendMaxStreamData( nghttp3_conn_unblock_stream(connection(), stream_id); } -bool Http3Application::StreamCommit(int64_t stream_id, ssize_t datalen) { - CHECK_GT(datalen, 0); +bool Http3Application::StreamCommit(StreamData* stream_data, size_t datalen) { + CHECK_GE(datalen, 0); int err = nghttp3_conn_add_write_offset( connection(), - stream_id, + stream_data->id, datalen); if (err != 0) { session()->set_last_error(QUIC_ERROR_APPLICATION, err); @@ -450,98 +450,109 @@ bool Http3Application::StreamCommit(int64_t stream_id, ssize_t datalen) { return true; } -void Http3Application::set_stream_fin(int64_t stream_id) { - if (!is_control_stream(stream_id)) { - QuicStream* stream = session()->FindStream(stream_id); - if (stream != nullptr) - stream->set_fin_sent(); +int Http3Application::GetStreamData(StreamData* stream_data) { + ssize_t ret = 0; + if (connection() && session()->max_data_left()) { + ret = nghttp3_conn_writev_stream( + connection(), + &stream_data->id, + &stream_data->fin, + reinterpret_cast(stream_data->data), + sizeof(stream_data->data)); + if (ret < 0) + return static_cast(ret); + else + stream_data->remaining = stream_data->count = static_cast(ret); + } + if (stream_data->id > -1) { + Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", + stream_data->count, + stream_data->id, + stream_data->fin == 1 ? " (fin)" : ""); } + return 0; +} + +bool Http3Application::BlockStream(int64_t stream_id) { + int err = nghttp3_conn_block_stream(connection(), stream_id); + if (err != 0) { + session()->set_last_error(QUIC_ERROR_APPLICATION, err); + return false; + } + return true; } bool Http3Application::SendPendingData() { - std::array vec; QuicPathStorage path; int err; std::unique_ptr packet = CreateStreamDataPacket(); - size_t packet_offset = 0; + uint8_t* pos = packet->data(); for (;;) { - int64_t stream_id = -1; - int fin = 0; - ssize_t sveccnt = 0; - - // First, grab the outgoing data from nghttp3 - if (connection() && session()->max_data_left()) { - sveccnt = - nghttp3_conn_writev_stream( - connection(), - &stream_id, - &fin, - vec.data(), - vec.size()); - - if (sveccnt < 0) - return false; + ssize_t ndatalen; + StreamData stream_data; + err = GetStreamData(&stream_data); + if (err < 0) { + session()->set_last_error(QUIC_ERROR_APPLICATION, err); + return false; } - Debug(session(), "Serializing packets for stream id %" PRId64, stream_id); - - ssize_t ndatalen; - nghttp3_vec* v = vec.data(); - size_t vcnt = static_cast(sveccnt); + // If stream_data.id is -1, then we're not serializing any data for any + // specific stream. We still need to process QUIC session packets tho. + if (stream_data.id > -1) + Debug(session(), "Serializing packets for stream id %" PRId64, + stream_data.id); + else + Debug(session(), "Serializing session packets"); - // If packet was sent on previous iteration, it will have been reset. - if (!packet) + // If the packet was sent previously, then packet will have been reset. + if (!packet) { packet = CreateStreamDataPacket(); + pos = packet->data(); + } - // Second, serialize as much of the outgoing data as possible to a - // QUIC packet for transmission. We'll keep iterating until there - // is no more data to transmit. - ssize_t nwrite = - ngtcp2_conn_writev_stream( - session()->connection(), - &path.path, - packet->data() + packet_offset, - session()->max_packet_length(), - &ndatalen, - NGTCP2_WRITE_STREAM_FLAG_MORE, - stream_id, - fin, - reinterpret_cast(v), - vcnt, - uv_hrtime()); - - if (nwrite < 0) { + ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); + + if (nwrite <= 0) { switch (nwrite) { + case 0: + goto congestion_limited; + case NGTCP2_ERR_PKT_NUM_EXHAUSTED: + // There is a finite number of packets that can be sent + // per connection. Once those are exhausted, there's + // absolutely nothing we can do except immediately + // and silently tear down the QuicSession. This has + // to be silent because we can't even send a + // CONNECTION_CLOSE since even those require a + // packet number. + session()->SilentClose(); + return false; case NGTCP2_ERR_STREAM_DATA_BLOCKED: - session()->StreamDataBlocked(stream_id); + session()->StreamDataBlocked(stream_data.id); if (session()->max_data_left() == 0) - return true; + goto congestion_limited; // Fall through case NGTCP2_ERR_STREAM_SHUT_WR: - err = nghttp3_conn_block_stream(connection(), stream_id); - if (err != 0) { - session()->set_last_error(QUIC_ERROR_APPLICATION, err); + if (UNLIKELY(!BlockStream(stream_data.id))) return false; - } + continue; + case NGTCP2_ERR_STREAM_NOT_FOUND: continue; case NGTCP2_ERR_WRITE_STREAM_MORE: - if (ndatalen > 0) { - CHECK(StreamCommit(stream_id, ndatalen)); - packet_offset += ndatalen; - } + CHECK_GT(ndatalen, 0); + CHECK(StreamCommit(&stream_data, ndatalen)); + pos += ndatalen; continue; } - // session()->set_last_error(QUIC_ERROR_APPLICATION, nwrite); + session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); return false; } - if (nwrite == 0) - return true; // Congestion limited + pos += nwrite; - if (ndatalen > 0) - CHECK(StreamCommit(stream_id, ndatalen)); + if (ndatalen >= 0) + CHECK(StreamCommit(&stream_data, ndatalen)); Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite); packet->set_length(nwrite); @@ -549,12 +560,28 @@ bool Http3Application::SendPendingData() { return false; packet.reset(); - packet_offset = 0; + pos = nullptr; - if (fin) - set_stream_fin(stream_id); + if (ShouldSetFin(&stream_data)) + set_stream_fin(stream_data.id); } return true; + + congestion_limited: + // We are either congestion limited or done. + if (pos - packet->data()) { + // Some data was serialized into the packet. We need to send it. + packet->set_length(pos - packet->data()); + Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", + packet->length()); + if (!session()->SendPacket(std::move(packet), path)) + return false; + } + return true; +} + +bool Http3Application::ShouldSetFin(StreamData* stream_data) { + return !is_control_stream(stream_data->id) && stream_data->fin == 1; } bool Http3Application::SendStreamData(QuicStream* stream) { diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index 80d0fc987c..4c3f3305df 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -139,8 +139,11 @@ class Http3Application final : bool CreateAndBindControlStream(); bool CreateAndBindQPackStreams(); - bool StreamCommit(int64_t stream_id, ssize_t datalen); - void set_stream_fin(int64_t stream_id); + int GetStreamData(StreamData* stream_data) override; + + bool BlockStream(int64_t stream_id) override; + bool StreamCommit(StreamData* stream_data, size_t datalen) override; + bool ShouldSetFin(StreamData* data) override; ssize_t H3ReadData( int64_t stream_id, diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index 46d1aa76a2..44c5ef8ea6 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -138,6 +138,35 @@ bool QuicCryptoContext::SetupInitialKey(const ngtcp2_cid* dcid) { QuicApplication::QuicApplication(QuicSession* session) : session_(session) {} +void QuicApplication::set_stream_fin(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) + stream->set_fin_sent(); +} + +ssize_t QuicApplication::WriteVStream( + QuicPathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data) { + CHECK_LE(stream_data.count, 16); + return ngtcp2_conn_writev_stream( + session()->connection(), + &path->path, + buf, + session()->max_packet_length(), + ndatalen, + stream_data.remaining > 0 ? + NGTCP2_WRITE_STREAM_FLAG_MORE : + NGTCP2_WRITE_STREAM_FLAG_NONE, + stream_data.id, + stream_data.fin, + stream_data.buf, + stream_data.count, + uv_hrtime() + ); +} + std::unique_ptr QuicApplication::CreateStreamDataPacket() { return QuicPacket::Create( "stream data", diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index d8df1b39f9..13a7db969b 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -523,6 +523,7 @@ class QuicApplication : public MemoryRetainer { virtual bool SubmitTrailers( int64_t stream_id, v8::Local headers) { return false; } + virtual bool BlockStream(int64_t id) { return true; } inline Environment* env() const; @@ -533,10 +534,33 @@ class QuicApplication : public MemoryRetainer { QuicSession* session() const { return session_; } bool needs_init() const { return needs_init_; } void set_init_done() { needs_init_ = false; } + inline void set_stream_fin(int64_t stream_id); void set_max_header_pairs(size_t max) { max_header_pairs_ = max; } void set_max_header_length(size_t max) { max_header_length_ = max; } inline std::unique_ptr CreateStreamDataPacket(); + struct StreamData { + size_t count = 0; + size_t remaining = 0; + int64_t id = -1; + int fin = 0; + ngtcp2_vec data[16] {}; + ngtcp2_vec* buf = nullptr; + void* user_data = nullptr; + uint8_t* pos = nullptr; + StreamData() { buf = data; } + }; + + virtual int GetStreamData(StreamData* data) = 0; + virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; + virtual bool ShouldSetFin(StreamData* data) = 0; + + inline ssize_t WriteVStream( + QuicPathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data); + private: QuicSession* session_; bool needs_init_ = true; From 11e029c7d4bff4b9b646c12ed219025acb086c30 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 10:41:48 -0800 Subject: [PATCH 2/8] quic: use unordered_map for streams --- src/quic/node_quic_session.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index 13a7db969b..7d8ebc58b6 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include namespace node { @@ -35,6 +35,8 @@ class QuicPacket; class QuicStream; class QuicHeader; +using StreamsMap = std::unordered_map>; + enum class QlogMode { kDisabled, kEnabled @@ -871,9 +873,7 @@ class QuicSession : public AsyncWrap, int set_session(SSL_SESSION* session); bool set_session(v8::Local buffer); - const std::map>& streams() const { - return streams_; - } + const StreamsMap& streams() const { return streams_; } // ResetStream will cause ngtcp2 to queue a // RESET_STREAM and STOP_SENDING frame, as appropriate, @@ -1364,7 +1364,7 @@ class QuicSession : public AsyncWrap, std::unique_ptr conn_closebuf_; - std::map> streams_; + StreamsMap streams_; AliasedFloat64Array state_; From 575e0f7a4feadc3684f474b5ddb206d0c2fa1799 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 14:06:17 -0800 Subject: [PATCH 3/8] quic: limit the number of packets sent per iteration (http3) --- src/quic/node_quic_default_application.cc | 8 ++++---- src/quic/node_quic_default_application.h | 2 +- src/quic/node_quic_http3_application.cc | 14 +++++++++++--- src/quic/node_quic_http3_application.h | 2 +- src/quic/node_quic_session.h | 2 +- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 914e9ccbb1..9a8d18c753 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -209,7 +209,7 @@ bool DefaultApplication::SendStreamData(QuicStream* stream) { packet.reset(); pos = nullptr; - if (ShouldSetFin(&stream_data)) + if (ShouldSetFin(stream_data)) set_stream_fin(stream_data.id); if (IsEmpty(stream_data.buf, stream_data.count)) @@ -240,10 +240,10 @@ bool DefaultApplication::StreamCommit( return true; } -bool DefaultApplication::ShouldSetFin(StreamData* stream_data) { - if (!IsEmpty(stream_data->buf, stream_data->count)) +bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) { + if (!IsEmpty(stream_data.buf, stream_data.count)) return false; - QuicStream* stream = static_cast(stream_data->user_data); + QuicStream* stream = static_cast(stream_data.user_data); return !stream->is_writable(); } diff --git a/src/quic/node_quic_default_application.h b/src/quic/node_quic_default_application.h index 34efe18972..5b30abe328 100644 --- a/src/quic/node_quic_default_application.h +++ b/src/quic/node_quic_default_application.h @@ -38,7 +38,7 @@ class DefaultApplication final : public QuicApplication { int GetStreamData(StreamData* stream_data) override; bool StreamCommit(StreamData* stream_data, size_t datalen) override; - bool ShouldSetFin(StreamData* stream_data) override; + bool ShouldSetFin(const StreamData& stream_data) override; SET_SELF_SIZE(DefaultApplication) SET_MEMORY_INFO_NAME(DefaultApplication) diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 838c341edc..e22e3fc8b6 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -483,8 +483,11 @@ bool Http3Application::BlockStream(int64_t stream_id) { } bool Http3Application::SendPendingData() { + // The maximum number of packets to send per call + static constexpr size_t MAX_PACKETS = 16; QuicPathStorage path; int err; + size_t packets_sent = 0; std::unique_ptr packet = CreateStreamDataPacket(); uint8_t* pos = packet->data(); @@ -562,8 +565,11 @@ bool Http3Application::SendPendingData() { packet.reset(); pos = nullptr; - if (ShouldSetFin(&stream_data)) + if (ShouldSetFin(stream_data)) set_stream_fin(stream_data.id); + + if (++packets_sent == MAX_PACKETS) + break; } return true; @@ -580,8 +586,10 @@ bool Http3Application::SendPendingData() { return true; } -bool Http3Application::ShouldSetFin(StreamData* stream_data) { - return !is_control_stream(stream_data->id) && stream_data->fin == 1; +bool Http3Application::ShouldSetFin(const StreamData& stream_data) { + return stream_data.id > -1 && + !is_control_stream(stream_data.id) && + stream_data.fin == 1; } bool Http3Application::SendStreamData(QuicStream* stream) { diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index 4c3f3305df..645fe4142a 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -143,7 +143,7 @@ class Http3Application final : bool BlockStream(int64_t stream_id) override; bool StreamCommit(StreamData* stream_data, size_t datalen) override; - bool ShouldSetFin(StreamData* data) override; + bool ShouldSetFin(const StreamData& data) override; ssize_t H3ReadData( int64_t stream_id, diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index 7d8ebc58b6..6106d4f1ab 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -555,7 +555,7 @@ class QuicApplication : public MemoryRetainer { virtual int GetStreamData(StreamData* data) = 0; virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; - virtual bool ShouldSetFin(StreamData* data) = 0; + virtual bool ShouldSetFin(const StreamData& data) = 0; inline ssize_t WriteVStream( QuicPathStorage* path, From 41914cfe2c80f362f7ca07831ace7ef4ba31d035 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 14:22:56 -0800 Subject: [PATCH 4/8] quic: GetID() -> id() --- src/quic/node_quic_default_application.cc | 6 +++--- src/quic/node_quic_http3_application.cc | 2 +- src/quic/node_quic_session-inl.h | 4 ++-- src/quic/node_quic_session.cc | 6 +++--- src/quic/node_quic_stream.cc | 14 +++++++------- src/quic/node_quic_stream.h | 2 +- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 9a8d18c753..1ba89a0ea2 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -135,10 +135,10 @@ bool DefaultApplication::SendStreamData(QuicStream* stream) { ssize_t ndatalen = 0; QuicPathStorage path; Debug(session(), "Default QUIC Application sending stream %" PRId64 " data", - stream->GetID()); + stream->id()); StreamData stream_data; - stream_data.id = stream->GetID(); + stream_data.id = stream->id(); stream_data.user_data = stream; GetStreamData(&stream_data); @@ -176,7 +176,7 @@ bool DefaultApplication::SendStreamData(QuicStream* stream) { session()->SilentClose(); return false; case NGTCP2_ERR_STREAM_DATA_BLOCKED: - session()->StreamDataBlocked(stream->GetID()); + session()->StreamDataBlocked(stream->id()); if (session()->max_data_left() == 0) goto congestion_limited; return true; diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index e22e3fc8b6..450382a2f3 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -594,7 +594,7 @@ bool Http3Application::ShouldSetFin(const StreamData& stream_data) { bool Http3Application::SendStreamData(QuicStream* stream) { // Data is available now, so resume the stream. - nghttp3_conn_resume_stream(connection(), stream->GetID()); + nghttp3_conn_resume_stream(connection(), stream->id()); return SendPendingData(); } diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index 44c5ef8ea6..ecb89d3611 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -221,10 +221,10 @@ void QuicSession::ExtendMaxStreamsBidi(uint64_t max_streams) { // Extends the stream-level flow control by the given number of bytes. void QuicSession::ExtendStreamOffset(QuicStream* stream, size_t amount) { Debug(this, "Extending max stream %" PRId64 " offset by %" PRId64 " bytes", - stream->GetID(), amount); + stream->id(), amount); ngtcp2_conn_extend_max_stream_offset( connection(), - stream->GetID(), + stream->id(), amount); } diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 0f71c115f0..f4b33bbb8d 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -466,7 +466,7 @@ void JSQuicSessionListener::OnStreamReady(BaseObjectPtr stream) { Environment* env = session()->env(); Local argv[] = { stream->object(), - Number::New(env->isolate(), static_cast(stream->GetID())) + Number::New(env->isolate(), static_cast(stream->id())) }; // Grab a shared pointer to this to prevent the QuicSession @@ -1423,8 +1423,8 @@ void QuicSession::AddToSocket(QuicSocket* socket) { // streams added must be removed before the QuicSession instance is freed. void QuicSession::AddStream(BaseObjectPtr stream) { DCHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)); - Debug(this, "Adding stream %" PRId64 " to session.", stream->GetID()); - streams_.emplace(stream->GetID(), stream); + Debug(this, "Adding stream %" PRId64 " to session.", stream->id()); + streams_.emplace(stream->id(), stream); // Update tracking statistics for the number of streams associated with // this session. diff --git a/src/quic/node_quic_stream.cc b/src/quic/node_quic_stream.cc index 35336510b6..ceef98e76c 100644 --- a/src/quic/node_quic_stream.cc +++ b/src/quic/node_quic_stream.cc @@ -94,7 +94,7 @@ QuicStream::QuicStream( } std::string QuicStream::diagnostic_name() const { - return std::string("QuicStream ") + std::to_string(GetID()) + + return std::string("QuicStream ") + std::to_string(stream_id_) + " (" + std::to_string(static_cast(get_async_id())) + ", " + session_->diagnostic_name() + ")"; } @@ -374,7 +374,7 @@ void QuicStream::ResetStream(uint64_t app_error_code) { // abandoned. set_read_close(); set_write_close(); - session_->ResetStream(GetID(), app_error_code); + session_->ResetStream(stream_id_, app_error_code); } BaseObjectPtr QuicStream::New( @@ -425,7 +425,7 @@ void QuicStream::EndHeaders() { // Upon completion of a block of headers, convert the // vector of Header objects into an array of name+value // pairs, then call the on_stream_headers function. - session()->application()->StreamHeaders(GetID(), headers_kind_, headers_); + session()->application()->StreamHeaders(stream_id_, headers_kind_, headers_); headers_.clear(); } @@ -438,15 +438,15 @@ void QuicStream::MemoryInfo(MemoryTracker* tracker) const { } bool QuicStream::SubmitInformation(v8::Local headers) { - return session_->SubmitInformation(GetID(), headers); + return session_->SubmitInformation(stream_id_, headers); } bool QuicStream::SubmitHeaders(v8::Local headers, uint32_t flags) { - return session_->SubmitHeaders(GetID(), headers, flags); + return session_->SubmitHeaders(stream_id_, headers, flags); } bool QuicStream::SubmitTrailers(v8::Local headers) { - return session_->SubmitTrailers(GetID(), headers); + return session_->SubmitTrailers(stream_id_, headers); } // JavaScript API @@ -454,7 +454,7 @@ namespace { void QuicStreamGetID(const FunctionCallbackInfo& args) { QuicStream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); - args.GetReturnValue().Set(static_cast(stream->GetID())); + args.GetReturnValue().Set(static_cast(stream->id())); } void OpenUnidirectionalStream(const FunctionCallbackInfo& args) { diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index e3dfee2a9a..ceac4b8f63 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -209,7 +209,7 @@ class QuicStream : public AsyncWrap, public StreamBase { QUIC_STREAM_CLIENT; } - int64_t GetID() const { return stream_id_; } + int64_t id() const { return stream_id_; } inline bool is_destroyed() const { return flags_ & QUICSTREAM_FLAG_DESTROYED; From c85e0d7c622b190802312458dc1bb39a5ffe69cc Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 15:25:46 -0800 Subject: [PATCH 5/8] quic: cleaning up QuicStream --- node.gyp | 1 + src/quic/node_quic.cc | 2 +- src/quic/node_quic_default_application.cc | 4 +- src/quic/node_quic_http3_application.cc | 13 +- src/quic/node_quic_http3_application.h | 8 +- src/quic/node_quic_session-inl.h | 2 +- src/quic/node_quic_session.cc | 33 +- src/quic/node_quic_session.h | 16 + src/quic/node_quic_stream-inl.h | 228 ++++++++++++++ src/quic/node_quic_stream.cc | 315 ++++++++----------- src/quic/node_quic_stream.h | 359 +++++++++------------- 11 files changed, 550 insertions(+), 431 deletions(-) create mode 100644 src/quic/node_quic_stream-inl.h diff --git a/node.gyp b/node.gyp index bbfa1cd211..09ad9a58c8 100644 --- a/node.gyp +++ b/node.gyp @@ -853,6 +853,7 @@ 'src/quic/node_quic_session-inl.h', 'src/quic/node_quic_socket.h', 'src/quic/node_quic_stream.h', + 'src/quic/node_quic_stream-inl.h', 'src/quic/node_quic_util.h', 'src/quic/node_quic_util-inl.h', 'src/quic/node_quic_state.h', diff --git a/src/quic/node_quic.cc b/src/quic/node_quic.cc index 3975b05381..044d7eddec 100644 --- a/src/quic/node_quic.cc +++ b/src/quic/node_quic.cc @@ -9,7 +9,7 @@ #include "node_quic_crypto.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_state.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 1ba89a0ea2..64970752bf 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -3,7 +3,7 @@ #include "node_quic_default_application.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" #include @@ -68,7 +68,7 @@ void DefaultApplication::AcknowledgeStreamData( // It's possible that the stream has already been destroyed and // removed. If so, just silently ignore the ack if (stream != nullptr) - stream->AckedDataOffset(offset, datalen); + stream->Acknowledge(offset, datalen); } bool DefaultApplication::SendPendingData() { diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 450382a2f3..d251fc8386 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -5,7 +5,7 @@ #include "node_quic_http3_application.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" #include "node_http_common-inl.h" @@ -133,6 +133,11 @@ size_t Http3Header::length() const { return name().length() + value().length(); } +void Http3Header::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("name", name_); + tracker->TrackField("value", value_); +} + namespace { template inline void SetConfig(Environment* env, int idx, t* val) { @@ -427,6 +432,10 @@ void Http3Application::StreamReset( QuicApplication::StreamReset(stream_id, final_size, app_error_code); } +void Http3Application::ResumeStream(int64_t stream_id) { + nghttp3_conn_resume_stream(connection(), stream_id); +} + void Http3Application::ExtendMaxStreamsRemoteUni(uint64_t max_streams) { nghttp3_conn_set_max_client_streams_bidi(connection(), max_streams); } @@ -640,7 +649,7 @@ void Http3Application::H3AckedStreamData( size_t datalen) { QuicStream* stream = session()->FindStream(stream_id); if (stream) { - stream->AckedDataOffset(0, datalen); + stream->Acknowledge(0, datalen); nghttp3_conn_resume_stream(connection(), stream_id); } } diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index 645fe4142a..598d3a8cb0 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -7,7 +7,7 @@ #include "node_http_common.h" #include "node_mem.h" #include "node_quic_session.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util.h" #include "v8.h" #include @@ -67,6 +67,10 @@ class Http3Header : public QuicHeader { size_t length() const override; + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Http3Header) + SET_SELF_SIZE(Http3Header) + private: int32_t token_ = -1; Http3RcBufferPointer name_; @@ -104,6 +108,8 @@ class Http3Application final : uint64_t final_size, uint64_t app_error_code) override; + void ResumeStream(int64_t stream_id) override; + void ExtendMaxStreamsRemoteUni(uint64_t max_streams) override; void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) override; diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index ecb89d3611..aebd3c939e 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -9,7 +9,7 @@ #include "node_quic_crypto.h" #include "node_quic_session.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include #include diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index f4b33bbb8d..ddc4a26992 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -15,7 +15,7 @@ #include "node_quic_buffer-inl.h" #include "node_quic_crypto.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_state.h" #include "node_quic_util-inl.h" #include "node_quic_default_application.h" @@ -1429,13 +1429,13 @@ void QuicSession::AddStream(BaseObjectPtr stream) { // Update tracking statistics for the number of streams associated with // this session. switch (stream->origin()) { - case QuicStream::QuicStreamOrigin::QUIC_STREAM_CLIENT: + case QuicStreamOrigin::QUIC_STREAM_CLIENT: if (is_server()) IncrementStat(1, &session_stats_, &session_stats::streams_in_count); else IncrementStat(1, &session_stats_, &session_stats::streams_out_count); break; - case QuicStream::QuicStreamOrigin::QUIC_STREAM_SERVER: + case QuicStreamOrigin::QUIC_STREAM_SERVER: if (is_server()) IncrementStat(1, &session_stats_, &session_stats::streams_out_count); else @@ -1443,10 +1443,10 @@ void QuicSession::AddStream(BaseObjectPtr stream) { } IncrementStat(1, &session_stats_, &session_stats::streams_out_count); switch (stream->direction()) { - case QuicStream::QuicStreamDirection::QUIC_STREAM_BIRECTIONAL: + case QuicStreamDirection::QUIC_STREAM_BIRECTIONAL: IncrementStat(1, &session_stats_, &session_stats::bidi_stream_count); break; - case QuicStream::QuicStreamDirection::QUIC_STREAM_UNIDIRECTIONAL: + case QuicStreamDirection::QUIC_STREAM_UNIDIRECTIONAL: IncrementStat(1, &session_stats_, &session_stats::uni_stream_count); break; } @@ -2178,6 +2178,8 @@ bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { if (socket == nullptr || socket == socket_.get()) return true; + SendSessionScope send(this); + // Step 1: Add this Session to the given Socket AddToSocket(socket); @@ -2204,25 +2206,16 @@ bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { } } - SendPendingData(); return true; } -void QuicSession::ResetStream(int64_t stream_id, uint64_t code) { - // First, update the internal ngtcp2 state of the given stream - // and schedule the STOP_SENDING and RESET_STREAM frames as - // appropriate. - CHECK_EQ( - ngtcp2_conn_shutdown_stream( - connection(), - stream_id, - code), 0); +void QuicSession::ResumeStream(int64_t stream_id) { + application()->ResumeStream(stream_id); +} - // If ShutdownStream is called outside of an ngtcp2 callback, - // we need to trigger SendPendingData manually to cause the - // RESET_STREAM and STOP_SENDING frames to be transmitted. - if (!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)) - SendPendingData(); +void QuicSession::ResetStream(int64_t stream_id, uint64_t code) { + SendSessionScope scope(this); + CHECK_EQ(ngtcp2_conn_shutdown_stream(connection(), stream_id, code), 0); } // Silent Close must start with the JavaScript side, which must diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index 6106d4f1ab..e23e0fdbf5 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -511,6 +511,7 @@ class QuicApplication : public MemoryRetainer { int64_t stream_id, uint64_t app_error_code); virtual void StreamOpen(int64_t stream_id) {} + virtual void ResumeStream(int64_t stream_id) {} virtual void StreamReset( int64_t stream_id, uint64_t final_size, @@ -906,6 +907,8 @@ class QuicSession : public AsyncWrap, int64_t stream_id, uint64_t error_code = NGTCP2_APP_NOERROR); + void ResumeStream(int64_t stream_id); + // Submits informational headers to the QUIC Application // implementation. If headers are not supported, false // will be returned. Otherwise, returns true @@ -990,6 +993,19 @@ class QuicSession : public AsyncWrap, // Report that the stream data is flow control blocked inline void StreamDataBlocked(int64_t stream_id); + class SendSessionScope { + public: + explicit SendSessionScope(QuicSession* session) : session_(session) {} + + ~SendSessionScope() { + if (!Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_)) + session_->SendPendingData(); + } + + private: + QuicSession* session_; + }; + // Tracks whether or not we are currently within an ngtcp2 callback // function. Certain ngtcp2 APIs are not supposed to be called when // within a callback. We use this as a gate to check. diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h new file mode 100644 index 0000000000..8d8943bc29 --- /dev/null +++ b/src/quic/node_quic_stream-inl.h @@ -0,0 +1,228 @@ +#ifndef SRC_NODE_QUIC_STREAM_INL_H_ +#define SRC_NODE_QUIC_STREAM_INL_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "node_quic_session.h" +#include "node_quic_stream.h" +#include "node_quic_buffer-inl.h" + +namespace node { +namespace quic { + +QuicStreamDirection QuicStream::direction() const { + return stream_id_ & 0b10 ? + QUIC_STREAM_UNIDIRECTIONAL : + QUIC_STREAM_BIRECTIONAL; +} + +QuicStreamOrigin QuicStream::origin() const { + return stream_id_ & 0b01 ? + QUIC_STREAM_SERVER : + QUIC_STREAM_CLIENT; +} + +bool QuicStream::is_destroyed() const { + return flags_ & QUICSTREAM_FLAG_DESTROYED; +} + +bool QuicStream::has_received_fin() const { + return flags_ & QUICSTREAM_FLAG_FIN; +} + +bool QuicStream::has_sent_fin() const { + return flags_ & QUICSTREAM_FLAG_FIN_SENT; +} + +bool QuicStream::was_ever_writable() const { + if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { + return session_->is_server() ? + origin() == QUIC_STREAM_SERVER : + origin() == QUIC_STREAM_CLIENT; + } + return true; +} + +bool QuicStream::is_writable() const { + if (flags_ & QUICSTREAM_FLAG_WRITE_CLOSED) + return false; + + return was_ever_writable(); +} + +bool QuicStream::was_ever_readable() const { + if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { + return session_->is_server() ? + origin() == QUIC_STREAM_CLIENT : + origin() == QUIC_STREAM_SERVER; + } + + return true; +} + +bool QuicStream::is_readable() const { + if (flags_ & QUICSTREAM_FLAG_READ_CLOSED) + return false; + + return was_ever_readable(); +} + +bool QuicStream::is_read_started() const { + return flags_ & QUICSTREAM_FLAG_READ_STARTED; +} + +bool QuicStream::is_read_paused() const { + return flags_ & QUICSTREAM_FLAG_READ_PAUSED; +} + +bool QuicStream::IsAlive() { + return !is_destroyed() && !IsClosing(); +} + +bool QuicStream::IsClosing() { + return !is_writable() && !is_readable(); +} + +void QuicStream::set_fin_sent() { + CHECK(!is_writable()); + flags_ |= QUICSTREAM_FLAG_FIN_SENT; +} + +bool QuicStream::is_write_finished() const { + return has_sent_fin() && streambuf_.length() == 0; +} + +void QuicStream::IncrementAvailableOutboundLength(size_t amount) { + available_outbound_length_ += amount; +} + +void QuicStream::DecrementAvailableOutboundLength(size_t amount) { + available_outbound_length_ -= amount; +} + +void QuicStream::set_fin_received() { + flags_ |= QUICSTREAM_FLAG_FIN; + set_read_close(); +} + +void QuicStream::set_write_close() { + flags_ |= QUICSTREAM_FLAG_WRITE_CLOSED; +} + +void QuicStream::set_read_close() { + flags_ |= QUICSTREAM_FLAG_READ_CLOSED; +} + +void QuicStream::set_read_start() { + flags_ |= QUICSTREAM_FLAG_READ_STARTED; +} + +void QuicStream::set_read_pause() { + flags_ |= QUICSTREAM_FLAG_READ_PAUSED; +} + +void QuicStream::set_read_resume() { + flags_ &= QUICSTREAM_FLAG_READ_PAUSED; +} + +void QuicStream::set_destroyed() { + flags_ |= QUICSTREAM_FLAG_DESTROYED; +} + +bool QuicStream::SubmitInformation(v8::Local headers) { + return session_->SubmitInformation(stream_id_, headers); +} + +bool QuicStream::SubmitHeaders(v8::Local headers, uint32_t flags) { + return session_->SubmitHeaders(stream_id_, headers, flags); +} + +bool QuicStream::SubmitTrailers(v8::Local headers) { + return session_->SubmitTrailers(stream_id_, headers); +} + +void QuicStream::EndHeaders() { + Debug(this, "End Headers"); + // Upon completion of a block of headers, convert the + // vector of Header objects into an array of name+value + // pairs, then call the on_stream_headers function. + session()->application()->StreamHeaders(stream_id_, headers_kind_, headers_); + headers_.clear(); +} + +void QuicStream::set_headers_kind(QuicStreamHeadersKind kind) { + headers_kind_ = kind; +} + +void QuicStream::BeginHeaders(QuicStreamHeadersKind kind) { + Debug(this, "Beginning Headers"); + // Upon start of a new block of headers, ensure that any + // previously collected ones are cleaned up. + headers_.clear(); + set_headers_kind(kind); +} + +void QuicStream::Commit(ssize_t amount) { + CHECK(!is_destroyed()); + CHECK_GE(amount, 0); + streambuf_.Seek(amount); +} + +int QuicStream::ReadStart() { + CHECK(!is_destroyed()); + CHECK(is_readable()); + set_read_start(); + set_read_resume(); + IncrementStat( + inbound_consumed_data_while_paused_, + &stream_stats_, + &stream_stats::max_offset); + session_->ExtendStreamOffset(this, inbound_consumed_data_while_paused_); + return 0; +} + +int QuicStream::ReadStop() { + CHECK(!is_destroyed()); + CHECK(is_readable()); + set_read_pause(); + return 0; +} + +void QuicStream::ResetStream(uint64_t app_error_code) { + // On calling shutdown, the stream will no longer be + // readable or writable, all any pending data in the + // streambuf_ will be canceled, and all data pending + // to be acknowledged at the ngtcp2 level will be + // abandoned. + set_read_close(); + set_write_close(); + session_->ResetStream(stream_id_, app_error_code); +} + +template +size_t QuicStream::DrainInto( + std::vector* vec, + size_t max_count) { + CHECK(!is_destroyed()); + size_t length = 0; + streambuf_.DrainInto(vec, &length, max_count); + return length; +} + +template +size_t QuicStream::DrainInto( + T* vec, + size_t* count, + size_t max_count) { + CHECK(!is_destroyed()); + size_t length = 0; + streambuf_.DrainInto(vec, count, &length, max_count); + return length; +} + +} // namespace quic +} // namespace node + +#endif // NODE_WANT_INTERNALS + +#endif // SRC_NODE_QUIC_STREAM_INL_H_ diff --git a/src/quic/node_quic_stream.cc b/src/quic/node_quic_stream.cc index ceef98e76c..54e0bf315e 100644 --- a/src/quic/node_quic_stream.cc +++ b/src/quic/node_quic_stream.cc @@ -6,10 +6,9 @@ #include "node_internals.h" #include "stream_base-inl.h" #include "node_quic_session-inl.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_socket.h" #include "node_quic_util-inl.h" -#include "node_sockaddr-inl.h" #include "v8.h" #include "uv.h" @@ -32,6 +31,15 @@ using v8::Value; namespace quic { +namespace { +size_t get_length(uv_buf_t* bufs, size_t nbufs) { + size_t len = 0; + for (size_t n = 0; n < nbufs; n++) + len += bufs[n].len; + return len; +} +} // namespace + QuicStream::QuicStream( QuicSession* sess, Local wrap, @@ -93,6 +101,52 @@ QuicStream::QuicStream( &stream_stats::max_offset); } +// Acknowledge is called when ngtcp2 has received an acknowledgement +// for one or more stream frames for this QuicStream. This will cause +// data stored in the streambuf_ outbound queue to be consumed and may +// result in the JavaScript callback for the write to be invoked. +void QuicStream::Acknowledge(uint64_t offset, size_t datalen) { + if (is_destroyed()) + return; + + // ngtcp2 guarantees that offset must always be greater + // than the previously received offset, but let's just + // make sure that holds. + CHECK_GE(offset, max_offset_ack_); + max_offset_ack_ = offset; + + Debug(this, "Acknowledging %d bytes", datalen); + + // Consumes the given number of bytes in the buffer. This may + // have the side-effect of causing the onwrite callback to be + // invoked if a complete chunk of buffered data has been acknowledged. + streambuf_.Consume(datalen); + + uint64_t now = uv_hrtime(); + if (stream_stats_.stream_acked_at > 0) + data_rx_ack_->Record(now - stream_stats_.stream_acked_at); + stream_stats_.stream_acked_at = now; +} + +// While not all QUIC applications will support headers, QuicStream +// includes basic, generic support for storing them. +bool QuicStream::AddHeader(std::unique_ptr header) { + Debug(this, "Header Added"); + size_t len = header->length(); + QuicApplication* app = session()->application(); + // We cannot add the header if we've either reached + // * the max number of header pairs or + // * the max number of header bytes + if (headers_.size() == app->max_header_pairs() || + current_headers_length_ + len > app->max_header_length()) { + return false; + } + + current_headers_length_ += header->length(); + headers_.emplace_back(std::move(header)); + return true; +} + std::string QuicStream::diagnostic_name() const { return std::string("QuicStream ") + std::to_string(stream_id_) + " (" + std::to_string(static_cast(get_async_id())) + @@ -139,21 +193,18 @@ void QuicStream::Destroy() { int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { if (is_destroyed()) return UV_EPIPE; - Debug(this, "Shutdown writable side"); - // Do nothing if the stream was already shutdown. Specifically, - // we should not attempt to send anything on the QuicSession - if (!is_writable()) - return UV_EPIPE; - stream_stats_.closing_at = uv_hrtime(); - set_write_close(); - // If we're not currently within an ngtcp2 callback, then we need to - // tell the QuicSession to initiate serialization and sending of any - // pending frames. - if (!QuicSession::Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get())) - session_->SendStreamData(this); + QuicSession::SendSessionScope send_scope(session()); + + if (is_writable()) { + Debug(this, "Shutdown writable side"); + stream_stats_.closing_at = uv_hrtime(); + set_write_close(); + session()->ResumeStream(stream_id_); + } - return 1; + req_wrap->Done(0); + return 0; } int QuicStream::DoWrite( @@ -170,101 +221,83 @@ int QuicStream::DoWrite( return 0; } + // Nothing to write. + size_t length = get_length(bufs, nbufs); + if (length == 0) { + req_wrap->Done(0); + return 0; + } + + QuicSession::SendSessionScope send_scope(session()); + + Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers", + length, nbufs); + IncrementStat( + static_cast(length), + &stream_stats_, + &stream_stats::bytes_sent); + BaseObjectPtr strong_ref{req_wrap->GetAsyncWrap()}; // The list of buffers will be appended onto streambuf_ without - // copying. Those will remain in that buffer until the serialized + // copying. Those will remain in the buffer until the serialized // stream frames are acknowledged. - uint64_t length = - streambuf_.Push( - bufs, - nbufs, - [req_wrap, strong_ref](int status) { - // This callback function will be invoked once this - // complete batch of buffers has been acknowledged - // by the peer. This will have the side effect of - // blocking additional pending writes from the - // javascript side, so writing data to the stream - // will be throttled by how quickly the peer is - // able to acknowledge stream packets. This is good - // in the sense of providing back-pressure, but - // also means that writes will be significantly - // less performant unless written in batches. - req_wrap->Done(status); - }); - Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers", - length, nbufs); - IncrementStat(length, &stream_stats_, &stream_stats::bytes_sent); + // This callback function will be invoked once this + // complete batch of buffers has been acknowledged + // by the peer. This will have the side effect of + // blocking additional pending writes from the + // javascript side, so writing data to the stream + // will be throttled by how quickly the peer is + // able to acknowledge stream packets. This is good + // in the sense of providing back-pressure, but + // also means that writes will be significantly + // less performant unless written in batches. + streambuf_.Push( + bufs, + nbufs, + [req_wrap, strong_ref](int status) { + req_wrap->Done(status); + }); stream_stats_.stream_sent_at = uv_hrtime(); - // If we're not within an ngtcp2 callback, go ahead and send - // the pending stream data. Otherwise, the data will be flushed - // once the ngtcp2 callback scope exits and all streams with - // data pending are flushed. - if (!QuicSession::Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get())) - session_->SendStreamData(this); + session()->ResumeStream(stream_id_); // IncrementAvailableOutboundLength(len); return 0; } -// AckedDataOffset is called when ngtcp2 has received an acknowledgement -// for one or more stream frames for this QuicStream. This will cause -// data stored in the streambuf_ outbound queue to be consumed and may -// result in the JavaScript callback for the write to be invoked. -void QuicStream::AckedDataOffset(uint64_t offset, size_t datalen) { - if (is_destroyed()) - return; - - // ngtcp2 guarantees that offset must always be greater - // than the previously received offset, but let's just - // make sure that holds. - CHECK_GE(offset, max_offset_ack_); - max_offset_ack_ = offset; - - Debug(this, "Acknowledging %d bytes", datalen); - - // Consumes the given number of bytes in the buffer. This may - // have the side-effect of causing the onwrite callback to be - // invoked if a complete chunk of buffered data has been acknowledged. - streambuf_.Consume(datalen); +void QuicStream::IncrementStats(size_t datalen) { + uint64_t len = static_cast(datalen); + IncrementStat(len, &stream_stats_, &stream_stats::bytes_received); uint64_t now = uv_hrtime(); - if (stream_stats_.stream_acked_at > 0) - data_rx_ack_->Record(now - stream_stats_.stream_acked_at); - stream_stats_.stream_acked_at = now; -} - -void QuicStream::Commit(ssize_t amount) { - CHECK(!is_destroyed()); - streambuf_.Seek(amount); -} - -inline void QuicStream::IncrementAvailableOutboundLength(size_t amount) { - available_outbound_length_ += amount; -} - -inline void QuicStream::DecrementAvailableOutboundLength(size_t amount) { - available_outbound_length_ -= amount; + if (stream_stats_.stream_received_at > 0) + data_rx_rate_->Record(now - stream_stats_.stream_received_at); + stream_stats_.stream_received_at = now; + data_rx_size_->Record(len); } -int QuicStream::ReadStart() { - CHECK(!is_destroyed()); - CHECK(is_readable()); - set_read_start(); - set_read_resume(); - IncrementStat( - inbound_consumed_data_while_paused_, - &stream_stats_, - &stream_stats::max_offset); - session_->ExtendStreamOffset(this, inbound_consumed_data_while_paused_); - return 0; +void QuicStream::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("buffer", &streambuf_); + tracker->TrackField("data_rx_rate", data_rx_rate_); + tracker->TrackField("data_rx_size", data_rx_size_); + tracker->TrackField("data_rx_ack", data_rx_ack_); + tracker->TrackField("stats_buffer", stats_buffer_); + tracker->TrackField("headers", headers_); } -int QuicStream::ReadStop() { - CHECK(!is_destroyed()); - CHECK(is_readable()); - set_read_pause(); - return 0; +BaseObjectPtr QuicStream::New( + QuicSession* session, + int64_t stream_id) { + Local obj; + if (!session->env() + ->quicserverstream_constructor_template() + ->NewInstance(session->env()->context()).ToLocal(&obj)) { + return {}; + } + BaseObjectPtr stream = + MakeDetachedBaseObject(session, obj, stream_id); + session->AddStream(stream); + return stream; } // Passes chunks of data on to the JavaScript side as soon as they are @@ -355,100 +388,6 @@ void QuicStream::ReceiveData( } } -inline void QuicStream::IncrementStats(size_t datalen) { - uint64_t len = static_cast(datalen); - IncrementStat(len, &stream_stats_, &stream_stats::bytes_received); - - uint64_t now = uv_hrtime(); - if (stream_stats_.stream_received_at > 0) - data_rx_rate_->Record(now - stream_stats_.stream_received_at); - stream_stats_.stream_received_at = now; - data_rx_size_->Record(len); -} - -void QuicStream::ResetStream(uint64_t app_error_code) { - // On calling shutdown, the stream will no longer be - // readable or writable, all any pending data in the - // streambuf_ will be canceled, and all data pending - // to be acknowledged at the ngtcp2 level will be - // abandoned. - set_read_close(); - set_write_close(); - session_->ResetStream(stream_id_, app_error_code); -} - -BaseObjectPtr QuicStream::New( - QuicSession* session, int64_t stream_id) { - Local obj; - if (!session->env() - ->quicserverstream_constructor_template() - ->NewInstance(session->env()->context()).ToLocal(&obj)) { - return {}; - } - BaseObjectPtr stream = - MakeDetachedBaseObject(session, obj, stream_id); - session->AddStream(stream); - return stream; -} - -void QuicStream::BeginHeaders(QuicStreamHeadersKind kind) { - Debug(this, "Beginning Headers"); - // Upon start of a new block of headers, ensure that any - // previously collected ones are cleaned up. - headers_.clear(); - set_headers_kind(kind); -} - -void QuicStream::set_headers_kind(QuicStreamHeadersKind kind) { - headers_kind_ = kind; -} - -bool QuicStream::AddHeader(std::unique_ptr header) { - Debug(this, "Header Added"); - size_t len = header->length(); - QuicApplication* app = session()->application(); - // We cannot add the header if we've either reached - // * the max number of header pairs or - // * the max number of header bytes - if (headers_.size() == app->max_header_pairs() || - current_headers_length_ + len > app->max_header_length()) { - return false; - } - - current_headers_length_ += header->length(); - headers_.emplace_back(std::move(header)); - return true; -} - -void QuicStream::EndHeaders() { - Debug(this, "End Headers"); - // Upon completion of a block of headers, convert the - // vector of Header objects into an array of name+value - // pairs, then call the on_stream_headers function. - session()->application()->StreamHeaders(stream_id_, headers_kind_, headers_); - headers_.clear(); -} - -void QuicStream::MemoryInfo(MemoryTracker* tracker) const { - tracker->TrackField("buffer", &streambuf_); - tracker->TrackField("data_rx_rate", data_rx_rate_); - tracker->TrackField("data_rx_size", data_rx_size_); - tracker->TrackField("data_rx_ack", data_rx_ack_); - tracker->TrackField("stats_buffer", stats_buffer_); -} - -bool QuicStream::SubmitInformation(v8::Local headers) { - return session_->SubmitInformation(stream_id_, headers); -} - -bool QuicStream::SubmitHeaders(v8::Local headers, uint32_t flags) { - return session_->SubmitHeaders(stream_id_, headers, flags); -} - -bool QuicStream::SubmitTrailers(v8::Local headers) { - return session_->SubmitTrailers(stream_id_, headers); -} - // JavaScript API namespace { void QuicStreamGetID(const FunctionCallbackInfo& args) { diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index ceac4b8f63..76396516a7 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -54,7 +54,7 @@ enum QuicStreamStatsIdx : int { // different internal representations for a header name+value // pair. QuicApplication implementations that support headers // per stream must create a specialization of the Header class. -class QuicHeader { +class QuicHeader : public MemoryRetainer { public: QuicHeader() {} @@ -69,6 +69,55 @@ class QuicHeader { virtual size_t length() const = 0; }; +enum QuicStreamStates : uint32_t { + // QuicStream is fully open. Readable and Writable + QUICSTREAM_FLAG_INITIAL = 0x0, + + // QuicStream Read State is closed because a final stream frame + // has been received from the peer or the QuicStream is unidirectional + // outbound only (i.e. it was never readable) + QUICSTREAM_FLAG_READ_CLOSED = 0x1, + + // QuicStream Write State is closed. There may still be data + // in the outbound queue waiting to be serialized or acknowledged. + // No additional data may be added to the queue, however, and a + // final stream packet will be sent once all of the data in the + // queue has been serialized. + QUICSTREAM_FLAG_WRITE_CLOSED = 0x2, + + // JavaScript side has switched into flowing mode (Readable side) + QUICSTREAM_FLAG_READ_STARTED = 0x4, + + // JavaScript side has paused the flow of data (Readable side) + QUICSTREAM_FLAG_READ_PAUSED = 0x8, + + // QuicStream has received a final stream frame (Readable side) + QUICSTREAM_FLAG_FIN = 0x10, + + // QuicStream has sent a final stream frame (Writable side) + QUICSTREAM_FLAG_FIN_SENT = 0x20, + + // QuicStream has been destroyed + QUICSTREAM_FLAG_DESTROYED = 0x40 +}; + +enum QuicStreamDirection { + // The QuicStream is readable and writable in both directions + QUIC_STREAM_BIRECTIONAL, + + // The QuicStream is writable and readable in only one direction. + // The direction depends on the QuicStreamOrigin. + QUIC_STREAM_UNIDIRECTIONAL +}; + +enum QuicStreamOrigin { + // The QuicStream was created by the server. + QUIC_STREAM_SERVER, + + // The QuicStream was created by the client. + QUIC_STREAM_CLIENT +}; + // QuicStream's are simple data flows that, fortunately, do not // require much. They may be: // @@ -137,183 +186,96 @@ class QuicHeader { // ngtcp2 level. class QuicStream : public AsyncWrap, public StreamBase { public: - enum QuicStreamStates : uint32_t { - // QuicStream is fully open. Readable and Writable - QUICSTREAM_FLAG_INITIAL = 0x0, - - // QuicStream Read State is closed because a final stream frame - // has been received from the peer or the QuicStream is unidirectional - // outbound only (i.e. it was never readable) - QUICSTREAM_FLAG_READ_CLOSED = 0x1, - - // QuicStream Write State is closed. There may still be data - // in the outbound queue waiting to be serialized or acknowledged. - // No additional data may be added to the queue, however, and a - // final stream packet will be sent once all of the data in the - // queue has been serialized. - QUICSTREAM_FLAG_WRITE_CLOSED = 0x2, - - // JavaScript side has switched into flowing mode (Readable side) - QUICSTREAM_FLAG_READ_STARTED = 0x4, - - // JavaScript side has paused the flow of data (Readable side) - QUICSTREAM_FLAG_READ_PAUSED = 0x8, - - // QuicStream has received a final stream frame (Readable side) - QUICSTREAM_FLAG_FIN = 0x10, - - // QuicStream has sent a final stream frame (Writable side) - QUICSTREAM_FLAG_FIN_SENT = 0x20, - - // QuicStream has been destroyed - QUICSTREAM_FLAG_DESTROYED = 0x40 - }; - - enum QuicStreamDirection { - // The QuicStream is readable and writable in both directions - QUIC_STREAM_BIRECTIONAL, - - // The QuicStream is writable and readable in only one direction. - // The direction depends on the QuicStreamOrigin. - QUIC_STREAM_UNIDIRECTIONAL - }; - - enum QuicStreamOrigin { - // The QuicStream was created by the server. - QUIC_STREAM_SERVER, - - // The QuicStream was created by the client. - QUIC_STREAM_CLIENT - }; - - static void Initialize( Environment* env, v8::Local target, v8::Local context); static BaseObjectPtr New( - QuicSession* session, int64_t stream_id); + QuicSession* session, + int64_t stream_id); + + QuicStream( + QuicSession* session, + v8::Local target, + int64_t stream_id); std::string diagnostic_name() const override; - inline QuicStreamDirection direction() const { - return stream_id_ & 0b10 ? - QUIC_STREAM_UNIDIRECTIONAL : - QUIC_STREAM_BIRECTIONAL; - } + int64_t id() const { return stream_id_; } + QuicSession* session() const { return session_.get(); } - inline QuicStreamOrigin origin() const { - return stream_id_ & 0b01 ? - QUIC_STREAM_SERVER : - QUIC_STREAM_CLIENT; - } + // A QuicStream can be either uni- or bi-directional. + inline QuicStreamDirection direction() const; - int64_t id() const { return stream_id_; } + // A QuicStream can be initiated by either the client + // or the server. + inline QuicStreamOrigin origin() const; - inline bool is_destroyed() const { - return flags_ & QUICSTREAM_FLAG_DESTROYED; - } + // The QuicStream has been destroyed and is no longer usable. + inline bool is_destroyed() const; // The QUICSTREAM_FLAG_FIN flag will be set only when a final stream // frame has been received from the peer. - inline bool has_received_fin() const { - return flags_ & QUICSTREAM_FLAG_FIN; - } + inline bool has_received_fin() const; // The QUICSTREAM_FLAG_FIN_SENT flag will be set only when a final // stream frame has been transmitted to the peer. Once sent, no // additional data may be transmitted to the peer. If has_sent_fin() // is set, is_writable() can be assumed to be false. - inline bool has_sent_fin() const { - return flags_ & QUICSTREAM_FLAG_FIN_SENT; - } + inline bool has_sent_fin() const; // WasEverWritable returns true if it is a bidirectional stream, // or a Unidirectional stream originating from the local peer. // If was_ever_writable() is false, then no stream frames should // ever be sent from the local peer, including final stream frames. - inline bool was_ever_writable() const { - if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { - return session_->is_server() ? - origin() == QUIC_STREAM_SERVER : - origin() == QUIC_STREAM_CLIENT; - } - return true; - } + inline bool was_ever_writable() const; // A QuicStream will not be writable if: // - The QUICSTREAM_FLAG_WRITE_CLOSED flag is set or // - It is a Unidirectional stream originating from the peer - inline bool is_writable() const { - if (flags_ & QUICSTREAM_FLAG_WRITE_CLOSED) - return false; - - return was_ever_writable(); - } + inline bool is_writable() const; // WasEverReadable returns true if it is a bidirectional stream, // or a Unidirectional stream originating from the remote // peer. - inline bool was_ever_readable() const { - if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { - return session_->is_server() ? - origin() == QUIC_STREAM_CLIENT : - origin() == QUIC_STREAM_SERVER; - } - - return true; - } + inline bool was_ever_readable() const; // A QuicStream will not be readable if: // - The QUICSTREAM_FLAG_READ_CLOSED flag is set or // - It is a Unidirectional stream originating from the local peer. - inline bool is_readable() const { - if (flags_ & QUICSTREAM_FLAG_READ_CLOSED) - return false; - - return was_ever_readable(); - } - - inline bool is_read_started() const { - return flags_ & QUICSTREAM_FLAG_READ_STARTED; - } + inline bool is_readable() const; - inline bool is_read_paused() const { - return flags_ & QUICSTREAM_FLAG_READ_PAUSED; - } + // True if reading from this QuicStream has ever initiated. + inline bool is_read_started() const; - bool IsAlive() override { - return !is_destroyed() && !IsClosing(); - } - - bool IsClosing() override { - return !is_writable() && !is_readable(); - } + // True if reading from this QuicStream is currently paused. + inline bool is_read_paused() const; // Records the fact that a final stream frame has been // serialized and sent to the peer. There still may be // unacknowledged data in the outbound queue, but no // additional frames may be sent for the stream other // than reset stream. - inline void set_fin_sent() { - CHECK(!is_writable()); - flags_ |= QUICSTREAM_FLAG_FIN_SENT; - } + inline void set_fin_sent(); // IsWriteFinished will return true if a final stream frame // has been sent and all data has been acknowledged (the // send buffer is empty). - inline bool is_write_finished() { - return has_sent_fin() && streambuf_.length() == 0; - } + inline bool is_write_finished() const; - QuicSession* session() const { return session_.get(); } + // Specifies the kind of headers currently being processed. + inline void set_headers_kind(QuicStreamHeadersKind kind); - virtual void AckedDataOffset(uint64_t offset, size_t datalen); + // Marks the given data range as having been acknowledged. + // This means that the data range may be released from + // memory. + void Acknowledge(uint64_t offset, size_t datalen); - virtual void Destroy(); + // Destroy the QuicStream and render it no longer usable. + void Destroy(); + // Buffers chunks of data to be written to the QUIC connection. int DoWrite( WriteWrap* req_wrap, uv_buf_t* bufs, @@ -323,118 +285,83 @@ class QuicStream : public AsyncWrap, public StreamBase { inline void IncrementAvailableOutboundLength(size_t amount); inline void DecrementAvailableOutboundLength(size_t amount); - virtual void ReceiveData( - int fin, - const uint8_t* data, - size_t datalen, - uint64_t offset); - - bool SubmitInformation(v8::Local headers); - bool SubmitHeaders(v8::Local headers, uint32_t flags); - bool SubmitTrailers(v8::Local headers); - - // Required for StreamBase - int ReadStart() override; - - // Required for StreamBase - int ReadStop() override; - - // Required for StreamBase - int DoShutdown(ShutdownWrap* req_wrap) override; + // Returns false if the header cannot be added. This will + // typically only happen if a maximimum number of headers + // has been reached. + bool AddHeader(std::unique_ptr header); - void ResetStream(uint64_t app_error_code); + // Some QUIC applications support headers, others do not. + // The following methods allow consistent handling of + // headers at the QuicStream level regardless of the + // protocol. For applications that do not support headers, + // these are simply not used. + inline void BeginHeaders( + QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); - void Commit(ssize_t amount); + // Indicates an amount of unacknowledged data that has been + // submitted to the QUIC connection. + inline void Commit(ssize_t amount); template inline size_t DrainInto( std::vector* vec, - size_t max_count = MAX_VECTOR_COUNT) { - CHECK(!is_destroyed()); - size_t length = 0; - streambuf_.DrainInto(vec, &length, max_count); - return length; - } + size_t max_count = MAX_VECTOR_COUNT); template inline size_t DrainInto( T* vec, size_t* count, - size_t max_count = MAX_VECTOR_COUNT) { - CHECK(!is_destroyed()); - size_t length = 0; - streambuf_.DrainInto(vec, count, &length, max_count); - return length; - } + size_t max_count = MAX_VECTOR_COUNT); - AsyncWrap* GetAsyncWrap() override { return this; } + inline void EndHeaders(); - // Some QUIC applications support headers, others do not. - // The following methods allow consistent handling of - // headers at the QuicStream level regardless of the - // protocol. For applications that do not support headers, - // these are simply not used. - void BeginHeaders(QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); + // Passes a chunk of data on to the QuicStream listener. + void ReceiveData( + int fin, + const uint8_t* data, + size_t datalen, + uint64_t offset); - // Returns false if the header cannot be added. This will - // typically only happen if a maximimum number of headers - // has been reached. - bool AddHeader(std::unique_ptr header); - void EndHeaders(); + // Resets the QUIC stream, sending a signal to the peer that + // no additional data will be transmitted for this stream. + inline void ResetStream(uint64_t app_error_code = 0); - // Sets the kind of headers currently being processed. - void set_headers_kind(QuicStreamHeadersKind kind); + // Submits informational headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitInformation(v8::Local headers); - void MemoryInfo(MemoryTracker* tracker) const override; + // Submits initial headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitHeaders(v8::Local headers, uint32_t flags); + + // Submits trailing headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitTrailers(v8::Local headers); + // Required for StreamBase + inline bool IsAlive() override; + inline bool IsClosing() override; + inline int ReadStart() override; + inline int ReadStop() override; + int DoShutdown(ShutdownWrap* req_wrap) override; + + AsyncWrap* GetAsyncWrap() override { return this; } + + // Required for MemoryRetainer + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(QuicStream) SET_SELF_SIZE(QuicStream) - QuicStream( - QuicSession* session, - v8::Local target, - int64_t stream_id); - private: - // Called only when a final stream frame has been received from - // the peer. This has the side effect of marking the readable - // side of the stream closed. No additional data will be received - // on this QuicStream. - inline void set_fin_received() { - flags_ |= QUICSTREAM_FLAG_FIN; - set_read_close(); - } - - // SetWriteClose is called either when the QuicStream is created - // and is unidirectional from the peer, or when DoShutdown is called. - // This will indicate that the writable side of the QuicStream is - // closed and that no data will be pushed to the outbound queue. - inline void set_write_close() { - flags_ |= QUICSTREAM_FLAG_WRITE_CLOSED; - } - - // Called when no additional data can be received on the QuicStream. - inline void set_read_close() { - flags_ |= QUICSTREAM_FLAG_READ_CLOSED; - } - - inline void set_read_start() { - flags_ |= QUICSTREAM_FLAG_READ_STARTED; - } - - inline void set_read_pause() { - flags_ |= QUICSTREAM_FLAG_READ_PAUSED; - } - - inline void set_read_resume() { - flags_ &= QUICSTREAM_FLAG_READ_PAUSED; - } - - inline void set_destroyed() { - flags_ |= QUICSTREAM_FLAG_DESTROYED; - } - - inline void IncrementStats(size_t datalen); + inline void set_fin_received(); + inline void set_write_close(); + inline void set_read_close(); + inline void set_read_start(); + inline void set_read_pause(); + inline void set_read_resume(); + inline void set_destroyed(); + + void IncrementStats(size_t datalen); BaseObjectWeakPtr session_; int64_t stream_id_; From 3d5e0c482cef0ea2bc8f4dd317e99715b4d1975f Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 16:42:47 -0800 Subject: [PATCH 6/8] quic: cleaning up Http3Application --- src/quic/node_quic_http3_application.cc | 83 ++++++++++++------------- src/quic/node_quic_http3_application.h | 30 ++++----- 2 files changed, 56 insertions(+), 57 deletions(-) diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index d251fc8386..f05bde00ac 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -609,7 +609,7 @@ bool Http3Application::SendStreamData(QuicStream* stream) { // This is where nghttp3 pulls the data from the outgoing // buffer to prepare it to be sent on the QUIC stream. -ssize_t Http3Application::H3ReadData( +ssize_t Http3Application::ReadData( int64_t stream_id, nghttp3_vec* vec, size_t veccnt, @@ -644,17 +644,17 @@ ssize_t Http3Application::H3ReadData( } // Outgoing data is retained in memory until it is acknowledged. -void Http3Application::H3AckedStreamData( +void Http3Application::AckedStreamData( int64_t stream_id, size_t datalen) { QuicStream* stream = session()->FindStream(stream_id); - if (stream) { + if (stream != nullptr) { stream->Acknowledge(0, datalen); - nghttp3_conn_resume_stream(connection(), stream_id); + ResumeStream(stream_id); } } -void Http3Application::H3StreamClose( +void Http3Application::StreamClosed( int64_t stream_id, uint64_t app_error_code) { session()->listener()->OnStreamClose(stream_id, app_error_code); @@ -662,7 +662,7 @@ void Http3Application::H3StreamClose( QuicStream* Http3Application::FindOrCreateStream(int64_t stream_id) { QuicStream* stream = session()->FindStream(stream_id); - if (!stream) { + if (stream == nullptr) { if (session()->is_gracefully_closing()) { nghttp3_conn_close_stream(connection(), stream_id, NGTCP2_ERR_CLOSING); return nullptr; @@ -674,22 +674,22 @@ QuicStream* Http3Application::FindOrCreateStream(int64_t stream_id) { return stream; } -void Http3Application::H3ReceiveData( +void Http3Application::ReceiveData( int64_t stream_id, const uint8_t* data, size_t datalen) { QuicStream* stream = FindOrCreateStream(stream_id); - if (stream) + if (stream != nullptr) stream->ReceiveData(0, data, datalen, 0); } -void Http3Application::H3DeferredConsume( +void Http3Application::DeferredConsume( int64_t stream_id, size_t consumed) { - H3ReceiveData(stream_id, nullptr, consumed); + ReceiveData(stream_id, nullptr, consumed); } -void Http3Application::H3BeginHeaders( +void Http3Application::BeginHeaders( int64_t stream_id, QuicStreamHeadersKind kind) { QuicStream* stream = FindOrCreateStream(stream_id); @@ -702,7 +702,7 @@ void Http3Application::H3BeginHeaders( // by the QuicStream until stream->EndHeaders() is called, during which // the collected headers are converted to an array and passed off to // the javascript side. -bool Http3Application::H3ReceiveHeader( +bool Http3Application::ReceiveHeader( int64_t stream_id, int32_t token, nghttp3_rcbuf* name, @@ -713,7 +713,7 @@ bool Http3Application::H3ReceiveHeader( if (!IsZeroLengthHeader(name, value)) { Debug(session(), "Receiving header for stream %" PRId64, stream_id); QuicStream* stream = session()->FindStream(stream_id); - if (stream) { + if (stream != nullptr) { if (token == NGHTTP3_QPACK_TOKEN__STATUS) { nghttp3_vec vec = nghttp3_rcbuf_get_buf(value); if (vec.base[0] == '1') @@ -728,22 +728,22 @@ bool Http3Application::H3ReceiveHeader( return true; } -void Http3Application::H3EndHeaders(int64_t stream_id) { +void Http3Application::EndHeaders(int64_t stream_id) { Debug(session(), "Ending header block for stream %" PRId64, stream_id); QuicStream* stream = session()->FindStream(stream_id); - if (stream) + if (stream != nullptr) stream->EndHeaders(); } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3BeginPushPromise( +int Http3Application::BeginPushPromise( int64_t stream_id, int64_t push_id) { return 0; } // TODO(@jasnell): Implement Push Promise Support -bool Http3Application::H3ReceivePushPromise( +bool Http3Application::ReceivePushPromise( int64_t stream_id, int64_t push_id, int32_t token, @@ -754,37 +754,35 @@ bool Http3Application::H3ReceivePushPromise( } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3EndPushPromise( +int Http3Application::EndPushPromise( int64_t stream_id, int64_t push_id) { return 0; } // TODO(@jasnell): Implement Push Promise Support -void Http3Application::H3CancelPush( +void Http3Application::CancelPush( int64_t push_id, int64_t stream_id) { } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3PushStream( +int Http3Application::PushStream( int64_t push_id, int64_t stream_id) { return 0; } -void Http3Application::H3SendStopSending( +void Http3Application::SendStopSending( int64_t stream_id, uint64_t app_error_code) { session()->ResetStream(stream_id, app_error_code); } -int Http3Application::H3EndStream( - int64_t stream_id) { - QuicStream* stream = FindOrCreateStream(stream_id); - if (stream) +void Http3Application::EndStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) stream->ReceiveData(1, nullptr, 0, 0); - return 0; } const nghttp3_conn_callbacks Http3Application::callbacks_[2] = { @@ -837,7 +835,7 @@ int Http3Application::OnAckedStreamData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3AckedStreamData(stream_id, datalen); + app->AckedStreamData(stream_id, datalen); return 0; } @@ -848,7 +846,7 @@ int Http3Application::OnStreamClose( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3StreamClose(stream_id, app_error_code); + app->StreamClosed(stream_id, app_error_code); return 0; } @@ -860,7 +858,7 @@ int Http3Application::OnReceiveData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3ReceiveData(stream_id, data, datalen); + app->ReceiveData(stream_id, data, datalen); return 0; } @@ -871,7 +869,7 @@ int Http3Application::OnDeferredConsume( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3DeferredConsume(stream_id, consumed); + app->DeferredConsume(stream_id, consumed); return 0; } @@ -881,7 +879,7 @@ int Http3Application::OnBeginHeaders( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3BeginHeaders(stream_id); + app->BeginHeaders(stream_id); return 0; } @@ -891,7 +889,7 @@ int Http3Application::OnBeginTrailers( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3BeginHeaders(stream_id, QUICSTREAM_HEADERS_KIND_TRAILING); + app->BeginHeaders(stream_id, QUICSTREAM_HEADERS_KIND_TRAILING); return 0; } @@ -907,7 +905,7 @@ int Http3Application::OnReceiveHeader( Http3Application* app = static_cast(conn_user_data); // TODO(@jasnell): Need to determine the appropriate response code here // for when the header is not going to be accepted. - return app->H3ReceiveHeader(stream_id, token, name, value, flags) ? + return app->ReceiveHeader(stream_id, token, name, value, flags) ? 0 : NGHTTP3_ERR_CALLBACK_FAILURE; } @@ -917,7 +915,7 @@ int Http3Application::OnEndHeaders( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3EndHeaders(stream_id); + app->EndHeaders(stream_id); return 0; } @@ -928,7 +926,7 @@ int Http3Application::OnBeginPushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3BeginPushPromise(stream_id, push_id); + return app->BeginPushPromise(stream_id, push_id); } int Http3Application::OnReceivePushPromise( @@ -942,7 +940,7 @@ int Http3Application::OnReceivePushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3ReceivePushPromise( + return app->ReceivePushPromise( stream_id, push_id, token, @@ -958,7 +956,7 @@ int Http3Application::OnEndPushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3EndPushPromise(stream_id, push_id); + return app->EndPushPromise(stream_id, push_id); } int Http3Application::OnCancelPush( @@ -968,7 +966,7 @@ int Http3Application::OnCancelPush( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3CancelPush(push_id, stream_id); + app->CancelPush(push_id, stream_id); return 0; } @@ -979,7 +977,7 @@ int Http3Application::OnSendStopSending( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3SendStopSending(stream_id, app_error_code); + app->SendStopSending(stream_id, app_error_code); return 0; } @@ -989,7 +987,7 @@ int Http3Application::OnPushStream( int64_t stream_id, void* conn_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3PushStream(push_id, stream_id); + return app->PushStream(push_id, stream_id); } int Http3Application::OnEndStream( @@ -998,7 +996,8 @@ int Http3Application::OnEndStream( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3EndStream(stream_id); + app->EndStream(stream_id); + return 0; } ssize_t Http3Application::OnReadData( @@ -1010,7 +1009,7 @@ ssize_t Http3Application::OnReadData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3ReadData(stream_id, vec, veccnt, pflags); + return app->ReadData(stream_id, vec, veccnt, pflags); } } // namespace quic } // namespace node diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index 598d3a8cb0..a4274da984 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -151,39 +151,39 @@ class Http3Application final : bool StreamCommit(StreamData* stream_data, size_t datalen) override; bool ShouldSetFin(const StreamData& data) override; - ssize_t H3ReadData( + ssize_t ReadData( int64_t stream_id, nghttp3_vec* vec, size_t veccnt, uint32_t* pflags); - void H3AckedStreamData(int64_t stream_id, size_t datalen); - void H3StreamClose(int64_t stream_id, uint64_t app_error_code); - void H3ReceiveData(int64_t stream_id, const uint8_t* data, size_t datalen); - void H3DeferredConsume(int64_t stream_id, size_t consumed); - void H3BeginHeaders( + void AckedStreamData(int64_t stream_id, size_t datalen); + void StreamClosed(int64_t stream_id, uint64_t app_error_code); + void ReceiveData(int64_t stream_id, const uint8_t* data, size_t datalen); + void DeferredConsume(int64_t stream_id, size_t consumed); + void BeginHeaders( int64_t stream_id, QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); - bool H3ReceiveHeader( + bool ReceiveHeader( int64_t stream_id, int32_t token, nghttp3_rcbuf* name, nghttp3_rcbuf* value, uint8_t flags); - void H3EndHeaders(int64_t stream_id); - int H3BeginPushPromise(int64_t stream_id, int64_t push_id); - bool H3ReceivePushPromise( + void EndHeaders(int64_t stream_id); + int BeginPushPromise(int64_t stream_id, int64_t push_id); + bool ReceivePushPromise( int64_t stream_id, int64_t push_id, int32_t token, nghttp3_rcbuf* name, nghttp3_rcbuf* value, uint8_t flags); - int H3EndPushPromise(int64_t stream_id, int64_t push_id); - void H3CancelPush(int64_t push_id, int64_t stream_id); - void H3SendStopSending(int64_t stream_id, uint64_t app_error_code); - int H3PushStream(int64_t push_id, int64_t stream_id); - int H3EndStream(int64_t stream_id); + int EndPushPromise(int64_t stream_id, int64_t push_id); + void CancelPush(int64_t push_id, int64_t stream_id); + void SendStopSending(int64_t stream_id, uint64_t app_error_code); + int PushStream(int64_t push_id, int64_t stream_id); + void EndStream(int64_t stream_id); bool is_control_stream(int64_t stream_id) const { return stream_id == control_stream_id_ || From 69c68abc9f00fc526b9ce25a1167e5062f7659b0 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 8 Jan 2020 17:24:59 -0800 Subject: [PATCH 7/8] quic: refactor SendPendingData For DefaultApplication, use a linked list to only iterate through QuicStream instances that have data to send. Use a single unified implementation of SendPendingData for QuicApplication. --- src/quic/node_quic_default_application.cc | 246 +++++++--------------- src/quic/node_quic_default_application.h | 20 +- src/quic/node_quic_http3_application.cc | 116 +--------- src/quic/node_quic_http3_application.h | 3 - src/quic/node_quic_session.cc | 141 ++++++++++--- src/quic/node_quic_session.h | 19 +- src/quic/node_quic_stream-inl.h | 10 + src/quic/node_quic_stream.h | 10 + 8 files changed, 238 insertions(+), 327 deletions(-) diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 64970752bf..4665c24f33 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -13,16 +13,69 @@ namespace node { namespace quic { +namespace { +void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) { + ngtcp2_vec* v = *pvec; + size_t cnt = *pcnt; + + for (; cnt > 0; --cnt, ++v) { + if (v->len > len) { + v->len -= len; + v->base += len; + break; + } + len -= v->len; + } + + *pvec = v; + *pcnt = cnt; +} + +int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { + size_t i; + for (i = 0; i < cnt && vec[i].len == 0; ++i) {} + return i == cnt; +} +} // anonymous namespace + DefaultApplication::DefaultApplication( QuicSession* session) : QuicApplication(session) {} bool DefaultApplication::Initialize() { - if (!needs_init()) - return false; - Debug(session(), "Default QUIC Application Initialized"); - set_init_done(); - return true; + if (needs_init()) { + Debug(session(), "Default QUIC Application Initialized"); + set_init_done(); + } + return needs_init(); +} + +void DefaultApplication::ScheduleStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + Debug(session(), "Scheduling stream %" PRIu64, stream_id); + if (stream != nullptr) + stream->Schedule(&stream_queue_); +} + +void DefaultApplication::UnscheduleStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + Debug(session(), "Unscheduling stream %" PRIu64, stream_id); + if (stream != nullptr) + stream->Unschedule(); +} + +void DefaultApplication::StreamClose( + int64_t stream_id, + uint64_t app_error_code) { + if (app_error_code == 0) + app_error_code = NGTCP2_APP_NOERROR; + UnscheduleStream(stream_id); + QuicApplication::StreamClose(stream_id, app_error_code); +} + +void DefaultApplication::ResumeStream(int64_t stream_id) { + Debug(session(), "Stream %" PRId64 " has data to send"); + ScheduleStream(stream_id); } bool DefaultApplication::ReceiveStreamData( @@ -59,71 +112,29 @@ bool DefaultApplication::ReceiveStreamData( return true; } -void DefaultApplication::AcknowledgeStreamData( - int64_t stream_id, - uint64_t offset, - size_t datalen) { - QuicStream* stream = session()->FindStream(stream_id); - Debug(session(), "Default QUIC Application acknowledging stream data"); - // It's possible that the stream has already been destroyed and - // removed. If so, just silently ignore the ack - if (stream != nullptr) - stream->Acknowledge(offset, datalen); -} - -bool DefaultApplication::SendPendingData() { - // Right now this iterates through the streams in the order they - // were created. Later, we might want to implement a prioritization - // scheme to allow higher priority streams to be serialized first. - // Prioritization is left entirely up to the application layer in QUIC. - // HTTP/3, for instance, drops prioritization entirely. - Debug(session(), "Default QUIC Application sending pending data"); - for (const auto& stream : session()->streams()) { - if (!SendStreamData(stream.second.get())) - return false; - - // Check to make sure QuicSession state did not change in this iteration - if (session()->is_in_draining_period() || - session()->is_in_closing_period() || - session()->is_destroyed()) { - break; - } - } - - return true; -} - -namespace { -void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) { - ngtcp2_vec* v = *pvec; - size_t cnt = *pcnt; - - for (; cnt > 0; --cnt, ++v) { - if (v->len > len) { - v->len -= len; - v->base += len; - break; - } - len -= v->len; - } - - *pvec = v; - *pcnt = cnt; -} - -int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { - size_t i; - for (i = 0; i < cnt && vec[i].len == 0; ++i) {} - return i == cnt; -} -} // anonymous namespace - int DefaultApplication::GetStreamData(StreamData* stream_data) { - QuicStream* stream = session()->FindStream(stream_data->id); + QuicStream* stream = stream_queue_.PopFront(); + // If stream is nullptr, there are no streams with data pending. + if (stream == nullptr) + return 0; + stream_data->remaining = - stream->DrainInto(&stream_data->data, &stream_data->count, 16); + stream->DrainInto( + &stream_data->data, + &stream_data->count, + MAX_VECTOR_COUNT); + + stream_data->user_data = stream; + stream_data->id = stream->id(); stream_data->fin = stream->is_writable() ? 0 : 1; + // Schedule the stream again only if there is data to write. There + // might not actually be any more data to write but we can't know + // that yet as it depends entirely on how much data actually gets + // serialized by ngtcp2. + if (stream_data->count > 0) + ScheduleStream(stream->id()); + Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", stream_data->count, stream_data->id, @@ -131,109 +142,11 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) { return 0; } -bool DefaultApplication::SendStreamData(QuicStream* stream) { - ssize_t ndatalen = 0; - QuicPathStorage path; - Debug(session(), "Default QUIC Application sending stream %" PRId64 " data", - stream->id()); - - StreamData stream_data; - stream_data.id = stream->id(); - stream_data.user_data = stream; - GetStreamData(&stream_data); - - // If there is no stream data and we're not sending fin, - // Just return without doing anything. - if (stream_data.count == 0 && !stream_data.fin) { - Debug(stream, "There is no stream data to send"); - return true; - } - - std::unique_ptr packet = CreateStreamDataPacket(); - uint8_t* pos = packet->data(); - - for (;;) { - // If packet was sent on the previous iteration, it will have been reset - if (!packet) { - packet = CreateStreamDataPacket(); - pos = packet->data(); - } - - ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); - - if (nwrite <= 0) { - switch (nwrite) { - case 0: - goto congestion_limited; - case NGTCP2_ERR_PKT_NUM_EXHAUSTED: - // There is a finite number of packets that can be sent - // per connection. Once those are exhausted, there's - // absolutely nothing we can do except immediately - // and silently tear down the QuicSession. This has - // to be silent because we can't even send a - // CONNECTION_CLOSE since even those require a - // packet number. - session()->SilentClose(); - return false; - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - session()->StreamDataBlocked(stream->id()); - if (session()->max_data_left() == 0) - goto congestion_limited; - return true; - case NGTCP2_ERR_STREAM_SHUT_WR: - if (UNLIKELY(!BlockStream(stream_data.id))) - return false; - return true; - case NGTCP2_ERR_STREAM_NOT_FOUND: - return true; - case NGTCP2_ERR_WRITE_STREAM_MORE: - CHECK_GT(ndatalen, 0); - CHECK(StreamCommit(&stream_data, ndatalen)); - pos += ndatalen; - continue; - } - session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); - return false; - } - - pos += nwrite; - - if (ndatalen >= 0) - CHECK(StreamCommit(&stream_data, ndatalen)); - - Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite); - packet->set_length(nwrite); - if (!session()->SendPacket(std::move(packet), path)) - return false; - - packet.reset(); - pos = nullptr; - - if (ShouldSetFin(stream_data)) - set_stream_fin(stream_data.id); - - if (IsEmpty(stream_data.buf, stream_data.count)) - break; - } - - return true; - - congestion_limited: - if (pos - packet->data()) { - // Some data was serialized into the packet. We need to send it. - packet->set_length(pos - packet->data()); - Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", - packet->length()); - if (!session()->SendPacket(std::move(packet), path)) - return false; - } - return true; -} - bool DefaultApplication::StreamCommit( StreamData* stream_data, size_t datalen) { QuicStream* stream = static_cast(stream_data->user_data); + CHECK_NOT_NULL(stream); stream_data->remaining -= datalen; Consume(&stream_data->buf, &stream_data->count, datalen); stream->Commit(datalen); @@ -241,7 +154,8 @@ bool DefaultApplication::StreamCommit( } bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) { - if (!IsEmpty(stream_data.buf, stream_data.count)) + if (stream_data.user_data == nullptr || + !IsEmpty(stream_data.buf, stream_data.count)) return false; QuicStream* stream = static_cast(stream_data.user_data); return !stream->is_writable(); diff --git a/src/quic/node_quic_default_application.h b/src/quic/node_quic_default_application.h index 5b30abe328..b12c617c6c 100644 --- a/src/quic/node_quic_default_application.h +++ b/src/quic/node_quic_default_application.h @@ -3,8 +3,10 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#include "node_quic_stream.h" #include "node_quic_session.h" #include "node_quic_util.h" +#include "util.h" #include "v8.h" namespace node { @@ -28,21 +30,23 @@ class DefaultApplication final : public QuicApplication { const uint8_t* data, size_t datalen, uint64_t offset) override; - void AcknowledgeStreamData( - int64_t stream_id, - uint64_t offset, - size_t datalen) override; - - bool SendPendingData() override; - bool SendStreamData(QuicStream* stream) override; int GetStreamData(StreamData* stream_data) override; - bool StreamCommit(StreamData* stream_data, size_t datalen) override; + + void ResumeStream(int64_t stream_id) override; + void StreamClose(int64_t stream_id, uint64_t app_error_code) override; bool ShouldSetFin(const StreamData& stream_data) override; + bool StreamCommit(StreamData* stream_data, size_t datalen) override; SET_SELF_SIZE(DefaultApplication) SET_MEMORY_INFO_NAME(DefaultApplication) SET_NO_MEMORY_INFO() + + private: + void ScheduleStream(int64_t stream_id); + void UnscheduleStream(int64_t stream_id); + + QuicStream::Queue stream_queue_; }; } // namespace quic diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index f05bde00ac..b54f44a182 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -491,122 +491,12 @@ bool Http3Application::BlockStream(int64_t stream_id) { return true; } -bool Http3Application::SendPendingData() { - // The maximum number of packets to send per call - static constexpr size_t MAX_PACKETS = 16; - QuicPathStorage path; - int err; - size_t packets_sent = 0; - - std::unique_ptr packet = CreateStreamDataPacket(); - uint8_t* pos = packet->data(); - - for (;;) { - ssize_t ndatalen; - StreamData stream_data; - err = GetStreamData(&stream_data); - if (err < 0) { - session()->set_last_error(QUIC_ERROR_APPLICATION, err); - return false; - } - - // If stream_data.id is -1, then we're not serializing any data for any - // specific stream. We still need to process QUIC session packets tho. - if (stream_data.id > -1) - Debug(session(), "Serializing packets for stream id %" PRId64, - stream_data.id); - else - Debug(session(), "Serializing session packets"); - - // If the packet was sent previously, then packet will have been reset. - if (!packet) { - packet = CreateStreamDataPacket(); - pos = packet->data(); - } - - ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); - - if (nwrite <= 0) { - switch (nwrite) { - case 0: - goto congestion_limited; - case NGTCP2_ERR_PKT_NUM_EXHAUSTED: - // There is a finite number of packets that can be sent - // per connection. Once those are exhausted, there's - // absolutely nothing we can do except immediately - // and silently tear down the QuicSession. This has - // to be silent because we can't even send a - // CONNECTION_CLOSE since even those require a - // packet number. - session()->SilentClose(); - return false; - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - session()->StreamDataBlocked(stream_data.id); - if (session()->max_data_left() == 0) - goto congestion_limited; - // Fall through - case NGTCP2_ERR_STREAM_SHUT_WR: - if (UNLIKELY(!BlockStream(stream_data.id))) - return false; - continue; - case NGTCP2_ERR_STREAM_NOT_FOUND: - continue; - case NGTCP2_ERR_WRITE_STREAM_MORE: - CHECK_GT(ndatalen, 0); - CHECK(StreamCommit(&stream_data, ndatalen)); - pos += ndatalen; - continue; - } - session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); - return false; - } - - pos += nwrite; - - if (ndatalen >= 0) - CHECK(StreamCommit(&stream_data, ndatalen)); - - Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite); - packet->set_length(nwrite); - if (!session()->SendPacket(std::move(packet), path)) - return false; - - packet.reset(); - pos = nullptr; - - if (ShouldSetFin(stream_data)) - set_stream_fin(stream_data.id); - - if (++packets_sent == MAX_PACKETS) - break; - } - return true; - - congestion_limited: - // We are either congestion limited or done. - if (pos - packet->data()) { - // Some data was serialized into the packet. We need to send it. - packet->set_length(pos - packet->data()); - Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", - packet->length()); - if (!session()->SendPacket(std::move(packet), path)) - return false; - } - return true; -} - bool Http3Application::ShouldSetFin(const StreamData& stream_data) { return stream_data.id > -1 && !is_control_stream(stream_data.id) && stream_data.fin == 1; } -bool Http3Application::SendStreamData(QuicStream* stream) { - // Data is available now, so resume the stream. - nghttp3_conn_resume_stream(connection(), stream->id()); - return SendPendingData(); -} - // This is where nghttp3 pulls the data from the outgoing // buffer to prepare it to be sent on the QUIC stream. ssize_t Http3Application::ReadData( @@ -647,11 +537,7 @@ ssize_t Http3Application::ReadData( void Http3Application::AckedStreamData( int64_t stream_id, size_t datalen) { - QuicStream* stream = session()->FindStream(stream_id); - if (stream != nullptr) { - stream->Acknowledge(0, datalen); - ResumeStream(stream_id); - } + Acknowledge(stream_id, 0, datalen); } void Http3Application::StreamClosed( diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index a4274da984..7483d760cc 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -126,9 +126,6 @@ class Http3Application final : int64_t stream_id, v8::Local headers) override; - bool SendPendingData() override; - bool SendStreamData(QuicStream* stream) override; - // Implementation for mem::NgLibMemoryManager void CheckAllocatedSize(size_t previous_size) const; void IncreaseAllocatedSize(size_t size); diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index ddc4a26992..a430b49e47 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -1098,6 +1098,121 @@ void QuicCryptoContext::WriteHandshake( handshake_[level].Push(std::move(buffer)); } +void QuicApplication::Acknowledge( + int64_t stream_id, + uint64_t offset, + size_t datalen) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) { + stream->Acknowledge(offset, datalen); + ResumeStream(stream_id); + } +} + +bool QuicApplication::SendPendingData() { + // The maximum number of packets to send per call + static constexpr size_t MAX_PACKETS = 16; + QuicPathStorage path; + std::unique_ptr packet; + uint8_t* pos = nullptr; + size_t packets_sent = 0; + int err; + + for (;;) { + ssize_t ndatalen; + StreamData stream_data; + err = GetStreamData(&stream_data); + if (err < 0) { + session()->set_last_error(QUIC_ERROR_APPLICATION, err); + return false; + } + + // If stream_data.id is -1, then we're not serializing any data for any + // specific stream. We still need to process QUIC session packets tho. + if (stream_data.id > -1) + Debug(session(), "Serializing packets for stream id %" PRId64, + stream_data.id); + else + Debug(session(), "Serializing session packets"); + + // If the packet was sent previously, then packet will have been reset. + if (!packet) { + packet = CreateStreamDataPacket(); + pos = packet->data(); + } + + ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); + + if (nwrite <= 0) { + switch (nwrite) { + case 0: + goto congestion_limited; + case NGTCP2_ERR_PKT_NUM_EXHAUSTED: + // There is a finite number of packets that can be sent + // per connection. Once those are exhausted, there's + // absolutely nothing we can do except immediately + // and silently tear down the QuicSession. This has + // to be silent because we can't even send a + // CONNECTION_CLOSE since even those require a + // packet number. + session()->SilentClose(); + return false; + case NGTCP2_ERR_STREAM_DATA_BLOCKED: + session()->StreamDataBlocked(stream_data.id); + if (session()->max_data_left() == 0) + goto congestion_limited; + // Fall through + case NGTCP2_ERR_STREAM_SHUT_WR: + if (UNLIKELY(!BlockStream(stream_data.id))) + return false; + continue; + case NGTCP2_ERR_STREAM_NOT_FOUND: + continue; + case NGTCP2_ERR_WRITE_STREAM_MORE: + CHECK_GT(ndatalen, 0); + CHECK(StreamCommit(&stream_data, ndatalen)); + pos += ndatalen; + continue; + } + session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); + return false; + } + + pos += nwrite; + + if (ndatalen >= 0) + CHECK(StreamCommit(&stream_data, ndatalen)); + + Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite); + packet->set_length(nwrite); + if (!session()->SendPacket(std::move(packet), path)) + return false; + packet.reset(); + pos = nullptr; + MaybeSetFin(stream_data); + if (++packets_sent == MAX_PACKETS) + break; + } + return true; + + congestion_limited: + // We are either congestion limited or done. + if (pos - packet->data()) { + // Some data was serialized into the packet. We need to send it. + packet->set_length(pos - packet->data()); + Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", + packet->length()); + if (!session()->SendPacket(std::move(packet), path)) + return false; + } + return true; +} + +void QuicApplication::MaybeSetFin(const StreamData& stream_data) { + if (ShouldSetFin(stream_data)) + set_stream_fin(stream_data.id); +} + void QuicApplication::StreamHeaders( int64_t stream_id, int kind, @@ -2040,32 +2155,6 @@ void QuicSession::UsePreferredAddressStrategy( } } -// Sends buffered stream data. -bool QuicSession::SendStreamData(QuicStream* stream) { - // Because SendStreamData calls ngtcp2_conn_writev_streams, - // it is not permitted to be called while we are running within - // an ngtcp2 callback function. - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); - - // No stream data may be serialized and sent if: - // - the QuicSession is destroyed - // - the QuicStream was never writable, - // - a final stream frame has already been sent, - // - the QuicSession is in the draining period, - // - the QuicSession is in the closing period, or - // - we are blocked from sending any data because of flow control - if (is_destroyed() || - !stream->was_ever_writable() || - stream->has_sent_fin() || - is_in_draining_period() || - is_in_closing_period() || - max_data_left() == 0) { - return true; - } - - return application_->SendStreamData(stream); -} - // Passes a serialized packet to the associated QuicSocket. bool QuicSession::SendPacket(std::unique_ptr packet) { CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED)); diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index e23e0fdbf5..e2c162e0cf 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -497,12 +497,12 @@ class QuicApplication : public MemoryRetainer { virtual void AcknowledgeStreamData( int64_t stream_id, uint64_t offset, - size_t datalen) = 0; + size_t datalen) { Acknowledge(stream_id, offset, datalen); } + virtual bool BlockStream(int64_t id) { return true; } virtual void ExtendMaxStreamsRemoteUni(uint64_t max_streams) {} virtual void ExtendMaxStreamsRemoteBidi(uint64_t max_streams) {} virtual void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) {} - virtual bool SendPendingData() = 0; - virtual bool SendStreamData(QuicStream* stream) = 0; + virtual void ResumeStream(int64_t stream_id) {} virtual void StreamHeaders( int64_t stream_id, int kind, @@ -511,7 +511,6 @@ class QuicApplication : public MemoryRetainer { int64_t stream_id, uint64_t app_error_code); virtual void StreamOpen(int64_t stream_id) {} - virtual void ResumeStream(int64_t stream_id) {} virtual void StreamReset( int64_t stream_id, uint64_t final_size, @@ -526,10 +525,10 @@ class QuicApplication : public MemoryRetainer { virtual bool SubmitTrailers( int64_t stream_id, v8::Local headers) { return false; } - virtual bool BlockStream(int64_t id) { return true; } inline Environment* env() const; + bool SendPendingData(); size_t max_header_pairs() const { return max_header_pairs_; } size_t max_header_length() const { return max_header_length_; } @@ -547,13 +546,17 @@ class QuicApplication : public MemoryRetainer { size_t remaining = 0; int64_t id = -1; int fin = 0; - ngtcp2_vec data[16] {}; + ngtcp2_vec data[MAX_VECTOR_COUNT] {}; ngtcp2_vec* buf = nullptr; void* user_data = nullptr; uint8_t* pos = nullptr; StreamData() { buf = data; } }; + void Acknowledge( + int64_t stream_id, + uint64_t offset, + size_t datalen); virtual int GetStreamData(StreamData* data) = 0; virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; virtual bool ShouldSetFin(const StreamData& data) = 0; @@ -565,6 +568,7 @@ class QuicApplication : public MemoryRetainer { const StreamData& stream_data); private: + void MaybeSetFin(const StreamData& stream_data); QuicSession* session_; bool needs_init_ = true; size_t max_header_pairs_ = 0; @@ -850,9 +854,6 @@ class QuicSession : public AsyncWrap, // Causes pending ngtcp2 frames to be serialized and sent void SendPendingData(); - // Causes pending QuicStream data to be serialized and sent - bool SendStreamData(QuicStream* stream); - inline bool SendPacket( std::unique_ptr packet, const ngtcp2_path_storage& path); diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h index 8d8943bc29..530fddc3c8 100644 --- a/src/quic/node_quic_stream-inl.h +++ b/src/quic/node_quic_stream-inl.h @@ -220,6 +220,16 @@ size_t QuicStream::DrainInto( return length; } +void QuicStream::Schedule(Queue* queue) { + if (!stream_queue_.IsEmpty()) // Already scheduled? + return; + queue->PushBack(this); +} + +void QuicStream::Unschedule() { + stream_queue_.Remove(); +} + } // namespace quic } // namespace node diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index 76396516a7..a0b7a767c7 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -9,6 +9,7 @@ #include "histogram-inl.h" #include "node_quic_util.h" #include "stream_base-inl.h" +#include "util-inl.h" #include "v8.h" #include @@ -18,6 +19,7 @@ namespace node { namespace quic { class QuicSession; +class QuicApplication; enum QuicStreamHeaderFlags : uint32_t { // No flags @@ -420,6 +422,14 @@ class QuicStream : public AsyncWrap, public StreamBase { BaseObjectPtr data_rx_ack_; AliasedBigUint64Array stats_buffer_; + + ListNode stream_queue_; + + public: + // Linked List of QuicStream objects + using Queue = ListHead; + inline void Schedule(Queue* queue); + inline void Unschedule(); }; } // namespace quic From 867d1c37370cccac5836abf5cf67aa951d2e3c56 Mon Sep 17 00:00:00 2001 From: James M Snell Date: Thu, 9 Jan 2020 10:48:53 -0800 Subject: [PATCH 8/8] [Squash] nits --- src/quic/node_quic_default_application.cc | 14 +++++----- src/quic/node_quic_session-inl.h | 2 +- src/quic/node_quic_session.cc | 4 +-- src/quic/node_quic_session.h | 3 +-- src/quic/node_quic_stream-inl.h | 28 -------------------- src/quic/node_quic_stream.cc | 31 +++++++++++++++++++++-- src/quic/node_quic_stream.h | 8 +++--- 7 files changed, 43 insertions(+), 47 deletions(-) diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 4665c24f33..a151796b93 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -124,7 +124,7 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) { &stream_data->count, MAX_VECTOR_COUNT); - stream_data->user_data = stream; + stream_data->stream.reset(stream); stream_data->id = stream->id(); stream_data->fin = stream->is_writable() ? 0 : 1; @@ -133,7 +133,7 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) { // that yet as it depends entirely on how much data actually gets // serialized by ngtcp2. if (stream_data->count > 0) - ScheduleStream(stream->id()); + stream->Schedule(&stream_queue_); Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", stream_data->count, @@ -145,20 +145,18 @@ int DefaultApplication::GetStreamData(StreamData* stream_data) { bool DefaultApplication::StreamCommit( StreamData* stream_data, size_t datalen) { - QuicStream* stream = static_cast(stream_data->user_data); - CHECK_NOT_NULL(stream); + CHECK(stream_data->stream); stream_data->remaining -= datalen; Consume(&stream_data->buf, &stream_data->count, datalen); - stream->Commit(datalen); + stream_data->stream->Commit(datalen); return true; } bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) { - if (stream_data.user_data == nullptr || + if (!stream_data.stream || !IsEmpty(stream_data.buf, stream_data.count)) return false; - QuicStream* stream = static_cast(stream_data.user_data); - return !stream->is_writable(); + return !stream_data.stream->is_writable(); } } // namespace quic diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index aebd3c939e..62fda5fd57 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -149,7 +149,7 @@ ssize_t QuicApplication::WriteVStream( uint8_t* buf, ssize_t* ndatalen, const StreamData& stream_data) { - CHECK_LE(stream_data.count, 16); + CHECK_LE(stream_data.count, MAX_VECTOR_COUNT); return ngtcp2_conn_writev_stream( session()->connection(), &path->path, diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index a430b49e47..8b44247fcf 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -1111,7 +1111,7 @@ void QuicApplication::Acknowledge( bool QuicApplication::SendPendingData() { // The maximum number of packets to send per call - static constexpr size_t MAX_PACKETS = 16; + static constexpr size_t kMaxPackets = 16; QuicPathStorage path; std::unique_ptr packet; uint8_t* pos = nullptr; @@ -1190,7 +1190,7 @@ bool QuicApplication::SendPendingData() { packet.reset(); pos = nullptr; MaybeSetFin(stream_data); - if (++packets_sent == MAX_PACKETS) + if (++packets_sent == kMaxPackets) break; } return true; diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index e2c162e0cf..e2d893cb00 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -548,8 +548,7 @@ class QuicApplication : public MemoryRetainer { int fin = 0; ngtcp2_vec data[MAX_VECTOR_COUNT] {}; ngtcp2_vec* buf = nullptr; - void* user_data = nullptr; - uint8_t* pos = nullptr; + BaseObjectPtr stream; StreamData() { buf = data; } }; diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h index 530fddc3c8..9981ead8df 100644 --- a/src/quic/node_quic_stream-inl.h +++ b/src/quic/node_quic_stream-inl.h @@ -75,14 +75,6 @@ bool QuicStream::is_read_paused() const { return flags_ & QUICSTREAM_FLAG_READ_PAUSED; } -bool QuicStream::IsAlive() { - return !is_destroyed() && !IsClosing(); -} - -bool QuicStream::IsClosing() { - return !is_writable() && !is_readable(); -} - void QuicStream::set_fin_sent() { CHECK(!is_writable()); flags_ |= QUICSTREAM_FLAG_FIN_SENT; @@ -168,26 +160,6 @@ void QuicStream::Commit(ssize_t amount) { streambuf_.Seek(amount); } -int QuicStream::ReadStart() { - CHECK(!is_destroyed()); - CHECK(is_readable()); - set_read_start(); - set_read_resume(); - IncrementStat( - inbound_consumed_data_while_paused_, - &stream_stats_, - &stream_stats::max_offset); - session_->ExtendStreamOffset(this, inbound_consumed_data_while_paused_); - return 0; -} - -int QuicStream::ReadStop() { - CHECK(!is_destroyed()); - CHECK(is_readable()); - set_read_pause(); - return 0; -} - void QuicStream::ResetStream(uint64_t app_error_code) { // On calling shutdown, the stream will no longer be // readable or writable, all any pending data in the diff --git a/src/quic/node_quic_stream.cc b/src/quic/node_quic_stream.cc index 54e0bf315e..d9dc24f2ef 100644 --- a/src/quic/node_quic_stream.cc +++ b/src/quic/node_quic_stream.cc @@ -203,8 +203,7 @@ int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { session()->ResumeStream(stream_id_); } - req_wrap->Done(0); - return 0; + return 1; } int QuicStream::DoWrite( @@ -265,6 +264,34 @@ int QuicStream::DoWrite( return 0; } +bool QuicStream::IsAlive() { + return !is_destroyed() && !IsClosing(); +} + +bool QuicStream::IsClosing() { + return !is_writable() && !is_readable(); +} + +int QuicStream::ReadStart() { + CHECK(!is_destroyed()); + CHECK(is_readable()); + set_read_start(); + set_read_resume(); + IncrementStat( + inbound_consumed_data_while_paused_, + &stream_stats_, + &stream_stats::max_offset); + session_->ExtendStreamOffset(this, inbound_consumed_data_while_paused_); + return 0; +} + +int QuicStream::ReadStop() { + CHECK(!is_destroyed()); + CHECK(is_readable()); + set_read_pause(); + return 0; +} + void QuicStream::IncrementStats(size_t datalen) { uint64_t len = static_cast(datalen); IncrementStat(len, &stream_stats_, &stream_stats::bytes_received); diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index a0b7a767c7..6853925ae5 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -341,10 +341,10 @@ class QuicStream : public AsyncWrap, public StreamBase { inline bool SubmitTrailers(v8::Local headers); // Required for StreamBase - inline bool IsAlive() override; - inline bool IsClosing() override; - inline int ReadStart() override; - inline int ReadStop() override; + bool IsAlive() override; + bool IsClosing() override; + int ReadStart() override; + int ReadStop() override; int DoShutdown(ShutdownWrap* req_wrap) override; AsyncWrap* GetAsyncWrap() override { return this; }