Skip to content

Commit

Permalink
Refactor WebTransport Implementation
Browse files Browse the repository at this point in the history
Summary:
This is a big diff, but I think most of the change is moving code from HTTPTransaction to WebTransportImpl.  The purpose is to make the general implementation of webtransport available for re-use by a subsequent `QuicWebTransport` component, which will implement the WebTransport API on a raw QUIC connection, rather than within HTTP.  This allows future protocols (eg: moqt) to be implemented against the WebTransport interface in a fashion that is agnostic to WT over HTTP or raw QUIC transports.

The most significant functional change is in how read and write callbacks for webtransport streams are handled.  Previously, HQSession had a single read and write callback per CONNECT stream for any webtransport streams that were under there.  This required another hashtable to dispatch the callbacks to the proper webtransport stream objects.

Since we already had to allocate an object per webtransport stream, I changed it to directly implement the new `quic::StreamWriteCallback` and `quic::StreamWriteCallback`.  This *does* pull in one quic dependency into the HTTPTransaction chain.  It's a bit unfortunate, but all other implementations lead to an extra hashtable indirection, object allocation, or both.

The WebTransportImpl class requires a TransportProvider, which implements the basic reading/writing of streams and datagrams and a SessionProvider, which for now handles only stream closure.  HTTPTransaction implements SessionProvider while HTTPTransaction::Transport implements TransportProvider.

Reviewed By: hanidamlaj

Differential Revision: D56782852

fbshipit-source-id: 18506bb1f3a75f3f7afe00bde621f0be99bc8c05
  • Loading branch information
afrind authored and facebook-github-bot committed Oct 22, 2024
1 parent a17d926 commit 7e4a032
Show file tree
Hide file tree
Showing 11 changed files with 785 additions and 601 deletions.
1 change: 1 addition & 0 deletions proxygen/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ add_library(
http/structuredheaders/StructuredHeadersDecoder.cpp
http/structuredheaders/StructuredHeadersEncoder.cpp
http/structuredheaders/StructuredHeadersUtilities.cpp
http/webtransport/WebTransportImpl.cpp
pools/generators/FileServerListGenerator.cpp
pools/generators/ServerListGenerator.cpp
sampling/Sampling.cpp
Expand Down
70 changes: 20 additions & 50 deletions proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3585,9 +3585,8 @@ void HQSession::dispatchUniWTStream(quic::StreamId streamID,
if (!parent) {
return;
}
auto streamTransport = static_cast<HQStreamTransport*>(parent);
sock_->setReadCallback(streamID, streamTransport->getWTReadCallback());
parent->txn_.onWebTransportUniStream(streamID);
auto handle = parent->txn_.onWebTransportUniStream(streamID);
sock_->setReadCallback(streamID, handle);
}

// Peer initiated Bidi WT streams
Expand All @@ -3605,9 +3604,8 @@ void HQSession::dispatchBidiWTStream(HTTPCodec::StreamID streamID,
return;
}

auto streamTransport = static_cast<HQStreamTransport*>(parent);
sock_->setReadCallback(streamID, streamTransport->getWTReadCallback());
parent->txn_.onWebTransportBidiStream(streamID);
auto handle = parent->txn_.onWebTransportBidiStream(streamID);
sock_->setReadCallback(streamID, handle.readHandle);
}

// Methods specific to StreamTransport subclasses
Expand Down Expand Up @@ -3786,10 +3784,11 @@ uint16_t HQSession::HQStreamTransport::getDatagramSizeLimit() const noexcept {
return session_.sock_->getDatagramSizeLimit() - kMaxDatagramHeaderSize;
}

