Skip to content
This repository has been archived by the owner on Aug 11, 2020. It is now read-only.

quic: refactoring SendPendingData #275

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@
'src/quic/node_quic_session-inl.h',
'src/quic/node_quic_socket.h',
'src/quic/node_quic_stream.h',
'src/quic/node_quic_stream-inl.h',
'src/quic/node_quic_util.h',
'src/quic/node_quic_util-inl.h',
'src/quic/node_quic_state.h',
Expand Down
2 changes: 1 addition & 1 deletion src/quic/node_quic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include "node_quic_crypto.h"
#include "node_quic_session-inl.h"
#include "node_quic_socket.h"
#include "node_quic_stream.h"
#include "node_quic_stream-inl.h"
#include "node_quic_state.h"
#include "node_quic_util-inl.h"
#include "node_sockaddr-inl.h"
Expand Down
291 changes: 99 additions & 192 deletions src/quic/node_quic_default_application.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "node_quic_default_application.h"
#include "node_quic_session-inl.h"
#include "node_quic_socket.h"
#include "node_quic_stream.h"
#include "node_quic_stream-inl.h"
#include "node_quic_util-inl.h"
#include "node_sockaddr-inl.h"
#include <ngtcp2/ngtcp2.h>
Expand All @@ -13,16 +13,69 @@
namespace node {
namespace quic {

namespace {
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
ngtcp2_vec* v = *pvec;
size_t cnt = *pcnt;

for (; cnt > 0; --cnt, ++v) {
if (v->len > len) {
v->len -= len;
v->base += len;
break;
}
len -= v->len;
}

*pvec = v;
*pcnt = cnt;
}

int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
size_t i;
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
return i == cnt;
}
} // anonymous namespace

DefaultApplication::DefaultApplication(
QuicSession* session) :
QuicApplication(session) {}

bool DefaultApplication::Initialize() {
if (!needs_init())
return false;
Debug(session(), "Default QUIC Application Initialized");
set_init_done();
return true;
if (needs_init()) {
Debug(session(), "Default QUIC Application Initialized");
set_init_done();
}
return needs_init();
}

void DefaultApplication::ScheduleStream(int64_t stream_id) {
QuicStream* stream = session()->FindStream(stream_id);
Debug(session(), "Scheduling stream %" PRIu64, stream_id);
if (stream != nullptr)
stream->Schedule(&stream_queue_);
}

void DefaultApplication::UnscheduleStream(int64_t stream_id) {
QuicStream* stream = session()->FindStream(stream_id);
Debug(session(), "Unscheduling stream %" PRIu64, stream_id);
if (stream != nullptr)
stream->Unschedule();
}

void DefaultApplication::StreamClose(
int64_t stream_id,
uint64_t app_error_code) {
if (app_error_code == 0)
app_error_code = NGTCP2_APP_NOERROR;
UnscheduleStream(stream_id);
QuicApplication::StreamClose(stream_id, app_error_code);
}

void DefaultApplication::ResumeStream(int64_t stream_id) {
Debug(session(), "Stream %" PRId64 " has data to send");
ScheduleStream(stream_id);
}

