Skip to content

Commit

Permalink
Array-ptr based read async api
Browse files Browse the repository at this point in the history
  • Loading branch information
mikea committed Jun 24, 2024
1 parent 0a8af1d commit feccda6
Show file tree
Hide file tree
Showing 16 changed files with 77 additions and 84 deletions.
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ apple_support_dependencies()

http_archive(
name = "capnp-cpp",
integrity = "sha256-6bAIGw5ciWkS16cgJC6vxd2LmE31+p1XkhnfPsiklDQ=",
strip_prefix = "capnproto-capnproto-3765f4c/c++",
integrity = "sha256-iJVvugPmmCiWSuspzod1UQqI5pMYUe/ut8bzvte7ifI=",
strip_prefix = "capnproto-capnproto-4f73201/c++",
type = "tgz",
urls = ["https://github.com/capnproto/capnproto/tarball/3765f4c0c8839398ea59831c13b3bf19035aa69f"],
urls = ["https://github.com/capnproto/capnproto/tarball/4f7320140c42d1d13ef7a6b36ce09a364439b7e7"],
)

http_archive(
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/blob.c++
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ public:
// The minBytes argument is ignored in this implementation of tryRead.
// The buffer must be kept alive by the caller until the returned promise is fulfilled.
// The returned promise is fulfilled with the actual number of bytes read.
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
size_t amount = kj::min(maxBytes, unread.size());
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
size_t amount = kj::min(buffer.size(), unread.size());
if (amount > 0) {
memcpy(buffer, unread.begin(), amount);
unread = unread.slice(amount, unread.size());
buffer.first(amount).copyFrom(unread.first(amount));
unread = unread.slice(amount);
}
return amount;
}
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,10 @@ public:
: unread(buffer.view),
ownBytes(kj::mv(buffer.ownBytes)) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
size_t amount = kj::min(maxBytes, unread.size());
memcpy(buffer, unread.begin(), amount);
unread = unread.slice(amount, unread.size());
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
size_t amount = kj::min(buffer.size(), unread.size());
buffer.first(amount).copyFrom(unread.first(amount));
unread = unread.slice(amount);
return amount;
}

