From 745a9b7d4dcd3c9b17f41a6d8f7b1d54ce02f97c Mon Sep 17 00:00:00 2001 From: James M Snell Date: Mon, 8 May 2023 16:14:20 -0700 Subject: [PATCH] quic: add additional implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add QUIC_SESSION and QUIC_STREAM to AsyncWrap * update definitions in quic/bindingdata.h * fixup minor discrepancies in cid.h/cid.cc * add convenience operator in struct Path * fixup defs.h macro definitions * fixups in quic/preferredaddress.h/cc * fixups in src/quic/tokens.h/cc * fixups in quic/transportparams.h/cc * fixups in quic/tlscontext.h/cc * add quic/streams.h/cc placeholder * add quic session/application implementation PR-URL: https://github.com/nodejs/node/pull/47927 Reviewed-By: Stephen Belanger Reviewed-By: Tobias Nießen Reviewed-By: Yagiz Nizipli --- node.gyp | 8 + src/async_wrap.h | 2 + src/quic/application.cc | 449 ++++ src/quic/application.h | 144 ++ src/quic/bindingdata.h | 67 +- src/quic/cid.cc | 7 + src/quic/cid.h | 8 +- src/quic/data.h | 1 + src/quic/defs.h | 16 +- src/quic/endpoint.cc | 116 +- src/quic/endpoint.h | 63 +- src/quic/packet.cc | 60 +- src/quic/preferredaddress.cc | 20 + src/quic/preferredaddress.h | 7 + src/quic/session.cc | 2180 +++++++++++++++++ src/quic/session.h | 375 ++- src/quic/streams.cc | 63 + src/quic/streams.h | 82 + src/quic/tlscontext.cc | 19 +- src/quic/tlscontext.h | 5 +- src/quic/tokens.cc | 6 + src/quic/tokens.h | 7 +- src/quic/transportparams.cc | 61 +- src/quic/transportparams.h | 29 +- test/sequential/test-async-wrap-getasyncid.js | 2 + 25 files changed, 3595 insertions(+), 202 deletions(-) create mode 100644 src/quic/application.cc create mode 100644 src/quic/application.h create mode 100644 src/quic/session.cc create mode 100644 src/quic/streams.cc create mode 100644 src/quic/streams.h diff --git a/node.gyp b/node.gyp index d57ab5f21c575e..0db48c18dc1186 100644 --- a/node.gyp +++ b/node.gyp @@ -340,23 +340,31 @@ 'src/node_crypto.h', ], 'node_quic_sources': [ + 'src/quic/application.cc', 'src/quic/bindingdata.cc', 'src/quic/cid.cc', 'src/quic/data.cc', + 'src/quic/endpoint.cc', 'src/quic/logstream.cc', 'src/quic/packet.cc', 'src/quic/preferredaddress.cc', + 'src/quic/session.cc', 'src/quic/sessionticket.cc', + 'src/quic/streams.cc', 'src/quic/tlscontext.cc', 'src/quic/tokens.cc', 'src/quic/transportparams.cc', + 'src/quic/application.h', 'src/quic/bindingdata.h', 'src/quic/cid.h', 'src/quic/data.h', + 'src/quic/endpoint.h', 'src/quic/logstream.h', 'src/quic/packet.h', 'src/quic/preferredaddress.h', + 'src/quic/session.h', 'src/quic/sessionticket.h', + 'src/quic/streams.h', 'src/quic/tlscontext.h', 'src/quic/tokens.h', 'src/quic/transportparams.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index c91896295a6cf9..01e981aa671a23 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -63,6 +63,8 @@ namespace node { V(QUIC_ENDPOINT) \ V(QUIC_LOGSTREAM) \ V(QUIC_PACKET) \ + V(QUIC_SESSION) \ + V(QUIC_STREAM) \ V(QUIC_UDP) \ V(SHUTDOWNWRAP) \ V(SIGNALWRAP) \ diff --git a/src/quic/application.cc b/src/quic/application.cc new file mode 100644 index 00000000000000..b7ba7fcf86b1ca --- /dev/null +++ b/src/quic/application.cc @@ -0,0 +1,449 @@ +#include "uv.h" +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include +#include +#include "application.h" +#include "defs.h" +#include "endpoint.h" +#include "packet.h" +#include "session.h" + +namespace node { + +using v8::Just; +using v8::Local; +using v8::Maybe; +using v8::Nothing; +using v8::Object; +using v8::Value; + +namespace quic { + +struct Session::Application::StreamData final { + // The actual number of vectors in the struct, up to kMaxVectorCount. + size_t count = 0; + size_t remaining = 0; + // The stream identifier. If this is a negative value then no stream is + // identified. + int64_t id = -1; + int fin = 0; + ngtcp2_vec data[kMaxVectorCount]{}; + ngtcp2_vec* buf = data; + BaseObjectPtr stream; +}; + +const Session::Application_Options Session::Application_Options::kDefault = {}; + +Maybe Session::Application_Options::From( + Environment* env, Local value) { + if (value.IsEmpty() || !value->IsObject()) { + THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object"); + return Nothing(); + } + + auto& state = BindingData::Get(env); + auto params = value.As(); + Application_Options options; + +#define SET(name) \ + SetOption( \ + env, &options, params, state.name##_string()) + + if (!SET(max_header_pairs) || !SET(max_header_length) || + !SET(max_field_section_size) || !SET(qpack_max_dtable_capacity) || + !SET(qpack_encoder_max_dtable_capacity) || !SET(qpack_blocked_streams)) { + return Nothing(); + } + +#undef SET + + return Just(options); +} + +Session::Application::Application(Session* session, const Options& options) + : session_(session) {} + +bool Session::Application::Start() { + // By default there is nothing to do. Specific implementations may + // override to perform more actions. + return true; +} + +void Session::Application::AcknowledgeStreamData(Stream* stream, + size_t datalen) { + DCHECK_NOT_NULL(stream); + stream->Acknowledge(datalen); +} + +void Session::Application::BlockStream(int64_t id) { + auto stream = session().FindStream(id); + if (stream) stream->Blocked(); +} + +bool Session::Application::CanAddHeader(size_t current_count, + size_t current_headers_length, + size_t this_header_length) { + // By default headers are not supported. + return false; +} + +bool Session::Application::SendHeaders(const Stream& stream, + HeadersKind kind, + const v8::Local& headers, + HeadersFlags flags) { + // By default do nothing. + return false; +} + +void Session::Application::ResumeStream(int64_t id) { + // By default do nothing. +} + +void Session::Application::ExtendMaxStreams(EndpointLabel label, + Direction direction, + uint64_t max_streams) { + // By default do nothing. +} + +void Session::Application::ExtendMaxStreamData(Stream* stream, + uint64_t max_data) { + // By default do nothing. +} + +void Session::Application::CollectSessionTicketAppData( + SessionTicket::AppData* app_data) const { + // By default do nothing. +} + +SessionTicket::AppData::Status +Session::Application::ExtractSessionTicketAppData( + const SessionTicket::AppData& app_data, + SessionTicket::AppData::Source::Flag flag) { + // By default we do not have any application data to retrieve. + return flag == SessionTicket::AppData::Source::Flag::STATUS_RENEW + ? SessionTicket::AppData::Status::TICKET_USE_RENEW + : SessionTicket::AppData::Status::TICKET_USE; +} + +void Session::Application::SetStreamPriority(const Stream& stream, + StreamPriority priority, + StreamPriorityFlags flags) { + // By default do nothing. +} + +StreamPriority Session::Application::GetStreamPriority(const Stream& stream) { + return StreamPriority::DEFAULT; +} + +BaseObjectPtr Session::Application::CreateStreamDataPacket() { + return Packet::Create(env(), + session_->endpoint_.get(), + session_->remote_address_, + ngtcp2_conn_get_max_udp_payload_size(*session_), + "stream data"); +} + +void Session::Application::StreamClose(Stream* stream, QuicError error) { + stream->Destroy(error); +} + +void Session::Application::StreamStopSending(Stream* stream, QuicError error) { + DCHECK_NOT_NULL(stream); + stream->ReceiveStopSending(error); +} + +void Session::Application::StreamReset(Stream* stream, + uint64_t final_size, + QuicError error) { + stream->ReceiveStreamReset(final_size, error); +} + +void Session::Application::SendPendingData() { + PathStorage path; + + BaseObjectPtr packet; + uint8_t* pos = nullptr; + int err = 0; + + size_t maxPacketCount = std::min(static_cast(64000), + ngtcp2_conn_get_send_quantum(*session_)); + size_t packetSendCount = 0; + + const auto updateTimer = [&] { + ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime()); + session_->UpdateTimer(); + }; + + const auto congestionLimited = [&](auto packet) { + auto len = pos - ngtcp2_vec(*packet).base; + // We are either congestion limited or done. + if (len) { + // Some data was serialized into the packet. We need to send it. + packet->Truncate(len); + session_->Send(std::move(packet), path); + } + + updateTimer(); + }; + + for (;;) { + ssize_t ndatalen; + StreamData stream_data; + + err = GetStreamData(&stream_data); + + if (err < 0) { + session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); + return session_->Close(Session::CloseMethod::SILENT); + } + + if (!packet) { + packet = CreateStreamDataPacket(); + if (!packet) { + session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); + return session_->Close(Session::CloseMethod::SILENT); + } + pos = ngtcp2_vec(*packet).base; + } + + ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); + + if (nwrite <= 0) { + switch (nwrite) { + case 0: + if (stream_data.id >= 0) ResumeStream(stream_data.id); + return congestionLimited(std::move(packet)); + case NGTCP2_ERR_STREAM_DATA_BLOCKED: { + session().StreamDataBlocked(stream_data.id); + if (session().max_data_left() == 0) { + if (stream_data.id >= 0) ResumeStream(stream_data.id); + return congestionLimited(std::move(packet)); + } + CHECK_LE(ndatalen, 0); + continue; + } + case NGTCP2_ERR_STREAM_SHUT_WR: { + // Indicates that the writable side of the stream has been closed + // locally or the stream is being reset. In either case, we can't send + // any stream data! + CHECK_GE(stream_data.id, 0); + // We need to notify the stream that the writable side has been closed + // and no more outbound data can be sent. + CHECK_LE(ndatalen, 0); + auto stream = session_->FindStream(stream_data.id); + if (stream) stream->End(); + continue; + } + case NGTCP2_ERR_WRITE_MORE: { + CHECK_GT(ndatalen, 0); + if (!StreamCommit(&stream_data, ndatalen)) return session_->Close(); + pos += ndatalen; + continue; + } + } + + packet->Done(UV_ECANCELED); + session_->last_error_ = QuicError::ForNgtcp2Error(nwrite); + return session_->Close(Session::CloseMethod::SILENT); + } + + pos += nwrite; + if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) { + // Since we are closing the session here, we don't worry about updating + // the pkt tx time. The failed StreamCommit should have updated the + // last_error_ appropriately. + packet->Done(UV_ECANCELED); + return session_->Close(Session::CloseMethod::SILENT); + } + + if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id); + + packet->Truncate(nwrite); + session_->Send(std::move(packet), path); + + pos = nullptr; + + if (++packetSendCount == maxPacketCount) { + break; + } + } + + updateTimer(); +} + +ssize_t Session::Application::WriteVStream(PathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data) { + CHECK_LE(stream_data.count, kMaxVectorCount); + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; + if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; + if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; + ssize_t ret = + ngtcp2_conn_writev_stream(*session_, + &path->path, + nullptr, + buf, + ngtcp2_conn_get_max_udp_payload_size(*session_), + ndatalen, + flags, + stream_data.id, + stream_data.buf, + stream_data.count, + uv_hrtime()); + return ret; +} + +// The DefaultApplication is the default implementation of Session::Application +// that is used for all unrecognized ALPN identifiers. +class DefaultApplication final : public Session::Application { + public: + // Marked NOLINT because the cpp linter gets confused about this using + // statement not being sorted with the using v8 statements at the top + // of the namespace. + using Application::Application; // NOLINT + + bool ReceiveStreamData(Stream* stream, + const uint8_t* data, + size_t datalen, + Stream::ReceiveDataFlags flags) override { + DCHECK_NOT_NULL(stream); + if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags); + return true; + } + + int GetStreamData(StreamData* stream_data) override { + DCHECK_NOT_NULL(stream_data); + // If the queue is empty, there aren't any streams with data yet + if (stream_queue_.IsEmpty()) return 0; + + const auto get_length = [](auto vec, size_t count) { + CHECK_NOT_NULL(vec); + size_t len = 0; + for (size_t n = 0; n < count; n++) len += vec[n].len; + return len; + }; + + Stream* stream = stream_queue_.PopFront(); + CHECK_NOT_NULL(stream); + stream_data->stream.reset(stream); + stream_data->id = stream->id(); + auto next = + [&](int status, const ngtcp2_vec* data, size_t count, bob::Done done) { + switch (status) { + case bob::Status::STATUS_BLOCK: + // Fall through + case bob::Status::STATUS_WAIT: + return; + case bob::Status::STATUS_EOS: + stream_data->fin = 1; + } + + stream_data->count = count; + + if (count > 0) { + stream->Schedule(&stream_queue_); + stream_data->remaining = get_length(data, count); + } else { + stream_data->remaining = 0; + } + + // Not calling done here because we defer committing + // the data until after we're sure it's written. + }; + + if (LIKELY(!stream->is_eos())) { + int ret = stream->Pull(std::move(next), + bob::Options::OPTIONS_SYNC, + stream_data->data, + arraysize(stream_data->data), + kMaxVectorCount); + switch (ret) { + case bob::Status::STATUS_EOS: + stream_data->fin = 1; + break; + } + } else { + stream_data->fin = 1; + } + + return 0; + } + + void ResumeStream(int64_t id) override { ScheduleStream(id); } + + bool ShouldSetFin(const StreamData& stream_data) override { + auto const is_empty = [](auto vec, size_t cnt) { + size_t i; + for (i = 0; i < cnt && vec[i].len == 0; ++i) { + } + return i == cnt; + }; + + return stream_data.stream && is_empty(stream_data.buf, stream_data.count); + } + + bool StreamCommit(StreamData* stream_data, size_t datalen) override { + DCHECK_NOT_NULL(stream_data); + const auto consume = [](ngtcp2_vec** pvec, size_t* pcnt, size_t len) { + ngtcp2_vec* v = *pvec; + size_t cnt = *pcnt; + + for (; cnt > 0; --cnt, ++v) { + if (v->len > len) { + v->len -= len; + v->base += len; + break; + } + len -= v->len; + } + + *pvec = v; + *pcnt = cnt; + }; + + CHECK(stream_data->stream); + stream_data->remaining -= datalen; + consume(&stream_data->buf, &stream_data->count, datalen); + stream_data->stream->Commit(datalen); + return true; + } + + SET_SELF_SIZE(DefaultApplication) + SET_MEMORY_INFO_NAME(DefaultApplication) + SET_NO_MEMORY_INFO() + + private: + void ScheduleStream(int64_t id) { + auto stream = session().FindStream(id); + if (stream && !stream->is_destroyed()) { + stream->Schedule(&stream_queue_); + } + } + + void UnscheduleStream(int64_t id) { + auto stream = session().FindStream(id); + if (stream && !stream->is_destroyed()) stream->Unschedule(); + } + + Stream::Queue stream_queue_; +}; + +std::unique_ptr Session::select_application() { + // if (config.options.crypto_options.alpn == NGHTTP3_ALPN_H3) + // return std::make_unique(session, + // config.options.application_options); + + // In the future, we may end up supporting additional QUIC protocols. As they + // are added, extend the cases here to create and return them. + + return std::make_unique( + this, config_.options.application_options); +} + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC diff --git a/src/quic/application.h b/src/quic/application.h new file mode 100644 index 00000000000000..af64d7ffca026e --- /dev/null +++ b/src/quic/application.h @@ -0,0 +1,144 @@ +#pragma once + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include "bindingdata.h" +#include "session.h" +#include "sessionticket.h" +#include "streams.h" + +namespace node { +namespace quic { + +// An Application implements the ALPN-protocol specific semantics on behalf +// of a QUIC Session. +class Session::Application : public MemoryRetainer { + public: + using Options = Session::Application_Options; + + Application(Session* session, const Options& options); + Application(const Application&) = delete; + Application(Application&&) = delete; + Application& operator=(const Application&) = delete; + Application& operator=(Application&&) = delete; + + virtual bool Start(); + + // Session will forward all received stream data immediately on to the + // Application. The only additional processing the Session does is to + // automatically adjust the session-level flow control window. It is up to + // the Application to do the same for the Stream-level flow control. + virtual bool ReceiveStreamData(Stream* stream, + const uint8_t* data, + size_t datalen, + Stream::ReceiveDataFlags flags) = 0; + + // Session will forward all data acknowledgements for a stream to the + // Application. + virtual void AcknowledgeStreamData(Stream* stream, size_t datalen); + + // Called to determine if a Header can be added to this application. + // Applications that do not support headers will always return false. + virtual bool CanAddHeader(size_t current_count, + size_t current_headers_length, + size_t this_header_length); + + // Called to mark the identified stream as being blocked. Not all + // Application types will support blocked streams, and those that do will do + // so differently. + virtual void BlockStream(int64_t id); + + // Called when the session determines that there is outbound data available + // to send for the given stream. + virtual void ResumeStream(int64_t id); + + // Called when the Session determines that the maximum number of + // remotely-initiated unidirectional streams has been extended. Not all + // Application types will require this notification so the default is to do + // nothing. + virtual void ExtendMaxStreams(EndpointLabel label, + Direction direction, + uint64_t max_streams); + + // Called when the Session determines that the flow control window for the + // given stream has been expanded. Not all Application types will require + // this notification so the default is to do nothing. + virtual void ExtendMaxStreamData(Stream* stream, uint64_t max_data); + + // Different Applications may wish to set some application data in the + // session ticket (e.g. http/3 would set server settings in the application + // data). By default, there's nothing to set. + virtual void CollectSessionTicketAppData( + SessionTicket::AppData* app_data) const; + + // Different Applications may set some application data in the session + // ticket (e.g. http/3 would set server settings in the application data). + // By default, there's nothing to get. + virtual SessionTicket::AppData::Status ExtractSessionTicketAppData( + const SessionTicket::AppData& app_data, + SessionTicket::AppData::Source::Flag flag); + + // Notifies the Application that the identified stream has been closed. + virtual void StreamClose(Stream* stream, QuicError error = QuicError()); + + // Notifies the Application that the identified stream has been reset. + virtual void StreamReset(Stream* stream, + uint64_t final_size, + QuicError error); + + // Notifies the Application that the identified stream should stop sending. + virtual void StreamStopSending(Stream* stream, QuicError error); + + // Submits an outbound block of headers for the given stream. Not all + // Application types will support headers, in which case this function + // should return false. + virtual bool SendHeaders(const Stream& stream, + HeadersKind kind, + const v8::Local& headers, + HeadersFlags flags = HeadersFlags::NONE); + + // Signals to the Application that it should serialize and transmit any + // pending session and stream packets it has accumulated. + void SendPendingData(); + + // Set the priority level of the stream if supported by the application. Not + // all applications support priorities, in which case this function is a + // non-op. + virtual void SetStreamPriority( + const Stream& stream, + StreamPriority priority = StreamPriority::DEFAULT, + StreamPriorityFlags flags = StreamPriorityFlags::NONE); + + // Get the priority level of the stream if supported by the application. Not + // all applications support priorities, in which case this function returns + // the default stream priority. + virtual StreamPriority GetStreamPriority(const Stream& stream); + + protected: + inline Environment* env() const { return session_->env(); } + inline Session& session() { return *session_; } + + BaseObjectPtr CreateStreamDataPacket(); + + struct StreamData; + + virtual int GetStreamData(StreamData* data) = 0; + virtual bool StreamCommit(StreamData* data, size_t datalen) = 0; + virtual bool ShouldSetFin(const StreamData& data) = 0; + + // Write the given stream_data into the buffer. + ssize_t WriteVStream(PathStorage* path, + uint8_t* buf, + ssize_t* ndatalen, + const StreamData& stream_data); + + private: + Session* session_; +}; + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/bindingdata.h b/src/quic/bindingdata.h index a5bfe57d8ea6e6..2b33642b7ad76a 100644 --- a/src/quic/bindingdata.h +++ b/src/quic/bindingdata.h @@ -27,10 +27,55 @@ enum class Side { SERVER = NGTCP2_CRYPTO_SIDE_SERVER, }; +enum class EndpointLabel { + LOCAL, + REMOTE, +}; + +enum class Direction { + BIDIRECTIONAL, + UNIDIRECTIONAL, +}; + +enum class HeadersKind { + HINTS, + INITIAL, + TRAILING, +}; + +enum class HeadersFlags { + NONE, + TERMINAL, +}; + +enum class StreamPriority { + DEFAULT = NGHTTP3_DEFAULT_URGENCY, + LOW = NGHTTP3_URGENCY_LOW, + HIGH = NGHTTP3_URGENCY_HIGH, +}; + +enum class StreamPriorityFlags { + NONE, + NON_INCREMENTAL, +}; + +enum class PathValidationResult : uint8_t { + SUCCESS = NGTCP2_PATH_VALIDATION_RESULT_SUCCESS, + FAILURE = NGTCP2_PATH_VALIDATION_RESULT_FAILURE, + ABORTED = NGTCP2_PATH_VALIDATION_RESULT_ABORTED, +}; + +enum class DatagramStatus { + ACKNOWLEDGED, + LOST, +}; + +constexpr uint64_t NGTCP2_APP_NOERROR = 65280; constexpr size_t kDefaultMaxPacketLength = NGTCP2_MAX_UDP_PAYLOAD_SIZE; constexpr size_t kMaxSizeT = std::numeric_limits::max(); constexpr uint64_t kMaxSafeJsInteger = 9007199254740991; constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; +constexpr size_t kMaxVectorCount = 16; // ============================================================================ @@ -51,7 +96,6 @@ constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; V(endpoint_close, EndpointClose) \ V(session_new, SessionNew) \ V(session_close, SessionClose) \ - V(session_error, SessionError) \ V(session_datagram, SessionDatagram) \ V(session_datagram_status, SessionDatagramStatus) \ V(session_handshake, SessionHandshake) \ @@ -68,10 +112,13 @@ constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; // The various JS strings the implementation uses. #define QUIC_STRINGS(V) \ + V(aborted, "aborted") \ + V(acknowledged, "acknowledged") \ V(ack_delay_exponent, "ackDelayExponent") \ V(active_connection_id_limit, "activeConnectionIDLimit") \ V(address_lru_size, "addressLRUSize") \ V(alpn, "alpn") \ + V(application_options, "application") \ V(ca, "ca") \ V(certs, "certs") \ V(cc_algorithm, "cc") \ @@ -82,6 +129,7 @@ constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; V(enable_tls_trace, "tlsTrace") \ V(endpoint, "Endpoint") \ V(endpoint_udp, "Endpoint::UDP") \ + V(failure, "failure") \ V(groups, "groups") \ V(hostname, "hostname") \ V(http3_alpn, &NGHTTP3_ALPN_H3[1]) \ @@ -95,15 +143,25 @@ constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; V(keylog, "keylog") \ V(keys, "keys") \ V(logstream, "LogStream") \ + V(lost, "lost") \ V(max_ack_delay, "maxAckDelay") \ V(max_connections_per_host, "maxConnectionsPerHost") \ V(max_connections_total, "maxConnectionsTotal") \ V(max_datagram_frame_size, "maxDatagramFrameSize") \ + V(max_field_section_size, "maxFieldSectionSize") \ + V(max_header_length, "maxHeaderLength") \ + V(max_header_pairs, "maxHeaderPairs") \ V(max_idle_timeout, "maxIdleTimeout") \ V(max_payload_size, "maxPayloadSize") \ V(max_retries, "maxRetries") \ V(max_stateless_resets, "maxStatelessResetsPerHost") \ + V(min_version, "minVersion") \ V(packetwrap, "PacketWrap") \ + V(preferred_address_strategy, "preferredAddressPolicy") \ + V(qlog, "qlog") \ + V(qpack_blocked_streams, "qpackBlockedStreams") \ + V(qpack_encoder_max_dtable_capacity, "qpackEncoderMaxDTableCapacity") \ + V(qpack_max_dtable_capacity, "qpackMaxDTableCapacity") \ V(reject_unauthorized, "rejectUnauthorized") \ V(retry_token_expiration, "retryTokenExpiration") \ V(request_peer_certificate, "requestPeerCertificate") \ @@ -112,15 +170,19 @@ constexpr auto kSocketAddressInfoTimeout = 60 * NGTCP2_SECONDS; V(session, "Session") \ V(session_id_ctx, "sessionIDContext") \ V(stream, "Stream") \ + V(success, "success") \ + V(tls_options, "tls") \ V(token_expiration, "tokenExpiration") \ V(token_secret, "tokenSecret") \ + V(transport_params, "transportParams") \ V(tx_loss, "txDiagnosticLoss") \ V(udp_receive_buffer_size, "udpReceiveBufferSize") \ V(udp_send_buffer_size, "udpSendBufferSize") \ V(udp_ttl, "udpTTL") \ V(unacknowledged_packet_threshold, "unacknowledgedPacketThreshold") \ V(validate_address, "validateAddress") \ - V(verify_hostname_identity, "verifyHostnameIdentity") + V(verify_hostname_identity, "verifyHostnameIdentity") \ + V(version, "version") // ============================================================================= // The BindingState object holds state for the internalBinding('quic') binding @@ -247,6 +309,7 @@ struct CallbackScope final : public CallbackScopeBase { BaseObjectPtr ref; explicit CallbackScope(const T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {} + explicit CallbackScope(T* ptr) : CallbackScopeBase(ptr->env()), ref(ptr) {} }; } // namespace quic diff --git a/src/quic/cid.cc b/src/quic/cid.cc index 019896104fb63b..e23c9357340ec6 100644 --- a/src/quic/cid.cc +++ b/src/quic/cid.cc @@ -34,6 +34,13 @@ CID::CID(const CID& other) : ptr_(&cid_) { ngtcp2_cid_init(&cid_, other.ptr_->data, other.ptr_->datalen); } +CID& CID::operator=(const CID& other) { + CHECK_NOT_NULL(other.ptr_); + ptr_ = &cid_; + ngtcp2_cid_init(&cid_, other.ptr_->data, other.ptr_->datalen); + return *this; +} + bool CID::operator==(const CID& other) const noexcept { if (this == &other || (length() == 0 && other.length() == 0)) return true; if (length() != other.length()) return false; diff --git a/src/quic/cid.h b/src/quic/cid.h index bfd6eb47c9ff9b..a3aa0e750f303a 100644 --- a/src/quic/cid.h +++ b/src/quic/cid.h @@ -52,6 +52,8 @@ class CID final : public MemoryRetainer { CID(const CID& other); CID(CID&& other) = delete; + CID& operator=(const CID& other); + struct Hash final { size_t operator()(const CID& cid) const; }; @@ -86,12 +88,14 @@ class CID final : public MemoryRetainer { static CID kInvalid; - private: // The default constructor creates an empty, zero-length CID. // Zero-length CIDs are not usable. We use them as a placeholder - // for a missing or empty CID value. + // for a missing or empty CID value. This is public only because + // it is required for the CID::Map implementation. It should not + // be used. Use kInvalid instead. CID(); + private: ngtcp2_cid cid_; const ngtcp2_cid* ptr_; diff --git a/src/quic/data.h b/src/quic/data.h index 2ccd7d7a8156f2..ef103254104ec3 100644 --- a/src/quic/data.h +++ b/src/quic/data.h @@ -16,6 +16,7 @@ namespace quic { struct Path final : public ngtcp2_path { Path(const SocketAddress& local, const SocketAddress& remote); + inline operator ngtcp2_path*() { return this; } }; struct PathStorage final : public ngtcp2_path_storage { diff --git a/src/quic/defs.h b/src/quic/defs.h index d7123d16f11cab..65ebe812efa1b7 100644 --- a/src/quic/defs.h +++ b/src/quic/defs.h @@ -9,6 +9,10 @@ namespace node { namespace quic { +#define NGTCP2_SUCCESS 0 +#define NGTCP2_ERR(V) (V != NGTCP2_SUCCESS) +#define NGTCP2_OK(V) (V == NGTCP2_SUCCESS) + template bool SetOption(Environment* env, Opt* options, @@ -96,14 +100,16 @@ uint64_t GetStat(Stats* stats) { IncrementStat(stats_.Data(), amt); #define STAT_RECORD_TIMESTAMP(Type, name) \ RecordTimestampStat(stats_.Data()); -#define STAT_SET(Type, name, val) \ - SetStat(stats_.Data(), val); -#define STAT_GET(Type, name) GetStat(stats_.Data()); +#define STAT_SET(Type, name, val) SetStat(stats_.Data(), val) +#define STAT_GET(Type, name) GetStat(stats_.Data()) #define STAT_FIELD(_, name) uint64_t name; -#define STAT_STRUCT(name) \ - struct Stats final { \ +#define STAT_STRUCT(klass, name) \ + struct klass::Stats final { \ name##_STATS(STAT_FIELD) \ }; +#define JS_METHOD(name) \ + static void name(const v8::FunctionCallbackInfo& args) + } // namespace quic } // namespace node diff --git a/src/quic/endpoint.cc b/src/quic/endpoint.cc index 4b29de39a8df0d..bc47cfb26be9e2 100644 --- a/src/quic/endpoint.cc +++ b/src/quic/endpoint.cc @@ -12,6 +12,7 @@ #include #include #include +#include "application.h" #include "defs.h" namespace node { @@ -35,6 +36,45 @@ using v8::Value; namespace quic { +#define ENDPOINT_STATE(V) \ + /* Bound to the UDP port */ \ + V(BOUND, bound, uint8_t) \ + /* Receiving packets on the UDP port */ \ + V(RECEIVING, receiving, uint8_t) \ + /* Listening as a QUIC server */ \ + V(LISTENING, listening, uint8_t) \ + /* In the process of closing down */ \ + V(CLOSING, closing, uint8_t) \ + /* In the process of closing down, waiting for pending send callbacks */ \ + V(WAITING_FOR_CALLBACKS, waiting_for_callbacks, uint8_t) \ + /* Temporarily paused serving new initial requests */ \ + V(BUSY, busy, uint8_t) \ + /* The number of pending send callbacks */ \ + V(PENDING_CALLBACKS, pending_callbacks, size_t) + +#define ENDPOINT_STATS(V) \ + V(CREATED_AT, created_at) \ + V(DESTROYED_AT, destroyed_at) \ + V(BYTES_RECEIVED, bytes_received) \ + V(BYTES_SENT, bytes_sent) \ + V(PACKETS_RECEIVED, packets_received) \ + V(PACKETS_SENT, packets_sent) \ + V(SERVER_SESSIONS, server_sessions) \ + V(CLIENT_SESSIONS, client_sessions) \ + V(SERVER_BUSY_COUNT, server_busy_count) \ + V(RETRY_COUNT, retry_count) \ + V(VERSION_NEGOTIATION_COUNT, version_negotiation_count) \ + V(STATELESS_RESET_COUNT, stateless_reset_count) \ + V(IMMEDIATE_CLOSE_COUNT, immediate_close_count) + +struct Endpoint::State { +#define V(_, name, type) type name; + ENDPOINT_STATE(V) +#undef V +}; + +STAT_STRUCT(Endpoint, ENDPOINT) + // ============================================================================ namespace { @@ -258,7 +298,7 @@ class Endpoint::UDP::Impl final : public HandleWrap { return; } - impl->endpoint_->Receive(uv_buf_t{buf->base, static_cast(nread)}, + impl->endpoint_->Receive(uv_buf_init(buf->base, static_cast(nread)), SocketAddress(addr)); } @@ -414,6 +454,15 @@ Local Endpoint::GetConstructorTemplate(Environment* env) { void Endpoint::Initialize(Environment* env, Local target) { SetMethod(env->context(), target, "createEndpoint", CreateEndpoint); +#define V(name, _) IDX_STATS_ENDPOINT_##name, + enum EndpointStatsIdx { ENDPOINT_STATS(V) IDX_STATS_ENDPOINT_COUNT }; +#undef V + +#define V(name, key, __) \ + auto IDX_STATE_ENDPOINT_##name = offsetof(Endpoint::State, key); + ENDPOINT_STATE(V) +#undef V + #define V(name, _) NODE_DEFINE_CONSTANT(target, IDX_STATS_ENDPOINT_##name); ENDPOINT_STATS(V) #undef V @@ -483,25 +532,16 @@ void Endpoint::MarkAsBusy(bool on) { state_->busy = on ? 1 : 0; } -Maybe Endpoint::GenerateNewToken( - uint32_t version, const SocketAddress& remote_address) { - if (is_closed() || is_closing()) { - THROW_ERR_INVALID_STATE(env(), - "Endpoint is closed. Unable to create token."); - return Nothing(); - } - - return Just(RegularToken(version, remote_address, options_.token_secret)); +RegularToken Endpoint::GenerateNewToken(uint32_t version, + const SocketAddress& remote_address) { + DCHECK(!is_closed() && !is_closing()); + return RegularToken(version, remote_address, options_.token_secret); } -Maybe Endpoint::GenerateNewStatelessResetToken( +StatelessResetToken Endpoint::GenerateNewStatelessResetToken( uint8_t* token, const CID& cid) const { - if (is_closed() || is_closing()) { - THROW_ERR_INVALID_STATE(env(), - "Endpoint is closed. Unable to create token."); - return Nothing(); - } - return Just(StatelessResetToken(token, options_.reset_token_secret, cid)); + DCHECK(!is_closed() && !is_closing()); + return StatelessResetToken(token, options_.reset_token_secret, cid); } void Endpoint::AddSession(const CID& cid, BaseObjectPtr session) { @@ -708,33 +748,18 @@ void Endpoint::Listen(const Session::Options& options) { BaseObjectPtr Endpoint::Connect( const SocketAddress& remote_address, const Session::Options& options, - std::optional sessionTicket) { - // TODO(@jasnell): Implement as part of Session... + std::optional session_ticket) { // If starting fails, the endpoint will be destroyed. if (!Start()) return BaseObjectPtr(); - // auto config = Session::Config( - // Side::CLIENT, - // *this, - // // For client sessions, we always generate a random intial CID for the - // // server. This is generally just a throwaway. The server will generate - // // it's own CID and send that back to us. - // CIDFactory::random().Generate(NGTCP2_MIN_INITIAL_DCIDLEN), - // local_address(), - // remote_address); - - // if (options.qlog) config.EnableQLog(); - - // config.session_ticket = sessionTicket; + auto config = Session::Config( + *this, options, local_address(), remote_address, session_ticket); - // auto session = - // Session::Create(BaseObjectPtr(this), config, options); - // if (!session) return BaseObjectPtr(); - - // session->set_wrapped(); - - // auto on_exit = OnScopeLeave([&] { session->SendPendingData(); }); + auto session = Session::Create(BaseObjectPtr(this), config); + if (!session) return BaseObjectPtr(); + session->set_wrapped(); + session->application().SendPendingData(); return BaseObjectPtr(); } @@ -806,8 +831,7 @@ void Endpoint::Receive(const uv_buf_t& buf, const auto accept = [&](const Session::Config& config, Store&& store) { if (is_closed() || is_closing() || !is_listening()) return false; - auto session = Session::Create( - BaseObjectPtr(this), config, server_options_.value()); + auto session = Session::Create(BaseObjectPtr(this), config); return session ? session->Receive(std::move(store), config.local_address, @@ -873,11 +897,11 @@ void Endpoint::Receive(const uv_buf_t& buf, // *this* session will use as it's outbound dcid. auto config = Session::Config(Side::SERVER, *this, - scid, + server_options_.value(), + version, local_address, remote_address, - version, - version, + scid, dcid); // The this point, the config.scid and config.dcid represent *our* views of @@ -941,8 +965,8 @@ void Endpoint::Receive(const uv_buf_t& buf, // The ocid is the original dcid that was encoded into the // original retry packet sent to the client. We use it for // validation. - config.ocid.emplace(ocid.value()); - config.retry_scid.emplace(dcid); + config.ocid = ocid.value(); + config.retry_scid = dcid; break; } case RegularToken::kTokenMagic: { diff --git a/src/quic/endpoint.h b/src/quic/endpoint.h index d02a2a3d9a9360..700630c2244420 100644 --- a/src/quic/endpoint.h +++ b/src/quic/endpoint.h @@ -21,50 +21,11 @@ namespace node { namespace quic { -#define ENDPOINT_STATS(V) \ - V(CREATED_AT, created_at) \ - V(DESTROYED_AT, destroyed_at) \ - V(BYTES_RECEIVED, bytes_received) \ - V(BYTES_SENT, bytes_sent) \ - V(PACKETS_RECEIVED, packets_received) \ - V(PACKETS_SENT, packets_sent) \ - V(SERVER_SESSIONS, server_sessions) \ - V(CLIENT_SESSIONS, client_sessions) \ - V(SERVER_BUSY_COUNT, server_busy_count) \ - V(RETRY_COUNT, retry_count) \ - V(VERSION_NEGOTIATION_COUNT, version_negotiation_count) \ - V(STATELESS_RESET_COUNT, stateless_reset_count) \ - V(IMMEDIATE_CLOSE_COUNT, immediate_close_count) - -#define ENDPOINT_STATE(V) \ - /* Bound to the UDP port */ \ - V(BOUND, bound, uint8_t) \ - /* Receiving packets on the UDP port */ \ - V(RECEIVING, receiving, uint8_t) \ - /* Listening as a QUIC server */ \ - V(LISTENING, listening, uint8_t) \ - /* In the process of closing down */ \ - V(CLOSING, closing, uint8_t) \ - /* In the process of closing down, waiting for pending send callbacks */ \ - V(WAITING_FOR_CALLBACKS, waiting_for_callbacks, uint8_t) \ - /* Temporarily paused serving new initial requests */ \ - V(BUSY, busy, uint8_t) \ - /* The number of pending send callbacks */ \ - V(PENDING_CALLBACKS, pending_callbacks, size_t) - // An Endpoint encapsulates the UDP local port binding and is responsible for // sending and receiving QUIC packets. A single endpoint can act as both a QUIC // client and server simultaneously. class Endpoint final : public AsyncWrap, public Packet::Listener { public: - STAT_STRUCT(ENDPOINT) - - struct State final { -#define V(_, name, type) type name; - ENDPOINT_STATE(V) -#undef V - }; - static constexpr size_t DEFAULT_MAX_CONNECTIONS = std::min(kMaxSizeT, static_cast(kMaxSafeJsInteger)); static constexpr size_t DEFAULT_MAX_CONNECTIONS_PER_HOST = 100; @@ -222,9 +183,6 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { inline const Options& options() const { return options_; } - inline const Stats& stats() const { - return *stats_.Data(); - } // While the busy flag is set, the Endpoint will reject all initial packets // with a SERVER_BUSY response. This allows us to build a circuit breaker @@ -233,12 +191,12 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { void MarkAsBusy(bool on = true); // Use the endpoint's token secret to generate a new token. - v8::Maybe GenerateNewToken(uint32_t version, - const SocketAddress& remote_address); + RegularToken GenerateNewToken(uint32_t version, + const SocketAddress& remote_address); // Use the endpoint's reset token secret to generate a new stateless reset. - v8::Maybe GenerateNewStatelessResetToken( - uint8_t* token, const CID& cid) const; + StatelessResetToken GenerateNewStatelessResetToken(uint8_t* token, + const CID& cid) const; void AddSession(const CID& cid, BaseObjectPtr session); void RemoveSession(const CID& cid); @@ -310,16 +268,10 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { SET_MEMORY_INFO_NAME(Endpoint) SET_SELF_SIZE(Endpoint) - private: -#define V(name, _) IDX_STATS_ENDPOINT_##name, - enum EndpointStatsIdx { ENDPOINT_STATS(V) IDX_STATS_ENDPOINT_COUNT }; -#undef V - -#define V(name, key, __) \ - IDX_STATE_ENDPOINT_##name = OffsetOf(&Endpoint::State::key), - enum EndpointStateIdx { ENDPOINT_STATE(V) }; -#undef V + struct Stats; + struct State; + private: class UDP final : public MemoryRetainer { public: explicit UDP(Endpoint* endpoint); @@ -478,6 +430,7 @@ class Endpoint final : public AsyncWrap, public Packet::Listener { friend class UDP; friend class Packet; + friend class Session; }; } // namespace quic diff --git a/src/quic/packet.cc b/src/quic/packet.cc index 2277110aff56b8..a25bd9e78180bd 100644 --- a/src/quic/packet.cc +++ b/src/quic/packet.cc @@ -199,11 +199,12 @@ int Packet::Send(uv_udp_t* handle, BaseObjectPtr ref) { } void Packet::Done(int status) { - DCHECK_NOT_NULL(listener_); - listener_->PacketDone(status); + if (listener_ != nullptr) { + listener_->PacketDone(status); + } + listener_ = nullptr; handle_.reset(); data_.reset(); - listener_ = nullptr; Reset(); // As a performance optimization, we add this packet to a freelist @@ -261,7 +262,10 @@ BaseObjectPtr Packet::CreateRetryPacket( path_descriptor.dcid, vec.base, vec.len); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -272,13 +276,16 @@ BaseObjectPtr Packet::CreateConnectionClosePacket( const SocketAddress& destination, ngtcp2_conn* conn, const QuicError& error) { - auto packet = Packet::Create( + auto packet = Create( env, listener, destination, kDefaultMaxPacketLength, "connection close"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_conn_write_connection_close( conn, nullptr, nullptr, vec.base, vec.len, error, uv_hrtime()); - if (nwrite < 0) return BaseObjectPtr(); + if (nwrite < 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -288,11 +295,11 @@ BaseObjectPtr Packet::CreateImmediateConnectionClosePacket( Listener* listener, const PathDescriptor& path_descriptor, const QuicError& reason) { - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "immediate connection close (endpoint)"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "immediate connection close (endpoint)"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_crypto_write_connection_close( vec.base, @@ -305,7 +312,10 @@ BaseObjectPtr Packet::CreateImmediateConnectionClosePacket( // there is one in the QuicError nullptr, 0); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } @@ -329,16 +339,17 @@ BaseObjectPtr Packet::CreateStatelessResetPacket( uint8_t random[kRandlen]; CHECK(crypto::CSPRNG(random, kRandlen).is_ok()); - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "stateless reset"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "stateless reset"); ngtcp2_vec vec = *packet; ssize_t nwrite = ngtcp2_pkt_write_stateless_reset( vec.base, pktlen, token, random, kRandlen); if (nwrite <= static_cast(kMinStatelessResetLen)) { + packet->Done(UV_ECANCELED); return BaseObjectPtr(); } @@ -377,11 +388,11 @@ BaseObjectPtr Packet::CreateVersionNegotiationPacket( size_t pktlen = path_descriptor.dcid.length() + path_descriptor.scid.length() + (sizeof(sv)) + 7; - auto packet = Packet::Create(env, - listener, - path_descriptor.remote_address, - kDefaultMaxPacketLength, - "version negotiation"); + auto packet = Create(env, + listener, + path_descriptor.remote_address, + kDefaultMaxPacketLength, + "version negotiation"); ngtcp2_vec vec = *packet; ssize_t nwrite = @@ -394,7 +405,10 @@ BaseObjectPtr Packet::CreateVersionNegotiationPacket( path_descriptor.scid.length(), sv, arraysize(sv)); - if (nwrite <= 0) return BaseObjectPtr(); + if (nwrite <= 0) { + packet->Done(UV_ECANCELED); + return BaseObjectPtr(); + } packet->Truncate(static_cast(nwrite)); return packet; } diff --git a/src/quic/preferredaddress.cc b/src/quic/preferredaddress.cc index 180241cf272aa8..138dbf47c46a07 100644 --- a/src/quic/preferredaddress.cc +++ b/src/quic/preferredaddress.cc @@ -15,6 +15,7 @@ using v8::Just; using v8::Local; using v8::Maybe; using v8::Nothing; +using v8::Uint32; using v8::Value; namespace quic { @@ -153,6 +154,25 @@ void PreferredAddress::Set(ngtcp2_transport_params* params, // Any other value is just ignored. } +Maybe PreferredAddress::tryGetPolicy( + Environment* env, Local value) { + if (value->IsNumber()) { + auto val = value.As()->Value(); + if (val == static_cast(Policy::IGNORE_PREFERRED_ADDRESS)) + return Just(Policy::IGNORE_PREFERRED_ADDRESS); + if (val == static_cast(Policy::USE_PREFERRED_ADDRESS)) + return Just(Policy::USE_PREFERRED_ADDRESS); + } + return Nothing(); +} + +void PreferredAddress::Initialize(Environment* env, + v8::Local target) { + NODE_DEFINE_CONSTANT(target, QUIC_PREFERRED_ADDRESS_IGNORE); + NODE_DEFINE_CONSTANT(target, QUIC_PREFERRED_ADDRESS_USE); + NODE_DEFINE_CONSTANT(target, DEFAULT_PREFERRED_ADDRESS_POLICY); +} + } // namespace quic } // namespace node diff --git a/src/quic/preferredaddress.h b/src/quic/preferredaddress.h index 6be468fac2cd08..e29f4021883bd8 100644 --- a/src/quic/preferredaddress.h +++ b/src/quic/preferredaddress.h @@ -25,12 +25,19 @@ class PreferredAddress final { USE_PREFERRED_ADDRESS, }; + static v8::Maybe tryGetPolicy(Environment* env, + v8::Local value); + // The QUIC_* constants are expected to be exported out to be used on // the JavaScript side of the API. static constexpr uint32_t QUIC_PREFERRED_ADDRESS_USE = static_cast(Policy::USE_PREFERRED_ADDRESS); static constexpr uint32_t QUIC_PREFERRED_ADDRESS_IGNORE = static_cast(Policy::IGNORE_PREFERRED_ADDRESS); + static constexpr uint32_t DEFAULT_PREFERRED_ADDRESS_POLICY = + static_cast(Policy::USE_PREFERRED_ADDRESS); + + static void Initialize(Environment* env, v8::Local target); static v8::Maybe GetPolicy(Environment* env, v8::Local value); diff --git a/src/quic/session.cc b/src/quic/session.cc new file mode 100644 index 00000000000000..dd5a992d55275c --- /dev/null +++ b/src/quic/session.cc @@ -0,0 +1,2180 @@ +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include "session.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "application.h" +#include "bindingdata.h" +#include "cid.h" +#include "data.h" +#include "defs.h" +#include "endpoint.h" +#include "logstream.h" +#include "packet.h" +#include "preferredaddress.h" +#include "sessionticket.h" +#include "streams.h" +#include "tlscontext.h" +#include "transportparams.h" + +namespace node { + +using v8::Array; +using v8::ArrayBuffer; +using v8::ArrayBufferView; +using v8::BigInt; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Integer; +using v8::Just; +using v8::Local; +using v8::Maybe; +using v8::Nothing; +using v8::Object; +using v8::PropertyAttribute; +using v8::String; +using v8::Uint32; +using v8::Value; + +namespace quic { + +#define SESSION_STATE(V) \ + /* Set if the JavaScript wrapper has a path-validation event listener */ \ + V(PATH_VALIDATION, path_validation, uint8_t) \ + /* Set if the JavaScript wrapper has a version-negotiation event listener */ \ + V(VERSION_NEGOTIATION, version_negotiation, uint8_t) \ + /* Set if the JavaScript wrapper has a datagram event listener */ \ + V(DATAGRAM, datagram, uint8_t) \ + /* Set if the JavaScript wrapper has a session-ticket event listener */ \ + V(SESSION_TICKET, session_ticket, uint8_t) \ + V(CLOSING, closing, uint8_t) \ + V(GRACEFUL_CLOSE, graceful_close, uint8_t) \ + V(SILENT_CLOSE, silent_close, uint8_t) \ + V(STATELESS_RESET, stateless_reset, uint8_t) \ + V(DESTROYED, destroyed, uint8_t) \ + V(HANDSHAKE_COMPLETED, handshake_completed, uint8_t) \ + V(HANDSHAKE_CONFIRMED, handshake_confirmed, uint8_t) \ + V(STREAM_OPEN_ALLOWED, stream_open_allowed, uint8_t) \ + /* A Session is wrapped if it has been passed out to JS */ \ + V(WRAPPED, wrapped, uint8_t) \ + V(LAST_DATAGRAM_ID, last_datagram_id, uint64_t) + +#define SESSION_STATS(V) \ + V(CREATED_AT, created_at) \ + V(CLOSING_AT, closing_at) \ + V(DESTROYED_AT, destroyed_at) \ + V(HANDSHAKE_COMPLETED_AT, handshake_completed_at) \ + V(HANDSHAKE_CONFIRMED_AT, handshake_confirmed_at) \ + V(GRACEFUL_CLOSING_AT, graceful_closing_at) \ + V(BYTES_RECEIVED, bytes_received) \ + V(BYTES_SENT, bytes_sent) \ + V(BIDI_IN_STREAM_COUNT, bidi_in_stream_count) \ + V(BIDI_OUT_STREAM_COUNT, bidi_out_stream_count) \ + V(UNI_IN_STREAM_COUNT, uni_in_stream_count) \ + V(UNI_OUT_STREAM_COUNT, uni_out_stream_count) \ + V(KEYUPDATE_COUNT, keyupdate_count) \ + V(LOSS_RETRANSMIT_COUNT, loss_retransmit_count) \ + V(MAX_BYTES_IN_FLIGHT, max_bytes_in_flight) \ + V(BYTES_IN_FLIGHT, bytes_in_flight) \ + V(BLOCK_COUNT, block_count) \ + V(CONGESTION_RECOVERY_START_TS, congestion_recovery_start_ts) \ + V(CWND, cwnd) \ + V(DELIVERY_RATE_SEC, delivery_rate_sec) \ + V(FIRST_RTT_SAMPLE_TS, first_rtt_sample_ts) \ + V(INITIAL_RTT, initial_rtt) \ + V(LAST_TX_PKT_TS, last_tx_pkt_ts) \ + V(LATEST_RTT, latest_rtt) \ + V(LOSS_DETECTION_TIMER, loss_detection_timer) \ + V(LOSS_TIME, loss_time) \ + V(MAX_UDP_PAYLOAD_SIZE, max_udp_payload_size) \ + V(MIN_RTT, min_rtt) \ + V(PTO_COUNT, pto_count) \ + V(RTTVAR, rttvar) \ + V(SMOOTHED_RTT, smoothed_rtt) \ + V(SSTHRESH, ssthresh) \ + V(DATAGRAMS_RECEIVED, datagrams_received) \ + V(DATAGRAMS_SENT, datagrams_sent) \ + V(DATAGRAMS_ACKNOWLEDGED, datagrams_acknowledged) \ + V(DATAGRAMS_LOST, datagrams_lost) + +#define SESSION_JS_METHODS(V) \ + V(DoDestroy, destroy, false) \ + V(GetRemoteAddress, getRemoteAddress, true) \ + V(GetCertificate, getCertificate, true) \ + V(GetEphemeralKeyInfo, getEphemeralKey, true) \ + V(GetPeerCertificate, getPeerCertificate, true) \ + V(GracefulClose, gracefulClose, false) \ + V(SilentClose, silentClose, false) \ + V(UpdateKey, updateKey, false) \ + V(DoOpenStream, openStream, false) \ + V(DoSendDatagram, sendDatagram, false) + +struct Session::State { +#define V(_, name, type) type name; + SESSION_STATE(V) +#undef V +}; + +STAT_STRUCT(Session, SESSION) + +// ============================================================================ +// Used to conditionally trigger sending an explicit connection +// close. If there are multiple MaybeCloseConnectionScope in the +// stack, the determination of whether to send the close will be +// done once the final scope is closed. +struct Session::MaybeCloseConnectionScope final { + Session* session; + bool silent = false; + MaybeCloseConnectionScope(Session* session_, bool silent_) + : session(session_), + silent(silent_ || session->connection_close_depth_ > 0) { + session->connection_close_depth_++; + } + MaybeCloseConnectionScope(const MaybeCloseConnectionScope&) = delete; + MaybeCloseConnectionScope(MaybeCloseConnectionScope&&) = delete; + MaybeCloseConnectionScope& operator=(const MaybeCloseConnectionScope&) = + delete; + MaybeCloseConnectionScope& operator=(MaybeCloseConnectionScope&&) = delete; + ~MaybeCloseConnectionScope() { + // We only want to trigger the sending the connection close if ... + // a) Silent is not explicitly true at this scope. + // b) We're not within the scope of an ngtcp2 callback, and + // c) We are not already in a closing or draining period. + if (--session->connection_close_depth_ == 0 && !silent && + session->can_send_packets()) { + session->SendConnectionClose(); + } + } +}; + +// ============================================================================ +// Used to conditionally trigger sending of any pending data the session may +// be holding onto. If there are multiple SendPendingDataScope in the stack, +// the determination of whether to send the data will be done once the final +// scope is closed. + +Session::SendPendingDataScope::SendPendingDataScope(Session* session) + : session(session) { + session->send_scope_depth_++; +} + +Session::SendPendingDataScope::SendPendingDataScope( + const BaseObjectPtr& session) + : SendPendingDataScope(session.get()) {} + +Session::SendPendingDataScope::~SendPendingDataScope() { + if (--session->send_scope_depth_ == 0 && session->can_send_packets()) { + session->application().SendPendingData(); + } +} + +// ============================================================================ + +namespace { +// Qlog is a JSON-based logging format that is being standardized for low-level +// debug logging of QUIC connections and dataflows. The qlog output is generated +// optionally by ngtcp2 for us. The on_qlog_write callback is registered with +// ngtcp2 to emit the qlog information. Every Session will have it's own qlog +// stream. +void on_qlog_write(void* user_data, + uint32_t flags, + const void* data, + size_t len) { + static_cast(user_data)->HandleQlog(flags, data, len); +} + +// Forwards detailed(verbose) debugging information from ngtcp2. Enabled using +// the NODE_DEBUG_NATIVE=NGTCP2_DEBUG category. +void ngtcp2_debug_log(void* user_data, const char* fmt, ...) { + va_list ap; + va_start(ap, fmt); + std::string format(fmt, strlen(fmt) + 1); + format[strlen(fmt)] = '\n'; + // Debug() does not work with the va_list here. So we use vfprintf + // directly instead. Ngtcp2DebugLog is only enabled when the debug + // category is enabled. + vfprintf(stderr, format.c_str(), ap); + va_end(ap); +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + if (!value->IsUndefined()) { + DCHECK(value->IsNumber()); + options->*member = value.As()->Value(); + } + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + // If the policy specified is invalid, we will just ignore it. + auto maybePolicy = PreferredAddress::tryGetPolicy(env, value); + if (!maybePolicy.IsJust()) return false; + options->*member = maybePolicy.FromJust(); + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + auto maybeOptions = TLSContext::Options::From(env, value); + if (!maybeOptions.IsJust()) return false; + options->*member = maybeOptions.FromJust(); + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + auto maybeOptions = Session::Application_Options::From(env, value); + if (!maybeOptions.IsJust()) return false; + options->*member = maybeOptions.FromJust(); + return true; +} + +template +bool SetOption(Environment* env, + Opt* options, + const v8::Local& object, + const v8::Local& name) { + Local value; + if (!object->Get(env->context(), name).ToLocal(&value)) return false; + auto maybeOptions = TransportParams::Options::From(env, value); + if (!maybeOptions.IsJust()) return false; + options->*member = maybeOptions.FromJust(); + return true; +} + +} // namespace + +// ============================================================================ + +Session::Config::Config(Side side, + const Endpoint& endpoint, + const Options& options, + uint32_t version, + const SocketAddress& local_address, + const SocketAddress& remote_address, + const CID& dcid, + const CID& scid, + std::optional session_ticket, + const CID& ocid) + : side(side), + options(options), + version(version), + local_address(local_address), + remote_address(remote_address), + dcid(dcid), + scid(scid), + ocid(ocid), + session_ticket(session_ticket) { + ngtcp2_settings_default(&settings); + settings.initial_ts = uv_hrtime(); + + if (options.qlog) { + if (ocid) settings.qlog.odcid = ocid; + settings.qlog.write = on_qlog_write; + } + + if (endpoint.env()->enabled_debug_list()->enabled( + DebugCategory::NGTCP2_DEBUG)) { + settings.log_printf = ngtcp2_debug_log; + } + + // We pull parts of the settings for the session from the endpoint options. + auto& config = endpoint.options(); + settings.cc_algo = config.cc_algorithm; + settings.max_udp_payload_size = config.max_payload_size; + if (config.unacknowledged_packet_threshold > 0) { + settings.ack_thresh = config.unacknowledged_packet_threshold; + } +} + +Session::Config::Config(const Endpoint& endpoint, + const Options& options, + const SocketAddress& local_address, + const SocketAddress& remote_address, + std::optional session_ticket, + const CID& ocid) + : Config(Side::CLIENT, + endpoint, + options, + options.version, + local_address, + remote_address, + CID::Factory::random().Generate(NGTCP2_MIN_INITIAL_DCIDLEN), + options.cid_factory->Generate(), + session_ticket, + ocid) {} + +void Session::Config::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("options", options); + tracker->TrackField("local_address", local_address); + tracker->TrackField("remote_address", remote_address); + tracker->TrackField("dcid", dcid); + tracker->TrackField("scid", scid); + tracker->TrackField("ocid", ocid); + tracker->TrackField("retry_scid", retry_scid); + if (session_ticket.has_value()) + tracker->TrackField("session_ticket", session_ticket.value()); +} + +// ============================================================================ + +Maybe Session::Options::From(Environment* env, + Local value) { + if (value.IsEmpty() || !value->IsObject()) { + THROW_ERR_INVALID_ARG_TYPE(env, "options must be an object"); + return Nothing(); + } + + auto& state = BindingData::Get(env); + auto params = value.As(); + Options options; + +#define SET(name) \ + SetOption( \ + env, &options, params, state.name##_string()) + + if (!SET(version) || !SET(min_version) || !SET(preferred_address_strategy) || + !SET(transport_params) || !SET(tls_options) || + !SET(application_options) || !SET(qlog)) { + return Nothing(); + } + +#undef SET + + // TODO(@jasnell): Later we will also support setting the CID::Factory. + // For now, we're just using the default random factory. + + return Just(options); +} + +void Session::Options::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("transport_params", transport_params); + tracker->TrackField("crypto_options", tls_options); + tracker->TrackField("application_options", application_options); + tracker->TrackField("cid_factory_ref", cid_factory_ref); +} + +// ============================================================================ + +bool Session::HasInstance(Environment* env, Local value) { + return GetConstructorTemplate(env)->HasInstance(value); +} + +BaseObjectPtr Session::Create(BaseObjectPtr endpoint, + const Config& config) { + Local obj; + if (!GetConstructorTemplate(endpoint->env()) + ->InstanceTemplate() + ->NewInstance(endpoint->env()->context()) + .ToLocal(&obj)) { + return BaseObjectPtr(); + } + + return MakeDetachedBaseObject(std::move(endpoint), obj, config); +} + +Session::Session(BaseObjectPtr endpoint, + v8::Local object, + const Config& config) + : AsyncWrap(endpoint->env(), object, AsyncWrap::PROVIDER_QUIC_SESSION), + stats_(env()->isolate()), + state_(env()->isolate()), + config_(config), + connection_(InitConnection()), + tls_context_(env(), config_.side, this, config_.options.tls_options), + application_(select_application()), + local_address_(config.local_address), + remote_address_(config.remote_address), + timer_(env(), + [this, self = BaseObjectPtr(this)] { OnTimeout(); }) { + MakeWeak(); + timer_.Unref(); + + application().ExtendMaxStreams(EndpointLabel::LOCAL, + Direction::BIDIRECTIONAL, + TransportParams::DEFAULT_MAX_STREAMS_BIDI); + application().ExtendMaxStreams(EndpointLabel::LOCAL, + Direction::UNIDIRECTIONAL, + TransportParams::DEFAULT_MAX_STREAMS_UNI); + + const auto defineProperty = [&](auto name, auto value) { + object + ->DefineOwnProperty( + env()->context(), name, value, PropertyAttribute::ReadOnly) + .Check(); + }; + + defineProperty(env()->state_string(), state_.GetArrayBuffer()); + defineProperty(env()->stats_string(), stats_.GetArrayBuffer()); + + auto& state = BindingData::Get(env()); + + if (UNLIKELY(config_.options.qlog)) { + qlog_stream_ = LogStream::Create(env()); + if (qlog_stream_) + defineProperty(state.qlog_string(), qlog_stream_->object()); + } + + if (UNLIKELY(config_.options.tls_options.keylog)) { + keylog_stream_ = LogStream::Create(env()); + if (keylog_stream_) + defineProperty(state.keylog_string(), keylog_stream_->object()); + } + + // We index the Session by our local CID (the scid) and dcid (the peer's cid) + endpoint_->AddSession(config_.scid, BaseObjectPtr(this)); + endpoint_->AssociateCID(config_.dcid, config_.scid); + + tls_context_.Start(); + + UpdateDataStats(); +} + +Session::~Session() { + if (conn_closebuf_) { + conn_closebuf_->Done(0); + } + if (qlog_stream_) { + env()->SetImmediate( + [ptr = std::move(qlog_stream_)](Environment*) { ptr->End(); }); + } + if (keylog_stream_) { + env()->SetImmediate( + [ptr = std::move(keylog_stream_)](Environment*) { ptr->End(); }); + } + DCHECK(streams_.empty()); +} + +Session::operator ngtcp2_conn*() const { + return connection_.get(); +} + +uint32_t Session::version() const { + return config_.version; +} + +Endpoint& Session::endpoint() const { + return *endpoint_; +} + +TLSContext& Session::tls_context() { + return tls_context_; +} + +Session::Application& Session::application() { + return *application_; +} + +const SocketAddress& Session::remote_address() const { + return remote_address_; +} + +const SocketAddress& Session::local_address() const { + return local_address_; +} + +bool Session::is_closing() const { + return state_->closing; +} + +bool Session::is_graceful_closing() const { + return state_->graceful_close; +} + +bool Session::is_silent_closing() const { + return state_->silent_close; +} + +bool Session::is_destroyed() const { + return state_->destroyed; +} + +bool Session::is_server() const { + return config_.side == Side::SERVER; +} + +std::string Session::diagnostic_name() const { + const auto get_type = [&] { return is_server() ? "server" : "client"; }; + + return std::string("Session (") + get_type() + "," + + std::to_string(env()->thread_id()) + ":" + + std::to_string(static_cast(get_async_id())) + ")"; +} + +const Session::Config& Session::config() const { + return config_; +} + +const Session::Options& Session::options() const { + return config_.options; +} + +void Session::HandleQlog(uint32_t flags, const void* data, size_t len) { + if (qlog()) { + // Fun fact... ngtcp2 does not emit the final qlog statement until the + // ngtcp2_conn object is destroyed. Ideally, destroying is explicit, but + // sometimes the Session object can be garbage collected without being + // explicitly destroyed. During those times, we cannot call out to + // JavaScript. Because we don't know for sure if we're in in a GC when this + // is called, it is safer to just defer writes to immediate, and to keep it + // consistent, let's just always defer (this is not performance sensitive so + // the deferring is fine). + std::vector buffer(len); + memcpy(buffer.data(), data, len); + env()->SetImmediate( + [ptr = qlog(), buffer = std::move(buffer), flags](Environment*) { + ptr->Emit(buffer.data(), + buffer.size(), + flags & NGTCP2_QLOG_WRITE_FLAG_FIN + ? LogStream::EmitOption::FIN + : LogStream::EmitOption::NONE); + }); + } +} + +BaseObjectPtr Session::qlog() const { + return qlog_stream_; +} + +BaseObjectPtr Session::keylog() const { + return keylog_stream_; +} + +TransportParams Session::GetLocalTransportParams() const { + DCHECK(!is_destroyed()); + return TransportParams(TransportParams::Type::ENCRYPTED_EXTENSIONS, + ngtcp2_conn_get_local_transport_params(*this)); +} + +TransportParams Session::GetRemoteTransportParams() const { + DCHECK(!is_destroyed()); + return TransportParams(TransportParams::Type::ENCRYPTED_EXTENSIONS, + ngtcp2_conn_get_remote_transport_params(*this)); +} + +void Session::SetLastError(QuicError&& error) { + last_error_ = std::move(error); +} + +void Session::Close(Session::CloseMethod method) { + if (is_destroyed()) return; + switch (method) { + case CloseMethod::DEFAULT: + return DoClose(); + case CloseMethod::SILENT: + return DoClose(true); + case CloseMethod::GRACEFUL: + if (is_graceful_closing()) return; + // If there are no open streams, then we can close just immediately and + // not worry about waiting around for the right moment. + if (streams_.empty()) return DoClose(); + state_->graceful_close = 1; + STAT_RECORD_TIMESTAMP(Stats, graceful_closing_at); + return; + } + UNREACHABLE(); +} + +void Session::Destroy() { + if (is_destroyed()) return; + + // The DoClose() method should have already been called. + DCHECK(state_->closing); + + // We create a copy of the streams because they will remove themselves + // from streams_ as they are cleaning up, causing the iterator to be + // invalidated. + auto streams = streams_; + for (auto& stream : streams) stream.second->Destroy(last_error_); + DCHECK(streams_.empty()); + + STAT_RECORD_TIMESTAMP(Stats, destroyed_at); + state_->closing = 0; + state_->graceful_close = 0; + + timer_.Stop(); + + // The Session instances are kept alive using a in the Endpoint. Removing the + // Session from the Endpoint will free that pointer, allowing the Session to + // be deconstructed once the stack unwinds and any remaining + // BaseObjectPtr instances fall out of scope. + + std::vector cids(ngtcp2_conn_get_num_scid(*this)); + std::vector tokens(ngtcp2_conn_get_num_active_dcid(*this)); + ngtcp2_conn_get_scid(*this, cids.data()); + ngtcp2_conn_get_active_dcid(*this, tokens.data()); + + endpoint_->DisassociateCID(config_.dcid); + endpoint_->DisassociateCID(config_.preferred_address_cid); + + for (auto cid : cids) endpoint_->DisassociateCID(CID(&cid)); + + for (auto token : tokens) { + if (token.token_present) + endpoint_->DisassociateStatelessResetToken( + StatelessResetToken(token.token)); + } + + state_->destroyed = 1; + + BaseObjectPtr endpoint = std::move(endpoint_); + + endpoint->RemoveSession(config_.scid); +} + +bool Session::Receive(Store&& store, + const SocketAddress& local_address, + const SocketAddress& remote_address) { + DCHECK(!is_destroyed()); + + const auto receivePacket = [&](ngtcp2_path* path, ngtcp2_vec vec) { + DCHECK(!is_destroyed()); + + uint64_t now = uv_hrtime(); + ngtcp2_pkt_info pi{}; // Not used but required. + int err = ngtcp2_conn_read_pkt(*this, path, &pi, vec.base, vec.len, now); + switch (err) { + case 0: { + // Return true so we send after receiving. + return true; + } + case NGTCP2_ERR_DRAINING: { + // Connection has entered the draining state, no further data should be + // sent. This happens when the remote peer has sent a CONNECTION_CLOSE. + return false; + } + case NGTCP2_ERR_CRYPTO: { + // Crypto error happened! Set the last error to the tls alert + last_error_ = QuicError::ForTlsAlert(ngtcp2_conn_get_tls_alert(*this)); + Close(); + return false; + } + case NGTCP2_ERR_RETRY: { + // This should only ever happen on the server. We have to sent a path + // validation challenge in the form of a RETRY packet to the peer and + // drop the connection. + DCHECK(is_server()); + endpoint_->SendRetry(PathDescriptor{ + version(), + config_.dcid, + config_.scid, + local_address_, + remote_address_, + }); + Close(CloseMethod::SILENT); + return false; + } + case NGTCP2_ERR_DROP_CONN: { + // There's nothing else to do but drop the connection state. + Close(CloseMethod::SILENT); + return false; + } + } + // Shouldn't happen but just in case. + last_error_ = QuicError::ForNgtcp2Error(err); + Close(); + return false; + }; + + auto update_stats = OnScopeLeave([&] { UpdateDataStats(); }); + remote_address_ = remote_address; + Path path(local_address, remote_address_); + STAT_INCREMENT_N(Stats, bytes_received, store.length()); + if (receivePacket(&path, store)) application().SendPendingData(); + + if (!is_destroyed()) UpdateTimer(); + + return true; +} + +void Session::Send(BaseObjectPtr packet) { + // Sending a Packet is generally best effort. If we're not in a state + // where we can send a packet, it's ok to drop it on the floor. The + // packet loss mechanisms will cause the packet data to be resent later + // if appropriate (and possible). + DCHECK(!is_destroyed()); + DCHECK(!is_in_draining_period()); + + if (can_send_packets() && packet->length() > 0) { + STAT_INCREMENT_N(Stats, bytes_sent, packet->length()); + endpoint_->Send(std::move(packet)); + return; + } + + packet->Done(packet->length() > 0 ? UV_ECANCELED : 0); +} + +void Session::Send(BaseObjectPtr packet, const PathStorage& path) { + UpdatePath(path); + Send(std::move(packet)); +} + +uint64_t Session::SendDatagram(Store&& data) { + auto tp = ngtcp2_conn_get_remote_transport_params(*this); + uint64_t max_datagram_size = tp->max_datagram_frame_size; + if (max_datagram_size == 0 || data.length() > max_datagram_size) { + // Datagram is too large. + return 0; + } + + BaseObjectPtr packet; + uint8_t* pos = nullptr; + int accepted = 0; + ngtcp2_vec vec = data; + PathStorage path; + int flags = NGTCP2_WRITE_DATAGRAM_FLAG_MORE; + uint64_t did = state_->last_datagram_id + 1; + + // Let's give it a max number of attempts to send the datagram + static const int kMaxAttempts = 16; + int attempts = 0; + + for (;;) { + if (!packet) { + packet = Packet::Create(env(), + endpoint_.get(), + remote_address_, + ngtcp2_conn_get_max_udp_payload_size(*this), + "datagram"); + if (!packet) { + last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); + Close(CloseMethod::SILENT); + return 0; + } + pos = ngtcp2_vec(*packet).base; + } + + ssize_t nwrite = ngtcp2_conn_writev_datagram(*this, + &path.path, + nullptr, + pos, + packet->length(), + &accepted, + flags, + did, + &vec, + 1, + uv_hrtime()); + + if (nwrite < 0) { + // Nothing was written to the packet. + switch (nwrite) { + case 0: { + // We cannot send data because of congestion control or the data will + // not fit. Since datagrams are best effort, we are going to abandon + // the attempt and just return. + CHECK_EQ(accepted, 0); + packet->Done(UV_ECANCELED); + return 0; + } + case NGTCP2_ERR_WRITE_MORE: { + // We keep on looping! Keep on sending! + continue; + } + case NGTCP2_ERR_INVALID_STATE: { + // The remote endpoint does not want to accept datagrams. That's ok, + // just return 0. + packet->Done(UV_ECANCELED); + return 0; + } + case NGTCP2_ERR_INVALID_ARGUMENT: { + // The datagram is too large. That should have been caught above but + // that's ok. We'll just abandon the attempt and return. + packet->Done(UV_ECANCELED); + return 0; + } + } + packet->Done(UV_ECANCELED); + last_error_ = QuicError::ForNgtcp2Error(nwrite); + Close(CloseMethod::SILENT); + return 0; + } + + // In this case, a complete packet was written and we need to send it along. + // Note that this doesn't mean that the packet actually contains the + // datagram! We'll check that next by checking the accepted value. + packet->Truncate(nwrite); + Send(std::move(packet)); + ngtcp2_conn_update_pkt_tx_time(*this, uv_hrtime()); + + if (accepted != 0) { + // Yay! The datagram was accepted into the packet we just sent and we can + // return the datagram ID. + STAT_INCREMENT(Stats, datagrams_sent); + STAT_INCREMENT_N(Stats, bytes_sent, vec.len); + state_->last_datagram_id = did; + return did; + } + + // We sent a packet, but it wasn't the datagram packet. That can happen. + // Let's loop around and try again. + if (++attempts == kMaxAttempts) { + // Too many attempts to send the datagram. + break; + } + } + + return 0; +} + +void Session::UpdatePath(const PathStorage& storage) { + remote_address_.Update(storage.path.remote.addr, storage.path.remote.addrlen); + local_address_.Update(storage.path.local.addr, storage.path.local.addrlen); +} + +BaseObjectPtr Session::FindStream(int64_t id) const { + auto it = streams_.find(id); + return it == std::end(streams_) ? BaseObjectPtr() : it->second; +} + +BaseObjectPtr Session::CreateStream(int64_t id) { + if (!can_create_streams()) return BaseObjectPtr(); + auto stream = Stream::Create(this, id); + if (stream) AddStream(stream); + return stream; +} + +BaseObjectPtr Session::OpenStream(Direction direction) { + if (!can_create_streams()) return BaseObjectPtr(); + int64_t id; + switch (direction) { + case Direction::BIDIRECTIONAL: + if (ngtcp2_conn_open_bidi_stream(*this, &id, nullptr) == 0) + return CreateStream(id); + break; + case Direction::UNIDIRECTIONAL: + if (ngtcp2_conn_open_uni_stream(*this, &id, nullptr) == 0) + return CreateStream(id); + break; + } + return BaseObjectPtr(); +} + +void Session::AddStream(const BaseObjectPtr& stream) { + ngtcp2_conn_set_stream_user_data(*this, stream->id(), stream.get()); + streams_[stream->id()] = stream; + + // Update tracking statistics for the number of streams associated with this + // session. + switch (stream->origin()) { + case Side::CLIENT: { + if (is_server()) { + switch (stream->direction()) { + case Direction::BIDIRECTIONAL: + STAT_INCREMENT(Stats, bidi_in_stream_count); + break; + case Direction::UNIDIRECTIONAL: + STAT_INCREMENT(Stats, uni_in_stream_count); + break; + } + } else { + switch (stream->direction()) { + case Direction::BIDIRECTIONAL: + STAT_INCREMENT(Stats, bidi_out_stream_count); + break; + case Direction::UNIDIRECTIONAL: + STAT_INCREMENT(Stats, uni_out_stream_count); + break; + } + } + break; + } + case Side::SERVER: { + if (is_server()) { + switch (stream->direction()) { + case Direction::BIDIRECTIONAL: + STAT_INCREMENT(Stats, bidi_out_stream_count); + break; + case Direction::UNIDIRECTIONAL: + STAT_INCREMENT(Stats, uni_out_stream_count); + break; + } + } else { + switch (stream->direction()) { + case Direction::BIDIRECTIONAL: + STAT_INCREMENT(Stats, bidi_in_stream_count); + break; + case Direction::UNIDIRECTIONAL: + STAT_INCREMENT(Stats, uni_in_stream_count); + break; + } + } + break; + } + } +} + +void Session::RemoveStream(int64_t id) { + // ngtcp2 does not extend the max streams count automatically except in very + // specific conditions, none of which apply once we've gotten this far. We + // need to manually extend when a remote peer initiated stream is removed. + if (!is_in_draining_period() && !is_in_closing_period() && + !state_->silent_close && + !ngtcp2_conn_is_local_stream(connection_.get(), id)) { + if (ngtcp2_is_bidi_stream(id)) + ngtcp2_conn_extend_max_streams_bidi(connection_.get(), 1); + else + ngtcp2_conn_extend_max_streams_uni(connection_.get(), 1); + } + + // Frees the persistent reference to the Stream object, allowing it to be gc'd + // any time after the JS side releases it's own reference. + streams_.erase(id); + ngtcp2_conn_set_stream_user_data(*this, id, nullptr); +} + +void Session::ResumeStream(int64_t id) { + SendPendingDataScope send_scope(this); + application_->ResumeStream(id); +} + +void Session::ShutdownStream(int64_t id, QuicError error) { + SendPendingDataScope send_scope(this); + ngtcp2_conn_shutdown_stream(*this, + id, + error.type() == QuicError::Type::APPLICATION + ? error.code() + : NGTCP2_APP_NOERROR); +} + +void Session::StreamDataBlocked(int64_t id) { + STAT_INCREMENT(Stats, block_count); + application_->BlockStream(id); +} + +void Session::ShutdownStreamWrite(int64_t id, QuicError code) { + SendPendingDataScope send_scope(this); + ngtcp2_conn_shutdown_stream_write(*this, + id, + code.type() == QuicError::Type::APPLICATION + ? code.code() + : NGTCP2_APP_NOERROR); +} + +void Session::CollectSessionTicketAppData( + SessionTicket::AppData* app_data) const { + application_->CollectSessionTicketAppData(app_data); +} + +SessionTicket::AppData::Status Session::ExtractSessionTicketAppData( + const SessionTicket::AppData& app_data, + SessionTicket::AppData::Source::Flag flag) { + return application_->ExtractSessionTicketAppData(app_data, flag); +} + +void Session::MemoryInfo(MemoryTracker* tracker) const { + tracker->TrackField("config", config_); + tracker->TrackField("endpoint", endpoint_); + tracker->TrackField("streams", streams_); + tracker->TrackField("local_address", local_address_); + tracker->TrackField("remote_address", remote_address_); + tracker->TrackField("application", application_); + tracker->TrackField("tls_context", tls_context_); + tracker->TrackField("timer", timer_); + tracker->TrackField("conn_closebuf", conn_closebuf_); + tracker->TrackField("qlog_stream", qlog_stream_); + tracker->TrackField("keylog_stream", keylog_stream_); +} + +bool Session::is_in_closing_period() const { + return ngtcp2_conn_is_in_closing_period(*this); +} + +bool Session::is_in_draining_period() const { + return ngtcp2_conn_is_in_draining_period(*this); +} + +bool Session::wants_session_ticket() const { + return state_->session_ticket == 1; +} + +void Session::SetStreamOpenAllowed() { + // TODO(@jasnell): Might remove this. May not be needed + state_->stream_open_allowed = 1; +} + +bool Session::can_send_packets() const { + // We can send packets if we're not in the middle of a ngtcp2 callback, + // we're not destroyed, we're not in a draining or closing period, and + // endpoint is set. + return !NgTcp2CallbackScope::in_ngtcp2_callback(env()) && !is_destroyed() && + !is_in_draining_period() && !is_in_closing_period() && endpoint_; +} + +bool Session::can_create_streams() const { + return !state_->destroyed && !state_->graceful_close && !state_->closing && + !is_in_closing_period() && !is_in_draining_period(); +} + +uint64_t Session::max_data_left() const { + return ngtcp2_conn_get_max_data_left(*this); +} + +uint64_t Session::max_local_streams_uni() const { + return ngtcp2_conn_get_max_local_streams_uni(*this); +} + +uint64_t Session::max_local_streams_bidi() const { + return ngtcp2_conn_get_local_transport_params(*this) + ->initial_max_streams_bidi; +} + +void Session::set_wrapped() { + state_->wrapped = 1; +} + +void Session::DoClose(bool silent) { + DCHECK(!is_destroyed()); + // Once Close has been called, we cannot re-enter + if (state_->closing == 1) return; + state_->closing = 1; + state_->silent_close = silent ? 1 : 0; + STAT_RECORD_TIMESTAMP(Stats, closing_at); + + // Iterate through all of the known streams and close them. The streams + // will remove themselves from the Session as soon as they are closed. + // Note: we create a copy because the streams will remove themselves + // while they are cleaning up which will invalidate the iterator. + auto streams = streams_; + for (auto& stream : streams) stream.second->Destroy(last_error_); + DCHECK(streams.empty()); + + // If the state has not been passed out to JavaScript yet, we can skip closing + // entirely and drop directly out to Destroy. + if (!state_->wrapped) return Destroy(); + + // If we're not running within a ngtcp2 callback scope, schedule a + // CONNECTION_CLOSE to be sent. If we are within a ngtcp2 callback scope, + // sending the CONNECTION_CLOSE will be deferred. + { MaybeCloseConnectionScope close_scope(this, silent); } + + // We emit a close callback so that the JavaScript side can clean up anything + // it needs to clean up before destroying. It's the JavaScript side's + // responsibility to call destroy() when ready. + EmitClose(); +} + +void Session::ExtendStreamOffset(int64_t id, size_t amount) { + ngtcp2_conn_extend_max_stream_offset(*this, id, amount); +} + +void Session::ExtendOffset(size_t amount) { + ngtcp2_conn_extend_max_offset(*this, amount); +} + +void Session::UpdateDataStats() { + if (state_->destroyed) return; + ngtcp2_conn_stat stat; + ngtcp2_conn_get_conn_stat(*this, &stat); + STAT_SET(Stats, bytes_in_flight, stat.bytes_in_flight); + STAT_SET( + Stats, congestion_recovery_start_ts, stat.congestion_recovery_start_ts); + STAT_SET(Stats, cwnd, stat.cwnd); + STAT_SET(Stats, delivery_rate_sec, stat.delivery_rate_sec); + STAT_SET(Stats, first_rtt_sample_ts, stat.first_rtt_sample_ts); + STAT_SET(Stats, initial_rtt, stat.initial_rtt); + STAT_SET( + Stats, last_tx_pkt_ts, reinterpret_cast(stat.last_tx_pkt_ts)); + STAT_SET(Stats, latest_rtt, stat.latest_rtt); + STAT_SET(Stats, loss_detection_timer, stat.loss_detection_timer); + STAT_SET(Stats, loss_time, reinterpret_cast(stat.loss_time)); + STAT_SET(Stats, max_udp_payload_size, stat.max_udp_payload_size); + STAT_SET(Stats, min_rtt, stat.min_rtt); + STAT_SET(Stats, pto_count, stat.pto_count); + STAT_SET(Stats, rttvar, stat.rttvar); + STAT_SET(Stats, smoothed_rtt, stat.smoothed_rtt); + STAT_SET(Stats, ssthresh, stat.ssthresh); + STAT_SET( + Stats, + max_bytes_in_flight, + std::max(STAT_GET(Stats, max_bytes_in_flight), stat.bytes_in_flight)); +} + +void Session::SendConnectionClose() { + DCHECK(!NgTcp2CallbackScope::in_ngtcp2_callback(env())); + if (is_destroyed() || is_in_draining_period() || state_->silent_close) return; + + auto on_exit = OnScopeLeave([this] { UpdateTimer(); }); + + switch (config_.side) { + case Side::SERVER: { + if (!is_in_closing_period() && !StartClosingPeriod()) { + Close(CloseMethod::SILENT); + } else { + DCHECK(conn_closebuf_); + Send(conn_closebuf_->Clone()); + } + return; + } + case Side::CLIENT: { + Path path(local_address_, remote_address_); + auto packet = Packet::Create(env(), + endpoint_.get(), + remote_address_, + kDefaultMaxPacketLength, + "immediate connection close (client)"); + ngtcp2_vec vec = *packet; + ssize_t nwrite = ngtcp2_conn_write_connection_close( + *this, &path, nullptr, vec.base, vec.len, last_error_, uv_hrtime()); + + if (UNLIKELY(nwrite < 0)) { + packet->Done(UV_ECANCELED); + last_error_ = QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR); + Close(CloseMethod::SILENT); + } else { + packet->Truncate(nwrite); + Send(std::move(packet)); + } + return; + } + } + UNREACHABLE(); +} + +void Session::OnTimeout() { + HandleScope scope(env()->isolate()); + if (is_destroyed()) return; + + int ret = ngtcp2_conn_handle_expiry(*this, uv_hrtime()); + if (NGTCP2_OK(ret) && !is_in_closing_period() && !is_in_draining_period() && + env()->can_call_into_js()) { + SendPendingDataScope send_scope(this); + return; + } + + last_error_ = QuicError::ForNgtcp2Error(ret); + Close(CloseMethod::SILENT); +} + +void Session::UpdateTimer() { + // Both uv_hrtime and ngtcp2_conn_get_expiry return nanosecond units. + uint64_t expiry = ngtcp2_conn_get_expiry(*this); + uint64_t now = uv_hrtime(); + + if (expiry <= now) { + // The timer has already expired. + return OnTimeout(); + } + + auto timeout = (expiry - now) / NGTCP2_MILLISECONDS; + + // If timeout is zero here, it means our timer is less than a millisecond + // off from expiry. Let's bump the timer to 1. + timer_.Update(timeout == 0 ? 1 : timeout); +} + +bool Session::StartClosingPeriod() { + if (is_in_closing_period()) return true; + if (is_destroyed()) return false; + + conn_closebuf_ = Packet::CreateConnectionClosePacket( + env(), endpoint_.get(), remote_address_, *this, last_error_); + + // If we were unable to create a connection close packet, we're in trouble. + // Set the internal error and return false so that the session will be + // silently closed. + if (!conn_closebuf_) { + last_error_ = QuicError::ForNgtcp2Error(NGTCP2_INTERNAL_ERROR); + return false; + } + + return true; +} + +void Session::DatagramStatus(uint64_t datagramId, quic::DatagramStatus status) { + switch (status) { + case quic::DatagramStatus::ACKNOWLEDGED: + STAT_INCREMENT(Stats, datagrams_acknowledged); + break; + case quic::DatagramStatus::LOST: + STAT_INCREMENT(Stats, datagrams_lost); + break; + } + EmitDatagramStatus(datagramId, status); +} + +void Session::DatagramReceived(const uint8_t* data, + size_t datalen, + DatagramReceivedFlags flag) { + // If there is nothing watching for the datagram on the JavaScript side, + // we just drop it on the floor. + if (state_->datagram == 0 || datalen == 0) return; + + auto backing = ArrayBuffer::NewBackingStore(env()->isolate(), datalen); + memcpy(backing->Data(), data, datalen); + STAT_INCREMENT(Stats, datagrams_received); + STAT_INCREMENT_N(Stats, bytes_received, datalen); + EmitDatagram(Store(std::move(backing), datalen), flag); +} + +bool Session::GenerateNewConnectionId(ngtcp2_cid* cid, + size_t len, + uint8_t* token) { + CID cid_ = config_.options.cid_factory->Generate(len); + StatelessResetToken new_token( + token, endpoint_->options().reset_token_secret, cid_); + endpoint_->AssociateCID(cid_, config_.scid); + endpoint_->AssociateStatelessResetToken(new_token, this); + return true; +} + +bool Session::HandshakeCompleted() { + if (state_->handshake_completed) return false; + state_->handshake_completed = true; + STAT_RECORD_TIMESTAMP(Stats, handshake_completed_at); + + if (!tls_context_.early_data_was_accepted()) + ngtcp2_conn_early_data_rejected(*this); + + // When in a server session, handshake completed == handshake confirmed. + if (is_server()) { + HandshakeConfirmed(); + + if (!endpoint().is_closed() && !endpoint().is_closing()) { + auto token = endpoint().GenerateNewToken(version(), remote_address_); + ngtcp2_vec vec = token; + if (NGTCP2_ERR(ngtcp2_conn_submit_new_token(*this, vec.base, vec.len))) { + // Submitting the new token failed... In this case we're going to + // fail because submitting the new token should only fail if we + // ran out of memory or some other unrecoverable state. + return false; + } + } + } + + EmitHandshakeComplete(); + + return true; +} + +void Session::HandshakeConfirmed() { + if (state_->handshake_confirmed) return; + state_->handshake_confirmed = true; + STAT_RECORD_TIMESTAMP(Stats, handshake_confirmed_at); +} + +void Session::SelectPreferredAddress(PreferredAddress* preferredAddress) { + if (config_.options.preferred_address_strategy == + PreferredAddress::Policy::IGNORE_PREFERRED_ADDRESS) { + return; + } + + auto local_address = endpoint_->local_address(); + int family = local_address.family(); + + switch (family) { + case AF_INET: { + auto ipv4 = preferredAddress->ipv4(); + if (ipv4.has_value()) { + if (ipv4->address.empty() || ipv4->port == 0) return; + SocketAddress::New(AF_INET, + std::string(ipv4->address).c_str(), + ipv4->port, + &remote_address_); + preferredAddress->Use(ipv4.value()); + } + break; + } + case AF_INET6: { + auto ipv6 = preferredAddress->ipv6(); + if (ipv6.has_value()) { + if (ipv6->address.empty() || ipv6->port == 0) return; + SocketAddress::New(AF_INET, + std::string(ipv6->address).c_str(), + ipv6->port, + &remote_address_); + preferredAddress->Use(ipv6.value()); + } + break; + } + } +} + +CID Session::new_cid(size_t len) const { + return config_.options.cid_factory->Generate(len); +} + +// JavaScript callouts + +void Session::EmitClose(const QuicError& error) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return Destroy(); + + CallbackScope cb_scope(this); + Local argv[] = { + Integer::New(env()->isolate(), static_cast(error.type())), + BigInt::NewFromUnsigned(env()->isolate(), error.code()), + Undefined(env()->isolate()), + }; + if (error.reason().length() > 0 && + !ToV8Value(env()->context(), error.reason()).ToLocal(&argv[2])) { + return; + } + MakeCallback( + BindingData::Get(env()).session_close_callback(), arraysize(argv), argv); +} + +void Session::EmitDatagram(Store&& datagram, DatagramReceivedFlags flag) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + + CallbackScope cbv_scope(this); + + Local argv[] = {datagram.ToUint8Array(env()), + v8::Boolean::New(env()->isolate(), flag.early)}; + + MakeCallback(BindingData::Get(env()).session_datagram_callback(), + arraysize(argv), + argv); +} + +void Session::EmitDatagramStatus(uint64_t id, quic::DatagramStatus status) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + + CallbackScope cb_scope(this); + auto& state = BindingData::Get(env()); + + const auto status_to_string = [&] { + switch (status) { + case quic::DatagramStatus::ACKNOWLEDGED: + return state.acknowledged_string(); + case quic::DatagramStatus::LOST: + return state.lost_string(); + } + UNREACHABLE(); + }; + + Local argv[] = {BigInt::NewFromUnsigned(env()->isolate(), id), + status_to_string()}; + MakeCallback(state.session_datagram_status_callback(), arraysize(argv), argv); +} + +void Session::EmitHandshakeComplete() { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + + CallbackScope cb_scope(this); + + auto isolate = env()->isolate(); + + static constexpr auto kServerName = 0; + static constexpr auto kSelectedAlpn = 1; + static constexpr auto kCipherName = 2; + static constexpr auto kCipherVersion = 3; + static constexpr auto kValidationErrorReason = 4; + static constexpr auto kValidationErrorCode = 5; + + Local argv[] = { + Undefined(isolate), // The negotiated server name + Undefined(isolate), // The selected alpn + Undefined(isolate), // Cipher name + Undefined(isolate), // Cipher version + Undefined(isolate), // Validation error reason + Undefined(isolate), // Validation error code + v8::Boolean::New(isolate, tls_context_.early_data_was_accepted())}; + + int err = tls_context_.VerifyPeerIdentity(); + + if (err != X509_V_OK && (!crypto::GetValidationErrorReason(env(), err) + .ToLocal(&argv[kValidationErrorReason]) || + !crypto::GetValidationErrorCode(env(), err) + .ToLocal(&argv[kValidationErrorCode]))) { + return; + } + + if (!ToV8Value(env()->context(), tls_context_.servername()) + .ToLocal(&argv[kServerName]) || + !ToV8Value(env()->context(), tls_context_.alpn()) + .ToLocal(&argv[kSelectedAlpn]) || + tls_context_.cipher_name(env()).ToLocal(&argv[kCipherName]) || + !tls_context_.cipher_version(env()).ToLocal(&argv[kCipherVersion])) { + return; + } + + MakeCallback(BindingData::Get(env()).session_handshake_callback(), + arraysize(argv), + argv); +} + +void Session::EmitPathValidation(PathValidationResult result, + PathValidationFlags flags, + const SocketAddress& local_address, + const SocketAddress& remote_address) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + if (LIKELY(state_->path_validation == 0)) return; + + auto isolate = env()->isolate(); + CallbackScope cb_scope(this); + auto& state = BindingData::Get(env()); + + const auto resultToString = [&] { + switch (result) { + case PathValidationResult::ABORTED: + return state.aborted_string(); + case PathValidationResult::FAILURE: + return state.failure_string(); + case PathValidationResult::SUCCESS: + return state.success_string(); + } + UNREACHABLE(); + }; + + Local argv[4] = { + resultToString(), + SocketAddressBase::Create(env(), + std::make_shared(local_address)) + ->object(), + SocketAddressBase::Create(env(), + std::make_shared(remote_address)) + ->object(), + v8::Boolean::New(isolate, flags.preferredAddress)}; + + MakeCallback(state.session_path_validation_callback(), arraysize(argv), argv); +} + +void Session::EmitSessionTicket(Store&& ticket) { + DCHECK(!is_destroyed()); + if (!env()->can_call_into_js()) return; + + // If there is nothing listening for the session ticket, don't bother + // emitting. + if (LIKELY(state_->session_ticket == 0)) return; + + CallbackScope cb_scope(this); + + auto remote_transport_params = GetRemoteTransportParams(); + Store transport_params; + if (remote_transport_params) + transport_params = remote_transport_params.Encode(env()); + + SessionTicket session_ticket(std::move(ticket), std::move(transport_params)); + Local argv; + if (session_ticket.encode(env()).ToLocal(&argv)) + MakeCallback(BindingData::Get(env()).session_ticket_callback(), 1, &argv); +} + +void Session::EmitStream(BaseObjectPtr stream) { + if (is_destroyed()) return; + if (!env()->can_call_into_js()) return; + CallbackScope cb_scope(this); + Local arg = stream->object(); + + MakeCallback(BindingData::Get(env()).stream_created_callback(), 1, &arg); +} + +void Session::EmitVersionNegotiation(const ngtcp2_pkt_hd& hd, + const uint32_t* sv, + size_t nsv) { + DCHECK(!is_destroyed()); + DCHECK(!is_server()); + if (!env()->can_call_into_js()) return; + + auto isolate = env()->isolate(); + const auto to_integer = [&](uint32_t version) { + return Integer::NewFromUnsigned(isolate, version); + }; + + CallbackScope cb_scope(this); + + // version() is the version that was actually configured for this session. + + // versions are the versions requested by the peer. + MaybeStackBuffer, 5> versions; + versions.AllocateSufficientStorage(nsv); + for (size_t n = 0; n < nsv; n++) versions[n] = to_integer(sv[n]); + + // supported are the versons we acutually support expressed as a range. + // The first value is the minimum version, the second is the maximum. + Local supported[] = {to_integer(config_.options.min_version), + to_integer(config_.options.version)}; + + Local argv[] = {// The version configured for this session. + to_integer(version()), + // The versions requested. + Array::New(isolate, versions.out(), nsv), + // The versions we actually support. + Array::New(isolate, supported, arraysize(supported))}; + + MakeCallback(BindingData::Get(env()).session_version_negotiation_callback(), + arraysize(argv), + argv); +} + +void Session::EmitKeylog(const char* line) { + if (!env()->can_call_into_js()) return; + if (keylog_stream_) { + env()->SetImmediate([ptr = keylog_stream_, data = std::string(line) + "\n"]( + Environment* env) { ptr->Emit(data); }); + } +} + +// ============================================================================ +// ngtcp2 static callback functions + +#define NGTCP2_CALLBACK_SCOPE(name) \ + auto name = Impl::From(conn, user_data); \ + if (UNLIKELY(name->is_destroyed())) return NGTCP2_ERR_CALLBACK_FAILURE; \ + NgTcp2CallbackScope scope(session->env()); + +struct Session::Impl { + static Session* From(ngtcp2_conn* conn, void* user_data) { + DCHECK_NOT_NULL(user_data); + auto session = static_cast(user_data); + DCHECK_EQ(conn, session->connection_.get()); + return session; + } + + static void DoDestroy(const FunctionCallbackInfo& args) { + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + session->Destroy(); + } + + static void GetRemoteAddress(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + auto address = session->remote_address(); + args.GetReturnValue().Set( + SocketAddressBase::Create(env, std::make_shared(address)) + ->object()); + } + + static void GetCertificate(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + Local ret; + if (session->tls_context().cert(env).ToLocal(&ret)) + args.GetReturnValue().Set(ret); + } + + static void GetEphemeralKeyInfo(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + Local ret; + if (!session->is_server() && + session->tls_context().ephemeral_key(env).ToLocal(&ret)) + args.GetReturnValue().Set(ret); + } + + static void GetPeerCertificate(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + Local ret; + if (session->tls_context().peer_cert(env).ToLocal(&ret)) + args.GetReturnValue().Set(ret); + } + + static void GracefulClose(const FunctionCallbackInfo& args) { + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + session->Close(Session::CloseMethod::GRACEFUL); + } + + static void SilentClose(const FunctionCallbackInfo& args) { + // This is exposed for testing purposes only! + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + session->Close(Session::CloseMethod::SILENT); + } + + static void UpdateKey(const FunctionCallbackInfo& args) { + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + // Initiating a key update may fail if it is done too early (either + // before the TLS handshake has been confirmed or while a previous + // key update is being processed). When it fails, InitiateKeyUpdate() + // will return false. + args.GetReturnValue().Set(session->tls_context().InitiateKeyUpdate()); + } + + static void DoOpenStream(const FunctionCallbackInfo& args) { + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + DCHECK(args[0]->IsUint32()); + auto direction = static_cast(args[0].As()->Value()); + BaseObjectPtr stream = session->OpenStream(direction); + + if (stream) args.GetReturnValue().Set(stream->object()); + } + + static void DoSendDatagram(const FunctionCallbackInfo& args) { + auto env = Environment::GetCurrent(args); + Session* session; + ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + DCHECK(args[0]->IsArrayBufferView()); + args.GetReturnValue().Set(BigInt::New( + env->isolate(), + session->SendDatagram(Store(args[0].As())))); + } + + static int on_acknowledge_stream_data_offset(ngtcp2_conn* conn, + int64_t stream_id, + uint64_t offset, + uint64_t datalen, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().AcknowledgeStreamData(Stream::From(stream_user_data), + datalen); + return NGTCP2_SUCCESS; + } + + static int on_acknowledge_datagram(ngtcp2_conn* conn, + uint64_t dgram_id, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->DatagramStatus(dgram_id, quic::DatagramStatus::ACKNOWLEDGED); + return NGTCP2_SUCCESS; + } + + static int on_cid_status(ngtcp2_conn* conn, + int type, + uint64_t seq, + const ngtcp2_cid* cid, + const uint8_t* token, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + std::optional maybe_reset_token; + if (token != nullptr) maybe_reset_token.emplace(token); + auto& endpoint = session->endpoint(); + switch (type) { + case NGTCP2_CONNECTION_ID_STATUS_TYPE_ACTIVATE: { + endpoint.AssociateCID(session->config_.scid, CID(cid)); + if (token != nullptr) { + endpoint.AssociateStatelessResetToken(StatelessResetToken(token), + session); + } + break; + } + case NGTCP2_CONNECTION_ID_STATUS_TYPE_DEACTIVATE: { + endpoint.DisassociateCID(CID(cid)); + if (token != nullptr) { + endpoint.DisassociateStatelessResetToken(StatelessResetToken(token)); + } + break; + } + } + return NGTCP2_SUCCESS; + } + + static int on_extend_max_remote_streams_bidi(ngtcp2_conn* conn, + uint64_t max_streams, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().ExtendMaxStreams( + EndpointLabel::REMOTE, Direction::BIDIRECTIONAL, max_streams); + return NGTCP2_SUCCESS; + } + + static int on_extend_max_remote_streams_uni(ngtcp2_conn* conn, + uint64_t max_streams, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().ExtendMaxStreams( + EndpointLabel::REMOTE, Direction::UNIDIRECTIONAL, max_streams); + return NGTCP2_SUCCESS; + } + + static int on_extend_max_streams_bidi(ngtcp2_conn* conn, + uint64_t max_streams, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().ExtendMaxStreams( + EndpointLabel::LOCAL, Direction::BIDIRECTIONAL, max_streams); + return NGTCP2_SUCCESS; + } + + static int on_extend_max_streams_uni(ngtcp2_conn* conn, + uint64_t max_streams, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().ExtendMaxStreams( + EndpointLabel::LOCAL, Direction::UNIDIRECTIONAL, max_streams); + return NGTCP2_SUCCESS; + } + + static int on_extend_max_stream_data(ngtcp2_conn* conn, + int64_t stream_id, + uint64_t max_data, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().ExtendMaxStreamData(Stream::From(stream_user_data), + max_data); + return NGTCP2_SUCCESS; + } + + static int on_get_new_cid(ngtcp2_conn* conn, + ngtcp2_cid* cid, + uint8_t* token, + size_t cidlen, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + return session->GenerateNewConnectionId(cid, cidlen, token) + ? NGTCP2_SUCCESS + : NGTCP2_ERR_CALLBACK_FAILURE; + } + + static int on_get_path_challenge_data(ngtcp2_conn* conn, + uint8_t* data, + void* user_data) { + CHECK(crypto::CSPRNG(data, NGTCP2_PATH_CHALLENGE_DATALEN).is_ok()); + return NGTCP2_SUCCESS; + } + + static int on_handshake_completed(ngtcp2_conn* conn, void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + return session->HandshakeCompleted() ? NGTCP2_SUCCESS + : NGTCP2_ERR_CALLBACK_FAILURE; + } + + static int on_handshake_confirmed(ngtcp2_conn* conn, void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->HandshakeConfirmed(); + return NGTCP2_SUCCESS; + } + + static int on_lost_datagram(ngtcp2_conn* conn, + uint64_t dgram_id, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->DatagramStatus(dgram_id, quic::DatagramStatus::LOST); + return NGTCP2_SUCCESS; + } + + static int on_path_validation(ngtcp2_conn* conn, + uint32_t flags, + const ngtcp2_path* path, + ngtcp2_path_validation_result res, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + bool flag_preferred_address = + flags & NGTCP2_PATH_VALIDATION_FLAG_PREFERRED_ADDR; + session->EmitPathValidation(static_cast(res), + PathValidationFlags{flag_preferred_address}, + SocketAddress(path->local.addr), + SocketAddress(path->remote.addr)); + return NGTCP2_SUCCESS; + } + + static int on_receive_crypto_data(ngtcp2_conn* conn, + ngtcp2_crypto_level crypto_level, + uint64_t offset, + const uint8_t* data, + size_t datalen, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + return session->tls_context().Receive(crypto_level, offset, data, datalen); + } + + static int on_receive_datagram(ngtcp2_conn* conn, + uint32_t flags, + const uint8_t* data, + size_t datalen, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + DatagramReceivedFlags f; + f.early = flags & NGTCP2_DATAGRAM_FLAG_EARLY; + session->DatagramReceived(data, datalen, f); + return NGTCP2_SUCCESS; + } + + static int on_receive_new_token(ngtcp2_conn* conn, + const ngtcp2_vec* token, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + // We currently do nothing with this callback. + return NGTCP2_SUCCESS; + } + + static int on_receive_rx_key(ngtcp2_conn* conn, + ngtcp2_crypto_level level, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + + if (!session->is_server() && level == NGTCP2_CRYPTO_LEVEL_APPLICATION) { + if (!session->application().Start()) return NGTCP2_ERR_CALLBACK_FAILURE; + } + return NGTCP2_SUCCESS; + } + + static int on_receive_stateless_reset(ngtcp2_conn* conn, + const ngtcp2_pkt_stateless_reset* sr, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->state_->stateless_reset = 1; + return NGTCP2_SUCCESS; + } + + static int on_receive_stream_data(ngtcp2_conn* conn, + uint32_t flags, + int64_t stream_id, + uint64_t offset, + const uint8_t* data, + size_t datalen, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + Stream::ReceiveDataFlags f; + f.early = flags & NGTCP2_STREAM_DATA_FLAG_EARLY; + f.fin = flags & NGTCP2_STREAM_DATA_FLAG_FIN; + + if (stream_user_data == nullptr) { + // We have an implicitly created stream. + auto stream = session->CreateStream(stream_id); + if (stream) { + session->EmitStream(stream); + session->application().ReceiveStreamData( + stream.get(), data, datalen, f); + } else { + return ngtcp2_conn_shutdown_stream( + *session, stream_id, NGTCP2_APP_NOERROR) == 0 + ? NGTCP2_SUCCESS + : NGTCP2_ERR_CALLBACK_FAILURE; + } + } else { + session->application().ReceiveStreamData( + Stream::From(stream_user_data), data, datalen, f); + } + return NGTCP2_SUCCESS; + } + + static int on_receive_tx_key(ngtcp2_conn* conn, + ngtcp2_crypto_level level, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + if (session->is_server() && level == NGTCP2_CRYPTO_LEVEL_APPLICATION) { + if (!session->application().Start()) return NGTCP2_ERR_CALLBACK_FAILURE; + } + return NGTCP2_SUCCESS; + } + + static int on_receive_version_negotiation(ngtcp2_conn* conn, + const ngtcp2_pkt_hd* hd, + const uint32_t* sv, + size_t nsv, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->EmitVersionNegotiation(*hd, sv, nsv); + return NGTCP2_SUCCESS; + } + + static int on_remove_connection_id(ngtcp2_conn* conn, + const ngtcp2_cid* cid, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->endpoint().DisassociateCID(CID(cid)); + return NGTCP2_SUCCESS; + } + + static int on_select_preferred_address(ngtcp2_conn* conn, + ngtcp2_path* dest, + const ngtcp2_preferred_addr* paddr, + void* user_data) { + NGTCP2_CALLBACK_SCOPE(session) + PreferredAddress preferred_address(dest, paddr); + session->SelectPreferredAddress(&preferred_address); + return NGTCP2_SUCCESS; + } + + static int on_stream_close(ngtcp2_conn* conn, + uint32_t flags, + int64_t stream_id, + uint64_t app_error_code, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + if (flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET) { + session->application().StreamClose( + Stream::From(stream_user_data), + QuicError::ForApplication(app_error_code)); + } else { + session->application().StreamClose(Stream::From(stream_user_data)); + } + return NGTCP2_SUCCESS; + } + + static int on_stream_open(ngtcp2_conn* conn, + int64_t stream_id, + void* user_data) { + // We currently do nothing with this callback. That is because we + // implicitly create streams when we receive data on them. + return NGTCP2_SUCCESS; + } + + static int on_stream_reset(ngtcp2_conn* conn, + int64_t stream_id, + uint64_t final_size, + uint64_t app_error_code, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().StreamReset( + Stream::From(stream_user_data), + final_size, + QuicError::ForApplication(app_error_code)); + return NGTCP2_SUCCESS; + } + + static int on_stream_stop_sending(ngtcp2_conn* conn, + int64_t stream_id, + uint64_t app_error_code, + void* user_data, + void* stream_user_data) { + NGTCP2_CALLBACK_SCOPE(session) + session->application().StreamStopSending( + Stream::From(stream_user_data), + QuicError::ForApplication(app_error_code)); + return NGTCP2_SUCCESS; + } + + static void on_rand(uint8_t* dest, + size_t destlen, + const ngtcp2_rand_ctx* rand_ctx) { + CHECK(crypto::CSPRNG(dest, destlen).is_ok()); + } + + static constexpr ngtcp2_callbacks CLIENT = { + ngtcp2_crypto_client_initial_cb, + nullptr, + on_receive_crypto_data, + on_handshake_completed, + on_receive_version_negotiation, + ngtcp2_crypto_encrypt_cb, + ngtcp2_crypto_decrypt_cb, + ngtcp2_crypto_hp_mask_cb, + on_receive_stream_data, + on_acknowledge_stream_data_offset, + on_stream_open, + on_stream_close, + on_receive_stateless_reset, + ngtcp2_crypto_recv_retry_cb, + on_extend_max_streams_bidi, + on_extend_max_streams_uni, + on_rand, + on_get_new_cid, + on_remove_connection_id, + ngtcp2_crypto_update_key_cb, + on_path_validation, + on_select_preferred_address, + on_stream_reset, + on_extend_max_remote_streams_bidi, + on_extend_max_remote_streams_uni, + on_extend_max_stream_data, + on_cid_status, + on_handshake_confirmed, + on_receive_new_token, + ngtcp2_crypto_delete_crypto_aead_ctx_cb, + ngtcp2_crypto_delete_crypto_cipher_ctx_cb, + on_receive_datagram, + on_acknowledge_datagram, + on_lost_datagram, + on_get_path_challenge_data, + on_stream_stop_sending, + ngtcp2_crypto_version_negotiation_cb, + on_receive_rx_key, + on_receive_tx_key}; + + static constexpr ngtcp2_callbacks SERVER = { + nullptr, + ngtcp2_crypto_recv_client_initial_cb, + on_receive_crypto_data, + on_handshake_completed, + nullptr, + ngtcp2_crypto_encrypt_cb, + ngtcp2_crypto_decrypt_cb, + ngtcp2_crypto_hp_mask_cb, + on_receive_stream_data, + on_acknowledge_stream_data_offset, + on_stream_open, + on_stream_close, + on_receive_stateless_reset, + nullptr, + on_extend_max_streams_bidi, + on_extend_max_streams_uni, + on_rand, + on_get_new_cid, + on_remove_connection_id, + ngtcp2_crypto_update_key_cb, + on_path_validation, + nullptr, + on_stream_reset, + on_extend_max_remote_streams_bidi, + on_extend_max_remote_streams_uni, + on_extend_max_stream_data, + on_cid_status, + nullptr, + nullptr, + ngtcp2_crypto_delete_crypto_aead_ctx_cb, + ngtcp2_crypto_delete_crypto_cipher_ctx_cb, + on_receive_datagram, + on_acknowledge_datagram, + on_lost_datagram, + on_get_path_challenge_data, + on_stream_stop_sending, + ngtcp2_crypto_version_negotiation_cb, + on_receive_rx_key, + on_receive_tx_key}; +}; + +#undef NGTCP2_CALLBACK_SCOPE + +Local Session::GetConstructorTemplate(Environment* env) { + auto& state = BindingData::Get(env); + auto tmpl = state.session_constructor_template(); + if (tmpl.IsEmpty()) { + auto isolate = env->isolate(); + tmpl = NewFunctionTemplate(isolate, IllegalConstructor); + tmpl->SetClassName(state.session_string()); + tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env)); + tmpl->InstanceTemplate()->SetInternalFieldCount( + Session::kInternalFieldCount); +#define V(name, key, no_side_effect) \ + if (no_side_effect) { \ + SetProtoMethodNoSideEffect(isolate, tmpl, #key, Impl::name); \ + } else { \ + SetProtoMethod(isolate, tmpl, #key, Impl::name); \ + } + + SESSION_JS_METHODS(V) + +#undef V + state.set_session_constructor_template(tmpl); + } + return tmpl; +} + +void Session::RegisterExternalReferences(ExternalReferenceRegistry* registry) { +#define V(name, _, __) registry->Register(Impl::name); + SESSION_JS_METHODS(V) +#undef V +} + +Session::QuicConnectionPointer Session::InitConnection() { + ngtcp2_conn* conn; + Path path(local_address_, remote_address_); + TransportParams::Config tp_config( + config_.side, config_.ocid, config_.retry_scid); + TransportParams transport_params(tp_config, config_.options.transport_params); + transport_params.GenerateSessionTokens(this); + switch (config_.side) { + case Side::SERVER: { + CHECK_EQ(ngtcp2_conn_server_new(&conn, + config_.dcid, + config_.scid, + path, + config_.version, + &Impl::SERVER, + &config_.settings, + transport_params, + &allocator_, + this), + 0); + return QuicConnectionPointer(conn); + } + case Side::CLIENT: { + CHECK_EQ(ngtcp2_conn_client_new(&conn, + config_.dcid, + config_.scid, + path, + config_.version, + &Impl::CLIENT, + &config_.settings, + transport_params, + &allocator_, + this), + 0); + if (config_.session_ticket.has_value()) + tls_context_.MaybeSetEarlySession(config_.session_ticket.value()); + return QuicConnectionPointer(conn); + } + } + UNREACHABLE(); +} + +void Session::Initialize(Environment* env, Local target) { + // Make sure the Session constructor template is initialized. + USE(GetConstructorTemplate(env)); + + TransportParams::Initialize(env, target); + PreferredAddress::Initialize(env, target); + + static constexpr uint32_t STREAM_DIRECTION_BIDIRECTIONAL = + static_cast(Direction::BIDIRECTIONAL); + static constexpr uint32_t STREAM_DIRECTION_UNIDIRECTIONAL = + static_cast(Direction::UNIDIRECTIONAL); + + NODE_DEFINE_CONSTANT(target, STREAM_DIRECTION_BIDIRECTIONAL); + NODE_DEFINE_CONSTANT(target, STREAM_DIRECTION_UNIDIRECTIONAL); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_HEADER_LIST_PAIRS); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_HEADER_LENGTH); + + constexpr auto QUIC_PROTO_MAX = NGTCP2_PROTO_VER_MAX; + constexpr auto QUIC_PROTO_MIN = NGTCP2_PROTO_VER_MIN; + NODE_DEFINE_CONSTANT(target, QUIC_PROTO_MAX); + NODE_DEFINE_CONSTANT(target, QUIC_PROTO_MIN); + +#define V(name, _) IDX_STATS_SESSION_##name, + enum SessionStatsIdx { SESSION_STATS(V) IDX_STATS_SESSION_COUNT }; +#undef V + +#define V(name, key, __) \ + auto IDX_STATE_SESSION_##name = offsetof(Session::State, key); + SESSION_STATE(V) +#undef V + +#define V(name, _) NODE_DEFINE_CONSTANT(target, IDX_STATS_SESSION_##name); + SESSION_STATS(V) + NODE_DEFINE_CONSTANT(target, IDX_STATS_SESSION_COUNT); +#undef V +#define V(name, _, __) NODE_DEFINE_CONSTANT(target, IDX_STATE_SESSION_##name); + SESSION_STATE(V) +#undef V +} + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC diff --git a/src/quic/session.h b/src/quic/session.h index 6fdd7d584327d6..21af8d801d550e 100644 --- a/src/quic/session.h +++ b/src/quic/session.h @@ -4,50 +4,244 @@ #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC #include +#include +#include #include +#include +#include #include +#include +#include #include #include "bindingdata.h" #include "cid.h" #include "data.h" +#include "defs.h" +#include "logstream.h" +#include "packet.h" +#include "preferredaddress.h" +#include "sessionticket.h" +#include "streams.h" +#include "tlscontext.h" +#include "transportparams.h" namespace node { namespace quic { class Endpoint; -// TODO(@jasnell): This is a placeholder definition of Session that -// includes only the pieces needed by Endpoint right now. The full -// Session definition will be provided separately. -class Session final : public AsyncWrap { +// A Session represents one half of a persistent connection between two QUIC +// peers. Every Session is established first by performing a TLS handshake in +// which the client sends an initial packet to the server containing a TLS +// client hello. Once the TLS handshake has been completed, the Session can be +// used to open one or more Streams for the actual data flow back and forth. +// +// While client and server Sessions are created in slightly different ways, +// their lifecycles are generally identical: +// +// A Session is either acting as a Client or as a Server. +// +// Client Sessions are always created using Endpoint::Connect() +// +// Server Sessions are always created by an Endpoint receiving a valid initial +// request received from a remote client. +// +// As soon as Sessions of either type are created, they will immediately start +// working through the TLS handshake to establish the crypographic keys used to +// secure the communication. Once those keys are established, the Session can be +// used to open Streams. Based on how the Session is configured, any number of +// Streams can exist concurrently on a single Session. +class Session final : public AsyncWrap, private SessionTicket::AppData::Source { public: - struct Config { + // For simplicity, we use the same Application::Options struct for all + // Application types. This may change in the future. Not all of the options + // are going to be relevant for all Application types. + struct Application_Options final : public MemoryRetainer { + // The maximum number of header pairs permitted for a Stream. + // Only relevant if the selected application supports headers. + uint64_t max_header_pairs = DEFAULT_MAX_HEADER_LIST_PAIRS; + + // The maximum total number of header bytes (including header + // name and value) permitted for a Stream. + // Only relevant if the selected application supports headers. + uint64_t max_header_length = DEFAULT_MAX_HEADER_LENGTH; + + // HTTP/3 specific options. + uint64_t max_field_section_size = 0; + uint64_t qpack_max_dtable_capacity = 0; + uint64_t qpack_encoder_max_dtable_capacity = 0; + uint64_t qpack_blocked_streams = 0; + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(Application::Options) + SET_SELF_SIZE(Options) + + static v8::Maybe From(Environment* env, + v8::Local value); + + static const Application_Options kDefault; + }; + + // An Application implements the ALPN-protocol specific semantics on behalf + // of a QUIC Session. + class Application; + + // The options used to configure a session. Most of these deal directly with + // the transport parameters that are exchanged with the remote peer during + // handshake. + struct Options final : public MemoryRetainer { + // The QUIC protocol version requested for the session. + uint32_t version = NGTCP2_PROTO_VER_MAX; + + // Te minimum QUIC protocol version supported by this session. + uint32_t min_version = NGTCP2_PROTO_VER_MIN; + + // By default a client session will use the preferred address advertised by + // the the server. This option is only relevant for client sessions. + PreferredAddress::Policy preferred_address_strategy = + PreferredAddress::Policy::USE_PREFERRED_ADDRESS; + + TransportParams::Options transport_params = + TransportParams::Options::kDefault; + TLSContext::Options tls_options = TLSContext::Options::kDefault; + Application_Options application_options = Application_Options::kDefault; + + // A reference to the CID::Factory used to generate CID instances + // for this session. + const CID::Factory* cid_factory = &CID::Factory::random(); + // If the CID::Factory is a base object, we keep a reference to it + // so that it cannot be garbage collected. + BaseObjectPtr cid_factory_ref = BaseObjectPtr(); + + // When true, QLog output will be enabled for the session. + bool qlog = false; + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Session::Options) + SET_SELF_SIZE(Options) + + static v8::Maybe From(Environment* env, + v8::Local value); + }; + + // The additional configuration settings used to create a specific session. + // while the Options above can be used to configure multiple sessions, a + // single Config is used to create a single session, which is why they are + // kept separate. + struct Config final : MemoryRetainer { + // Is the Session acting as a client or a server? + Side side; + + // The options to use for this session. + Options options; + + // The actual QUIC version identified for this session. + uint32_t version; + SocketAddress local_address; SocketAddress remote_address; - std::optional ocid = std::nullopt; - std::optional retry_scid = std::nullopt; + + // The destination CID, identifying the remote peer. + CID dcid = CID::kInvalid; + + // The source CID, identifying this session. + CID scid = CID::kInvalid; + + // Used only by client sessions to identify the original DCID + // used to initiate the connection. + CID ocid = CID::kInvalid; + CID retry_scid = CID::kInvalid; + CID preferred_address_cid = CID::kInvalid; + + // If this is a client session, the session_ticket is used to resume + // a TLS session using a previously established session ticket. + std::optional session_ticket = std::nullopt; + + ngtcp2_settings settings = {}; + operator ngtcp2_settings*() { return &settings; } + operator const ngtcp2_settings*() const { return &settings; } Config(Side side, const Endpoint& endpoint, + const Options& options, + uint32_t version, + const SocketAddress& local_address, + const SocketAddress& remote_address, + const CID& dcid, const CID& scid, + std::optional session_ticket = std::nullopt, + const CID& ocid = CID::kInvalid); + + Config(const Endpoint& endpoint, + const Options& options, const SocketAddress& local_address, const SocketAddress& remote_address, - uint32_t min_quic_version, - uint32_t max_quic_version, + std::optional session_ticket = std::nullopt, const CID& ocid = CID::kInvalid); - }; - struct Options : public MemoryRetainer { - SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(Session::Options) - SET_SELF_SIZE(Options) - static v8::Maybe From(Environment* env, - v8::Local value); + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Session::Config) + SET_SELF_SIZE(Config) }; + static bool HasInstance(Environment* env, v8::Local value); + static v8::Local GetConstructorTemplate( + Environment* env); + static void Initialize(Environment* env, v8::Local target); + static void RegisterExternalReferences(ExternalReferenceRegistry* registry); + static BaseObjectPtr Create(BaseObjectPtr endpoint, - const Config& config, - const Options& options); + const Config& config); + + // Really should be private but MakeDetachedBaseObject needs visibility. + Session(BaseObjectPtr endpoint, + v8::Local object, + const Config& config); + ~Session() override; + + uint32_t version() const; + Endpoint& endpoint() const; + TLSContext& tls_context(); + Application& application(); + const Config& config() const; + const Options& options() const; + const SocketAddress& remote_address() const; + const SocketAddress& local_address() const; + + bool is_closing() const; + bool is_graceful_closing() const; + bool is_silent_closing() const; + bool is_destroyed() const; + bool is_server() const; + + std::string diagnostic_name() const override; + + // Use the configured CID::Factory to generate a new CID. + CID new_cid(size_t len = CID::kMaxLength) const; + + void HandleQlog(uint32_t flags, const void* data, size_t len); + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(Session) + SET_SELF_SIZE(Session) + + struct State; + struct Stats; + + private: + struct Impl; + struct MaybeCloseConnectionScope; + + using StreamsMap = std::unordered_map>; + using QuicConnectionPointer = DeleteFnPtr; + + struct PathValidationFlags { + bool preferredAddress = false; + }; + + struct DatagramReceivedFlags { + bool early = false; + }; enum class CloseMethod { // Roundtrip through JavaScript, causing all currently opened streams @@ -68,23 +262,148 @@ class Session final : public AsyncWrap { GRACEFUL }; - Session(Environment* env, v8::Local object); - void Close(CloseMethod method = CloseMethod::DEFAULT); + void Destroy(); + bool Receive(Store&& store, const SocketAddress& local_address, const SocketAddress& remote_address); - bool is_destroyed() const; - bool is_server() const; - // The session is "wrapped" if it has been passed out to JavaScript - // via the New Session event or returned by the connect method. + void Send(BaseObjectPtr packet); + void Send(BaseObjectPtr packet, const PathStorage& path); + uint64_t SendDatagram(Store&& data); + + BaseObjectPtr FindStream(int64_t id) const; + BaseObjectPtr CreateStream(int64_t id); + BaseObjectPtr OpenStream(Direction direction); + void AddStream(const BaseObjectPtr& stream); + void RemoveStream(int64_t id); + void ResumeStream(int64_t id); + void ShutdownStream(int64_t id, QuicError error); + void StreamDataBlocked(int64_t id); + void ShutdownStreamWrite(int64_t id, QuicError code = QuicError()); + + struct SendPendingDataScope { + Session* session; + explicit SendPendingDataScope(Session* session); + explicit SendPendingDataScope(const BaseObjectPtr& session); + SendPendingDataScope(const SendPendingDataScope&) = delete; + SendPendingDataScope(SendPendingDataScope&&) = delete; + SendPendingDataScope& operator=(const SendPendingDataScope&) = delete; + SendPendingDataScope& operator=(SendPendingDataScope&&) = delete; + ~SendPendingDataScope(); + }; + + operator ngtcp2_conn*() const; + + // Implementation of SessionTicket::AppData::Source + void CollectSessionTicketAppData( + SessionTicket::AppData* app_data) const override; + SessionTicket::AppData::Status ExtractSessionTicketAppData( + const SessionTicket::AppData& app_data, + SessionTicket::AppData::Source::Flag flag) override; + + // Returns true if the Session has entered the closing period after sending a + // CONNECTION_CLOSE. While true, the Session is only permitted to transmit + // CONNECTION_CLOSE frames until either the idle timeout period elapses or + // until the Session is explicitly destroyed. + bool is_in_closing_period() const; + + // Returns true if the Session has received a CONNECTION_CLOSE frame from the + // peer. Once in the draining period, the Session is not permitted to send any + // frames to the peer. The Session will be silently closed after either the + // idle timeout period elapses or until the Session is explicitly destroyed. + bool is_in_draining_period() const; + + // Returns false if the Session is currently in a state where it is unable to + // transmit any packets. + bool can_send_packets() const; + + // Returns false if the Session is currently in a state where it cannot create + // new streams. + bool can_create_streams() const; + uint64_t max_data_left() const; + uint64_t max_local_streams_uni() const; + uint64_t max_local_streams_bidi() const; + BaseObjectPtr qlog() const; + BaseObjectPtr keylog() const; + + bool wants_session_ticket() const; + void SetStreamOpenAllowed(); + void set_wrapped(); - SocketAddress remote_address() const; - SET_NO_MEMORY_INFO() - SET_MEMORY_INFO_NAME(Session) - SET_SELF_SIZE(Session) + void DoClose(bool silent = false); + void ExtendStreamOffset(int64_t id, size_t amount); + void ExtendOffset(size_t amount); + void UpdateDataStats(); + void SendConnectionClose(); + void OnTimeout(); + void UpdateTimer(); + bool StartClosingPeriod(); + + // JavaScript callouts + + void EmitClose(const QuicError& error = QuicError()); + void EmitDatagram(Store&& datagram, DatagramReceivedFlags flag); + void EmitDatagramStatus(uint64_t id, DatagramStatus status); + void EmitHandshakeComplete(); + void EmitKeylog(const char* line); + void EmitPathValidation(PathValidationResult result, + PathValidationFlags flags, + const SocketAddress& local_address, + const SocketAddress& remote_address); + void EmitSessionTicket(Store&& ticket); + void EmitStream(BaseObjectPtr stream); + void EmitVersionNegotiation(const ngtcp2_pkt_hd& hd, + const uint32_t* sv, + size_t nsv); + + void DatagramStatus(uint64_t datagramId, DatagramStatus status); + void DatagramReceived(const uint8_t* data, + size_t datalen, + DatagramReceivedFlags flag); + bool GenerateNewConnectionId(ngtcp2_cid* cid, size_t len, uint8_t* token); + bool HandshakeCompleted(); + void HandshakeConfirmed(); + void SelectPreferredAddress(PreferredAddress* preferredAddress); + TransportParams GetLocalTransportParams() const; + TransportParams GetRemoteTransportParams() const; + void SetLastError(QuicError&& error); + void UpdatePath(const PathStorage& path); + + QuicConnectionPointer InitConnection(); + + std::unique_ptr select_application(); + + AliasedStruct stats_; + AliasedStruct state_; + ngtcp2_mem allocator_; + Config config_; + QuicConnectionPointer connection_; + BaseObjectPtr endpoint_; + TLSContext tls_context_; + std::unique_ptr application_; + SocketAddress local_address_; + SocketAddress remote_address_; + StreamsMap streams_; + TimerWrapHandle timer_; + size_t send_scope_depth_ = 0; + size_t connection_close_depth_ = 0; + QuicError last_error_; + BaseObjectPtr conn_closebuf_; + BaseObjectPtr qlog_stream_; + BaseObjectPtr keylog_stream_; + + friend class Application; + friend class DefaultApplication; + friend class Endpoint; + friend struct Impl; + friend struct MaybeCloseConnectionScope; + friend struct SendPendingDataScope; + friend class Stream; + friend class TLSContext; + friend class TransportParams; }; } // namespace quic diff --git a/src/quic/streams.cc b/src/quic/streams.cc new file mode 100644 index 00000000000000..ae208e0eb0007a --- /dev/null +++ b/src/quic/streams.cc @@ -0,0 +1,63 @@ +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC +#include "streams.h" +#include +#include +#include +#include +#include +#include +#include "session.h" + +namespace node { +namespace quic { + +Stream::Stream(BaseObjectPtr session, v8::Local obj) + : AsyncWrap(session->env(), obj, AsyncWrap::PROVIDER_QUIC_STREAM) { + MakeWeak(); +} + +Stream* Stream::From(void* stream_user_data) { + DCHECK_NOT_NULL(stream_user_data); + return static_cast(stream_user_data); +} + +BaseObjectPtr Stream::Create(Session* session, int64_t id) { + return BaseObjectPtr(); +} + +int64_t Stream::id() const { + return 0; +} +Side Stream::origin() const { + return Side::CLIENT; +} +Direction Stream::direction() const { + return Direction::BIDIRECTIONAL; +} + +bool Stream::is_destroyed() const { + return false; +} +bool Stream::is_eos() const { + return false; +} + +void Stream::Acknowledge(size_t datalen) {} +void Stream::Blocked() {} +void Stream::Commit(size_t datalen) {} +void Stream::End() {} +void Stream::Destroy(QuicError error) {} + +void Stream::ReceiveData(const uint8_t* data, + size_t len, + ReceiveDataFlags flags) {} +void Stream::ReceiveStopSending(QuicError error) {} +void Stream::ReceiveStreamReset(uint64_t final_size, QuicError error) {} + +void Stream::Schedule(Stream::Queue* queue) {} +void Stream::Unschedule() {} + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC diff --git a/src/quic/streams.h b/src/quic/streams.h new file mode 100644 index 00000000000000..a97efa9acba6f7 --- /dev/null +++ b/src/quic/streams.h @@ -0,0 +1,82 @@ +#pragma once + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS +#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC + +#include +#include +#include +#include +#include "bindingdata.h" +#include "data.h" + +namespace node { +namespace quic { + +class Session; + +using Ngtcp2Source = bob::SourceImpl; + +// TODO(@jasnell): This is currently a placeholder for the actual definition. +class Stream : public AsyncWrap, public Ngtcp2Source { + public: + static Stream* From(void* stream_user_data); + + static BaseObjectPtr Create(Session* session, int64_t id); + + Stream(BaseObjectPtr session, v8::Local obj); + + int64_t id() const; + Side origin() const; + Direction direction() const; + + bool is_destroyed() const; + bool is_eos() const; + + void Acknowledge(size_t datalen); + void Blocked(); + void Commit(size_t datalen); + void End(); + void Destroy(QuicError error); + + struct ReceiveDataFlags final { + // Identifies the final chunk of data that the peer will send for the + // stream. + bool fin = false; + // Indicates that this chunk of data was received in a 0RTT packet before + // the TLS handshake completed, suggesting that is is not as secure and + // could be replayed by an attacker. + bool early = false; + }; + + void ReceiveData(const uint8_t* data, size_t len, ReceiveDataFlags flags); + void ReceiveStopSending(QuicError error); + void ReceiveStreamReset(uint64_t final_size, QuicError error); + + SET_NO_MEMORY_INFO() + SET_MEMORY_INFO_NAME(Stream) + SET_SELF_SIZE(Stream) + + ListNode stream_queue_; + + public: + // The Queue/Schedule/Unschedule here are part of the mechanism used to + // determine which streams have data to send on the session. When a stream + // potentially has data available, it will be scheduled in the Queue. Then, + // when the Session::Application starts sending pending data, it will check + // the queue to see if there are streams waiting. If there are, it will grab + // one and check to see if there is data to send. When a stream does not have + // data to send (such as when it is initially created or is using an async + // source that is still waiting for data to be pushed) it will not appear in + // the queue. + using Queue = ListHead; + + void Schedule(Queue* queue); + void Unschedule(); +}; + +} // namespace quic +} // namespace node + +#endif // HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/quic/tlscontext.cc b/src/quic/tlscontext.cc index ec14a732bc16fc..171f6e1d259bc7 100644 --- a/src/quic/tlscontext.cc +++ b/src/quic/tlscontext.cc @@ -7,10 +7,12 @@ #include #include #include +#include #include #include #include "bindingdata.h" #include "defs.h" +#include "session.h" #include "transportparams.h" namespace node { @@ -27,17 +29,7 @@ using v8::Value; namespace quic { -// TODO(@jasnell): This session class is just a placeholder. -// The real session impl will be added in a separate commit. -class Session { - public: - operator ngtcp2_conn*() { return nullptr; } - void EmitKeylog(const char* line) const {} - void EmitSessionTicket(Store&& store) {} - void SetStreamOpenAllowed() {} - bool is_destroyed() const { return false; } - bool wants_session_ticket() const { return false; } -}; +const TLSContext::Options TLSContext::Options::kDefault = {}; namespace { constexpr size_t kMaxAlpnLen = 255; @@ -407,7 +399,8 @@ void TLSContext::Keylog(const char* line) const { int TLSContext::Receive(ngtcp2_crypto_level crypto_level, uint64_t offset, - const ngtcp2_vec& vec) { + const uint8_t* data, + size_t datalen) { // ngtcp2 provides an implementation of this in // ngtcp2_crypto_recv_crypto_data_cb but given that we are using the // implementation specific error codes below, we can't use it. @@ -417,7 +410,7 @@ int TLSContext::Receive(ngtcp2_crypto_level crypto_level, // Internally, this passes the handshake data off to openssl for processing. // The handshake may or may not complete. int ret = ngtcp2_crypto_read_write_crypto_data( - *session_, crypto_level, vec.base, vec.len); + *session_, crypto_level, data, datalen); switch (ret) { case 0: diff --git a/src/quic/tlscontext.h b/src/quic/tlscontext.h index 588c3e7f2517fd..2de6c016a267af 100644 --- a/src/quic/tlscontext.h +++ b/src/quic/tlscontext.h @@ -89,6 +89,8 @@ class TLSContext final : public MemoryRetainer { SET_MEMORY_INFO_NAME(CryptoContext::Options) SET_SELF_SIZE(Options) + static const Options kDefault; + static v8::Maybe From(Environment* env, v8::Local value); }; @@ -118,7 +120,8 @@ class TLSContext final : public MemoryRetainer { // chunk, we move the TLS handshake further along until it is complete. int Receive(ngtcp2_crypto_level crypto_level, uint64_t offset, - const ngtcp2_vec& vec); + const uint8_t* data, + size_t datalen); v8::MaybeLocal cert(Environment* env) const; v8::MaybeLocal peer_cert(Environment* env) const; diff --git a/src/quic/tokens.cc b/src/quic/tokens.cc index f47aa45c25414f..8c9ced0a4aae70 100644 --- a/src/quic/tokens.cc +++ b/src/quic/tokens.cc @@ -213,6 +213,8 @@ RetryToken::operator const ngtcp2_vec*() const { return &ptr_; } +RegularToken::RegularToken() : buf_(), ptr_(ngtcp2_vec{nullptr, 0}) {} + RegularToken::RegularToken(uint32_t version, const SocketAddress& address, const TokenSecret& token_secret) @@ -225,6 +227,10 @@ RegularToken::RegularToken(const uint8_t* token, size_t size) DCHECK_IMPLIES(token == nullptr, size = 0); } +RegularToken::operator bool() const { + return ptr_.base != nullptr && ptr_.len > 0; +} + bool RegularToken::Validate(uint32_t version, const SocketAddress& addr, const TokenSecret& token_secret, diff --git a/src/quic/tokens.h b/src/quic/tokens.h index 00c2bf81233e69..21f611b7bbfb93 100644 --- a/src/quic/tokens.h +++ b/src/quic/tokens.h @@ -75,6 +75,8 @@ class StatelessResetToken final : public MemoryRetainer { public: static constexpr int kStatelessTokenLen = NGTCP2_STATELESS_RESET_TOKENLEN; + StatelessResetToken(); + // Generates a stateless reset token using HKDF with the cid and token secret // as input. The token secret is either provided by user code when an Endpoint // is created or is generated randomly. @@ -121,7 +123,6 @@ class StatelessResetToken final : public MemoryRetainer { static StatelessResetToken kInvalid; private: - StatelessResetToken(); operator const char*() const; const uint8_t* ptr_; @@ -211,6 +212,8 @@ class RegularToken final : public MemoryRetainer { static constexpr uint64_t QUIC_MIN_REGULARTOKEN_EXPIRATION = 1 * NGTCP2_SECONDS; + RegularToken(); + // Generates a new retry token. RegularToken(uint32_t version, const SocketAddress& address, @@ -229,6 +232,8 @@ class RegularToken final : public MemoryRetainer { operator const ngtcp2_vec&() const; operator const ngtcp2_vec*() const; + operator bool() const; + SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(RetryToken) SET_SELF_SIZE(RetryToken) diff --git a/src/quic/transportparams.cc b/src/quic/transportparams.cc index 52f73e5125605e..3ea7a3d6d268ae 100644 --- a/src/quic/transportparams.cc +++ b/src/quic/transportparams.cc @@ -8,6 +8,8 @@ #include #include "bindingdata.h" #include "defs.h" +#include "endpoint.h" +#include "session.h" #include "tokens.h" namespace node { @@ -21,6 +23,9 @@ using v8::Object; using v8::Value; namespace quic { + +const TransportParams::Options TransportParams::Options::kDefault = {}; + TransportParams::Config::Config(Side side, const CID& ocid, const CID& retry_scid) @@ -55,6 +60,17 @@ Maybe TransportParams::Options::From( return Just(options); } +void TransportParams::Options::MemoryInfo(MemoryTracker* tracker) const { + if (preferred_address_ipv4.has_value()) { + tracker->TrackField("preferred_address_ipv4", + preferred_address_ipv4.value()); + } + if (preferred_address_ipv6.has_value()) { + tracker->TrackField("preferred_address_ipv6", + preferred_address_ipv6.value()); + } +} + TransportParams::TransportParams(Type type) : type_(type), ptr_(¶ms_) {} TransportParams::TransportParams(Type type, const ngtcp2_transport_params* ptr) @@ -168,26 +184,33 @@ void TransportParams::SetPreferredAddress(const SocketAddress& address) { UNREACHABLE(); } -void TransportParams::GenerateStatelessResetToken( - const TokenSecret& token_secret, const CID& cid) { +void TransportParams::GenerateSessionTokens(Session* session) { + if (session->is_server()) { + GenerateStatelessResetToken(session->endpoint(), session->config_.scid); + GeneratePreferredAddressToken(session); + } +} + +void TransportParams::GenerateStatelessResetToken(const Endpoint& endpoint, + const CID& cid) { DCHECK(ptr_ == ¶ms_); DCHECK(cid); params_.stateless_reset_token_present = 1; - - StatelessResetToken token(params_.stateless_reset_token, token_secret, cid); + endpoint.GenerateNewStatelessResetToken(params_.stateless_reset_token, cid); } -CID TransportParams::GeneratePreferredAddressToken(const Session& session) { +void TransportParams::GeneratePreferredAddressToken(Session* session) { DCHECK(ptr_ == ¶ms_); - // DCHECK(pscid); - // TODO(@jasnell): To be implemented when Session is implemented - // *pscid = session->cid_factory_.Generate(); - // params_.preferred_address.cid = *pscid; - // session->endpoint_->AssociateStatelessResetToken( - // session->endpoint().GenerateNewStatelessResetToken( - // params_.preferred_address.stateless_reset_token, *pscid), - // session); - return CID::kInvalid; + if (params_.preferred_address_present) { + session->config_.preferred_address_cid = session->new_cid(); + params_.preferred_address.cid = session->config_.preferred_address_cid; + auto& endpoint = session->endpoint(); + endpoint.AssociateStatelessResetToken( + endpoint.GenerateNewStatelessResetToken( + params_.preferred_address.stateless_reset_token, + session->config_.preferred_address_cid), + session); + } } TransportParams::Type TransportParams::type() const { @@ -212,6 +235,16 @@ const QuicError& TransportParams::error() const { return error_; } +void TransportParams::Initialize(Environment* env, + v8::Local target) { + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_STREAM_DATA); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_DATA); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_IDLE_TIMEOUT); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_STREAMS_BIDI); + NODE_DEFINE_CONSTANT(target, DEFAULT_MAX_STREAMS_UNI); + NODE_DEFINE_CONSTANT(target, DEFAULT_ACTIVE_CONNECTION_ID_LIMIT); +} + } // namespace quic } // namespace node diff --git a/src/quic/transportparams.h b/src/quic/transportparams.h index 7808b1b6c189d2..1269f11fbbbf1c 100644 --- a/src/quic/transportparams.h +++ b/src/quic/transportparams.h @@ -4,6 +4,7 @@ #if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC #include +#include #include #include #include @@ -28,9 +29,9 @@ class TransportParams final { ENCRYPTED_EXTENSIONS = NGTCP2_TRANSPORT_PARAMS_TYPE_ENCRYPTED_EXTENSIONS, }; - static constexpr uint64_t DEFAULT_MAX_STREAM_DATA_BIDI_LOCAL = 256 * 1024; - static constexpr uint64_t DEFAULT_MAX_STREAM_DATA_BIDI_REMOTE = 256 * 1024; - static constexpr uint64_t DEFAULT_MAX_STREAM_DATA_UNI = 256 * 1024; + static void Initialize(Environment* env, v8::Local target); + + static constexpr uint64_t DEFAULT_MAX_STREAM_DATA = 256 * 1024; static constexpr uint64_t DEFAULT_MAX_DATA = 1 * 1024 * 1024; static constexpr uint64_t DEFAULT_MAX_IDLE_TIMEOUT = 10; // seconds static constexpr uint64_t DEFAULT_MAX_STREAMS_BIDI = 100; @@ -46,7 +47,7 @@ class TransportParams final { const CID& retry_scid = CID::kInvalid); }; - struct Options { + struct Options : public MemoryRetainer { // Set only on server Sessions, the preferred address communicates the IP // address and port that the server would prefer the client to use when // communicating with it. See the QUIC specification for more detail on how @@ -57,19 +58,17 @@ class TransportParams final { // The initial size of the flow control window of locally initiated streams. // This is the maximum number of bytes that the *remote* endpoint can send // when the connection is started. - uint64_t initial_max_stream_data_bidi_local = - DEFAULT_MAX_STREAM_DATA_BIDI_LOCAL; + uint64_t initial_max_stream_data_bidi_local = DEFAULT_MAX_STREAM_DATA; // The initial size of the flow control window of remotely initiated // streams. This is the maximum number of bytes that the remote endpoint can // send when the connection is started. - uint64_t initial_max_stream_data_bidi_remote = - DEFAULT_MAX_STREAM_DATA_BIDI_REMOTE; + uint64_t initial_max_stream_data_bidi_remote = DEFAULT_MAX_STREAM_DATA; // The initial size of the flow control window of remotely initiated // unidirectional streams. This is the maximum number of bytes that the // remote endpoint can send when the connection is started. - uint64_t initial_max_stream_data_uni = DEFAULT_MAX_STREAM_DATA_UNI; + uint64_t initial_max_stream_data_uni = DEFAULT_MAX_STREAM_DATA; // The initial size of the session-level flow control window. uint64_t initial_max_data = DEFAULT_MAX_DATA; @@ -111,6 +110,12 @@ class TransportParams final { // connection migration. bool disable_active_migration = false; + static const Options kDefault; + + void MemoryInfo(MemoryTracker* tracker) const override; + SET_MEMORY_INFO_NAME(TransportParams::Options) + SET_SELF_SIZE(Options) + static v8::Maybe From(Environment* env, v8::Local value); }; @@ -129,9 +134,9 @@ class TransportParams final { // operator will return false. TransportParams(Type type, const ngtcp2_vec& buf); - void GenerateStatelessResetToken(const TokenSecret& token_secret, - const CID& cid); - CID GeneratePreferredAddressToken(const Session& session); + void GenerateSessionTokens(Session* session); + void GenerateStatelessResetToken(const Endpoint& endpoint, const CID& cid); + void GeneratePreferredAddressToken(Session* session); void SetPreferredAddress(const SocketAddress& address); Type type() const; diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 33a5bc1ed91a5e..eb5bf1453683a9 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -70,6 +70,8 @@ const { getSystemErrorName } = require('util'); delete providers.QUIC_PACKET; delete providers.QUIC_UDP; delete providers.QUIC_ENDPOINT; + delete providers.QUIC_SESSION; + delete providers.QUIC_STREAM; const objKeys = Object.keys(providers); if (objKeys.length > 0)