bool DefaultApplication::ReceiveStreamData(
Expand Down Expand Up @@ -59,197 +112,51 @@ bool DefaultApplication::ReceiveStreamData(
return true;
}

void DefaultApplication::AcknowledgeStreamData(
int64_t stream_id,
uint64_t offset,
size_t datalen) {
QuicStream* stream = session()->FindStream(stream_id);
Debug(session(), "Default QUIC Application acknowledging stream data");
// It's possible that the stream has already been destroyed and
// removed. If so, just silently ignore the ack
if (stream != nullptr)
stream->AckedDataOffset(offset, datalen);
int DefaultApplication::GetStreamData(StreamData* stream_data) {
QuicStream* stream = stream_queue_.PopFront();
// If stream is nullptr, there are no streams with data pending.
if (stream == nullptr)
return 0;

stream_data->remaining =
stream->DrainInto(
&stream_data->data,
&stream_data->count,
MAX_VECTOR_COUNT);

stream_data->stream.reset(stream);
stream_data->id = stream->id();
stream_data->fin = stream->is_writable() ? 0 : 1;

// Schedule the stream again only if there is data to write. There
// might not actually be any more data to write but we can't know
// that yet as it depends entirely on how much data actually gets
// serialized by ngtcp2.
if (stream_data->count > 0)
stream->Schedule(&stream_queue_);

Debug(session(), "Selected %" PRId64 " buffers for stream %" PRId64 "%s",
stream_data->count,
stream_data->id,
stream_data->fin == 1 ? " (fin)" : "");
return 0;
}

bool DefaultApplication::SendPendingData() {
// Right now this iterates through the streams in the order they
// were created. Later, we might want to implement a prioritization
// scheme to allow higher priority streams to be serialized first.
// Prioritization is left entirely up to the application layer in QUIC.
// HTTP/3, for instance, drops prioritization entirely.
Debug(session(), "Default QUIC Application sending pending data");
for (const auto& stream : session()->streams()) {
if (!SendStreamData(stream.second.get()))
return false;

// Check to make sure QuicSession state did not change in this iteration
if (session()->is_in_draining_period() ||
session()->is_in_closing_period() ||
session()->is_destroyed()) {
break;
}
}

bool DefaultApplication::StreamCommit(
StreamData* stream_data,
size_t datalen) {
CHECK(stream_data->stream);
stream_data->remaining -= datalen;
Consume(&stream_data->buf, &stream_data->count, datalen);
stream_data->stream->Commit(datalen);
return true;
}

namespace {
void Consume(ngtcp2_vec** pvec, size_t* pcnt, size_t len) {
ngtcp2_vec* v = *pvec;
size_t cnt = *pcnt;

for (; cnt > 0; --cnt, ++v) {
if (v->len > len) {
v->len -= len;
v->base += len;
break;
}
len -= v->len;
}

*pvec = v;
*pcnt = cnt;
}

int IsEmpty(const ngtcp2_vec* vec, size_t cnt) {
size_t i;
for (i = 0; i < cnt && vec[i].len == 0; ++i) {}
return i == cnt;
}
} // anonymous namespace

bool DefaultApplication::SendStreamData(QuicStream* stream) {
ssize_t ndatalen = 0;
QuicPathStorage path;
Debug(session(), "Default QUIC Application sending stream %" PRId64 " data",
stream->GetID());

std::vector<ngtcp2_vec> vec;

// remaining is the total number of bytes stored in the vector
// that are remaining to be serialized.
size_t remaining = stream->DrainInto(&vec);
Debug(stream, "Sending %d bytes of stream data. Still writable? %s",
remaining,
stream->is_writable() ? "yes" : "no");

// c and v are used to track the current serialization position
// for each iteration of the for(;;) loop below.
size_t c = vec.size();
ngtcp2_vec* v = vec.data();

// If there is no stream data and we're not sending fin,
// Just return without doing anything.
if (c == 0 && stream->is_writable()) {
Debug(stream, "There is no stream data to send");
return true;
}

std::unique_ptr<QuicPacket> packet = CreateStreamDataPacket();
size_t packet_offset = 0;

for (;;) {
Debug(stream, "Starting packet serialization. Remaining? %d", remaining);

// If packet was sent on the previous iteration, it will have been reset
if (!packet)
packet = CreateStreamDataPacket();

ssize_t nwrite =
ngtcp2_conn_writev_stream(
session()->connection(),
&path.path,
packet->data() + packet_offset,
session()->max_packet_length(),
&ndatalen,
remaining > 0 ?
NGTCP2_WRITE_STREAM_FLAG_MORE :
NGTCP2_WRITE_STREAM_FLAG_NONE,
stream->GetID(),
stream->is_writable() ? 0 : 1,
reinterpret_cast<const ngtcp2_vec*>(v),
c,
uv_hrtime());

if (nwrite <= 0) {
switch (nwrite) {
case 0:
// If zero is returned, we've hit congestion limits. We need to stop
// serializing data and try again later to empty the queue once the
// congestion window has expanded.
Debug(stream, "Congestion limit reached");
return true;
case NGTCP2_ERR_PKT_NUM_EXHAUSTED:
// There is a finite number of packets that can be sent
// per connection. Once those are exhausted, there's
// absolutely nothing we can do except immediately
// and silently tear down the QuicSession. This has
// to be silent because we can't even send a
// CONNECTION_CLOSE since even those require a
// packet number.
session()->SilentClose();
return false;
case NGTCP2_ERR_STREAM_DATA_BLOCKED:
Debug(stream, "Stream data blocked");
session()->StreamDataBlocked(stream->GetID());
return true;
case NGTCP2_ERR_STREAM_SHUT_WR:
Debug(stream, "Stream writable side is closed");
return true;
case NGTCP2_ERR_STREAM_NOT_FOUND:
Debug(stream, "Stream does not exist");
return true;
case NGTCP2_ERR_WRITE_STREAM_MORE:
if (ndatalen > 0) {
remaining -= ndatalen;
Debug(stream,
"%" PRIu64 " stream bytes serialized into packet. %d remaining",
ndatalen,
remaining);
Consume(&v, &c, ndatalen);
stream->Commit(ndatalen);
packet_offset += ndatalen;
}
continue;
default:
Debug(stream, "Error writing packet. Code %" PRIu64, nwrite);
session()->set_last_error(
QUIC_ERROR_SESSION,
static_cast<int>(nwrite));
return false;
}
}

if (ndatalen > 0) {
remaining -= ndatalen;
Debug(stream,
"%" PRIu64 " stream bytes serialized into packet. %d remaining",
ndatalen,
remaining);
Consume(&v, &c, ndatalen);
stream->Commit(ndatalen);
}

Debug(stream, "Sending %" PRIu64 " bytes in serialized packet", nwrite);
packet->set_length(nwrite);
if (!session()->SendPacket(std::move(packet), path))
return false;

packet.reset();
packet_offset = 0;

if (IsEmpty(v, c)) {
// fin will have been set if all of the data has been
// encoded in the packet and is_writable() returns false.
if (!stream->is_writable()) {
Debug(stream, "Final stream has been sent");
stream->set_fin_sent();
}
break;
}
}

return true;
bool DefaultApplication::ShouldSetFin(const StreamData& stream_data) {
if (!stream_data.stream ||
!IsEmpty(stream_data.buf, stream_data.count))
return false;
return !stream_data.stream->is_writable();
}

} // namespace quic
Expand Down
20 changes: 14 additions & 6 deletions src/quic/node_quic_default_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node_quic_stream.h"
#include "node_quic_session.h"
#include "node_quic_util.h"
#include "util.h"
#include "v8.h"

namespace node {
Expand All @@ -28,17 +30,23 @@ class DefaultApplication final : public QuicApplication {
const uint8_t* data,
size_t datalen,
uint64_t offset) override;
void AcknowledgeStreamData(
int64_t stream_id,
uint64_t offset,
size_t datalen) override;

bool SendPendingData() override;
bool SendStreamData(QuicStream* stream) override;
int GetStreamData(StreamData* stream_data) override;

void ResumeStream(int64_t stream_id) override;
void StreamClose(int64_t stream_id, uint64_t app_error_code) override;
bool ShouldSetFin(const StreamData& stream_data) override;
bool StreamCommit(StreamData* stream_data, size_t datalen) override;

SET_SELF_SIZE(DefaultApplication)
SET_MEMORY_INFO_NAME(DefaultApplication)
SET_NO_MEMORY_INFO()

private:
void ScheduleStream(int64_t stream_id);
void UnscheduleStream(int64_t stream_id);

QuicStream::Queue stream_queue_;
};

} // namespace quic
Expand Down
Loading