diff --git a/node.gyp b/node.gyp index bbfa1cd211..09ad9a58c8 100644 --- a/node.gyp +++ b/node.gyp @@ -853,6 +853,7 @@ 'src/quic/node_quic_session-inl.h', 'src/quic/node_quic_socket.h', 'src/quic/node_quic_stream.h', + 'src/quic/node_quic_stream-inl.h', 'src/quic/node_quic_util.h', 'src/quic/node_quic_util-inl.h', 'src/quic/node_quic_state.h', diff --git a/src/quic/node_quic.cc b/src/quic/node_quic.cc index 3975b05381..044d7eddec 100644 --- a/src/quic/node_quic.cc +++ b/src/quic/node_quic.cc @@ -9,7 +9,7 @@ #include "node_quic_crypto.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_state.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" diff --git a/src/quic/node_quic_default_application.cc b/src/quic/node_quic_default_application.cc index 08494aa6ca..a151796b93 100644 --- a/src/quic/node_quic_default_application.cc +++ b/src/quic/node_quic_default_application.cc @@ -3,7 +3,7 @@ #include "node_quic_default_application.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" #include @@ -13,16 +13,69 @@ namespace node { namespace quic { +namespace { +void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) { + ngtcp2_vec* v = *pvec; + size_t cnt = *pcnt; + + for (; cnt > 0; --cnt, ++v) { + if (v->len > len) { + v->len -= len; + v->base += len; + break; + } + len -= v->len; + } + + *pvec = v; + *pcnt = cnt; +} + +int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { + size_t i; + for (i = 0; i < cnt && vec[i].len == 0; ++i) {} + return i == cnt; +} +} // anonymous namespace + DefaultApplication::DefaultApplication( QuicSession* session) : QuicApplication(session) {} bool DefaultApplication::Initialize() { - if (!needs_init()) - return false; - Debug(session(), "Default QUIC Application Initialized"); - set_init_done(); - return true; + if (needs_init()) { + Debug(session(), "Default QUIC Application Initialized"); + set_init_done(); + } + return needs_init(); +} + +void DefaultApplication::ScheduleStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + Debug(session(), "Scheduling stream %" PRIu64, stream_id); + if (stream != nullptr) + stream->Schedule(&stream_queue_); +} + +void DefaultApplication::UnscheduleStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + Debug(session(), "Unscheduling stream %" PRIu64, stream_id); + if (stream != nullptr) + stream->Unschedule(); +} + +void DefaultApplication::StreamClose( + int64_t stream_id, + uint64_t app_error_code) { + if (app_error_code == 0) + app_error_code = NGTCP2_APP_NOERROR; + UnscheduleStream(stream_id); + QuicApplication::StreamClose(stream_id, app_error_code); +} + +void DefaultApplication::ResumeStream(int64_t stream_id) { + Debug(session(), "Stream %" PRId64 " has data to send"); + ScheduleStream(stream_id); } bool DefaultApplication::ReceiveStreamData( @@ -59,197 +112,51 @@ bool DefaultApplication::ReceiveStreamData( return true; } -void DefaultApplication::AcknowledgeStreamData( - int64_t stream_id, - uint64_t offset, - size_t datalen) { - QuicStream* stream = session()->FindStream(stream_id); - Debug(session(), "Default QUIC Application acknowledging stream data"); - // It's possible that the stream has already been destroyed and - // removed. If so, just silently ignore the ack - if (stream != nullptr) - stream->AckedDataOffset(offset, datalen); +int DefaultApplication::GetStreamData(StreamData* stream_data) { + QuicStream* stream = stream_queue_.PopFront(); + // If stream is nullptr, there are no streams with data pending. + if (stream == nullptr) + return 0; + + stream_data->remaining = + stream->DrainInto( + &stream_data->data, + &stream_data->count, + MAX_VECTOR_COUNT); + + stream_data->stream.reset(stream); + stream_data->id = stream->id(); + stream_data->fin = stream->is_writable() ? 0 : 1; + + // Schedule the stream again only if there is data to write. There + // might not actually be any more data to write but we can't know + // that yet as it depends entirely on how much data actually gets + // serialized by ngtcp2. + if (stream_data->count > 0) + stream->Schedule(&stream_queue_); + + Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", + stream_data->count, + stream_data->id, + stream_data->fin == 1 ? " (fin)" : ""); + return 0; } -bool DefaultApplication::SendPendingData() { - // Right now this iterates through the streams in the order they - // were created. Later, we might want to implement a prioritization - // scheme to allow higher priority streams to be serialized first. - // Prioritization is left entirely up to the application layer in QUIC. - // HTTP/3, for instance, drops prioritization entirely. - Debug(session(), "Default QUIC Application sending pending data"); - for (const auto& stream : session()->streams()) { - if (!SendStreamData(stream.second.get())) - return false; - - // Check to make sure QuicSession state did not change in this iteration - if (session()->is_in_draining_period() || - session()->is_in_closing_period() || - session()->is_destroyed()) { - break; - } - } - +bool DefaultApplication::StreamCommit( + StreamData* stream_data, + size_t datalen) { + CHECK(stream_data->stream); + stream_data->remaining -= datalen; + Consume(&stream_data->buf, &stream_data->count, datalen); + stream_data->stream->Commit(datalen); return true; } -namespace { -void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) { - ngtcp2_vec* v = *pvec; - size_t cnt = *pcnt; - - for (; cnt > 0; --cnt, ++v) { - if (v->len > len) { - v->len -= len; - v->base += len; - break; - } - len -= v->len; - } - - *pvec = v; - *pcnt = cnt; -} - -int IsEmpty(const ngtcp2_vec* vec, size_t cnt) { - size_t i; - for (i = 0; i < cnt && vec[i].len == 0; ++i) {} - return i == cnt; -} -} // anonymous namespace - -bool DefaultApplication::SendStreamData(QuicStream* stream) { - ssize_t ndatalen = 0; - QuicPathStorage path; - Debug(session(), "Default QUIC Application sending stream %" PRId64 " data", - stream->GetID()); - - std::vector vec; - - // remaining is the total number of bytes stored in the vector - // that are remaining to be serialized. - size_t remaining = stream->DrainInto(&vec); - Debug(stream, "Sending %d bytes of stream data. Still writable? %s", - remaining, - stream->is_writable() ? "yes" : "no"); - - // c and v are used to track the current serialization position - // for each iteration of the for(;;) loop below. - size_t c = vec.size(); - ngtcp2_vec* v = vec.data(); - - // If there is no stream data and we're not sending fin, - // Just return without doing anything. - if (c == 0 && stream->is_writable()) { - Debug(stream, "There is no stream data to send"); - return true; - } - - std::unique_ptr packet = CreateStreamDataPacket(); - size_t packet_offset = 0; - - for (;;) { - Debug(stream, "Starting packet serialization. Remaining? %d", remaining); - - // If packet was sent on the previous iteration, it will have been reset - if (!packet) - packet = CreateStreamDataPacket(); - - ssize_t nwrite = - ngtcp2_conn_writev_stream( - session()->connection(), - &path.path, - packet->data() + packet_offset, - session()->max_packet_length(), - &ndatalen, - remaining > 0 ? - NGTCP2_WRITE_STREAM_FLAG_MORE : - NGTCP2_WRITE_STREAM_FLAG_NONE, - stream->GetID(), - stream->is_writable() ? 0 : 1, - reinterpret_cast(v), - c, - uv_hrtime()); - - if (nwrite <= 0) { - switch (nwrite) { - case 0: - // If zero is returned, we've hit congestion limits. We need to stop - // serializing data and try again later to empty the queue once the - // congestion window has expanded. - Debug(stream, "Congestion limit reached"); - return true; - case NGTCP2_ERR_PKT_NUM_EXHAUSTED: - // There is a finite number of packets that can be sent - // per connection. Once those are exhausted, there's - // absolutely nothing we can do except immediately - // and silently tear down the QuicSession. This has - // to be silent because we can't even send a - // CONNECTION_CLOSE since even those require a - // packet number. - session()->SilentClose(); - return false; - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - Debug(stream, "Stream data blocked"); - session()->StreamDataBlocked(stream->GetID()); - return true; - case NGTCP2_ERR_STREAM_SHUT_WR: - Debug(stream, "Stream writable side is closed"); - return true; - case NGTCP2_ERR_STREAM_NOT_FOUND: - Debug(stream, "Stream does not exist"); - return true; - case NGTCP2_ERR_WRITE_STREAM_MORE: - if (ndatalen > 0) { - remaining -= ndatalen; - Debug(stream, - "%" PRIu64 " stream bytes serialized into packet. %d remaining", - ndatalen, - remaining); - Consume(&v, &c, ndatalen); - stream->Commit(ndatalen); - packet_offset += ndatalen; - } - continue; - default: - Debug(stream, "Error writing packet. Code %" PRIu64, nwrite); - session()->set_last_error( - QUIC_ERROR_SESSION, - static_cast(nwrite)); - return false; - } - } - - if (ndatalen > 0) { - remaining -= ndatalen; - Debug(stream, - "%" PRIu64 " stream bytes serialized into packet. %d remaining", - ndatalen, - remaining); - Consume(&v, &c, ndatalen); - stream->Commit(ndatalen); - } - - Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite); - packet->set_length(nwrite); - if (!session()->SendPacket(std::move(packet), path)) - return false; - - packet.reset(); - packet_offset = 0; - - if (IsEmpty(v, c)) { - // fin will have been set if all of the data has been - // encoded in the packet and is_writable() returns false. - if (!stream->is_writable()) { - Debug(stream, "Final stream has been sent"); - stream->set_fin_sent(); - } - break; - } - } - - return true; +bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) { + if (!stream_data.stream || + !IsEmpty(stream_data.buf, stream_data.count)) + return false; + return !stream_data.stream->is_writable(); } } // namespace quic diff --git a/src/quic/node_quic_default_application.h b/src/quic/node_quic_default_application.h index 65fad9875e..b12c617c6c 100644 --- a/src/quic/node_quic_default_application.h +++ b/src/quic/node_quic_default_application.h @@ -3,8 +3,10 @@ #if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#include "node_quic_stream.h" #include "node_quic_session.h" #include "node_quic_util.h" +#include "util.h" #include "v8.h" namespace node { @@ -28,17 +30,23 @@ class DefaultApplication final : public QuicApplication { const uint8_t* data, size_t datalen, uint64_t offset) override; - void AcknowledgeStreamData( - int64_t stream_id, - uint64_t offset, - size_t datalen) override; - bool SendPendingData() override; - bool SendStreamData(QuicStream* stream) override; + int GetStreamData(StreamData* stream_data) override; + + void ResumeStream(int64_t stream_id) override; + void StreamClose(int64_t stream_id, uint64_t app_error_code) override; + bool ShouldSetFin(const StreamData& stream_data) override; + bool StreamCommit(StreamData* stream_data, size_t datalen) override; SET_SELF_SIZE(DefaultApplication) SET_MEMORY_INFO_NAME(DefaultApplication) SET_NO_MEMORY_INFO() + + private: + void ScheduleStream(int64_t stream_id); + void UnscheduleStream(int64_t stream_id); + + QuicStream::Queue stream_queue_; }; } // namespace quic diff --git a/src/quic/node_quic_http3_application.cc b/src/quic/node_quic_http3_application.cc index 40f143c081..b54f44a182 100644 --- a/src/quic/node_quic_http3_application.cc +++ b/src/quic/node_quic_http3_application.cc @@ -5,7 +5,7 @@ #include "node_quic_http3_application.h" #include "node_quic_session-inl.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util-inl.h" #include "node_sockaddr-inl.h" #include "node_http_common-inl.h" @@ -133,6 +133,11 @@ size_t Http3Header::length() const { return name().length() + value().length(); } +void Http3Header::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("name", name_); + tracker->TrackField("value", value_); +} + namespace { template inline void SetConfig(Environment* env, int idx, t* val) { @@ -427,6 +432,10 @@ void Http3Application::StreamReset( QuicApplication::StreamReset(stream_id, final_size, app_error_code); } +void Http3Application::ResumeStream(int64_t stream_id) { + nghttp3_conn_resume_stream(connection(), stream_id); +} + void Http3Application::ExtendMaxStreamsRemoteUni(uint64_t max_streams) { nghttp3_conn_set_max_client_streams_bidi(connection(), max_streams); } @@ -437,11 +446,11 @@ void Http3Application::ExtendMaxStreamData( nghttp3_conn_unblock_stream(connection(), stream_id); } -bool Http3Application::StreamCommit(int64_t stream_id, ssize_t datalen) { - CHECK_GT(datalen, 0); +bool Http3Application::StreamCommit(StreamData* stream_data, size_t datalen) { + CHECK_GE(datalen, 0); int err = nghttp3_conn_add_write_offset( connection(), - stream_id, + stream_data->id, datalen); if (err != 0) { session()->set_last_error(QUIC_ERROR_APPLICATION, err); @@ -450,122 +459,47 @@ bool Http3Application::StreamCommit(int64_t stream_id, ssize_t datalen) { return true; } -void Http3Application::set_stream_fin(int64_t stream_id) { - if (!is_control_stream(stream_id)) { - QuicStream* stream = session()->FindStream(stream_id); - if (stream != nullptr) - stream->set_fin_sent(); +int Http3Application::GetStreamData(StreamData* stream_data) { + ssize_t ret = 0; + if (connection() && session()->max_data_left()) { + ret = nghttp3_conn_writev_stream( + connection(), + &stream_data->id, + &stream_data->fin, + reinterpret_cast(stream_data->data), + sizeof(stream_data->data)); + if (ret < 0) + return static_cast(ret); + else + stream_data->remaining = stream_data->count = static_cast(ret); } + if (stream_data->id > -1) { + Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s", + stream_data->count, + stream_data->id, + stream_data->fin == 1 ? " (fin)" : ""); + } + return 0; } -bool Http3Application::SendPendingData() { - std::array vec; - QuicPathStorage path; - int err; - - std::unique_ptr packet = CreateStreamDataPacket(); - size_t packet_offset = 0; - - for (;;) { - int64_t stream_id = -1; - int fin = 0; - ssize_t sveccnt = 0; - - // First, grab the outgoing data from nghttp3 - if (connection() && session()->max_data_left()) { - sveccnt = - nghttp3_conn_writev_stream( - connection(), - &stream_id, - &fin, - vec.data(), - vec.size()); - - if (sveccnt < 0) - return false; - } - - Debug(session(), "Serializing packets for stream id %" PRId64, stream_id); - - ssize_t ndatalen; - nghttp3_vec* v = vec.data(); - size_t vcnt = static_cast(sveccnt); - - // If packet was sent on previous iteration, it will have been reset. - if (!packet) - packet = CreateStreamDataPacket(); - - // Second, serialize as much of the outgoing data as possible to a - // QUIC packet for transmission. We'll keep iterating until there - // is no more data to transmit. - ssize_t nwrite = - ngtcp2_conn_writev_stream( - session()->connection(), - &path.path, - packet->data() + packet_offset, - session()->max_packet_length(), - &ndatalen, - NGTCP2_WRITE_STREAM_FLAG_MORE, - stream_id, - fin, - reinterpret_cast(v), - vcnt, - uv_hrtime()); - - if (nwrite < 0) { - switch (nwrite) { - case NGTCP2_ERR_STREAM_DATA_BLOCKED: - session()->StreamDataBlocked(stream_id); - if (session()->max_data_left() == 0) - return true; - // Fall through - case NGTCP2_ERR_STREAM_SHUT_WR: - err = nghttp3_conn_block_stream(connection(), stream_id); - if (err != 0) { - session()->set_last_error(QUIC_ERROR_APPLICATION, err); - return false; - } - continue; - case NGTCP2_ERR_WRITE_STREAM_MORE: - if (ndatalen > 0) { - CHECK(StreamCommit(stream_id, ndatalen)); - packet_offset += ndatalen; - } - continue; - } - // session()->set_last_error(QUIC_ERROR_APPLICATION, nwrite); - return false; - } - - if (nwrite == 0) - return true; // Congestion limited - - if (ndatalen > 0) - CHECK(StreamCommit(stream_id, ndatalen)); - - Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite); - packet->set_length(nwrite); - if (!session()->SendPacket(std::move(packet), path)) - return false; - - packet.reset(); - packet_offset = 0; - - if (fin) - set_stream_fin(stream_id); +bool Http3Application::BlockStream(int64_t stream_id) { + int err = nghttp3_conn_block_stream(connection(), stream_id); + if (err != 0) { + session()->set_last_error(QUIC_ERROR_APPLICATION, err); + return false; } return true; } -bool Http3Application::SendStreamData(QuicStream* stream) { - // Data is available now, so resume the stream. - nghttp3_conn_resume_stream(connection(), stream->GetID()); - return SendPendingData(); +bool Http3Application::ShouldSetFin(const StreamData& stream_data) { + return stream_data.id > -1 && + !is_control_stream(stream_data.id) && + stream_data.fin == 1; } // This is where nghttp3 pulls the data from the outgoing // buffer to prepare it to be sent on the QUIC stream. -ssize_t Http3Application::H3ReadData( +ssize_t Http3Application::ReadData( int64_t stream_id, nghttp3_vec* vec, size_t veccnt, @@ -600,17 +534,13 @@ ssize_t Http3Application::H3ReadData( } // Outgoing data is retained in memory until it is acknowledged. -void Http3Application::H3AckedStreamData( +void Http3Application::AckedStreamData( int64_t stream_id, size_t datalen) { - QuicStream* stream = session()->FindStream(stream_id); - if (stream) { - stream->AckedDataOffset(0, datalen); - nghttp3_conn_resume_stream(connection(), stream_id); - } + Acknowledge(stream_id, 0, datalen); } -void Http3Application::H3StreamClose( +void Http3Application::StreamClosed( int64_t stream_id, uint64_t app_error_code) { session()->listener()->OnStreamClose(stream_id, app_error_code); @@ -618,7 +548,7 @@ void Http3Application::H3StreamClose( QuicStream* Http3Application::FindOrCreateStream(int64_t stream_id) { QuicStream* stream = session()->FindStream(stream_id); - if (!stream) { + if (stream == nullptr) { if (session()->is_gracefully_closing()) { nghttp3_conn_close_stream(connection(), stream_id, NGTCP2_ERR_CLOSING); return nullptr; @@ -630,22 +560,22 @@ QuicStream* Http3Application::FindOrCreateStream(int64_t stream_id) { return stream; } -void Http3Application::H3ReceiveData( +void Http3Application::ReceiveData( int64_t stream_id, const uint8_t* data, size_t datalen) { QuicStream* stream = FindOrCreateStream(stream_id); - if (stream) + if (stream != nullptr) stream->ReceiveData(0, data, datalen, 0); } -void Http3Application::H3DeferredConsume( +void Http3Application::DeferredConsume( int64_t stream_id, size_t consumed) { - H3ReceiveData(stream_id, nullptr, consumed); + ReceiveData(stream_id, nullptr, consumed); } -void Http3Application::H3BeginHeaders( +void Http3Application::BeginHeaders( int64_t stream_id, QuicStreamHeadersKind kind) { QuicStream* stream = FindOrCreateStream(stream_id); @@ -658,7 +588,7 @@ void Http3Application::H3BeginHeaders( // by the QuicStream until stream->EndHeaders() is called, during which // the collected headers are converted to an array and passed off to // the javascript side. -bool Http3Application::H3ReceiveHeader( +bool Http3Application::ReceiveHeader( int64_t stream_id, int32_t token, nghttp3_rcbuf* name, @@ -669,7 +599,7 @@ bool Http3Application::H3ReceiveHeader( if (!IsZeroLengthHeader(name, value)) { Debug(session(), "Receiving header for stream %" PRId64, stream_id); QuicStream* stream = session()->FindStream(stream_id); - if (stream) { + if (stream != nullptr) { if (token == NGHTTP3_QPACK_TOKEN__STATUS) { nghttp3_vec vec = nghttp3_rcbuf_get_buf(value); if (vec.base[0] == '1') @@ -684,22 +614,22 @@ bool Http3Application::H3ReceiveHeader( return true; } -void Http3Application::H3EndHeaders(int64_t stream_id) { +void Http3Application::EndHeaders(int64_t stream_id) { Debug(session(), "Ending header block for stream %" PRId64, stream_id); QuicStream* stream = session()->FindStream(stream_id); - if (stream) + if (stream != nullptr) stream->EndHeaders(); } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3BeginPushPromise( +int Http3Application::BeginPushPromise( int64_t stream_id, int64_t push_id) { return 0; } // TODO(@jasnell): Implement Push Promise Support -bool Http3Application::H3ReceivePushPromise( +bool Http3Application::ReceivePushPromise( int64_t stream_id, int64_t push_id, int32_t token, @@ -710,37 +640,35 @@ bool Http3Application::H3ReceivePushPromise( } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3EndPushPromise( +int Http3Application::EndPushPromise( int64_t stream_id, int64_t push_id) { return 0; } // TODO(@jasnell): Implement Push Promise Support -void Http3Application::H3CancelPush( +void Http3Application::CancelPush( int64_t push_id, int64_t stream_id) { } // TODO(@jasnell): Implement Push Promise Support -int Http3Application::H3PushStream( +int Http3Application::PushStream( int64_t push_id, int64_t stream_id) { return 0; } -void Http3Application::H3SendStopSending( +void Http3Application::SendStopSending( int64_t stream_id, uint64_t app_error_code) { session()->ResetStream(stream_id, app_error_code); } -int Http3Application::H3EndStream( - int64_t stream_id) { - QuicStream* stream = FindOrCreateStream(stream_id); - if (stream) +void Http3Application::EndStream(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) stream->ReceiveData(1, nullptr, 0, 0); - return 0; } const nghttp3_conn_callbacks Http3Application::callbacks_[2] = { @@ -793,7 +721,7 @@ int Http3Application::OnAckedStreamData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3AckedStreamData(stream_id, datalen); + app->AckedStreamData(stream_id, datalen); return 0; } @@ -804,7 +732,7 @@ int Http3Application::OnStreamClose( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3StreamClose(stream_id, app_error_code); + app->StreamClosed(stream_id, app_error_code); return 0; } @@ -816,7 +744,7 @@ int Http3Application::OnReceiveData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3ReceiveData(stream_id, data, datalen); + app->ReceiveData(stream_id, data, datalen); return 0; } @@ -827,7 +755,7 @@ int Http3Application::OnDeferredConsume( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3DeferredConsume(stream_id, consumed); + app->DeferredConsume(stream_id, consumed); return 0; } @@ -837,7 +765,7 @@ int Http3Application::OnBeginHeaders( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3BeginHeaders(stream_id); + app->BeginHeaders(stream_id); return 0; } @@ -847,7 +775,7 @@ int Http3Application::OnBeginTrailers( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3BeginHeaders(stream_id, QUICSTREAM_HEADERS_KIND_TRAILING); + app->BeginHeaders(stream_id, QUICSTREAM_HEADERS_KIND_TRAILING); return 0; } @@ -863,7 +791,7 @@ int Http3Application::OnReceiveHeader( Http3Application* app = static_cast(conn_user_data); // TODO(@jasnell): Need to determine the appropriate response code here // for when the header is not going to be accepted. - return app->H3ReceiveHeader(stream_id, token, name, value, flags) ? + return app->ReceiveHeader(stream_id, token, name, value, flags) ? 0 : NGHTTP3_ERR_CALLBACK_FAILURE; } @@ -873,7 +801,7 @@ int Http3Application::OnEndHeaders( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3EndHeaders(stream_id); + app->EndHeaders(stream_id); return 0; } @@ -884,7 +812,7 @@ int Http3Application::OnBeginPushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3BeginPushPromise(stream_id, push_id); + return app->BeginPushPromise(stream_id, push_id); } int Http3Application::OnReceivePushPromise( @@ -898,7 +826,7 @@ int Http3Application::OnReceivePushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3ReceivePushPromise( + return app->ReceivePushPromise( stream_id, push_id, token, @@ -914,7 +842,7 @@ int Http3Application::OnEndPushPromise( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3EndPushPromise(stream_id, push_id); + return app->EndPushPromise(stream_id, push_id); } int Http3Application::OnCancelPush( @@ -924,7 +852,7 @@ int Http3Application::OnCancelPush( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3CancelPush(push_id, stream_id); + app->CancelPush(push_id, stream_id); return 0; } @@ -935,7 +863,7 @@ int Http3Application::OnSendStopSending( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - app->H3SendStopSending(stream_id, app_error_code); + app->SendStopSending(stream_id, app_error_code); return 0; } @@ -945,7 +873,7 @@ int Http3Application::OnPushStream( int64_t stream_id, void* conn_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3PushStream(push_id, stream_id); + return app->PushStream(push_id, stream_id); } int Http3Application::OnEndStream( @@ -954,7 +882,8 @@ int Http3Application::OnEndStream( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3EndStream(stream_id); + app->EndStream(stream_id); + return 0; } ssize_t Http3Application::OnReadData( @@ -966,7 +895,7 @@ ssize_t Http3Application::OnReadData( void* conn_user_data, void* stream_user_data) { Http3Application* app = static_cast(conn_user_data); - return app->H3ReadData(stream_id, vec, veccnt, pflags); + return app->ReadData(stream_id, vec, veccnt, pflags); } } // namespace quic } // namespace node diff --git a/src/quic/node_quic_http3_application.h b/src/quic/node_quic_http3_application.h index 80d0fc987c..7483d760cc 100644 --- a/src/quic/node_quic_http3_application.h +++ b/src/quic/node_quic_http3_application.h @@ -7,7 +7,7 @@ #include "node_http_common.h" #include "node_mem.h" #include "node_quic_session.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_util.h" #include "v8.h" #include @@ -67,6 +67,10 @@ class Http3Header : public QuicHeader { size_t length() const override; + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Http3Header) + SET_SELF_SIZE(Http3Header) + private: int32_t token_ = -1; Http3RcBufferPointer name_; @@ -104,6 +108,8 @@ class Http3Application final : uint64_t final_size, uint64_t app_error_code) override; + void ResumeStream(int64_t stream_id) override; + void ExtendMaxStreamsRemoteUni(uint64_t max_streams) override; void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) override; @@ -120,9 +126,6 @@ class Http3Application final : int64_t stream_id, v8::Local headers) override; - bool SendPendingData() override; - bool SendStreamData(QuicStream* stream) override; - // Implementation for mem::NgLibMemoryManager void CheckAllocatedSize(size_t previous_size) const; void IncreaseAllocatedSize(size_t size); @@ -139,42 +142,45 @@ class Http3Application final : bool CreateAndBindControlStream(); bool CreateAndBindQPackStreams(); - bool StreamCommit(int64_t stream_id, ssize_t datalen); - void set_stream_fin(int64_t stream_id); + int GetStreamData(StreamData* stream_data) override; + + bool BlockStream(int64_t stream_id) override; + bool StreamCommit(StreamData* stream_data, size_t datalen) override; + bool ShouldSetFin(const StreamData& data) override; - ssize_t H3ReadData( + ssize_t ReadData( int64_t stream_id, nghttp3_vec* vec, size_t veccnt, uint32_t* pflags); - void H3AckedStreamData(int64_t stream_id, size_t datalen); - void H3StreamClose(int64_t stream_id, uint64_t app_error_code); - void H3ReceiveData(int64_t stream_id, const uint8_t* data, size_t datalen); - void H3DeferredConsume(int64_t stream_id, size_t consumed); - void H3BeginHeaders( + void AckedStreamData(int64_t stream_id, size_t datalen); + void StreamClosed(int64_t stream_id, uint64_t app_error_code); + void ReceiveData(int64_t stream_id, const uint8_t* data, size_t datalen); + void DeferredConsume(int64_t stream_id, size_t consumed); + void BeginHeaders( int64_t stream_id, QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); - bool H3ReceiveHeader( + bool ReceiveHeader( int64_t stream_id, int32_t token, nghttp3_rcbuf* name, nghttp3_rcbuf* value, uint8_t flags); - void H3EndHeaders(int64_t stream_id); - int H3BeginPushPromise(int64_t stream_id, int64_t push_id); - bool H3ReceivePushPromise( + void EndHeaders(int64_t stream_id); + int BeginPushPromise(int64_t stream_id, int64_t push_id); + bool ReceivePushPromise( int64_t stream_id, int64_t push_id, int32_t token, nghttp3_rcbuf* name, nghttp3_rcbuf* value, uint8_t flags); - int H3EndPushPromise(int64_t stream_id, int64_t push_id); - void H3CancelPush(int64_t push_id, int64_t stream_id); - void H3SendStopSending(int64_t stream_id, uint64_t app_error_code); - int H3PushStream(int64_t push_id, int64_t stream_id); - int H3EndStream(int64_t stream_id); + int EndPushPromise(int64_t stream_id, int64_t push_id); + void CancelPush(int64_t push_id, int64_t stream_id); + void SendStopSending(int64_t stream_id, uint64_t app_error_code); + int PushStream(int64_t push_id, int64_t stream_id); + void EndStream(int64_t stream_id); bool is_control_stream(int64_t stream_id) const { return stream_id == control_stream_id_ || diff --git a/src/quic/node_quic_session-inl.h b/src/quic/node_quic_session-inl.h index 46d1aa76a2..62fda5fd57 100644 --- a/src/quic/node_quic_session-inl.h +++ b/src/quic/node_quic_session-inl.h @@ -9,7 +9,7 @@ #include "node_quic_crypto.h" #include "node_quic_session.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include #include @@ -138,6 +138,35 @@ bool QuicCryptoContext::SetupInitialKey(const ngtcp2_cid* dcid) { QuicApplication::QuicApplication(QuicSession* session) : session_(session) {} +void QuicApplication::set_stream_fin(int64_t stream_id) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) + stream->set_fin_sent(); +} + +ssize_t QuicApplication::WriteVStream( + QuicPathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data) { + CHECK_LE(stream_data.count, MAX_VECTOR_COUNT); + return ngtcp2_conn_writev_stream( + session()->connection(), + &path->path, + buf, + session()->max_packet_length(), + ndatalen, + stream_data.remaining > 0 ? + NGTCP2_WRITE_STREAM_FLAG_MORE : + NGTCP2_WRITE_STREAM_FLAG_NONE, + stream_data.id, + stream_data.fin, + stream_data.buf, + stream_data.count, + uv_hrtime() + ); +} + std::unique_ptr QuicApplication::CreateStreamDataPacket() { return QuicPacket::Create( "stream data", @@ -192,10 +221,10 @@ void QuicSession::ExtendMaxStreamsBidi(uint64_t max_streams) { // Extends the stream-level flow control by the given number of bytes. void QuicSession::ExtendStreamOffset(QuicStream* stream, size_t amount) { Debug(this, "Extending max stream %" PRId64 " offset by %" PRId64 " bytes", - stream->GetID(), amount); + stream->id(), amount); ngtcp2_conn_extend_max_stream_offset( connection(), - stream->GetID(), + stream->id(), amount); } diff --git a/src/quic/node_quic_session.cc b/src/quic/node_quic_session.cc index 0f71c115f0..8b44247fcf 100644 --- a/src/quic/node_quic_session.cc +++ b/src/quic/node_quic_session.cc @@ -15,7 +15,7 @@ #include "node_quic_buffer-inl.h" #include "node_quic_crypto.h" #include "node_quic_socket.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_state.h" #include "node_quic_util-inl.h" #include "node_quic_default_application.h" @@ -466,7 +466,7 @@ void JSQuicSessionListener::OnStreamReady(BaseObjectPtr stream) { Environment* env = session()->env(); Local argv[] = { stream->object(), - Number::New(env->isolate(), static_cast(stream->GetID())) + Number::New(env->isolate(), static_cast(stream->id())) }; // Grab a shared pointer to this to prevent the QuicSession @@ -1098,6 +1098,121 @@ void QuicCryptoContext::WriteHandshake( handshake_[level].Push(std::move(buffer)); } +void QuicApplication::Acknowledge( + int64_t stream_id, + uint64_t offset, + size_t datalen) { + QuicStream* stream = session()->FindStream(stream_id); + if (stream != nullptr) { + stream->Acknowledge(offset, datalen); + ResumeStream(stream_id); + } +} + +bool QuicApplication::SendPendingData() { + // The maximum number of packets to send per call + static constexpr size_t kMaxPackets = 16; + QuicPathStorage path; + std::unique_ptr packet; + uint8_t* pos = nullptr; + size_t packets_sent = 0; + int err; + + for (;;) { + ssize_t ndatalen; + StreamData stream_data; + err = GetStreamData(&stream_data); + if (err < 0) { + session()->set_last_error(QUIC_ERROR_APPLICATION, err); + return false; + } + + // If stream_data.id is -1, then we're not serializing any data for any + // specific stream. We still need to process QUIC session packets tho. + if (stream_data.id > -1) + Debug(session(), "Serializing packets for stream id %" PRId64, + stream_data.id); + else + Debug(session(), "Serializing session packets"); + + // If the packet was sent previously, then packet will have been reset. + if (!packet) { + packet = CreateStreamDataPacket(); + pos = packet->data(); + } + + ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); + + if (nwrite <= 0) { + switch (nwrite) { + case 0: + goto congestion_limited; + case NGTCP2_ERR_PKT_NUM_EXHAUSTED: + // There is a finite number of packets that can be sent + // per connection. Once those are exhausted, there's + // absolutely nothing we can do except immediately + // and silently tear down the QuicSession. This has + // to be silent because we can't even send a + // CONNECTION_CLOSE since even those require a + // packet number. + session()->SilentClose(); + return false; + case NGTCP2_ERR_STREAM_DATA_BLOCKED: + session()->StreamDataBlocked(stream_data.id); + if (session()->max_data_left() == 0) + goto congestion_limited; + // Fall through + case NGTCP2_ERR_STREAM_SHUT_WR: + if (UNLIKELY(!BlockStream(stream_data.id))) + return false; + continue; + case NGTCP2_ERR_STREAM_NOT_FOUND: + continue; + case NGTCP2_ERR_WRITE_STREAM_MORE: + CHECK_GT(ndatalen, 0); + CHECK(StreamCommit(&stream_data, ndatalen)); + pos += ndatalen; + continue; + } + session()->set_last_error(QUIC_ERROR_SESSION, static_cast(nwrite)); + return false; + } + + pos += nwrite; + + if (ndatalen >= 0) + CHECK(StreamCommit(&stream_data, ndatalen)); + + Debug(session(), "Sending %" PRIu64 " bytes in serialized packet", nwrite); + packet->set_length(nwrite); + if (!session()->SendPacket(std::move(packet), path)) + return false; + packet.reset(); + pos = nullptr; + MaybeSetFin(stream_data); + if (++packets_sent == kMaxPackets) + break; + } + return true; + + congestion_limited: + // We are either congestion limited or done. + if (pos - packet->data()) { + // Some data was serialized into the packet. We need to send it. + packet->set_length(pos - packet->data()); + Debug(session(), "Congestion limited, but %" PRIu64 " bytes pending.", + packet->length()); + if (!session()->SendPacket(std::move(packet), path)) + return false; + } + return true; +} + +void QuicApplication::MaybeSetFin(const StreamData& stream_data) { + if (ShouldSetFin(stream_data)) + set_stream_fin(stream_data.id); +} + void QuicApplication::StreamHeaders( int64_t stream_id, int kind, @@ -1423,19 +1538,19 @@ void QuicSession::AddToSocket(QuicSocket* socket) { // streams added must be removed before the QuicSession instance is freed. void QuicSession::AddStream(BaseObjectPtr stream) { DCHECK(!is_flag_set(QUICSESSION_FLAG_GRACEFUL_CLOSING)); - Debug(this, "Adding stream %" PRId64 " to session.", stream->GetID()); - streams_.emplace(stream->GetID(), stream); + Debug(this, "Adding stream %" PRId64 " to session.", stream->id()); + streams_.emplace(stream->id(), stream); // Update tracking statistics for the number of streams associated with // this session. switch (stream->origin()) { - case QuicStream::QuicStreamOrigin::QUIC_STREAM_CLIENT: + case QuicStreamOrigin::QUIC_STREAM_CLIENT: if (is_server()) IncrementStat(1, &session_stats_, &session_stats::streams_in_count); else IncrementStat(1, &session_stats_, &session_stats::streams_out_count); break; - case QuicStream::QuicStreamOrigin::QUIC_STREAM_SERVER: + case QuicStreamOrigin::QUIC_STREAM_SERVER: if (is_server()) IncrementStat(1, &session_stats_, &session_stats::streams_out_count); else @@ -1443,10 +1558,10 @@ void QuicSession::AddStream(BaseObjectPtr stream) { } IncrementStat(1, &session_stats_, &session_stats::streams_out_count); switch (stream->direction()) { - case QuicStream::QuicStreamDirection::QUIC_STREAM_BIRECTIONAL: + case QuicStreamDirection::QUIC_STREAM_BIRECTIONAL: IncrementStat(1, &session_stats_, &session_stats::bidi_stream_count); break; - case QuicStream::QuicStreamDirection::QUIC_STREAM_UNIDIRECTIONAL: + case QuicStreamDirection::QUIC_STREAM_UNIDIRECTIONAL: IncrementStat(1, &session_stats_, &session_stats::uni_stream_count); break; } @@ -2040,32 +2155,6 @@ void QuicSession::UsePreferredAddressStrategy( } } -// Sends buffered stream data. -bool QuicSession::SendStreamData(QuicStream* stream) { - // Because SendStreamData calls ngtcp2_conn_writev_streams, - // it is not permitted to be called while we are running within - // an ngtcp2 callback function. - CHECK(!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)); - - // No stream data may be serialized and sent if: - // - the QuicSession is destroyed - // - the QuicStream was never writable, - // - a final stream frame has already been sent, - // - the QuicSession is in the draining period, - // - the QuicSession is in the closing period, or - // - we are blocked from sending any data because of flow control - if (is_destroyed() || - !stream->was_ever_writable() || - stream->has_sent_fin() || - is_in_draining_period() || - is_in_closing_period() || - max_data_left() == 0) { - return true; - } - - return application_->SendStreamData(stream); -} - // Passes a serialized packet to the associated QuicSocket. bool QuicSession::SendPacket(std::unique_ptr packet) { CHECK(!is_flag_set(QUICSESSION_FLAG_DESTROYED)); @@ -2178,6 +2267,8 @@ bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { if (socket == nullptr || socket == socket_.get()) return true; + SendSessionScope send(this); + // Step 1: Add this Session to the given Socket AddToSocket(socket); @@ -2204,25 +2295,16 @@ bool QuicSession::set_socket(QuicSocket* socket, bool nat_rebinding) { } } - SendPendingData(); return true; } -void QuicSession::ResetStream(int64_t stream_id, uint64_t code) { - // First, update the internal ngtcp2 state of the given stream - // and schedule the STOP_SENDING and RESET_STREAM frames as - // appropriate. - CHECK_EQ( - ngtcp2_conn_shutdown_stream( - connection(), - stream_id, - code), 0); +void QuicSession::ResumeStream(int64_t stream_id) { + application()->ResumeStream(stream_id); +} - // If ShutdownStream is called outside of an ngtcp2 callback, - // we need to trigger SendPendingData manually to cause the - // RESET_STREAM and STOP_SENDING frames to be transmitted. - if (!Ngtcp2CallbackScope::InNgtcp2CallbackScope(this)) - SendPendingData(); +void QuicSession::ResetStream(int64_t stream_id, uint64_t code) { + SendSessionScope scope(this); + CHECK_EQ(ngtcp2_conn_shutdown_stream(connection(), stream_id, code), 0); } // Silent Close must start with the JavaScript side, which must diff --git a/src/quic/node_quic_session.h b/src/quic/node_quic_session.h index d8df1b39f9..e2d893cb00 100644 --- a/src/quic/node_quic_session.h +++ b/src/quic/node_quic_session.h @@ -22,7 +22,7 @@ #include #include -#include +#include #include namespace node { @@ -35,6 +35,8 @@ class QuicPacket; class QuicStream; class QuicHeader; +using StreamsMap = std::unordered_map>; + enum class QlogMode { kDisabled, kEnabled @@ -495,12 +497,12 @@ class QuicApplication : public MemoryRetainer { virtual void AcknowledgeStreamData( int64_t stream_id, uint64_t offset, - size_t datalen) = 0; + size_t datalen) { Acknowledge(stream_id, offset, datalen); } + virtual bool BlockStream(int64_t id) { return true; } virtual void ExtendMaxStreamsRemoteUni(uint64_t max_streams) {} virtual void ExtendMaxStreamsRemoteBidi(uint64_t max_streams) {} virtual void ExtendMaxStreamData(int64_t stream_id, uint64_t max_data) {} - virtual bool SendPendingData() = 0; - virtual bool SendStreamData(QuicStream* stream) = 0; + virtual void ResumeStream(int64_t stream_id) {} virtual void StreamHeaders( int64_t stream_id, int kind, @@ -526,6 +528,7 @@ class QuicApplication : public MemoryRetainer { inline Environment* env() const; + bool SendPendingData(); size_t max_header_pairs() const { return max_header_pairs_; } size_t max_header_length() const { return max_header_length_; } @@ -533,11 +536,38 @@ class QuicApplication : public MemoryRetainer { QuicSession* session() const { return session_; } bool needs_init() const { return needs_init_; } void set_init_done() { needs_init_ = false; } + inline void set_stream_fin(int64_t stream_id); void set_max_header_pairs(size_t max) { max_header_pairs_ = max; } void set_max_header_length(size_t max) { max_header_length_ = max; } inline std::unique_ptr CreateStreamDataPacket(); + struct StreamData { + size_t count = 0; + size_t remaining = 0; + int64_t id = -1; + int fin = 0; + ngtcp2_vec data[MAX_VECTOR_COUNT] {}; + ngtcp2_vec* buf = nullptr; + BaseObjectPtr stream; + StreamData() { buf = data; } + }; + + void Acknowledge( + int64_t stream_id, + uint64_t offset, + size_t datalen); + virtual int GetStreamData(StreamData* data) = 0; + virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; + virtual bool ShouldSetFin(const StreamData& data) = 0; + + inline ssize_t WriteVStream( + QuicPathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data); + private: + void MaybeSetFin(const StreamData& stream_data); QuicSession* session_; bool needs_init_ = true; size_t max_header_pairs_ = 0; @@ -823,9 +853,6 @@ class QuicSession : public AsyncWrap, // Causes pending ngtcp2 frames to be serialized and sent void SendPendingData(); - // Causes pending QuicStream data to be serialized and sent - bool SendStreamData(QuicStream* stream); - inline bool SendPacket( std::unique_ptr packet, const ngtcp2_path_storage& path); @@ -847,9 +874,7 @@ class QuicSession : public AsyncWrap, int set_session(SSL_SESSION* session); bool set_session(v8::Local buffer); - const std::map>& streams() const { - return streams_; - } + const StreamsMap& streams() const { return streams_; } // ResetStream will cause ngtcp2 to queue a // RESET_STREAM and STOP_SENDING frame, as appropriate, @@ -882,6 +907,8 @@ class QuicSession : public AsyncWrap, int64_t stream_id, uint64_t error_code = NGTCP2_APP_NOERROR); + void ResumeStream(int64_t stream_id); + // Submits informational headers to the QUIC Application // implementation. If headers are not supported, false // will be returned. Otherwise, returns true @@ -966,6 +993,19 @@ class QuicSession : public AsyncWrap, // Report that the stream data is flow control blocked inline void StreamDataBlocked(int64_t stream_id); + class SendSessionScope { + public: + explicit SendSessionScope(QuicSession* session) : session_(session) {} + + ~SendSessionScope() { + if (!Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_)) + session_->SendPendingData(); + } + + private: + QuicSession* session_; + }; + // Tracks whether or not we are currently within an ngtcp2 callback // function. Certain ngtcp2 APIs are not supposed to be called when // within a callback. We use this as a gate to check. @@ -1340,7 +1380,7 @@ class QuicSession : public AsyncWrap, std::unique_ptr conn_closebuf_; - std::map> streams_; + StreamsMap streams_; AliasedFloat64Array state_; diff --git a/src/quic/node_quic_stream-inl.h b/src/quic/node_quic_stream-inl.h new file mode 100644 index 0000000000..9981ead8df --- /dev/null +++ b/src/quic/node_quic_stream-inl.h @@ -0,0 +1,210 @@ +#ifndef SRC_NODE_QUIC_STREAM_INL_H_ +#define SRC_NODE_QUIC_STREAM_INL_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "node_quic_session.h" +#include "node_quic_stream.h" +#include "node_quic_buffer-inl.h" + +namespace node { +namespace quic { + +QuicStreamDirection QuicStream::direction() const { + return stream_id_ & 0b10 ? + QUIC_STREAM_UNIDIRECTIONAL : + QUIC_STREAM_BIRECTIONAL; +} + +QuicStreamOrigin QuicStream::origin() const { + return stream_id_ & 0b01 ? + QUIC_STREAM_SERVER : + QUIC_STREAM_CLIENT; +} + +bool QuicStream::is_destroyed() const { + return flags_ & QUICSTREAM_FLAG_DESTROYED; +} + +bool QuicStream::has_received_fin() const { + return flags_ & QUICSTREAM_FLAG_FIN; +} + +bool QuicStream::has_sent_fin() const { + return flags_ & QUICSTREAM_FLAG_FIN_SENT; +} + +bool QuicStream::was_ever_writable() const { + if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { + return session_->is_server() ? + origin() == QUIC_STREAM_SERVER : + origin() == QUIC_STREAM_CLIENT; + } + return true; +} + +bool QuicStream::is_writable() const { + if (flags_ & QUICSTREAM_FLAG_WRITE_CLOSED) + return false; + + return was_ever_writable(); +} + +bool QuicStream::was_ever_readable() const { + if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { + return session_->is_server() ? + origin() == QUIC_STREAM_CLIENT : + origin() == QUIC_STREAM_SERVER; + } + + return true; +} + +bool QuicStream::is_readable() const { + if (flags_ & QUICSTREAM_FLAG_READ_CLOSED) + return false; + + return was_ever_readable(); +} + +bool QuicStream::is_read_started() const { + return flags_ & QUICSTREAM_FLAG_READ_STARTED; +} + +bool QuicStream::is_read_paused() const { + return flags_ & QUICSTREAM_FLAG_READ_PAUSED; +} + +void QuicStream::set_fin_sent() { + CHECK(!is_writable()); + flags_ |= QUICSTREAM_FLAG_FIN_SENT; +} + +bool QuicStream::is_write_finished() const { + return has_sent_fin() && streambuf_.length() == 0; +} + +void QuicStream::IncrementAvailableOutboundLength(size_t amount) { + available_outbound_length_ += amount; +} + +void QuicStream::DecrementAvailableOutboundLength(size_t amount) { + available_outbound_length_ -= amount; +} + +void QuicStream::set_fin_received() { + flags_ |= QUICSTREAM_FLAG_FIN; + set_read_close(); +} + +void QuicStream::set_write_close() { + flags_ |= QUICSTREAM_FLAG_WRITE_CLOSED; +} + +void QuicStream::set_read_close() { + flags_ |= QUICSTREAM_FLAG_READ_CLOSED; +} + +void QuicStream::set_read_start() { + flags_ |= QUICSTREAM_FLAG_READ_STARTED; +} + +void QuicStream::set_read_pause() { + flags_ |= QUICSTREAM_FLAG_READ_PAUSED; +} + +void QuicStream::set_read_resume() { + flags_ &= QUICSTREAM_FLAG_READ_PAUSED; +} + +void QuicStream::set_destroyed() { + flags_ |= QUICSTREAM_FLAG_DESTROYED; +} + +bool QuicStream::SubmitInformation(v8::Local headers) { + return session_->SubmitInformation(stream_id_, headers); +} + +bool QuicStream::SubmitHeaders(v8::Local headers, uint32_t flags) { + return session_->SubmitHeaders(stream_id_, headers, flags); +} + +bool QuicStream::SubmitTrailers(v8::Local headers) { + return session_->SubmitTrailers(stream_id_, headers); +} + +void QuicStream::EndHeaders() { + Debug(this, "End Headers"); + // Upon completion of a block of headers, convert the + // vector of Header objects into an array of name+value + // pairs, then call the on_stream_headers function. + session()->application()->StreamHeaders(stream_id_, headers_kind_, headers_); + headers_.clear(); +} + +void QuicStream::set_headers_kind(QuicStreamHeadersKind kind) { + headers_kind_ = kind; +} + +void QuicStream::BeginHeaders(QuicStreamHeadersKind kind) { + Debug(this, "Beginning Headers"); + // Upon start of a new block of headers, ensure that any + // previously collected ones are cleaned up. + headers_.clear(); + set_headers_kind(kind); +} + +void QuicStream::Commit(ssize_t amount) { + CHECK(!is_destroyed()); + CHECK_GE(amount, 0); + streambuf_.Seek(amount); +} + +void QuicStream::ResetStream(uint64_t app_error_code) { + // On calling shutdown, the stream will no longer be + // readable or writable, all any pending data in the + // streambuf_ will be canceled, and all data pending + // to be acknowledged at the ngtcp2 level will be + // abandoned. + set_read_close(); + set_write_close(); + session_->ResetStream(stream_id_, app_error_code); +} + +template +size_t QuicStream::DrainInto( + std::vector* vec, + size_t max_count) { + CHECK(!is_destroyed()); + size_t length = 0; + streambuf_.DrainInto(vec, &length, max_count); + return length; +} + +template +size_t QuicStream::DrainInto( + T* vec, + size_t* count, + size_t max_count) { + CHECK(!is_destroyed()); + size_t length = 0; + streambuf_.DrainInto(vec, count, &length, max_count); + return length; +} + +void QuicStream::Schedule(Queue* queue) { + if (!stream_queue_.IsEmpty()) // Already scheduled? + return; + queue->PushBack(this); +} + +void QuicStream::Unschedule() { + stream_queue_.Remove(); +} + +} // namespace quic +} // namespace node + +#endif // NODE_WANT_INTERNALS + +#endif // SRC_NODE_QUIC_STREAM_INL_H_ diff --git a/src/quic/node_quic_stream.cc b/src/quic/node_quic_stream.cc index 35336510b6..d9dc24f2ef 100644 --- a/src/quic/node_quic_stream.cc +++ b/src/quic/node_quic_stream.cc @@ -6,10 +6,9 @@ #include "node_internals.h" #include "stream_base-inl.h" #include "node_quic_session-inl.h" -#include "node_quic_stream.h" +#include "node_quic_stream-inl.h" #include "node_quic_socket.h" #include "node_quic_util-inl.h" -#include "node_sockaddr-inl.h" #include "v8.h" #include "uv.h" @@ -32,6 +31,15 @@ using v8::Value; namespace quic { +namespace { +size_t get_length(uv_buf_t* bufs, size_t nbufs) { + size_t len = 0; + for (size_t n = 0; n < nbufs; n++) + len += bufs[n].len; + return len; +} +} // namespace + QuicStream::QuicStream( QuicSession* sess, Local wrap, @@ -93,8 +101,54 @@ QuicStream::QuicStream( &stream_stats::max_offset); } +// Acknowledge is called when ngtcp2 has received an acknowledgement +// for one or more stream frames for this QuicStream. This will cause +// data stored in the streambuf_ outbound queue to be consumed and may +// result in the JavaScript callback for the write to be invoked. +void QuicStream::Acknowledge(uint64_t offset, size_t datalen) { + if (is_destroyed()) + return; + + // ngtcp2 guarantees that offset must always be greater + // than the previously received offset, but let's just + // make sure that holds. + CHECK_GE(offset, max_offset_ack_); + max_offset_ack_ = offset; + + Debug(this, "Acknowledging %d bytes", datalen); + + // Consumes the given number of bytes in the buffer. This may + // have the side-effect of causing the onwrite callback to be + // invoked if a complete chunk of buffered data has been acknowledged. + streambuf_.Consume(datalen); + + uint64_t now = uv_hrtime(); + if (stream_stats_.stream_acked_at > 0) + data_rx_ack_->Record(now - stream_stats_.stream_acked_at); + stream_stats_.stream_acked_at = now; +} + +// While not all QUIC applications will support headers, QuicStream +// includes basic, generic support for storing them. +bool QuicStream::AddHeader(std::unique_ptr header) { + Debug(this, "Header Added"); + size_t len = header->length(); + QuicApplication* app = session()->application(); + // We cannot add the header if we've either reached + // * the max number of header pairs or + // * the max number of header bytes + if (headers_.size() == app->max_header_pairs() || + current_headers_length_ + len > app->max_header_length()) { + return false; + } + + current_headers_length_ += header->length(); + headers_.emplace_back(std::move(header)); + return true; +} + std::string QuicStream::diagnostic_name() const { - return std::string("QuicStream ") + std::to_string(GetID()) + + return std::string("QuicStream ") + std::to_string(stream_id_) + " (" + std::to_string(static_cast(get_async_id())) + ", " + session_->diagnostic_name() + ")"; } @@ -139,19 +193,15 @@ void QuicStream::Destroy() { int QuicStream::DoShutdown(ShutdownWrap* req_wrap) { if (is_destroyed()) return UV_EPIPE; - Debug(this, "Shutdown writable side"); - // Do nothing if the stream was already shutdown. Specifically, - // we should not attempt to send anything on the QuicSession - if (!is_writable()) - return UV_EPIPE; - stream_stats_.closing_at = uv_hrtime(); - set_write_close(); - // If we're not currently within an ngtcp2 callback, then we need to - // tell the QuicSession to initiate serialization and sending of any - // pending frames. - if (!QuicSession::Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get())) - session_->SendStreamData(this); + QuicSession::SendSessionScope send_scope(session()); + + if (is_writable()) { + Debug(this, "Shutdown writable side"); + stream_stats_.closing_at = uv_hrtime(); + set_write_close(); + session()->ResumeStream(stream_id_); + } return 1; } @@ -170,81 +220,56 @@ int QuicStream::DoWrite( return 0; } + // Nothing to write. + size_t length = get_length(bufs, nbufs); + if (length == 0) { + req_wrap->Done(0); + return 0; + } + + QuicSession::SendSessionScope send_scope(session()); + + Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers", + length, nbufs); + IncrementStat( + static_cast(length), + &stream_stats_, + &stream_stats::bytes_sent); + BaseObjectPtr strong_ref{req_wrap->GetAsyncWrap()}; // The list of buffers will be appended onto streambuf_ without - // copying. Those will remain in that buffer until the serialized + // copying. Those will remain in the buffer until the serialized // stream frames are acknowledged. - uint64_t length = - streambuf_.Push( - bufs, - nbufs, - [req_wrap, strong_ref](int status) { - // This callback function will be invoked once this - // complete batch of buffers has been acknowledged - // by the peer. This will have the side effect of - // blocking additional pending writes from the - // javascript side, so writing data to the stream - // will be throttled by how quickly the peer is - // able to acknowledge stream packets. This is good - // in the sense of providing back-pressure, but - // also means that writes will be significantly - // less performant unless written in batches. - req_wrap->Done(status); - }); - Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers", - length, nbufs); - IncrementStat(length, &stream_stats_, &stream_stats::bytes_sent); + // This callback function will be invoked once this + // complete batch of buffers has been acknowledged + // by the peer. This will have the side effect of + // blocking additional pending writes from the + // javascript side, so writing data to the stream + // will be throttled by how quickly the peer is + // able to acknowledge stream packets. This is good + // in the sense of providing back-pressure, but + // also means that writes will be significantly + // less performant unless written in batches. + streambuf_.Push( + bufs, + nbufs, + [req_wrap, strong_ref](int status) { + req_wrap->Done(status); + }); stream_stats_.stream_sent_at = uv_hrtime(); - // If we're not within an ngtcp2 callback, go ahead and send - // the pending stream data. Otherwise, the data will be flushed - // once the ngtcp2 callback scope exits and all streams with - // data pending are flushed. - if (!QuicSession::Ngtcp2CallbackScope::InNgtcp2CallbackScope(session_.get())) - session_->SendStreamData(this); + session()->ResumeStream(stream_id_); // IncrementAvailableOutboundLength(len); return 0; } -// AckedDataOffset is called when ngtcp2 has received an acknowledgement -// for one or more stream frames for this QuicStream. This will cause -// data stored in the streambuf_ outbound queue to be consumed and may -// result in the JavaScript callback for the write to be invoked. -void QuicStream::AckedDataOffset(uint64_t offset, size_t datalen) { - if (is_destroyed()) - return; - - // ngtcp2 guarantees that offset must always be greater - // than the previously received offset, but let's just - // make sure that holds. - CHECK_GE(offset, max_offset_ack_); - max_offset_ack_ = offset; - - Debug(this, "Acknowledging %d bytes", datalen); - - // Consumes the given number of bytes in the buffer. This may - // have the side-effect of causing the onwrite callback to be - // invoked if a complete chunk of buffered data has been acknowledged. - streambuf_.Consume(datalen); - - uint64_t now = uv_hrtime(); - if (stream_stats_.stream_acked_at > 0) - data_rx_ack_->Record(now - stream_stats_.stream_acked_at); - stream_stats_.stream_acked_at = now; -} - -void QuicStream::Commit(ssize_t amount) { - CHECK(!is_destroyed()); - streambuf_.Seek(amount); +bool QuicStream::IsAlive() { + return !is_destroyed() && !IsClosing(); } -inline void QuicStream::IncrementAvailableOutboundLength(size_t amount) { - available_outbound_length_ += amount; -} - -inline void QuicStream::DecrementAvailableOutboundLength(size_t amount) { - available_outbound_length_ -= amount; +bool QuicStream::IsClosing() { + return !is_writable() && !is_readable(); } int QuicStream::ReadStart() { @@ -267,6 +292,41 @@ int QuicStream::ReadStop() { return 0; } +void QuicStream::IncrementStats(size_t datalen) { + uint64_t len = static_cast(datalen); + IncrementStat(len, &stream_stats_, &stream_stats::bytes_received); + + uint64_t now = uv_hrtime(); + if (stream_stats_.stream_received_at > 0) + data_rx_rate_->Record(now - stream_stats_.stream_received_at); + stream_stats_.stream_received_at = now; + data_rx_size_->Record(len); +} + +void QuicStream::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("buffer", &streambuf_); + tracker->TrackField("data_rx_rate", data_rx_rate_); + tracker->TrackField("data_rx_size", data_rx_size_); + tracker->TrackField("data_rx_ack", data_rx_ack_); + tracker->TrackField("stats_buffer", stats_buffer_); + tracker->TrackField("headers", headers_); +} + +BaseObjectPtr QuicStream::New( + QuicSession* session, + int64_t stream_id) { + Local obj; + if (!session->env() + ->quicserverstream_constructor_template() + ->NewInstance(session->env()->context()).ToLocal(&obj)) { + return {}; + } + BaseObjectPtr stream = + MakeDetachedBaseObject(session, obj, stream_id); + session->AddStream(stream); + return stream; +} + // Passes chunks of data on to the JavaScript side as soon as they are // received but only if we're still readable. The caller of this must have a // HandleScope. @@ -355,106 +415,12 @@ void QuicStream::ReceiveData( } } -inline void QuicStream::IncrementStats(size_t datalen) { - uint64_t len = static_cast(datalen); - IncrementStat(len, &stream_stats_, &stream_stats::bytes_received); - - uint64_t now = uv_hrtime(); - if (stream_stats_.stream_received_at > 0) - data_rx_rate_->Record(now - stream_stats_.stream_received_at); - stream_stats_.stream_received_at = now; - data_rx_size_->Record(len); -} - -void QuicStream::ResetStream(uint64_t app_error_code) { - // On calling shutdown, the stream will no longer be - // readable or writable, all any pending data in the - // streambuf_ will be canceled, and all data pending - // to be acknowledged at the ngtcp2 level will be - // abandoned. - set_read_close(); - set_write_close(); - session_->ResetStream(GetID(), app_error_code); -} - -BaseObjectPtr QuicStream::New( - QuicSession* session, int64_t stream_id) { - Local obj; - if (!session->env() - ->quicserverstream_constructor_template() - ->NewInstance(session->env()->context()).ToLocal(&obj)) { - return {}; - } - BaseObjectPtr stream = - MakeDetachedBaseObject(session, obj, stream_id); - session->AddStream(stream); - return stream; -} - -void QuicStream::BeginHeaders(QuicStreamHeadersKind kind) { - Debug(this, "Beginning Headers"); - // Upon start of a new block of headers, ensure that any - // previously collected ones are cleaned up. - headers_.clear(); - set_headers_kind(kind); -} - -void QuicStream::set_headers_kind(QuicStreamHeadersKind kind) { - headers_kind_ = kind; -} - -bool QuicStream::AddHeader(std::unique_ptr header) { - Debug(this, "Header Added"); - size_t len = header->length(); - QuicApplication* app = session()->application(); - // We cannot add the header if we've either reached - // * the max number of header pairs or - // * the max number of header bytes - if (headers_.size() == app->max_header_pairs() || - current_headers_length_ + len > app->max_header_length()) { - return false; - } - - current_headers_length_ += header->length(); - headers_.emplace_back(std::move(header)); - return true; -} - -void QuicStream::EndHeaders() { - Debug(this, "End Headers"); - // Upon completion of a block of headers, convert the - // vector of Header objects into an array of name+value - // pairs, then call the on_stream_headers function. - session()->application()->StreamHeaders(GetID(), headers_kind_, headers_); - headers_.clear(); -} - -void QuicStream::MemoryInfo(MemoryTracker* tracker) const { - tracker->TrackField("buffer", &streambuf_); - tracker->TrackField("data_rx_rate", data_rx_rate_); - tracker->TrackField("data_rx_size", data_rx_size_); - tracker->TrackField("data_rx_ack", data_rx_ack_); - tracker->TrackField("stats_buffer", stats_buffer_); -} - -bool QuicStream::SubmitInformation(v8::Local headers) { - return session_->SubmitInformation(GetID(), headers); -} - -bool QuicStream::SubmitHeaders(v8::Local headers, uint32_t flags) { - return session_->SubmitHeaders(GetID(), headers, flags); -} - -bool QuicStream::SubmitTrailers(v8::Local headers) { - return session_->SubmitTrailers(GetID(), headers); -} - // JavaScript API namespace { void QuicStreamGetID(const FunctionCallbackInfo& args) { QuicStream* stream; ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder()); - args.GetReturnValue().Set(static_cast(stream->GetID())); + args.GetReturnValue().Set(static_cast(stream->id())); } void OpenUnidirectionalStream(const FunctionCallbackInfo& args) { diff --git a/src/quic/node_quic_stream.h b/src/quic/node_quic_stream.h index e3dfee2a9a..6853925ae5 100644 --- a/src/quic/node_quic_stream.h +++ b/src/quic/node_quic_stream.h @@ -9,6 +9,7 @@ #include "histogram-inl.h" #include "node_quic_util.h" #include "stream_base-inl.h" +#include "util-inl.h" #include "v8.h" #include @@ -18,6 +19,7 @@ namespace node { namespace quic { class QuicSession; +class QuicApplication; enum QuicStreamHeaderFlags : uint32_t { // No flags @@ -54,7 +56,7 @@ enum QuicStreamStatsIdx : int { // different internal representations for a header name+value // pair. QuicApplication implementations that support headers // per stream must create a specialization of the Header class. -class QuicHeader { +class QuicHeader : public MemoryRetainer { public: QuicHeader() {} @@ -69,6 +71,55 @@ class QuicHeader { virtual size_t length() const = 0; }; +enum QuicStreamStates : uint32_t { + // QuicStream is fully open. Readable and Writable + QUICSTREAM_FLAG_INITIAL = 0x0, + + // QuicStream Read State is closed because a final stream frame + // has been received from the peer or the QuicStream is unidirectional + // outbound only (i.e. it was never readable) + QUICSTREAM_FLAG_READ_CLOSED = 0x1, + + // QuicStream Write State is closed. There may still be data + // in the outbound queue waiting to be serialized or acknowledged. + // No additional data may be added to the queue, however, and a + // final stream packet will be sent once all of the data in the + // queue has been serialized. + QUICSTREAM_FLAG_WRITE_CLOSED = 0x2, + + // JavaScript side has switched into flowing mode (Readable side) + QUICSTREAM_FLAG_READ_STARTED = 0x4, + + // JavaScript side has paused the flow of data (Readable side) + QUICSTREAM_FLAG_READ_PAUSED = 0x8, + + // QuicStream has received a final stream frame (Readable side) + QUICSTREAM_FLAG_FIN = 0x10, + + // QuicStream has sent a final stream frame (Writable side) + QUICSTREAM_FLAG_FIN_SENT = 0x20, + + // QuicStream has been destroyed + QUICSTREAM_FLAG_DESTROYED = 0x40 +}; + +enum QuicStreamDirection { + // The QuicStream is readable and writable in both directions + QUIC_STREAM_BIRECTIONAL, + + // The QuicStream is writable and readable in only one direction. + // The direction depends on the QuicStreamOrigin. + QUIC_STREAM_UNIDIRECTIONAL +}; + +enum QuicStreamOrigin { + // The QuicStream was created by the server. + QUIC_STREAM_SERVER, + + // The QuicStream was created by the client. + QUIC_STREAM_CLIENT +}; + // QuicStream's are simple data flows that, fortunately, do not // require much. They may be: // @@ -137,183 +188,96 @@ class QuicHeader { // ngtcp2 level. class QuicStream : public AsyncWrap, public StreamBase { public: - enum QuicStreamStates : uint32_t { - // QuicStream is fully open. Readable and Writable - QUICSTREAM_FLAG_INITIAL = 0x0, - - // QuicStream Read State is closed because a final stream frame - // has been received from the peer or the QuicStream is unidirectional - // outbound only (i.e. it was never readable) - QUICSTREAM_FLAG_READ_CLOSED = 0x1, - - // QuicStream Write State is closed. There may still be data - // in the outbound queue waiting to be serialized or acknowledged. - // No additional data may be added to the queue, however, and a - // final stream packet will be sent once all of the data in the - // queue has been serialized. - QUICSTREAM_FLAG_WRITE_CLOSED = 0x2, - - // JavaScript side has switched into flowing mode (Readable side) - QUICSTREAM_FLAG_READ_STARTED = 0x4, - - // JavaScript side has paused the flow of data (Readable side) - QUICSTREAM_FLAG_READ_PAUSED = 0x8, - - // QuicStream has received a final stream frame (Readable side) - QUICSTREAM_FLAG_FIN = 0x10, - - // QuicStream has sent a final stream frame (Writable side) - QUICSTREAM_FLAG_FIN_SENT = 0x20, - - // QuicStream has been destroyed - QUICSTREAM_FLAG_DESTROYED = 0x40 - }; - - enum QuicStreamDirection { - // The QuicStream is readable and writable in both directions - QUIC_STREAM_BIRECTIONAL, - - // The QuicStream is writable and readable in only one direction. - // The direction depends on the QuicStreamOrigin. - QUIC_STREAM_UNIDIRECTIONAL - }; - - enum QuicStreamOrigin { - // The QuicStream was created by the server. - QUIC_STREAM_SERVER, - - // The QuicStream was created by the client. - QUIC_STREAM_CLIENT - }; - - static void Initialize( Environment* env, v8::Local target, v8::Local context); static BaseObjectPtr New( - QuicSession* session, int64_t stream_id); + QuicSession* session, + int64_t stream_id); + + QuicStream( + QuicSession* session, + v8::Local target, + int64_t stream_id); std::string diagnostic_name() const override; - inline QuicStreamDirection direction() const { - return stream_id_ & 0b10 ? - QUIC_STREAM_UNIDIRECTIONAL : - QUIC_STREAM_BIRECTIONAL; - } + int64_t id() const { return stream_id_; } + QuicSession* session() const { return session_.get(); } - inline QuicStreamOrigin origin() const { - return stream_id_ & 0b01 ? - QUIC_STREAM_SERVER : - QUIC_STREAM_CLIENT; - } + // A QuicStream can be either uni- or bi-directional. + inline QuicStreamDirection direction() const; - int64_t GetID() const { return stream_id_; } + // A QuicStream can be initiated by either the client + // or the server. + inline QuicStreamOrigin origin() const; - inline bool is_destroyed() const { - return flags_ & QUICSTREAM_FLAG_DESTROYED; - } + // The QuicStream has been destroyed and is no longer usable. + inline bool is_destroyed() const; // The QUICSTREAM_FLAG_FIN flag will be set only when a final stream // frame has been received from the peer. - inline bool has_received_fin() const { - return flags_ & QUICSTREAM_FLAG_FIN; - } + inline bool has_received_fin() const; // The QUICSTREAM_FLAG_FIN_SENT flag will be set only when a final // stream frame has been transmitted to the peer. Once sent, no // additional data may be transmitted to the peer. If has_sent_fin() // is set, is_writable() can be assumed to be false. - inline bool has_sent_fin() const { - return flags_ & QUICSTREAM_FLAG_FIN_SENT; - } + inline bool has_sent_fin() const; // WasEverWritable returns true if it is a bidirectional stream, // or a Unidirectional stream originating from the local peer. // If was_ever_writable() is false, then no stream frames should // ever be sent from the local peer, including final stream frames. - inline bool was_ever_writable() const { - if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { - return session_->is_server() ? - origin() == QUIC_STREAM_SERVER : - origin() == QUIC_STREAM_CLIENT; - } - return true; - } + inline bool was_ever_writable() const; // A QuicStream will not be writable if: // - The QUICSTREAM_FLAG_WRITE_CLOSED flag is set or // - It is a Unidirectional stream originating from the peer - inline bool is_writable() const { - if (flags_ & QUICSTREAM_FLAG_WRITE_CLOSED) - return false; - - return was_ever_writable(); - } + inline bool is_writable() const; // WasEverReadable returns true if it is a bidirectional stream, // or a Unidirectional stream originating from the remote // peer. - inline bool was_ever_readable() const { - if (direction() == QUIC_STREAM_UNIDIRECTIONAL) { - return session_->is_server() ? - origin() == QUIC_STREAM_CLIENT : - origin() == QUIC_STREAM_SERVER; - } - - return true; - } + inline bool was_ever_readable() const; // A QuicStream will not be readable if: // - The QUICSTREAM_FLAG_READ_CLOSED flag is set or // - It is a Unidirectional stream originating from the local peer. - inline bool is_readable() const { - if (flags_ & QUICSTREAM_FLAG_READ_CLOSED) - return false; - - return was_ever_readable(); - } + inline bool is_readable() const; - inline bool is_read_started() const { - return flags_ & QUICSTREAM_FLAG_READ_STARTED; - } + // True if reading from this QuicStream has ever initiated. + inline bool is_read_started() const; - inline bool is_read_paused() const { - return flags_ & QUICSTREAM_FLAG_READ_PAUSED; - } - - bool IsAlive() override { - return !is_destroyed() && !IsClosing(); - } - - bool IsClosing() override { - return !is_writable() && !is_readable(); - } + // True if reading from this QuicStream is currently paused. + inline bool is_read_paused() const; // Records the fact that a final stream frame has been // serialized and sent to the peer. There still may be // unacknowledged data in the outbound queue, but no // additional frames may be sent for the stream other // than reset stream. - inline void set_fin_sent() { - CHECK(!is_writable()); - flags_ |= QUICSTREAM_FLAG_FIN_SENT; - } + inline void set_fin_sent(); // IsWriteFinished will return true if a final stream frame // has been sent and all data has been acknowledged (the // send buffer is empty). - inline bool is_write_finished() { - return has_sent_fin() && streambuf_.length() == 0; - } + inline bool is_write_finished() const; - QuicSession* session() const { return session_.get(); } + // Specifies the kind of headers currently being processed. + inline void set_headers_kind(QuicStreamHeadersKind kind); - virtual void AckedDataOffset(uint64_t offset, size_t datalen); + // Marks the given data range as having been acknowledged. + // This means that the data range may be released from + // memory. + void Acknowledge(uint64_t offset, size_t datalen); - virtual void Destroy(); + // Destroy the QuicStream and render it no longer usable. + void Destroy(); + // Buffers chunks of data to be written to the QUIC connection. int DoWrite( WriteWrap* req_wrap, uv_buf_t* bufs, @@ -323,118 +287,83 @@ class QuicStream : public AsyncWrap, public StreamBase { inline void IncrementAvailableOutboundLength(size_t amount); inline void DecrementAvailableOutboundLength(size_t amount); - virtual void ReceiveData( - int fin, - const uint8_t* data, - size_t datalen, - uint64_t offset); - - bool SubmitInformation(v8::Local headers); - bool SubmitHeaders(v8::Local headers, uint32_t flags); - bool SubmitTrailers(v8::Local headers); - - // Required for StreamBase - int ReadStart() override; - - // Required for StreamBase - int ReadStop() override; - - // Required for StreamBase - int DoShutdown(ShutdownWrap* req_wrap) override; + // Returns false if the header cannot be added. This will + // typically only happen if a maximimum number of headers + // has been reached. + bool AddHeader(std::unique_ptr header); - void ResetStream(uint64_t app_error_code); + // Some QUIC applications support headers, others do not. + // The following methods allow consistent handling of + // headers at the QuicStream level regardless of the + // protocol. For applications that do not support headers, + // these are simply not used. + inline void BeginHeaders( + QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); - void Commit(ssize_t amount); + // Indicates an amount of unacknowledged data that has been + // submitted to the QUIC connection. + inline void Commit(ssize_t amount); template inline size_t DrainInto( std::vector* vec, - size_t max_count = MAX_VECTOR_COUNT) { - CHECK(!is_destroyed()); - size_t length = 0; - streambuf_.DrainInto(vec, &length, max_count); - return length; - } + size_t max_count = MAX_VECTOR_COUNT); template inline size_t DrainInto( T* vec, size_t* count, - size_t max_count = MAX_VECTOR_COUNT) { - CHECK(!is_destroyed()); - size_t length = 0; - streambuf_.DrainInto(vec, count, &length, max_count); - return length; - } + size_t max_count = MAX_VECTOR_COUNT); - AsyncWrap* GetAsyncWrap() override { return this; } + inline void EndHeaders(); - // Some QUIC applications support headers, others do not. - // The following methods allow consistent handling of - // headers at the QuicStream level regardless of the - // protocol. For applications that do not support headers, - // these are simply not used. - void BeginHeaders(QuicStreamHeadersKind kind = QUICSTREAM_HEADERS_KIND_NONE); + // Passes a chunk of data on to the QuicStream listener. + void ReceiveData( + int fin, + const uint8_t* data, + size_t datalen, + uint64_t offset); - // Returns false if the header cannot be added. This will - // typically only happen if a maximimum number of headers - // has been reached. - bool AddHeader(std::unique_ptr header); - void EndHeaders(); + // Resets the QUIC stream, sending a signal to the peer that + // no additional data will be transmitted for this stream. + inline void ResetStream(uint64_t app_error_code = 0); - // Sets the kind of headers currently being processed. - void set_headers_kind(QuicStreamHeadersKind kind); + // Submits informational headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitInformation(v8::Local headers); - void MemoryInfo(MemoryTracker* tracker) const override; + // Submits initial headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitHeaders(v8::Local headers, uint32_t flags); + + // Submits trailing headers. Returns false if headers are not + // supported on the underlying QuicApplication. + inline bool SubmitTrailers(v8::Local headers); + + // Required for StreamBase + bool IsAlive() override; + bool IsClosing() override; + int ReadStart() override; + int ReadStop() override; + int DoShutdown(ShutdownWrap* req_wrap) override; + AsyncWrap* GetAsyncWrap() override { return this; } + + // Required for MemoryRetainer + void MemoryInfo(MemoryTracker* tracker) const override; SET_MEMORY_INFO_NAME(QuicStream) SET_SELF_SIZE(QuicStream) - QuicStream( - QuicSession* session, - v8::Local target, - int64_t stream_id); - private: - // Called only when a final stream frame has been received from - // the peer. This has the side effect of marking the readable - // side of the stream closed. No additional data will be received - // on this QuicStream. - inline void set_fin_received() { - flags_ |= QUICSTREAM_FLAG_FIN; - set_read_close(); - } - - // SetWriteClose is called either when the QuicStream is created - // and is unidirectional from the peer, or when DoShutdown is called. - // This will indicate that the writable side of the QuicStream is - // closed and that no data will be pushed to the outbound queue. - inline void set_write_close() { - flags_ |= QUICSTREAM_FLAG_WRITE_CLOSED; - } - - // Called when no additional data can be received on the QuicStream. - inline void set_read_close() { - flags_ |= QUICSTREAM_FLAG_READ_CLOSED; - } - - inline void set_read_start() { - flags_ |= QUICSTREAM_FLAG_READ_STARTED; - } - - inline void set_read_pause() { - flags_ |= QUICSTREAM_FLAG_READ_PAUSED; - } - - inline void set_read_resume() { - flags_ &= QUICSTREAM_FLAG_READ_PAUSED; - } - - inline void set_destroyed() { - flags_ |= QUICSTREAM_FLAG_DESTROYED; - } - - inline void IncrementStats(size_t datalen); + inline void set_fin_received(); + inline void set_write_close(); + inline void set_read_close(); + inline void set_read_start(); + inline void set_read_pause(); + inline void set_read_resume(); + inline void set_destroyed(); + + void IncrementStats(size_t datalen); BaseObjectWeakPtr session_; int64_t stream_id_; @@ -493,6 +422,14 @@ class QuicStream : public AsyncWrap, public StreamBase { BaseObjectPtr data_rx_ack_; AliasedBigUint64Array stats_buffer_; + + ListNode stream_queue_; + + public: + // Linked List of QuicStream objects + using Queue = ListHead; + inline void Schedule(Queue* queue); + inline void Unschedule(); }; } // namespace quic