Expand Down
3 changes: 1 addition & 2 deletions src/workerd/api/r2-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ kj::Promise<R2Result> doR2HTTPGetRequest(kj::Own<kj::HttpClient> client,
KJ_REQUIRE(metadataSize >= 0, "R2 metadata size parsed as negative");

auto metadataBuffer = kj::heapArray<char>(metadataSize);
auto metadataReadLength =
co_await stream->tryRead(metadataBuffer.begin(), metadataSize, metadataSize);
auto metadataReadLength = co_await stream->tryRead(metadataBuffer.asBytes(), metadataSize);

KJ_ASSERT(metadataReadLength == metadataBuffer.size(),
"R2 metadata buffer not read fully/overflow?");
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ class WritableStreamSink {

class ReadableStreamSource {
public:
virtual kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) = 0;
virtual kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) = 0;

// The ReadableStreamSource version of pumpTo() has no `amount` parameter, since the Streams spec
// only defines pumping everything.
Expand Down
12 changes: 4 additions & 8 deletions src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,19 @@ public:

// ReadableStreamSource implementation -------------------------------------------------

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_ASSERT(minBytes <= maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
KJ_ASSERT(minBytes <= buffer.size());
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(ended, Ended) {
// There might still be data in the output buffer remaining to read.
if (output.empty()) return size_t(0);
return tryReadInternal(
kj::ArrayPtr<kj::byte>(reinterpret_cast<kj::byte*>(buffer), maxBytes),
minBytes);
return tryReadInternal(buffer, minBytes);
}
KJ_CASE_ONEOF(exception, kj::Exception) {
return kj::cp(exception);
}
KJ_CASE_ONEOF(open, Open) {
return tryReadInternal(
kj::ArrayPtr<kj::byte>(reinterpret_cast<kj::byte*>(buffer), maxBytes),
minBytes);
return tryReadInternal(buffer, minBytes);
}
}
KJ_UNREACHABLE;
Expand Down
9 changes: 5 additions & 4 deletions src/workerd/api/streams/internal-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ public:
FooStream() : ptr(&data[0]), remaining_(size) {
KJ_ASSERT(RAND_bytes(data, size) == 1);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) {
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
auto maxBytes = buffer.size();
maxMaxBytesSeen_ = kj::max(maxMaxBytesSeen_, maxBytes);
numreads_++;
if (remaining_ == 0) return (size_t)0;
KJ_ASSERT(minBytes == maxBytes);
KJ_ASSERT(maxBytes <= size);
auto amount = kj::min(remaining_, maxBytes);
memcpy(buffer, ptr, amount);
memcpy(buffer.begin(), ptr, amount);
ptr += amount;
remaining_ -= amount;
return amount;
Expand Down Expand Up @@ -130,10 +131,10 @@ KJ_TEST("zero-length stream") {

class Zero : public ReadableStreamSource {
public:
kj::Promise<size_t> tryRead(void*, size_t, size_t) {
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t) override {
return (size_t)0;
}
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) {
kj::Maybe<uint64_t> tryGetLength(StreamEncoding encoding) override {
return (size_t)0;
}
};
Expand Down
35 changes: 16 additions & 19 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ kj::Promise<void> pumpTo(ReadableStreamSource& input, WritableStreamSink& output
kj::byte buffer[4096]{};

while (true) {
auto amount = co_await input.tryRead(buffer, 1, kj::size(buffer));
auto amount = co_await input.tryRead(buffer, 1);

if (amount == 0) {
if (end) {
Expand Down Expand Up @@ -150,7 +150,7 @@ private:
// Note that we're passing amountToRead as the *minBytes* here so the tryRead should
// attempt to fill the entire buffer. If it doesn't, the implication is that we read
// everything.
uint64_t amount = co_await input.tryRead(bytes.begin(), amountToRead, amountToRead);
uint64_t amount = co_await input.tryRead(bytes.asBytes(), amountToRead);
KJ_DASSERT(amount <= amountToRead);

runningTotal += amount;
Expand Down Expand Up @@ -252,8 +252,8 @@ public:
explicit TeeAdapter(kj::Own<ReadableStreamSource> inner)
: inner(kj::mv(inner)) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return inner->tryRead(buffer, minBytes);
}

kj::Maybe<uint64_t> tryGetLength() override {
Expand All @@ -269,8 +269,8 @@ public:
explicit TeeBranch(kj::Own<kj::AsyncInputStream> inner)
: inner(kj::mv(inner)) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return inner->tryRead(buffer, minBytes);
}

kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
Expand Down Expand Up @@ -416,9 +416,9 @@ public:
return inner->pumpTo(output, end);
}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
wasRead = true;
return inner->tryRead(buffer, minBytes, maxBytes);
return inner->tryRead(buffer, minBytes);
}

// TODO(someday): we set `wasRead` to avoid warning here, but TeeBranch might still buffer the
Expand Down Expand Up @@ -594,7 +594,7 @@ kj::Maybe<jsg::Promise<ReadResult>> ReadableStreamInternalController::read(
auto bytes = kj::arrayPtr(ptr + byteOffset, byteLength);

auto promise = kj::evalNow([&] {
return readable->tryRead(bytes.begin(), atLeast, bytes.size());
return readable->tryRead(bytes, atLeast);
});
KJ_IF_SOME(readerLock, readState.tryGet<ReaderLocked>()) {
promise = KJ_ASSERT_NONNULL(readerLock.getCanceler())->wrap(kj::mv(promise));
Expand Down Expand Up @@ -799,7 +799,7 @@ kj::Maybe<kj::Own<ReadableStreamSource>> ReadableStreamInternalController::remov
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
class NullSource final: public ReadableStreamSource {
public:
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return size_t(0);
}

Expand Down Expand Up @@ -2188,32 +2188,29 @@ StreamEncoding ReadableStreamInternalController::getPreferredEncoding() {
}

kj::Promise<size_t> IdentityTransformStreamImpl::tryRead(
void* buffer,
size_t minBytes,
size_t maxBytes) {
kj::ArrayPtr<kj::byte> buffer, size_t minBytes) {
size_t total = 0;
while (total < minBytes) {
// TODO(perf): tryReadInternal was written assuming minBytes would always be 1 but we've now
// introduced an API for user to specify a larger minBytes. For now, this is implemented as a
// naiive loop dispatching to the 1 byte version but would be better to bake it deeper into
// the implementation where it can be more efficient.
auto amount = co_await tryReadInternal(buffer, maxBytes);
KJ_ASSERT(amount <= maxBytes);
auto amount = co_await tryReadInternal(buffer);
KJ_ASSERT(amount <= buffer.size());
if (amount == 0) {
// EOF.
break;
}

total += amount;
buffer = reinterpret_cast<char*>(buffer) + amount;
maxBytes -= amount;
buffer = buffer.slice(amount);
}

co_return total;
}

kj::Promise<size_t> IdentityTransformStreamImpl::tryReadInternal(void* buffer, size_t maxBytes) {
auto promise = readHelper(kj::arrayPtr(static_cast<kj::byte*>(buffer), maxBytes));
kj::Promise<size_t> IdentityTransformStreamImpl::tryReadInternal(kj::ArrayPtr<kj::byte> buffer) {
auto promise = readHelper(buffer);

KJ_IF_SOME(l, limit) {
promise = promise.then([this, &l = l](size_t amount) -> kj::Promise<size_t> {
Expand Down
4 changes: 2 additions & 2 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,9 @@ class IdentityTransformStreamImpl: public kj::Refcounted,

// ReadableStreamSource implementation -------------------------------------------------

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override;

kj::Promise<size_t> tryReadInternal(void* buffer, size_t maxBytes);
kj::Promise<size_t> tryReadInternal(kj::ArrayPtr<kj::byte> buffer);

kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override;

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/streams/readable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -581,8 +581,8 @@ public:
kj::Maybe<uint64_t> expectedLength)
: inner(kj::mv(inner)), ended(kj::mv(ended)), expectedLength(expectedLength) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
size_t result = co_await inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
size_t result = co_await inner->tryRead(buffer, minBytes);

KJ_IF_SOME(l, expectedLength) {
KJ_ASSERT(result <= l);
Expand Down Expand Up @@ -629,8 +629,8 @@ public:
NoDeferredProxyReadableStream(kj::Own<ReadableStreamSource> inner, IoContext& ioctx)
: inner(kj::mv(inner)), ioctx(ioctx) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return inner->tryRead(buffer, minBytes);
}

kj::Promise<DeferredProxy<void>> pumpTo(WritableStreamSink& output, bool end) override {
Expand Down
8 changes: 4 additions & 4 deletions src/workerd/api/system-streams.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public:

// Read bytes in identity encoding. If the stream is not already in identity encoding, it will be
// converted to identity encoding via an appropriate stream wrapper.
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override;
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override;

StreamEncoding getPreferredEncoding() override { return encoding; }

Expand Down Expand Up @@ -59,11 +59,11 @@ EncodedAsyncInputStream::EncodedAsyncInputStream(
: inner(kj::mv(inner)), encoding(encoding), ioContext(context) {}

kj::Promise<size_t> EncodedAsyncInputStream::tryRead(
void* buffer, size_t minBytes, size_t maxBytes) {
kj::ArrayPtr<kj::byte> buffer, size_t minBytes) {
ensureIdentityEncoding();

return kj::evalNow([&]() {
return inner->tryRead(buffer, minBytes, maxBytes)
return kj::evalNow([&]() mutable {
return inner->tryRead(buffer, minBytes)
.attach(ioContext.registerPendingEvent());
}).catch_([](kj::Exception&& exception) -> kj::Promise<size_t> {
KJ_IF_SOME(e, translateKjException(exception, {
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ kj::Own<kj::AsyncInputStream> newTeeErrorAdapter(kj::Own<kj::AsyncInputStream> i
public:
explicit Adapter(kj::Own<AsyncInputStream> inner): inner(kj::mv(inner)) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return translateTeeErrors([&] {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return translateTeeErrors([&] () mutable {
return inner->tryRead(buffer, minBytes);
});
}

Expand Down
4 changes: 2 additions & 2 deletions src/workerd/io/io-context.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1210,8 +1210,8 @@ public:
fulfiller->fulfill();
}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return inner->tryRead(buffer, minBytes);
}

kj::Maybe<uint64_t> tryGetLength() override {
Expand Down
14 changes: 7 additions & 7 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public:
return false;
}

char c;
auto promise = stream->tryRead(&c, 1, 1);
byte c[1]{};
auto promise = stream->tryRead(c, 1);
if (!promise.poll(ws)) {
// Read didn't complete immediately. We have no data available, but we're not at EOF.
return false;
Expand All @@ -170,7 +170,7 @@ public:
// Oops, the stream had data available and we accidentally read a byte of it. Store that off
// to the side.
KJ_ASSERT(n == 1);
premature = c;
premature = c[0];
return false;
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ private:
kj::Maybe<char> premature;

kj::String readAllAvailable() {
kj::Vector<char> buffer(256);
kj::Vector<byte> buffer(256);
KJ_IF_SOME(p, premature) {
buffer.add(p);
}
Expand All @@ -213,7 +213,7 @@ private:
size_t pos = buffer.size();
buffer.resize(kj::max(buffer.size() + 256, buffer.capacity()));

auto promise = stream->tryRead(buffer.begin() + pos, 1, buffer.size() - pos);
auto promise = stream->tryRead(buffer.asPtr().slice(pos), 1);
if (!promise.poll(ws)) {
// A tryRead() of 1 byte didn't resolve, there must be no data to read.
buffer.resize(pos);
Expand All @@ -235,7 +235,7 @@ private:
};

buffer.add('\0');
return kj::String(buffer.releaseAsArray());
return kj::String(buffer.releaseAsArray().releaseAsChars());
}

kj::String readWebSocketMessage(size_t maxMessageSize = 1 << 24) {
Expand Down Expand Up @@ -291,7 +291,7 @@ private:
size_t bytesRead = 0;
buffer.resize(buffer.size() + bytesToRead);
while (bytesRead < bytesToRead) {
auto promise = stream->tryRead(buffer.begin() + pos, 1, buffer.size() - pos);
auto promise = stream->tryRead(buffer.asPtr().asBytes().slice(pos), 1);
KJ_REQUIRE(promise.poll(ws), kj::str("No data available while ", what));
// A tryRead() of 1 byte didn't resolve, there must be no data to read.

Expand Down
8 changes: 4 additions & 4 deletions src/workerd/util/abortable.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class AbortableInputStream final: public kj::AsyncInputStream,
AbortableInputStream(kj::Own<kj::AsyncInputStream> inner, RefcountedCanceler& canceler)
: impl(kj::mv(inner), canceler) {}

kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return impl.wrap<size_t>(&kj::AsyncInputStream::read, buffer, minBytes, maxBytes);
kj::Promise<size_t> read(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return impl.wrap<size_t>(&kj::AsyncInputStream::read, buffer, minBytes);
}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return impl.wrap(&kj::AsyncInputStream::tryRead, buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override {
return impl.wrap(&kj::AsyncInputStream::tryRead, buffer, minBytes);
}

kj::Maybe<uint64_t> tryGetLength() override {
Expand Down
Loading

0 comments on commit feccda6

Please sign in to comment.