Skip to content

Commit

Permalink
new async read api
Browse files Browse the repository at this point in the history
  • Loading branch information
mikea committed Jun 21, 2024
1 parent c0c63c3 commit 7cae47a
Show file tree
Hide file tree
Showing 26 changed files with 493 additions and 552 deletions.
4 changes: 2 additions & 2 deletions c++/src/capnp/compat/byte-stream-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ namespace {
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;

auto buffer = kj::heapArray<char>(expected.size());
auto buffer = kj::heapArray<byte>(expected.size());

auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
auto promise = in.tryRead(buffer, 1);
return promise.then([&in,expected,buffer=kj::mv(buffer)](size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
Expand Down
4 changes: 2 additions & 2 deletions c++/src/capnp/compat/byte-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public:
}
}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
kj::Promise<size_t> tryRead(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
// If this is called, it means the tryPumpFrom() in probeForShorterPath() eventually invoked
// code that tries to read manually from the source. We don't know what this code is doing
// exactly, but we do know for sure that the endpoint is not a KjToCapnpStreamAdapter, so
Expand Down Expand Up @@ -1113,7 +1113,7 @@ private:

WriteRequestAndBuffer wrab = { kj::mv(req), kj::mv(buffer) };

return input.tryRead(wrab.buffer.get().begin(), 1, size)
return input.tryRead(wrab.buffer.get().first(size), 1)
.then([this, &input, completed, remaining, size, wrab = kj::mv(wrab)]
(size_t actual) mutable -> kj::Promise<uint64_t> {
if (actual == 0) {
Expand Down
12 changes: 5 additions & 7 deletions c++/src/capnp/compat/http-over-capnp-perf-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,15 @@ public:
}
}

kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.read(buffer, minBytes, maxBytes)
.then([this](size_t n) {
kj::Promise<size_t> read(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
return inner.read(buffer, minBytes).then([this](size_t n) {
++readCount;
readBytes += n;
return n;
});
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner.tryRead(buffer, minBytes, maxBytes)
.then([this](size_t n) {
kj::Promise<size_t> tryRead(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
return inner.tryRead(buffer, minBytes).then([this](size_t n) {
++readCount;
readBytes += n;
return n;
Expand Down Expand Up @@ -209,7 +207,7 @@ public:
NullInputStream(kj::Maybe<size_t> expectedLength = size_t(0))
: expectedLength(expectedLength) {}

kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
kj::Promise<size_t> tryRead(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
return size_t(0);
}

Expand Down
23 changes: 11 additions & 12 deletions c++/src/capnp/compat/http-over-capnp-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ KJ_TEST("KJ and RPC HTTP method enums match") {
kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
if (expected.size() == 0) return kj::READY_NOW;

auto buffer = kj::heapArray<char>(expected.size());
auto buffer = kj::heapArray<byte>(expected.size());

auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
auto promise = in.tryRead(buffer, 1);
return promise.then([&in,expected,buffer=kj::mv(buffer)](size_t amount) {
if (amount == 0) {
KJ_FAIL_ASSERT("expected data never sent", expected);
Expand Down Expand Up @@ -720,7 +720,7 @@ public:
}

kj::Promise<void> manualPumpLoop(kj::ArrayPtr<byte> buffer, kj::AsyncIoStream& io) {
return io.tryRead(buffer.begin(), 1, buffer.size()).then(
return io.tryRead(buffer, 1).then(
[this,&io,buffer](size_t amount) mutable -> kj::Promise<void> {
if (amount == 0) { return kj::READY_NOW; }
return io.write(buffer.first(amount)).then([this,&io,buffer]() mutable -> kj::Promise<void> {
Expand Down Expand Up @@ -817,11 +817,11 @@ KJ_TEST("HTTP-over-Cap'n-Proto Connect with close") {
KJ_ASSERT(status.statusCode == 200);
KJ_ASSERT(status.statusText == "OK"_kj);

auto buf = kj::heapArray<char>(4);
return io->tryRead(buf.begin(), 4, 4).then(
auto buf = kj::heapArray<byte>(4);
return io->tryRead(buf, 4).then(
[buf = kj::mv(buf), io = kj::mv(io)](size_t count) mutable {
KJ_ASSERT(count == 4, "Expecting the stream to read 4 chars.");
return io->tryRead(buf.begin(), 1, 1).then(
return io->tryRead(buf.first(1), 1).then(
[buf = kj::mv(buf)](size_t count) mutable {
KJ_ASSERT(count == 0, "Expecting the stream to get disconnected.");
}).attach(kj::mv(io));
Expand Down Expand Up @@ -886,8 +886,8 @@ KJ_TEST("HTTP-over-Cap'n-Proto Connect Reject") {

paf.promise.then(
[](auto body) mutable {
auto buf = kj::heapArray<char>(5);
return body->tryRead(buf.begin(), 5, 5).then(
auto buf = kj::heapArray<byte>(5);
return body->tryRead(buf, 5).then(
[buf = kj::mv(buf), body = kj::mv(body)](size_t count) mutable {
KJ_ASSERT(count == 5, "Expecting the stream to read 5 chars.");
});
Expand All @@ -897,9 +897,8 @@ KJ_TEST("HTTP-over-Cap'n-Proto Connect Reject") {
}

kj::Promise<void> expectEnd(kj::AsyncInputStream& in) {
static char buffer;

auto promise = in.tryRead(&buffer, 1, 1);
static byte buffer[1];
auto promise = in.tryRead(buffer, 1);
return promise.then([](size_t amount) {
KJ_ASSERT(amount == 0, "expected EOF");
});
Expand Down Expand Up @@ -971,7 +970,7 @@ KJ_TEST("HTTP-over-Cap'n-Proto Connect with startTls") {
return KJ_ASSERT_NONNULL(*tlsStarter)("example.com").then([io = kj::mv(io)]() mutable {
return io->write("hello"_kjb).then([io = kj::mv(io)]() mutable {
auto buffer = kj::heapArray<byte>(5);
return io->tryRead(buffer.begin(), 5, 5).then(
return io->tryRead(buffer, 5).then(
[io = kj::mv(io), buffer = kj::mv(buffer)](size_t) mutable {
io->shutdownWrite();
return expectEnd(*io).attach(kj::mv(io));
Expand Down
8 changes: 4 additions & 4 deletions c++/src/capnp/rpc-twoparty-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,11 @@ public:
MockSndbufStream(kj::Own<AsyncIoStream> inner, size_t& window, size_t& written)
: inner(kj::mv(inner)), window(window), written(written) {}

kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
kj::Promise<size_t> read(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
return inner->read(buffer, minBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
kj::Promise<size_t> tryRead(kj::ArrayPtr<byte> buffer, size_t minBytes) override {
return inner->tryRead(buffer, minBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
Expand Down
21 changes: 11 additions & 10 deletions c++/src/capnp/serialize-async.c++
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private:

kj::Promise<bool> AsyncMessageReader::read(kj::AsyncInputStream& inputStream,
kj::ArrayPtr<word> scratchSpace) {
return inputStream.tryRead(firstWord, sizeof(firstWord), sizeof(firstWord))
return inputStream.tryRead(kj::arrayPtr(firstWord).asBytes(), sizeof(firstWord))
.then([this,&inputStream,KJ_CPCAP(scratchSpace)](size_t n) mutable -> kj::Promise<bool> {
if (n == 0) {
return false;
Expand All @@ -99,7 +99,7 @@ kj::Promise<bool> AsyncMessageReader::read(kj::AsyncInputStream& inputStream,
kj::Promise<kj::Maybe<size_t>> AsyncMessageReader::readWithFds(
kj::AsyncCapabilityStream& inputStream, kj::ArrayPtr<kj::AutoCloseFd> fds,
kj::ArrayPtr<word> scratchSpace) {
return inputStream.tryReadWithFds(firstWord, sizeof(firstWord), sizeof(firstWord),
return inputStream.tryReadWithFds(kj::arrayPtr(firstWord).asBytes(), sizeof(firstWord),
fds.begin(), fds.size())
.then([this,&inputStream,KJ_CPCAP(scratchSpace)]
(kj::AsyncCapabilityStream::ReadResult result) mutable
Expand Down Expand Up @@ -131,7 +131,7 @@ kj::Promise<void> AsyncMessageReader::readAfterFirstWord(kj::AsyncInputStream& i
if (segmentCount() > 1) {
// Read sizes for all segments except the first. Include padding if necessary.
moreSizes = kj::heapArray<_::WireValue<uint32_t>>(segmentCount() & ~1);
return inputStream.read(moreSizes.begin(), moreSizes.size() * sizeof(moreSizes[0]))
return inputStream.read(moreSizes.asBytes())
.then([this,&inputStream,KJ_CPCAP(scratchSpace)]() mutable {
return readSegments(inputStream, scratchSpace);
});
Expand Down Expand Up @@ -179,7 +179,7 @@ kj::Promise<void> AsyncMessageReader::readSegments(kj::AsyncInputStream& inputSt
}
}

return inputStream.read(scratchSpace.begin(), totalWords * sizeof(word));
return inputStream.read(scratchSpace.first(totalWords).asBytes());
}


Expand Down Expand Up @@ -760,7 +760,7 @@ kj::Promise<kj::Maybe<MessageReaderAndFds>> BufferedMessageStream::tryReadMessag
KJ_DASSERT(minBytes <= maxBytes);

// Read from underlying stream.
return tryReadWithFds(beginAvailable, minBytes, maxBytes,
return tryReadWithFds(kj::arrayPtr(beginAvailable, maxBytes), minBytes,
fdSpace.begin() + fdsSoFar, fdSpace.size() - fdsSoFar)
.then([this,minBytes,fdSpace,fdsSoFar,options,scratchSpace]
(kj::AsyncCapabilityStream::ReadResult result) mutable
Expand Down Expand Up @@ -795,13 +795,14 @@ kj::Promise<kj::Maybe<MessageReaderAndFds>> BufferedMessageStream::readEntireMes

memcpy(msgBuffer.asBytes().begin(), prefix.begin(), prefix.size());

size_t bytesRemaining = msgBuffer.asBytes().size() - prefix.size();
auto remaining = msgBuffer.asBytes().slice(prefix.size());
auto bytesRemaining = remaining.size();

// TODO(perf): If we had scatter-read API support, we could optimistically try to read additional
// bytes into the shared buffer, to save syscalls when a big message is immediately followed
// by small messages.
auto promise = tryReadWithFds(
msgBuffer.asBytes().begin() + prefix.size(), bytesRemaining, bytesRemaining,
remaining, bytesRemaining,
fdSpace.begin() + fdsSoFar, fdSpace.size() - fdsSoFar);
return promise
.then([this, msgBuffer = kj::mv(msgBuffer), fdSpace, fdsSoFar, options, bytesRemaining]
Expand Down Expand Up @@ -840,12 +841,12 @@ kj::Promise<kj::Maybe<MessageReaderAndFds>> BufferedMessageStream::readEntireMes
}

kj::Promise<kj::AsyncCapabilityStream::ReadResult> BufferedMessageStream::tryReadWithFds(
void* buffer, size_t minBytes, size_t maxBytes, kj::AutoCloseFd* fdBuffer, size_t maxFds) {
kj::ArrayPtr<byte> buffer, size_t minBytes, kj::AutoCloseFd* fdBuffer, size_t maxFds) {
KJ_IF_SOME(cs, capStream) {
return cs.tryReadWithFds(buffer, minBytes, maxBytes, fdBuffer, maxFds);
return cs.tryReadWithFds(buffer, minBytes, fdBuffer, maxFds);
} else {
// Regular byte stream, no FDs.
return stream.tryRead(buffer, minBytes, maxBytes)
return stream.tryRead(buffer, minBytes)
.then([](size_t amount) mutable -> kj::AsyncCapabilityStream::ReadResult {
return { amount, 0 };
});
Expand Down
2 changes: 1 addition & 1 deletion c++/src/capnp/serialize-async.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class BufferedMessageStream final: public MessageStream {
// a single array and return it.

kj::Promise<kj::AsyncCapabilityStream::ReadResult> tryReadWithFds(
void* buffer, size_t minBytes, size_t maxBytes, kj::AutoCloseFd* fdBuffer, size_t maxFds);
kj::ArrayPtr<byte> buffer, size_t minBytes, kj::AutoCloseFd* fdBuffer, size_t maxFds);
// Executes AsyncCapabilityStream::tryReadWithFds() on the underlying stream, or falls back to
// AsyncIoStream::tryRead() if it's not a capability stream.

Expand Down
2 changes: 1 addition & 1 deletion c++/src/kj/async-coroutine-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ Promise<void> sendData(Promise<Own<NetworkAddress>> addressPromise) {
Promise<String> receiveDataCoroutine(Own<ConnectionReceiver> listener) {
auto server = co_await listener->accept();
char buffer[4]{};
auto n = co_await server->read(buffer, 3, 4);
auto n = co_await server->read(arrayPtr(buffer).asBytes(), 3);
KJ_EXPECT(3u == n);
co_return heapString(buffer, n);
}
Expand Down
Loading

0 comments on commit 7cae47a

Please sign in to comment.