Skip to content

Commit

Permalink
quic: add streams implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed May 29, 2023
1 parent 04f2976 commit b6ba7a4
Show file tree
Hide file tree
Showing 6 changed files with 1,188 additions and 42 deletions.
7 changes: 3 additions & 4 deletions src/dataqueue/queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ class DataQueueImpl final : public DataQueue,

void NotifyBackpressure(size_t amount) {
if (idempotent_) return;
for (auto listener : backpressure_listeners_)
listener->EntryRead(amount);
for (auto listener : backpressure_listeners_) listener->EntryRead(amount);
}

bool has_backpressure_listeners() const {
Expand Down Expand Up @@ -457,8 +456,8 @@ class NonIdempotentDataQueueReader final
return;
}

// If there is a backpressure listener, lets report on how much data was
// actually read.
// If there is a backpressure listener, lets report on how much data
// was actually read.
if (data_queue_->has_backpressure_listeners()) {
// How much did we actually read?
size_t read = 0;
Expand Down
11 changes: 5 additions & 6 deletions src/quic/application.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "node_bob.h"
#include "uv.h"
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC

Expand Down Expand Up @@ -79,7 +80,7 @@ void Session::Application::AcknowledgeStreamData(Stream* stream,

void Session::Application::BlockStream(int64_t id) {
auto stream = session().FindStream(id);
if (stream) stream->Blocked();
if (stream) stream->EmitBlocked();
}

bool Session::Application::CanAddHeader(size_t current_count,
Expand Down Expand Up @@ -233,7 +234,7 @@ void Session::Application::SendPendingData() {
// and no more outbound data can be sent.
CHECK_LE(ndatalen, 0);
auto stream = session_->FindStream(stream_data.id);
if (stream) stream->End();
if (stream) stream->EndWritable();
continue;
}
case NGTCP2_ERR_WRITE_MORE: {
Expand Down Expand Up @@ -360,10 +361,8 @@ class DefaultApplication final : public Session::Application {
stream_data->data,
arraysize(stream_data->data),
kMaxVectorCount);
switch (ret) {
case bob::Status::STATUS_EOS:
stream_data->fin = 1;
break;
if (ret == bob::Status::STATUS_EOS) {
stream_data->fin = 1;
}
} else {
stream_data->fin = 1;
Expand Down
8 changes: 6 additions & 2 deletions src/quic/bindingdata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,12 @@ CallbackScopeBase::CallbackScopeBase(Environment* env)
: env(env), context_scope(env->context()), try_catch(env->isolate()) {}

CallbackScopeBase::~CallbackScopeBase() {
if (try_catch.HasCaught() && !try_catch.HasTerminated()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
if (try_catch.HasCaught()) {
if (!try_catch.HasTerminated() && env->can_call_into_js()) {
errors::TriggerUncaughtException(env->isolate(), try_catch);
} else {
try_catch.ReThrow();
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/quic/bindingdata.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ constexpr size_t kMaxVectorCount = 16;
V(session_version_negotiation, SessionVersionNegotiation) \
V(session_path_validation, SessionPathValidation) \
V(stream_close, StreamClose) \
V(stream_error, StreamError) \
V(stream_created, StreamCreated) \
V(stream_reset, StreamReset) \
V(stream_headers, StreamHeaders) \
Expand Down Expand Up @@ -304,6 +303,8 @@ struct CallbackScopeBase {
~CallbackScopeBase();
};

// Maintains a strong reference to BaseObject type ptr to keep it alive during
// a MakeCallback during which it might be destroyed.
template <typename T>
struct CallbackScope final : public CallbackScopeBase {
BaseObjectPtr<T> ref;
Expand Down
Loading

0 comments on commit b6ba7a4

Please sign in to comment.