From 90901c2617d3c166a5931b64fc35d05036ac351c Mon Sep 17 00:00:00 2001 From: Felix Hanau Date: Mon, 29 Jul 2024 20:01:14 -0400 Subject: [PATCH] Reduce excessive copying in Compression Streams API In the Compression Streams API, introduce a helper class to only shift output data when this would reduce output vector size significantly. This results in the leading bytes of the output vector becoming invalid when reading only parts of it, which requires careful tracking but avoids having to move the output data with every read from the buffer. --- src/workerd/api/streams/compression.c++ | 79 ++++++++++++++++++++----- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/src/workerd/api/streams/compression.c++ b/src/workerd/api/streams/compression.c++ index 71455d69e0c..53e2959060a 100644 --- a/src/workerd/api/streams/compression.c++ +++ b/src/workerd/api/streams/compression.c++ @@ -135,6 +135,60 @@ private: ContextFlags strictCompression; }; +// Buffer class based on std::vector that erases data that has been read from it lazily to avoid +// excessive copying when reading a larger amount of buffered data in small chunks. valid_size_ is +// used to track the amount of data that has not been read back yet. +class LazyBuffer { +public: + LazyBuffer() : valid_size_(0) {} + + // Return a chunk of data and mark it as invalid. The returned chunk remains valid until data is + // shifted, cleared or destructor is called. maybeShift() should be called after the returned data + // has been processed. + kj::ArrayPtr take(size_t read_size) { + KJ_ASSERT(read_size <= valid_size_); + kj::ArrayPtr chunk = kj::arrayPtr(&output[output.size() - valid_size_], read_size); + valid_size_ -= read_size; + return chunk; + } + + // Shift the output only if doing so results in reducing vector size by at least 1 KiB and 1/8 of + // its size to avoid copying for small reads. + void maybeShift() { + size_t unusedSpace = output.size() - valid_size_; + if (unusedSpace >= 1024 && unusedSpace >= (output.size() >> 3)) { + // Shifting buffer to erase data that has already been read. valid_size_ remains the same. + output.erase(output.begin(), output.begin() + unusedSpace); + } + } + + void write(kj::ArrayPtr chunk) { + std::copy(chunk.begin(), chunk.end(), std::back_inserter(output)); + valid_size_ += chunk.size(); + } + + void clear() { + output.clear(); + valid_size_ = 0; + } + + // For convenience, provide the size of the valid data that has not been read back yet. This may + // be smaller than the size of the internal vector, which is not relevant for the stream + // implementation. + size_t size() { + return valid_size_; + } + + // As with size(), the buffer is considered empty if there is no valid data remaining. + size_t empty() { + return valid_size_ == 0; + } + +private: + std::vector output; + size_t valid_size_; +}; + // Uncompressed data goes in. Compressed data comes out. template class CompressionStreamImpl: public kj::Refcounted, @@ -246,9 +300,8 @@ private: kj::Promise tryReadInternal(kj::ArrayPtr dest, size_t minBytes) { const auto copyIntoBuffer = [this](kj::ArrayPtr dest) { auto maxBytesToCopy = kj::min(dest.size(), output.size()); - auto src = &output[0]; - memcpy(dest.begin(), src, maxBytesToCopy); - output.erase(output.begin(), output.begin() + maxBytesToCopy); + dest.first(maxBytesToCopy).copyFrom(output.take(maxBytesToCopy)); + output.maybeShift(); return maxBytesToCopy; }; @@ -305,19 +358,16 @@ private: // Output has been produced, copy it to result buffer and continue loop to call pumpOnce // again. - std::copy(result.buffer.begin(), result.buffer.end(), std::back_inserter(output)); + output.write(result.buffer); } KJ_UNREACHABLE; } // Fulfill as many pending reads as we can from the output buffer. kj::Promise maybeFulfillRead() { - auto remaining = output.size(); - auto source = output.begin(); - // If there are pending reads and data to be read, we'll loop through // the pending reads and fulfill them as much as possible. - while (!pendingReads.empty() && remaining > 0) { + while (!pendingReads.empty() && output.size() > 0) { auto& pending = pendingReads.front(); if (!pending.promise->isWaiting()) { @@ -340,12 +390,11 @@ private: } // The pending read is still viable so determine how much we can copy in. - auto amountToCopy = kj::min(pending.buffer.size() - pending.filled, remaining); - std::copy(source, source + amountToCopy, pending.buffer.begin() + pending.filled); - source += amountToCopy; + auto amountToCopy = kj::min(pending.buffer.size() - pending.filled, output.size()); + kj::ArrayPtr chunk = output.take(amountToCopy); + pending.buffer.slice(pending.filled, pending.filled + amountToCopy).copyFrom(chunk); pending.filled += amountToCopy; - remaining -= amountToCopy; - output.erase(output.begin(), source); + output.maybeShift(); // If we've met the minimum bytes requirement for the pending read, fulfill // the read promise. @@ -358,7 +407,7 @@ private: // If we reached this point in the loop, remaining must be 0 so that we // don't keep iterating through on the same pending read. - KJ_ASSERT(remaining == 0); + KJ_ASSERT(output.empty()); } if (state.template is() && !pendingReads.empty()) { @@ -387,7 +436,7 @@ private: Context context; kj::Canceler canceler; - std::vector output; + LazyBuffer output; std::deque pendingReads; }; } // namespace