Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

quic: next iteration #48244

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
}

void addBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.insert(listener);
}

void removeBackpressureListener(BackpressureListener* listener) override {
if (idempotent_) return;
DCHECK_NOT_NULL(listener);
backpressure_listeners_.erase(listener);
}

void NotifyBackpressure(size_t amount) {
if (idempotent_) return;
for (auto& listener : backpressure_listeners_) listener->EntryRead(amount);
}

bool HasBackpressureListeners() const noexcept {
return !backpressure_listeners_.empty();
}

std::shared_ptr<Reader> get_reader() override;
SET_MEMORY_INFO_NAME(DataQueue)
SET_SELF_SIZE(DataQueueImpl)
Expand All @@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
std::optional<uint64_t> capped_size_ = std::nullopt;
bool locked_to_reader_ = false;

std::unordered_set<BackpressureListener*> backpressure_listeners_;

friend class DataQueue;
friend class IdempotentDataQueueReader;
friend class NonIdempotentDataQueueReader;
Expand Down Expand Up @@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
return;
}

// If there is a backpressure listener, lets report on how much data
// was actually read.
if (data_queue_->HasBackpressureListeners()) {
// How much did we actually read?
size_t read = 0;
for (uint64_t n = 0; n < count; n++) {
read += vecs[n].len;
anonrig marked this conversation as resolved.
Show resolved Hide resolved
}
data_queue_->NotifyBackpressure(read);
}

// Now that we have updated this readers state, we can forward
// everything on to the outer next.
std::move(next)(status, vecs, count, std::move(done));
Expand Down
12 changes: 12 additions & 0 deletions src/dataqueue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
using Done = bob::Done;
};

// A BackpressureListener can be used to receive notifications
// when a non-idempotent DataQueue releases entries as they
// are consumed.
class BackpressureListener {
public:
virtual void EntryRead(size_t amount) = 0;
};

// A DataQueue::Entry represents a logical chunk of data in the queue.
// The entry may or may not represent memory-resident data. It may
// or may not be consumable more than once.
Expand Down Expand Up @@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
// been set, maybeCapRemaining() will return std::nullopt.
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;

// BackpressureListeners only work on non-idempotent DataQueues.
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;

static void Initialize(Environment* env, v8::Local<v8::Object> target);
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
};
Expand Down
11 changes: 5 additions & 6 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "node_bob.h"
#include "uv.h"
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

Expand Down Expand Up @@ -79,7 +80,7 @@ void Session::Application::AcknowledgeStreamData(Stream* stream,

void Session::Application::BlockStream(int64_t id) {
auto stream = session().FindStream(id);
if (stream) stream->Blocked();
if (stream) stream->EmitBlocked();
}

bool Session::Application::CanAddHeader(size_t current_count,
Expand Down Expand Up @@ -233,7 +234,7 @@ void Session::Application::SendPendingData() {
// and no more outbound data can be sent.
CHECK_LE(ndatalen, 0);
auto stream = session_->FindStream(stream_data.id);
if (stream) stream->End();
if (stream) stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
Expand Down Expand Up @@ -360,10 +361,8 @@ class DefaultApplication final : public Session::Application {
stream_data->data,
arraysize(stream_data->data),
kMaxVectorCount);
switch (ret) {
case bob::Status::STATUS_EOS:
stream_data->fin = 1;
break;
if (ret == bob::Status::STATUS_EOS) {
stream_data->fin = 1;
}
} else {
stream_data->fin = 1;
Expand Down
8 changes: 6 additions & 2 deletions src/quic/bindingdata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,12 @@ CallbackScopeBase::CallbackScopeBase(Environment* env)
: env(env), context_scope(env->context()), try_catch(env->isolate()) {}

CallbackScopeBase::~CallbackScopeBase() {
if (try_catch.HasCaught() && !try_catch.HasTerminated()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
if (try_catch.HasCaught()) {
if (!try_catch.HasTerminated() && env->can_call_into_js()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
} else {
try_catch.ReThrow();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ constexpr size_t kMaxVectorCount = 16;
V(session_version_negotiation, SessionVersionNegotiation) \
V(session_path_validation, SessionPathValidation) \
V(stream_close, StreamClose) \
V(stream_error, StreamError) \
V(stream_created, StreamCreated) \
V(stream_reset, StreamReset) \
V(stream_headers, StreamHeaders) \
Expand Down Expand Up @@ -304,6 +303,8 @@ struct CallbackScopeBase {
~CallbackScopeBase();
};

// Maintains a strong reference to BaseObject type ptr to keep it alive during
// a MakeCallback during which it might be destroyed.
template <typename T>
struct CallbackScope final : public CallbackScopeBase {
BaseObjectPtr<T> ref;
Expand Down
Loading