From feccda6d02cf378d2cad45eb2f892918f2935f48 Mon Sep 17 00:00:00 2001 From: Mike Aizatsky Date: Fri, 21 Jun 2024 08:51:58 -0700 Subject: [PATCH] Array-ptr based read async api Downstream of https://github.com/capnproto/capnproto/pull/2084 --- WORKSPACE | 6 ++-- src/workerd/api/blob.c++ | 8 +++--- src/workerd/api/http.c++ | 8 +++--- src/workerd/api/r2-rpc.c++ | 3 +- src/workerd/api/streams/common.h | 2 +- src/workerd/api/streams/compression.c++ | 12 +++----- src/workerd/api/streams/internal-test.c++ | 9 +++--- src/workerd/api/streams/internal.c++ | 35 +++++++++++------------ src/workerd/api/streams/internal.h | 4 +-- src/workerd/api/streams/readable.c++ | 8 +++--- src/workerd/api/system-streams.c++ | 8 +++--- src/workerd/api/util.c++ | 6 ++-- src/workerd/io/io-context.c++ | 4 +-- src/workerd/server/server-test.c++ | 14 ++++----- src/workerd/util/abortable.h | 8 +++--- src/workerd/util/stream-utils.c++ | 26 ++++++++--------- 16 files changed, 77 insertions(+), 84 deletions(-) diff --git a/WORKSPACE b/WORKSPACE index dffd598efbe..6e50996cc95 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -39,10 +39,10 @@ apple_support_dependencies() http_archive( name = "capnp-cpp", - integrity = "sha256-6bAIGw5ciWkS16cgJC6vxd2LmE31+p1XkhnfPsiklDQ=", - strip_prefix = "capnproto-capnproto-3765f4c/c++", + integrity = "sha256-iJVvugPmmCiWSuspzod1UQqI5pMYUe/ut8bzvte7ifI=", + strip_prefix = "capnproto-capnproto-4f73201/c++", type = "tgz", - urls = ["https://github.com/capnproto/capnproto/tarball/3765f4c0c8839398ea59831c13b3bf19035aa69f"], + urls = ["https://github.com/capnproto/capnproto/tarball/4f7320140c42d1d13ef7a6b36ce09a364439b7e7"], ) http_archive( diff --git a/src/workerd/api/blob.c++ b/src/workerd/api/blob.c++ index 1849a4a52d7..c3e9114cf7e 100644 --- a/src/workerd/api/blob.c++ +++ b/src/workerd/api/blob.c++ @@ -224,11 +224,11 @@ public: // The minBytes argument is ignored in this implementation of tryRead. // The buffer must be kept alive by the caller until the returned promise is fulfilled. // The returned promise is fulfilled with the actual number of bytes read. - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t amount = kj::min(maxBytes, unread.size()); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + size_t amount = kj::min(buffer.size(), unread.size()); if (amount > 0) { - memcpy(buffer, unread.begin(), amount); - unread = unread.slice(amount, unread.size()); + buffer.first(amount).copyFrom(unread.first(amount)); + unread = unread.slice(amount); } return amount; } diff --git a/src/workerd/api/http.c++ b/src/workerd/api/http.c++ index 43bb76102f5..77ff693e547 100644 --- a/src/workerd/api/http.c++ +++ b/src/workerd/api/http.c++ @@ -576,10 +576,10 @@ public: : unread(buffer.view), ownBytes(kj::mv(buffer.ownBytes)) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t amount = kj::min(maxBytes, unread.size()); - memcpy(buffer, unread.begin(), amount); - unread = unread.slice(amount, unread.size()); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + size_t amount = kj::min(buffer.size(), unread.size()); + buffer.first(amount).copyFrom(unread.first(amount)); + unread = unread.slice(amount); return amount; } diff --git a/src/workerd/api/r2-rpc.c++ b/src/workerd/api/r2-rpc.c++ index b9bff2a7e29..b6f13c1b317 100644 --- a/src/workerd/api/r2-rpc.c++ +++ b/src/workerd/api/r2-rpc.c++ @@ -101,8 +101,7 @@ kj::Promise doR2HTTPGetRequest(kj::Own client, KJ_REQUIRE(metadataSize >= 0, "R2 metadata size parsed as negative"); auto metadataBuffer = kj::heapArray(metadataSize); - auto metadataReadLength = - co_await stream->tryRead(metadataBuffer.begin(), metadataSize, metadataSize); + auto metadataReadLength = co_await stream->tryRead(metadataBuffer.asBytes(), metadataSize); KJ_ASSERT(metadataReadLength == metadataBuffer.size(), "R2 metadata buffer not read fully/overflow?"); diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index 70f31bc90a7..6f562257aa3 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -214,7 +214,7 @@ class WritableStreamSink { class ReadableStreamSource { public: - virtual kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0; + virtual kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) = 0; // The ReadableStreamSource version of pumpTo() has no `amount` parameter, since the Streams spec // only defines pumping everything. diff --git a/src/workerd/api/streams/compression.c++ b/src/workerd/api/streams/compression.c++ index eda12bfe268..9fe8a92f23d 100644 --- a/src/workerd/api/streams/compression.c++ +++ b/src/workerd/api/streams/compression.c++ @@ -194,23 +194,19 @@ public: // ReadableStreamSource implementation ------------------------------------------------- - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - KJ_ASSERT(minBytes <= maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + KJ_ASSERT(minBytes <= buffer.size()); KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(ended, Ended) { // There might still be data in the output buffer remaining to read. if (output.empty()) return size_t(0); - return tryReadInternal( - kj::ArrayPtr(reinterpret_cast(buffer), maxBytes), - minBytes); + return tryReadInternal(buffer, minBytes); } KJ_CASE_ONEOF(exception, kj::Exception) { return kj::cp(exception); } KJ_CASE_ONEOF(open, Open) { - return tryReadInternal( - kj::ArrayPtr(reinterpret_cast(buffer), maxBytes), - minBytes); + return tryReadInternal(buffer, minBytes); } } KJ_UNREACHABLE; diff --git a/src/workerd/api/streams/internal-test.c++ b/src/workerd/api/streams/internal-test.c++ index 87d0f58ac27..76352d6b00c 100644 --- a/src/workerd/api/streams/internal-test.c++ +++ b/src/workerd/api/streams/internal-test.c++ @@ -19,14 +19,15 @@ public: FooStream() : ptr(&data[0]), remaining_(size) { KJ_ASSERT(RAND_bytes(data, size) == 1); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) { + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + auto maxBytes = buffer.size(); maxMaxBytesSeen_ = kj::max(maxMaxBytesSeen_, maxBytes); numreads_++; if (remaining_ == 0) return (size_t)0; KJ_ASSERT(minBytes == maxBytes); KJ_ASSERT(maxBytes <= size); auto amount = kj::min(remaining_, maxBytes); - memcpy(buffer, ptr, amount); + memcpy(buffer.begin(), ptr, amount); ptr += amount; remaining_ -= amount; return amount; @@ -130,10 +131,10 @@ KJ_TEST("zero-length stream") { class Zero : public ReadableStreamSource { public: - kj::Promise tryRead(void*, size_t, size_t) { + kj::Promise tryRead(kj::ArrayPtr buffer, size_t) override { return (size_t)0; } - kj::Maybe tryGetLength(StreamEncoding encoding) { + kj::Maybe tryGetLength(StreamEncoding encoding) override { return (size_t)0; } }; diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 12b8032c6bb..257af192e2c 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -34,7 +34,7 @@ kj::Promise pumpTo(ReadableStreamSource& input, WritableStreamSink& output kj::byte buffer[4096]{}; while (true) { - auto amount = co_await input.tryRead(buffer, 1, kj::size(buffer)); + auto amount = co_await input.tryRead(buffer, 1); if (amount == 0) { if (end) { @@ -150,7 +150,7 @@ private: // Note that we're passing amountToRead as the *minBytes* here so the tryRead should // attempt to fill the entire buffer. If it doesn't, the implication is that we read // everything. - uint64_t amount = co_await input.tryRead(bytes.begin(), amountToRead, amountToRead); + uint64_t amount = co_await input.tryRead(bytes.asBytes(), amountToRead); KJ_DASSERT(amount <= amountToRead); runningTotal += amount; @@ -252,8 +252,8 @@ public: explicit TeeAdapter(kj::Own inner) : inner(kj::mv(inner)) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return inner->tryRead(buffer, minBytes); } kj::Maybe tryGetLength() override { @@ -269,8 +269,8 @@ public: explicit TeeBranch(kj::Own inner) : inner(kj::mv(inner)) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return inner->tryRead(buffer, minBytes); } kj::Promise> pumpTo(WritableStreamSink& output, bool end) override { @@ -416,9 +416,9 @@ public: return inner->pumpTo(output, end); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { wasRead = true; - return inner->tryRead(buffer, minBytes, maxBytes); + return inner->tryRead(buffer, minBytes); } // TODO(someday): we set `wasRead` to avoid warning here, but TeeBranch might still buffer the @@ -594,7 +594,7 @@ kj::Maybe> ReadableStreamInternalController::read( auto bytes = kj::arrayPtr(ptr + byteOffset, byteLength); auto promise = kj::evalNow([&] { - return readable->tryRead(bytes.begin(), atLeast, bytes.size()); + return readable->tryRead(bytes, atLeast); }); KJ_IF_SOME(readerLock, readState.tryGet()) { promise = KJ_ASSERT_NONNULL(readerLock.getCanceler())->wrap(kj::mv(promise)); @@ -799,7 +799,7 @@ kj::Maybe> ReadableStreamInternalController::remov KJ_CASE_ONEOF(closed, StreamStates::Closed) { class NullSource final: public ReadableStreamSource { public: - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { return size_t(0); } @@ -2188,32 +2188,29 @@ StreamEncoding ReadableStreamInternalController::getPreferredEncoding() { } kj::Promise IdentityTransformStreamImpl::tryRead( - void* buffer, - size_t minBytes, - size_t maxBytes) { + kj::ArrayPtr buffer, size_t minBytes) { size_t total = 0; while (total < minBytes) { // TODO(perf): tryReadInternal was written assuming minBytes would always be 1 but we've now // introduced an API for user to specify a larger minBytes. For now, this is implemented as a // naiive loop dispatching to the 1 byte version but would be better to bake it deeper into // the implementation where it can be more efficient. - auto amount = co_await tryReadInternal(buffer, maxBytes); - KJ_ASSERT(amount <= maxBytes); + auto amount = co_await tryReadInternal(buffer); + KJ_ASSERT(amount <= buffer.size()); if (amount == 0) { // EOF. break; } total += amount; - buffer = reinterpret_cast(buffer) + amount; - maxBytes -= amount; + buffer = buffer.slice(amount); } co_return total; } -kj::Promise IdentityTransformStreamImpl::tryReadInternal(void* buffer, size_t maxBytes) { - auto promise = readHelper(kj::arrayPtr(static_cast(buffer), maxBytes)); +kj::Promise IdentityTransformStreamImpl::tryReadInternal(kj::ArrayPtr buffer) { + auto promise = readHelper(buffer); KJ_IF_SOME(l, limit) { promise = promise.then([this, &l = l](size_t amount) -> kj::Promise { diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 1e6eeda475b..ab1fab1949e 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -387,9 +387,9 @@ class IdentityTransformStreamImpl: public kj::Refcounted, // ReadableStreamSource implementation ------------------------------------------------- - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override; + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override; - kj::Promise tryReadInternal(void* buffer, size_t maxBytes); + kj::Promise tryReadInternal(kj::ArrayPtr buffer); kj::Promise> pumpTo(WritableStreamSink& output, bool end) override; diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 4caace74f11..891d579d67f 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -581,8 +581,8 @@ public: kj::Maybe expectedLength) : inner(kj::mv(inner)), ended(kj::mv(ended)), expectedLength(expectedLength) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + size_t result = co_await inner->tryRead(buffer, minBytes); KJ_IF_SOME(l, expectedLength) { KJ_ASSERT(result <= l); @@ -629,8 +629,8 @@ public: NoDeferredProxyReadableStream(kj::Own inner, IoContext& ioctx) : inner(kj::mv(inner)), ioctx(ioctx) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return inner->tryRead(buffer, minBytes); } kj::Promise> pumpTo(WritableStreamSink& output, bool end) override { diff --git a/src/workerd/api/system-streams.c++ b/src/workerd/api/system-streams.c++ index 8ecafc7b162..e18ae7a5a39 100644 --- a/src/workerd/api/system-streams.c++ +++ b/src/workerd/api/system-streams.c++ @@ -24,7 +24,7 @@ public: // Read bytes in identity encoding. If the stream is not already in identity encoding, it will be // converted to identity encoding via an appropriate stream wrapper. - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override; + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override; StreamEncoding getPreferredEncoding() override { return encoding; } @@ -59,11 +59,11 @@ EncodedAsyncInputStream::EncodedAsyncInputStream( : inner(kj::mv(inner)), encoding(encoding), ioContext(context) {} kj::Promise EncodedAsyncInputStream::tryRead( - void* buffer, size_t minBytes, size_t maxBytes) { + kj::ArrayPtr buffer, size_t minBytes) { ensureIdentityEncoding(); - return kj::evalNow([&]() { - return inner->tryRead(buffer, minBytes, maxBytes) + return kj::evalNow([&]() mutable { + return inner->tryRead(buffer, minBytes) .attach(ioContext.registerPendingEvent()); }).catch_([](kj::Exception&& exception) -> kj::Promise { KJ_IF_SOME(e, translateKjException(exception, { diff --git a/src/workerd/api/util.c++ b/src/workerd/api/util.c++ index 2adb50233a5..150422befe2 100644 --- a/src/workerd/api/util.c++ +++ b/src/workerd/api/util.c++ @@ -119,9 +119,9 @@ kj::Own newTeeErrorAdapter(kj::Own i public: explicit Adapter(kj::Own inner): inner(kj::mv(inner)) {} - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return translateTeeErrors([&] { - return inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return translateTeeErrors([&] () mutable { + return inner->tryRead(buffer, minBytes); }); } diff --git a/src/workerd/io/io-context.c++ b/src/workerd/io/io-context.c++ index 05a0ea1e758..fc275def04e 100644 --- a/src/workerd/io/io-context.c++ +++ b/src/workerd/io/io-context.c++ @@ -1210,8 +1210,8 @@ public: fulfiller->fulfill(); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return inner->tryRead(buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return inner->tryRead(buffer, minBytes); } kj::Maybe tryGetLength() override { diff --git a/src/workerd/server/server-test.c++ b/src/workerd/server/server-test.c++ index df4323bd786..fc90bf80bf3 100644 --- a/src/workerd/server/server-test.c++ +++ b/src/workerd/server/server-test.c++ @@ -156,8 +156,8 @@ public: return false; } - char c; - auto promise = stream->tryRead(&c, 1, 1); + byte c[1]{}; + auto promise = stream->tryRead(c, 1); if (!promise.poll(ws)) { // Read didn't complete immediately. We have no data available, but we're not at EOF. return false; @@ -170,7 +170,7 @@ public: // Oops, the stream had data available and we accidentally read a byte of it. Store that off // to the side. KJ_ASSERT(n == 1); - premature = c; + premature = c[0]; return false; } } @@ -202,7 +202,7 @@ private: kj::Maybe premature; kj::String readAllAvailable() { - kj::Vector buffer(256); + kj::Vector buffer(256); KJ_IF_SOME(p, premature) { buffer.add(p); } @@ -213,7 +213,7 @@ private: size_t pos = buffer.size(); buffer.resize(kj::max(buffer.size() + 256, buffer.capacity())); - auto promise = stream->tryRead(buffer.begin() + pos, 1, buffer.size() - pos); + auto promise = stream->tryRead(buffer.asPtr().slice(pos), 1); if (!promise.poll(ws)) { // A tryRead() of 1 byte didn't resolve, there must be no data to read. buffer.resize(pos); @@ -235,7 +235,7 @@ private: }; buffer.add('\0'); - return kj::String(buffer.releaseAsArray()); + return kj::String(buffer.releaseAsArray().releaseAsChars()); } kj::String readWebSocketMessage(size_t maxMessageSize = 1 << 24) { @@ -291,7 +291,7 @@ private: size_t bytesRead = 0; buffer.resize(buffer.size() + bytesToRead); while (bytesRead < bytesToRead) { - auto promise = stream->tryRead(buffer.begin() + pos, 1, buffer.size() - pos); + auto promise = stream->tryRead(buffer.asPtr().asBytes().slice(pos), 1); KJ_REQUIRE(promise.poll(ws), kj::str("No data available while ", what)); // A tryRead() of 1 byte didn't resolve, there must be no data to read. diff --git a/src/workerd/util/abortable.h b/src/workerd/util/abortable.h index 202c041348e..b2191605191 100644 --- a/src/workerd/util/abortable.h +++ b/src/workerd/util/abortable.h @@ -56,12 +56,12 @@ class AbortableInputStream final: public kj::AsyncInputStream, AbortableInputStream(kj::Own inner, RefcountedCanceler& canceler) : impl(kj::mv(inner), canceler) {} - kj::Promise read(void* buffer, size_t minBytes, size_t maxBytes) override { - return impl.wrap(&kj::AsyncInputStream::read, buffer, minBytes, maxBytes); + kj::Promise read(kj::ArrayPtr buffer, size_t minBytes) override { + return impl.wrap(&kj::AsyncInputStream::read, buffer, minBytes); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return impl.wrap(&kj::AsyncInputStream::tryRead, buffer, minBytes, maxBytes); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return impl.wrap(&kj::AsyncInputStream::tryRead, buffer, minBytes); } kj::Maybe tryGetLength() override { diff --git a/src/workerd/util/stream-utils.c++ b/src/workerd/util/stream-utils.c++ index 09335a97ddb..dad01333fd9 100644 --- a/src/workerd/util/stream-utils.c++ +++ b/src/workerd/util/stream-utils.c++ @@ -21,7 +21,7 @@ public: return kj::NEVER_DONE; } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { return kj::constPromise(); } @@ -39,10 +39,10 @@ public: MemoryInputStream(kj::ArrayPtr data) : data(data) { } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - size_t toRead = kj::min(data.size(), maxBytes); - memcpy(buffer, data.begin(), toRead); - data = data.slice(toRead, data.size()); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + size_t toRead = kj::min(data.size(), buffer.size()); + buffer.first(toRead).copyFrom(data.first(toRead)); + data = data.slice(toRead); return toRead; } @@ -63,11 +63,11 @@ public: } } - kj::Promise read(void* buffer, size_t minBytes, size_t maxBytes) override { - return canceler.wrap(getStream().read(buffer, minBytes, maxBytes)); + kj::Promise read(kj::ArrayPtr buffer, size_t minBytes) override { + return canceler.wrap(getStream().read(buffer, minBytes)); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return canceler.wrap(getStream().tryRead(buffer, minBytes, maxBytes)); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return canceler.wrap(getStream().tryRead(buffer, minBytes)); } kj::Maybe tryGetLength() override { return getStream().tryGetLength(); @@ -108,11 +108,11 @@ public: // AsyncInputStream - kj::Promise read(void* buffer, size_t minBytes, size_t maxBytes) override { - return canceler.wrap(getStream().read(buffer, minBytes, maxBytes)); + kj::Promise read(kj::ArrayPtr buffer, size_t minBytes) override { + return canceler.wrap(getStream().read(buffer, minBytes)); } - kj::Promise tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { - return canceler.wrap(getStream().tryRead(buffer, minBytes, maxBytes)); + kj::Promise tryRead(kj::ArrayPtr buffer, size_t minBytes) override { + return canceler.wrap(getStream().tryRead(buffer, minBytes)); } kj::Maybe tryGetLength() override { return getStream().tryGetLength();