Skip to content

Commit

Permalink
Reduce excessive copying in Compression Streams API
Browse files Browse the repository at this point in the history
In the Compression Streams API, introduce a helper class to only shift output
data when this would reduce output vector size significantly. This results in
the leading bytes of the output vector becoming invalid when reading only parts
of it, which requires careful tracking but avoids having to move the output data
with every read from the buffer.
  • Loading branch information
fhanau committed Jul 31, 2024
1 parent 939dddd commit 90901c2
Showing 1 changed file with 64 additions and 15 deletions.
79 changes: 64 additions & 15 deletions src/workerd/api/streams/compression.c++
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,60 @@ private:
ContextFlags strictCompression;
};

// Buffer class based on std::vector that erases data that has been read from it lazily to avoid
// excessive copying when reading a larger amount of buffered data in small chunks. valid_size_ is
// used to track the amount of data that has not been read back yet.
class LazyBuffer {
public:
LazyBuffer() : valid_size_(0) {}

// Return a chunk of data and mark it as invalid. The returned chunk remains valid until data is
// shifted, cleared or destructor is called. maybeShift() should be called after the returned data
// has been processed.
kj::ArrayPtr<byte> take(size_t read_size) {
KJ_ASSERT(read_size <= valid_size_);
kj::ArrayPtr<byte> chunk = kj::arrayPtr(&output[output.size() - valid_size_], read_size);
valid_size_ -= read_size;
return chunk;
}

// Shift the output only if doing so results in reducing vector size by at least 1 KiB and 1/8 of
// its size to avoid copying for small reads.
void maybeShift() {
size_t unusedSpace = output.size() - valid_size_;
if (unusedSpace >= 1024 && unusedSpace >= (output.size() >> 3)) {
// Shifting buffer to erase data that has already been read. valid_size_ remains the same.
output.erase(output.begin(), output.begin() + unusedSpace);
}
}

void write(kj::ArrayPtr<const byte> chunk) {
std::copy(chunk.begin(), chunk.end(), std::back_inserter(output));
valid_size_ += chunk.size();
}

void clear() {
output.clear();
valid_size_ = 0;
}

// For convenience, provide the size of the valid data that has not been read back yet. This may
// be smaller than the size of the internal vector, which is not relevant for the stream
// implementation.
size_t size() {
return valid_size_;
}

// As with size(), the buffer is considered empty if there is no valid data remaining.
size_t empty() {
return valid_size_ == 0;
}

private:
std::vector<kj::byte> output;
size_t valid_size_;
};

// Uncompressed data goes in. Compressed data comes out.
template <Context::Mode mode>
class CompressionStreamImpl: public kj::Refcounted,
Expand Down Expand Up @@ -246,9 +300,8 @@ private:
kj::Promise<size_t> tryReadInternal(kj::ArrayPtr<kj::byte> dest, size_t minBytes) {
const auto copyIntoBuffer = [this](kj::ArrayPtr<kj::byte> dest) {
auto maxBytesToCopy = kj::min(dest.size(), output.size());
auto src = &output[0];
memcpy(dest.begin(), src, maxBytesToCopy);
output.erase(output.begin(), output.begin() + maxBytesToCopy);
dest.first(maxBytesToCopy).copyFrom(output.take(maxBytesToCopy));
output.maybeShift();
return maxBytesToCopy;
};

Expand Down Expand Up @@ -305,19 +358,16 @@ private:

// Output has been produced, copy it to result buffer and continue loop to call pumpOnce
// again.
std::copy(result.buffer.begin(), result.buffer.end(), std::back_inserter(output));
output.write(result.buffer);
}
KJ_UNREACHABLE;
}

// Fulfill as many pending reads as we can from the output buffer.
kj::Promise<void> maybeFulfillRead() {
auto remaining = output.size();
auto source = output.begin();

// If there are pending reads and data to be read, we'll loop through
// the pending reads and fulfill them as much as possible.
while (!pendingReads.empty() && remaining > 0) {
while (!pendingReads.empty() && output.size() > 0) {
auto& pending = pendingReads.front();

if (!pending.promise->isWaiting()) {
Expand All @@ -340,12 +390,11 @@ private:
}

// The pending read is still viable so determine how much we can copy in.
auto amountToCopy = kj::min(pending.buffer.size() - pending.filled, remaining);
std::copy(source, source + amountToCopy, pending.buffer.begin() + pending.filled);
source += amountToCopy;
auto amountToCopy = kj::min(pending.buffer.size() - pending.filled, output.size());
kj::ArrayPtr<byte> chunk = output.take(amountToCopy);
pending.buffer.slice(pending.filled, pending.filled + amountToCopy).copyFrom(chunk);
pending.filled += amountToCopy;
remaining -= amountToCopy;
output.erase(output.begin(), source);
output.maybeShift();

// If we've met the minimum bytes requirement for the pending read, fulfill
// the read promise.
Expand All @@ -358,7 +407,7 @@ private:

// If we reached this point in the loop, remaining must be 0 so that we
// don't keep iterating through on the same pending read.
KJ_ASSERT(remaining == 0);
KJ_ASSERT(output.empty());
}

if (state.template is<Ended>() && !pendingReads.empty()) {
Expand Down Expand Up @@ -387,7 +436,7 @@ private:
Context context;

kj::Canceler canceler;
std::vector<kj::byte> output;
LazyBuffer output;
std::deque<PendingRead> pendingReads;
};
} // namespace
Expand Down

0 comments on commit 90901c2

Please sign in to comment.