bool HQSession::HQStreamTransport::sendDatagram(
folly::Expected<folly::Unit, WebTransport::ErrorCode>
HQSession::HQStreamTransport::sendDatagram(
std::unique_ptr<folly::IOBuf> datagram) {
if (!streamId_.hasValue() || !session_.datagramEnabled_) {
return false;
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
// Prepend the H3 Datagram header to the datagram payload
// HTTP/3 Datagram {
Expand All @@ -3803,14 +3802,14 @@ bool HQSession::HQStreamTransport::sendDatagram(
auto streamIdRes = quic::encodeQuicInteger(
streamId_.value() / 4, [&](auto val) { appender.writeBE(val); });
if (streamIdRes.hasError()) {
return false;
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
if (!txn_.isWebTransportConnectStream()) {
// Always use context-id = 0 for now
auto ctxIdRes =
quic::encodeQuicInteger(0, [&](auto val) { appender.writeBE(val); });
if (ctxIdRes.hasError()) {
return false;
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
}
VLOG(4) << "Sending datagram for streamId=" << streamId_.value()
Expand All @@ -3821,9 +3820,9 @@ bool HQSession::HQStreamTransport::sendDatagram(
auto writeRes = session_.sock_->writeDatagram(queue.move());
if (writeRes.hasError()) {
LOG(ERROR) << "Failed to send datagram for streamId=" << streamId_.value();
return false;
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return true;
return folly::unit;
}

folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>
Expand All @@ -3843,7 +3842,6 @@ HQSession::HQStreamTransport::newWebTransportBidiStream() {
return folly::makeUnexpected(
WebTransport::ErrorCode::STREAM_CREATION_ERROR);
}
session_.sock_->setReadCallback(*id, getWTReadCallback());
return *id;
}

Expand All @@ -3867,9 +3865,13 @@ HQSession::HQStreamTransport::newWebTransportUniStream() {
return *id;
}

folly::Expected<HTTPTransaction::Transport::FCState, WebTransport::ErrorCode>
folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
HQSession::HQStreamTransport::sendWebTransportStreamData(
HTTPCodec::StreamID id, std::unique_ptr<folly::IOBuf> data, bool eof) {
HTTPCodec::StreamID id,
std::unique_ptr<folly::IOBuf> data,
bool eof,
quic::StreamWriteCallback* writeCallback) {
auto res = session_.sock_->writeChain(id, std::move(data), eof);
if (res.hasError()) {
LOG(ERROR) << "Failed to write WT stream data";
Expand All @@ -3881,11 +3883,11 @@ HQSession::HQStreamTransport::sendWebTransportStreamData(
return folly::makeUnexpected(WebTransport::ErrorCode::SEND_ERROR);
}
if (!eof && flowControl->sendWindowAvailable == 0) {
session_.sock_->notifyPendingWriteOnStream(id, getWTWriteCallback());
session_.sock_->notifyPendingWriteOnStream(id, writeCallback);
VLOG(4) << "Closing fc window";
return HTTPTransaction::Transport::FCState::BLOCKED;
return WebTransportImpl::TransportProvider::FCState::BLOCKED;
} else {
return HTTPTransaction::Transport::FCState::UNBLOCKED;
return WebTransportImpl::TransportProvider::FCState::UNBLOCKED;
}
}

Expand Down Expand Up @@ -3949,38 +3951,6 @@ HQSession::HQStreamTransport::stopReadingWebTransportIngress(
return folly::unit;
}

void HQSession::HQStreamTransport::WTReadCallback::readAvailable(
quic::StreamId id) noexcept {
auto readRes = session_.sock_->read(id, 65535);
if (readRes.hasError()) {
LOG(ERROR) << "Got synchronous read error=" << readRes.error();
readError(id, quic::QuicError(readRes.error(), "sync read error"));
return;
}
quic::Buf data = std::move(readRes.value().first);
bool eof = readRes.value().second;
if (eof) {
session_.sock_->setReadCallback(id, nullptr);
}
txn_.onWebTransportStreamIngress(id, std::move(data), eof);
}

void HQSession::HQStreamTransport::WTReadCallback::readError(
quic::StreamId id, quic::QuicError error) noexcept {
auto quicAppErrorCode = error.code.asApplicationErrorCode();
if (quicAppErrorCode) {
auto appErrorCode = WebTransport::toApplicationErrorCode(*quicAppErrorCode);
if (appErrorCode) {
txn_.onWebTransportStreamError(id, *appErrorCode);
return;
}
}
// any other error
txn_.onWebTransportStreamError(id, WebTransport::kInternalError);

session_.sock_->setReadCallback(id, nullptr);
}

std::ostream& operator<<(std::ostream& os, const HQSession& session) {
session.describe(os);
return os;
Expand Down
75 changes: 27 additions & 48 deletions proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -1821,7 +1821,8 @@ class HQSession
std::unique_ptr<HTTPMessage> /* promise */) override;

uint16_t getDatagramSizeLimit() const noexcept override;
bool sendDatagram(std::unique_ptr<folly::IOBuf> datagram) override;
folly::Expected<folly::Unit, WebTransport::ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> datagram) override;

[[nodiscard]] bool supportsWebTransport() const override {
return session_.supportsWebTransport();
Expand All @@ -1831,11 +1832,12 @@ class HQSession
folly::Expected<HTTPCodec::StreamID, WebTransport::ErrorCode>
newWebTransportUniStream() override;

folly::Expected<HTTPTransaction::Transport::FCState,
folly::Expected<WebTransportImpl::TransportProvider::FCState,
WebTransport::ErrorCode>
sendWebTransportStreamData(HTTPCodec::StreamID /*id*/,
std::unique_ptr<folly::IOBuf> /*data*/,
bool /*eof*/) override;
bool /*eof*/,
quic::StreamWriteCallback* wcb) override;

folly::Expected<folly::Unit, WebTransport::ErrorCode>
resetWebTransportEgress(HTTPCodec::StreamID /*id*/,
Expand All @@ -1845,6 +1847,28 @@ class HQSession
setWebTransportStreamPriority(HTTPCodec::StreamID /*id*/,
HTTPPriority pri) override;

folly::Expected<std::pair<std::unique_ptr<folly::IOBuf>, bool>,
WebTransport::ErrorCode>
readWebTransportData(HTTPCodec::StreamID id, size_t max) override {
auto res = session_.sock_->read(id, max);
if (res) {
return std::move(res.value());
} else {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
initiateReadOnBidiStream(HTTPCodec::StreamID id,
quic::StreamReadCallback* readCallback) override {
auto res = session_.sock_->setReadCallback(id, readCallback);
if (res) {
return folly::unit;
} else {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
}

folly::Expected<folly::Unit, WebTransport::ErrorCode>
pauseWebTransportIngress(HTTPCodec::StreamID /*id*/) override;

Expand All @@ -1855,51 +1879,6 @@ class HQSession
stopReadingWebTransportIngress(HTTPCodec::StreamID /*id*/,
uint32_t /*errorCode*/) override;

class WTWriteCallback : public quic::QuicSocket::WriteCallback {
public:
explicit WTWriteCallback(HTTPTransaction& txn) : txn_(txn) {
}

void onStreamWriteReady(quic::StreamId id, uint64_t) noexcept override {
VLOG(4) << "onStreamWriteReady id=" << id;
txn_.onWebTransportEgressReady(id);
}

private:
HTTPTransaction& txn_;
};

class WTReadCallback : public quic::QuicSocket::ReadCallback {
public:
explicit WTReadCallback(HTTPTransaction& txn, HQSession& session)
: txn_(txn), session_(session) {
}

void readAvailable(quic::StreamId id) noexcept override;

void readError(quic::StreamId id,
quic::QuicError error) noexcept override;

private:
HTTPTransaction& txn_;
HQSession& session_;
};

std::unique_ptr<WTWriteCallback> wtWriteCallback_;
std::unique_ptr<WTReadCallback> wtReadCallback_;

WTWriteCallback* getWTWriteCallback() {
if (!wtWriteCallback_) {
wtWriteCallback_ = std::make_unique<WTWriteCallback>(txn_);
}
return wtWriteCallback_.get();
}
WTReadCallback* getWTReadCallback() {
if (!wtReadCallback_) {
wtReadCallback_ = std::make_unique<WTReadCallback>(txn_, session_);
}
return wtReadCallback_.get();
}
}; // HQStreamTransport

#ifdef _MSC_VER
Expand Down
Loading

0 comments on commit 7e4a032

Please sign in to comment.