diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 1fae2645bce8..c2b557cd5957 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -19,6 +19,7 @@ Minor Behavior Changes revert to the legacy path canonicalizer, enable the runtime flag `envoy.reloadable_features.remove_forked_chromium_url`. * oauth filter: added the optional parameter :ref:`auth_scopes ` with default value of 'user' if not provided. Enables this value to be overridden in the Authorization request to the OAuth provider. +* perf: allow reading more bytes per operation from raw sockets to improve performance. * tcp: setting NODELAY in the base connection class. This should have no effect for TCP or HTTP proxying, but may improve throughput in other areas. This behavior can be temporarily reverted by setting `envoy.reloadable_features.always_nodelay` to false. * upstream: host weight changes now cause a full load balancer rebuild as opposed to happening atomically inline. This change has been made to support load balancer pre-computation of data diff --git a/include/envoy/buffer/BUILD b/include/envoy/buffer/BUILD index 499ec8605a2e..9367bedb85ec 100644 --- a/include/envoy/buffer/BUILD +++ b/include/envoy/buffer/BUILD @@ -16,6 +16,7 @@ envoy_cc_library( ], deps = [ "//include/envoy/api:os_sys_calls_interface", + "//source/common/common:assert_lib", "//source/common/common:byte_order_lib", "//source/common/common:utility_lib", ], diff --git a/include/envoy/buffer/buffer.h b/include/envoy/buffer/buffer.h index d9404c47bc82..a4bdfc9459da 100644 --- a/include/envoy/buffer/buffer.h +++ b/include/envoy/buffer/buffer.h @@ -10,6 +10,7 @@ #include "envoy/common/platform.h" #include "envoy/common/pure.h" +#include "common/common/assert.h" #include "common/common/byte_order.h" #include "common/common/utility.h" @@ -72,6 +73,18 @@ class SliceData { using SliceDataPtr = std::unique_ptr; +class Reservation; +class ReservationSingleSlice; + +// Base class for an object to manage the ownership for slices in a `Reservation` or +// `ReservationSingleSlice`. +class ReservationSlicesOwner { +public: + virtual ~ReservationSlicesOwner() = default; +}; + +using ReservationSlicesOwnerPtr = std::unique_ptr; + /** * A basic buffer abstraction. */ @@ -129,16 +142,6 @@ class Instance { */ virtual void prepend(Instance& data) PURE; - /** - * Commit a set of slices originally obtained from reserve(). The number of slices should match - * the number obtained from reserve(). The size of each slice can also be altered. Commit must - * occur once following a reserve() without any mutating operations in between other than to the - * iovecs len_ fields. - * @param iovecs supplies the array of slices to commit. - * @param num_iovecs supplies the size of the slices array. - */ - virtual void commit(RawSlice* iovecs, uint64_t num_iovecs) PURE; - /** * Copy out a section of the buffer. * @param start supplies the buffer index to start copying from. @@ -202,13 +205,22 @@ class Instance { virtual void move(Instance& rhs, uint64_t length) PURE; /** - * Reserve space in the buffer. - * @param length supplies the amount of space to reserve. - * @param iovecs supplies the slices to fill with reserved memory. - * @param num_iovecs supplies the size of the slices array. - * @return the number of iovecs used to reserve the space. + * Reserve space in the buffer for reading into. The amount of space reserved is determined + * based on buffer settings and performance considerations. + * @return a `Reservation`, on which `commit()` can be called, or which can + * be destructed to discard any resources in the `Reservation`. */ - virtual uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) PURE; + virtual Reservation reserveForRead() PURE; + + /** + * Reserve space in the buffer in a single slice. + * @param length the exact length of the reservation. + * @param separate_slice specifies whether the reserved space must be in a separate slice + * from any other data in this buffer. + * @return a `ReservationSingleSlice` which has exactly one slice in it. + */ + virtual ReservationSingleSlice reserveSingleSlice(uint64_t length, + bool separate_slice = false) PURE; /** * Search for an occurrence of data within the buffer. @@ -414,6 +426,17 @@ class Instance { * the low watermark. */ virtual bool highWatermarkTriggered() const PURE; + +private: + friend Reservation; + friend ReservationSingleSlice; + + /** + * Called by a `Reservation` to commit `length` bytes of the + * reservation. + */ + virtual void commit(uint64_t length, absl::Span slices, + ReservationSlicesOwnerPtr slices_owner) PURE; }; using InstancePtr = std::unique_ptr; @@ -441,5 +464,140 @@ class WatermarkFactory { using WatermarkFactoryPtr = std::unique_ptr; using WatermarkFactorySharedPtr = std::shared_ptr; +/** + * Holds an in-progress addition to a buffer. + * + * @note For performance reasons, this class is passed by value to + * avoid an extra allocation, so it cannot have any virtual methods. + */ +class Reservation final { +public: + Reservation(Reservation&&) = default; + ~Reservation() = default; + + /** + * @return an array of `RawSlice` of length `numSlices()`. + */ + RawSlice* slices() { return slices_.data(); } + const RawSlice* slices() const { return slices_.data(); } + + /** + * @return the number of slices present. + */ + uint64_t numSlices() const { return slices_.size(); } + + /** + * @return the total length of the Reservation. + */ + uint64_t length() const { return length_; } + + /** + * Commits some or all of the data in the reservation. + * @param length supplies the number of bytes to commit. This must be + * less than or equal to the size of the `Reservation`. + * + * @note No other methods should be called on the object after `commit()` is called. + */ + void commit(uint64_t length) { + ENVOY_BUG(length <= length_, "commit() length must be <= size of the Reservation"); + ASSERT(length == 0 || !slices_.empty(), + "Reservation.commit() called on empty Reservation; possible double-commit()."); + buffer_.commit(length, absl::MakeSpan(slices_), std::move(slices_owner_)); + length_ = 0; + slices_.clear(); + ASSERT(slices_owner_ == nullptr); + } + + // Tuned to allow reads of 128k, using 16k slices. + static constexpr uint32_t MAX_SLICES_ = 8; + +private: + Reservation(Instance& buffer) : buffer_(buffer) {} + + // The buffer that created this `Reservation`. + Instance& buffer_; + + // The combined length of all slices in the Reservation. + uint64_t length_; + + // The RawSlices in the reservation, usable by operations such as `::readv()`. + absl::InlinedVector slices_; + + // An owner that can be set by the creator of the `Reservation` to free slices upon + // destruction. + ReservationSlicesOwnerPtr slices_owner_; + +public: + // The following are for use only by implementations of Buffer. Because c++ + // doesn't allow inheritance of friendship, these are just trying to make + // misuse easy to spot in a code review. + static Reservation bufferImplUseOnlyConstruct(Instance& buffer) { return Reservation(buffer); } + decltype(slices_)& bufferImplUseOnlySlices() { return slices_; } + ReservationSlicesOwnerPtr& bufferImplUseOnlySlicesOwner() { return slices_owner_; } + void bufferImplUseOnlySetLength(uint64_t length) { length_ = length; } +}; + +/** + * Holds an in-progress addition to a buffer, holding only a single slice. + * + * @note For performance reasons, this class is passed by value to + * avoid an extra allocation, so it cannot have any virtual methods. + */ +class ReservationSingleSlice final { +public: + ReservationSingleSlice(ReservationSingleSlice&&) = default; + ~ReservationSingleSlice() = default; + + /** + * @return the slice in the Reservation. + */ + RawSlice slice() const { return slice_; } + + /** + * @return the total length of the Reservation. + */ + uint64_t length() const { return slice_.len_; } + + /** + * Commits some or all of the data in the reservation. + * @param length supplies the number of bytes to commit. This must be + * less than or equal to the size of the `Reservation`. + * + * @note No other methods should be called on the object after `commit()` is called. + */ + void commit(uint64_t length) { + ENVOY_BUG(length <= slice_.len_, "commit() length must be <= size of the Reservation"); + ASSERT(length == 0 || slice_.mem_ != nullptr, + "Reservation.commit() called on empty Reservation; possible double-commit()."); + buffer_.commit(length, absl::MakeSpan(&slice_, 1), std::move(slice_owner_)); + slice_ = {nullptr, 0}; + ASSERT(slice_owner_ == nullptr); + } + +private: + ReservationSingleSlice(Instance& buffer) : buffer_(buffer) {} + + // The buffer that created this `Reservation`. + Instance& buffer_; + + // The RawSlice in the reservation, usable by anything needing the raw pointer + // and length to read into. + RawSlice slice_{}; + + // An owner that can be set by the creator of the `ReservationSingleSlice` to free the slice upon + // destruction. + ReservationSlicesOwnerPtr slice_owner_; + +public: + // The following are for use only by implementations of Buffer. Because c++ + // doesn't allow inheritance of friendship, these are just trying to make + // misuse easy to spot in a code review. + static ReservationSingleSlice bufferImplUseOnlyConstruct(Instance& buffer) { + return ReservationSingleSlice(buffer); + } + RawSlice& bufferImplUseOnlySlice() { return slice_; } + ReservationSlicesOwnerPtr& bufferImplUseOnlySliceOwner() { return slice_owner_; } +}; + } // namespace Buffer } // namespace Envoy diff --git a/include/envoy/network/io_handle.h b/include/envoy/network/io_handle.h index de530474592a..ab5289507fdc 100644 --- a/include/envoy/network/io_handle.h +++ b/include/envoy/network/io_handle.h @@ -70,11 +70,13 @@ class IoHandle { /** * Read from a io handle directly into buffer. * @param buffer supplies the buffer to read into. - * @param max_length supplies the maximum length to read. + * @param max_length supplies the maximum length to read. A value of absl::nullopt means to read + * as much data as possible, within the constraints of available buffer size. * @return a IoCallUint64Result with err_ = nullptr and rc_ = the number of bytes * read if successful, or err_ = some IoError for failure. If call failed, rc_ shouldn't be used. */ - virtual Api::IoCallUint64Result read(Buffer::Instance& buffer, uint64_t max_length) PURE; + virtual Api::IoCallUint64Result read(Buffer::Instance& buffer, + absl::optional max_length) PURE; /** * Write the data in slices out. diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 3d2b27483466..420dd9cd1d2c 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -18,6 +18,8 @@ namespace { constexpr uint64_t CopyThreshold = 512; } // namespace +thread_local absl::InlinedVector Slice::free_list_; + void OwnedImpl::addImpl(const void* data, uint64_t size) { const char* src = static_cast(data); bool new_slice_needed = slices_.empty(); @@ -81,47 +83,6 @@ void OwnedImpl::prepend(Instance& data) { other.postProcess(); } -void OwnedImpl::commit(RawSlice* iovecs, uint64_t num_iovecs) { - if (num_iovecs == 0) { - return; - } - if (slices_.empty()) { - return; - } - // Find the slices in the buffer that correspond to the iovecs: - // First, scan backward from the end of the buffer to find the last slice containing - // any content. Reservations are made from the end of the buffer, and out-of-order commits - // aren't supported, so any slices before this point cannot match the iovecs being committed. - ssize_t slice_index = static_cast(slices_.size()) - 1; - while (slice_index >= 0 && slices_[slice_index].dataSize() == 0) { - slice_index--; - } - if (slice_index < 0) { - // There was no slice containing any data, so rewind the iterator at the first slice. - slice_index = 0; - } - - // Next, scan forward and attempt to match the slices against iovecs. - uint64_t num_slices_committed = 0; - while (num_slices_committed < num_iovecs) { - if (slices_[slice_index].commit(iovecs[num_slices_committed])) { - length_ += iovecs[num_slices_committed].len_; - num_slices_committed++; - } - slice_index++; - if (slice_index == static_cast(slices_.size())) { - break; - } - } - - // In case an extra slice was reserved, remove empty slices from the end of the buffer. - while (!slices_.empty() && slices_.back().dataSize() == 0) { - slices_.pop_back(); - } - - ASSERT(num_slices_committed > 0); -} - void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const { uint64_t bytes_to_skip = start; uint8_t* dest = static_cast(data); @@ -339,55 +300,119 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) { other.postProcess(); } -uint64_t OwnedImpl::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) { - if (num_iovecs == 0 || length == 0) { - return 0; +Reservation OwnedImpl::reserveForRead() { + return reserveWithMaxLength(default_read_reservation_size_); +} + +Reservation OwnedImpl::reserveWithMaxLength(uint64_t max_length) { + Reservation reservation = Reservation::bufferImplUseOnlyConstruct(*this); + if (max_length == 0) { + return reservation; } + + // Remove any empty slices at the end. + while (!slices_.empty() && slices_.back().dataSize() == 0) { + slices_.pop_back(); + } + + uint64_t bytes_remaining = max_length; + uint64_t reserved = 0; + auto& reservation_slices = reservation.bufferImplUseOnlySlices(); + auto slices_owner = std::make_unique(); + // Check whether there are any empty slices with reservable space at the end of the buffer. - size_t first_reservable_slice = slices_.size(); - while (first_reservable_slice > 0) { - if (slices_[first_reservable_slice - 1].reservableSize() == 0) { - break; - } - first_reservable_slice--; - if (slices_[first_reservable_slice].dataSize() != 0) { - // There is some content in this slice, so anything in front of it is non-reservable. + uint64_t reservable_size = slices_.empty() ? 0 : slices_.back().reservableSize(); + if (reservable_size >= max_length || reservable_size >= (Slice::default_slice_size_ / 8)) { + auto& last_slice = slices_.back(); + const uint64_t reservation_size = std::min(last_slice.reservableSize(), bytes_remaining); + auto slice = last_slice.reserve(reservation_size); + reservation_slices.push_back(slice); + slices_owner->owned_slices_.emplace_back(Slice()); + bytes_remaining -= slice.len_; + reserved += slice.len_; + } + + while (bytes_remaining != 0 && reservation_slices.size() < reservation.MAX_SLICES_) { + const uint64_t size = Slice::default_slice_size_; + + // If the next slice would go over the desired size, and the amount already reserved is already + // at least one full slice in size, stop allocating slices. This prevents returning a + // reservation larger than requested, which could go above the watermark limits for a watermark + // buffer, unless the size would be very small (less than 1 full slice). + if (size > bytes_remaining && reserved >= size) { break; } + + Slice slice(size, slices_owner->free_list_); + const auto raw_slice = slice.reserve(size); + reservation_slices.push_back(raw_slice); + slices_owner->owned_slices_.emplace_back(std::move(slice)); + bytes_remaining -= std::min(raw_slice.len_, bytes_remaining); + reserved += raw_slice.len_; } - // Having found the sequence of reservable slices at the back of the buffer, reserve - // as much space as possible from each one. - uint64_t num_slices_used = 0; - uint64_t bytes_remaining = length; - size_t slice_index = first_reservable_slice; - while (slice_index < slices_.size() && bytes_remaining != 0 && num_slices_used < num_iovecs) { - auto& slice = slices_[slice_index]; - const uint64_t reservation_size = std::min(slice.reservableSize(), bytes_remaining); - if (num_slices_used + 1 == num_iovecs && reservation_size < bytes_remaining) { - // There is only one iovec left, and this next slice does not have enough space to - // complete the reservation. Stop iterating, with last one iovec still unpopulated, - // so the code following this loop can allocate a new slice to hold the rest of the - // reservation. - break; - } - iovecs[num_slices_used] = slice.reserve(reservation_size); - bytes_remaining -= iovecs[num_slices_used].len_; - num_slices_used++; - slice_index++; + ASSERT(reservation_slices.size() == slices_owner->owned_slices_.size()); + reservation.bufferImplUseOnlySlicesOwner() = std::move(slices_owner); + reservation.bufferImplUseOnlySetLength(reserved); + + return reservation; +} + +ReservationSingleSlice OwnedImpl::reserveSingleSlice(uint64_t length, bool separate_slice) { + ReservationSingleSlice reservation = ReservationSingleSlice::bufferImplUseOnlyConstruct(*this); + if (length == 0) { + return reservation; + } + + // Remove any empty slices at the end. + while (!slices_.empty() && slices_.back().dataSize() == 0) { + slices_.pop_back(); + } + + auto& reservation_slice = reservation.bufferImplUseOnlySlice(); + auto slice_owner = std::make_unique(); + + // Check whether there are any empty slices with reservable space at the end of the buffer. + uint64_t reservable_size = + (separate_slice || slices_.empty()) ? 0 : slices_.back().reservableSize(); + if (reservable_size >= length) { + reservation_slice = slices_.back().reserve(length); + } else { + Slice slice(length); + reservation_slice = slice.reserve(length); + slice_owner->owned_slice_ = std::move(slice); } - // If needed, allocate one more slice at the end to provide the remainder of the reservation. - if (bytes_remaining != 0) { - slices_.emplace_back(bytes_remaining); - iovecs[num_slices_used] = slices_.back().reserve(bytes_remaining); - bytes_remaining -= iovecs[num_slices_used].len_; - num_slices_used++; + reservation.bufferImplUseOnlySliceOwner() = std::move(slice_owner); + + return reservation; +} + +void OwnedImpl::commit(uint64_t length, absl::Span slices, + ReservationSlicesOwnerPtr slices_owner_base) { + if (length == 0) { + return; } - ASSERT(num_slices_used <= num_iovecs); - ASSERT(bytes_remaining == 0); - return num_slices_used; + ASSERT(dynamic_cast(slices_owner_base.get()) != nullptr); + std::unique_ptr slices_owner( + static_cast(slices_owner_base.release())); + + absl::Span owned_slices = slices_owner->ownedSlices(); + ASSERT(slices.size() == owned_slices.size()); + + uint64_t bytes_remaining = length; + for (uint32_t i = 0; i < slices.size() && bytes_remaining > 0; i++) { + Slice& owned_slice = owned_slices[i]; + if (owned_slice.data() != nullptr) { + slices_.emplace_back(std::move(owned_slice)); + } + slices[i].len_ = std::min(slices[i].len_, bytes_remaining); + bool success = slices_.back().commit(slices[i]); + ASSERT(success); + length_ += slices[i].len_; + bytes_remaining -= slices[i].len_; + } } ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start, size_t length) const { diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index c557107df050..b217d82195ed 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -33,6 +33,16 @@ namespace Buffer { class Slice { public: using Reservation = RawSlice; + using StoragePtr = std::unique_ptr; + + static constexpr uint32_t free_list_max_ = Buffer::Reservation::MAX_SLICES_; + using FreeListType = absl::InlinedVector; + class FreeListReference { + private: + FreeListReference(FreeListType& free_list) : free_list_(free_list) {} + FreeListType& free_list_; + friend class Slice; + }; /** * Create an empty Slice with 0 capacity. @@ -44,9 +54,9 @@ class Slice { * @param min_capacity number of bytes of space the slice should have. Actual capacity is rounded * up to the next multiple of 4kb. */ - Slice(uint64_t min_capacity) - : capacity_(sliceSize(min_capacity)), storage_(new uint8_t[capacity_]), base_(storage_.get()), - data_(0), reservable_(0) {} + Slice(uint64_t min_capacity, absl::optional free_list = absl::nullopt) + : capacity_(sliceSize(min_capacity)), storage_(newStorage(capacity_, free_list)), + base_(storage_.get()), data_(0), reservable_(0) {} /** * Create an immutable Slice that refers to an external buffer fragment. @@ -77,6 +87,7 @@ class Slice { if (this != &rhs) { callAndClearDrainTrackers(); + freeStorage(std::move(storage_), capacity_); storage_ = std::move(rhs.storage_); drain_trackers_ = std::move(rhs.drain_trackers_); base_ = rhs.base_; @@ -93,7 +104,15 @@ class Slice { return *this; } - ~Slice() { callAndClearDrainTrackers(); } + ~Slice() { + callAndClearDrainTrackers(); + freeStorage(std::move(storage_), capacity_); + } + + void freeStorage(FreeListReference free_list) { + callAndClearDrainTrackers(); + freeStorage(std::move(storage_), capacity_, free_list); + } /** * @return true if the data in the slice is mutable @@ -285,6 +304,10 @@ class Slice { drain_trackers_.clear(); } + static constexpr uint32_t default_slice_size_ = 16384; + + static FreeListReference freeList() { return FreeListReference(free_list_); } + protected: /** * Compute a slice size big enough to hold a specified amount of data. @@ -297,13 +320,56 @@ class Slice { return num_pages * PageSize; } + static StoragePtr newStorage(uint64_t capacity, absl::optional free_list_opt) { + ASSERT(sliceSize(default_slice_size_) == default_slice_size_, + "default_slice_size_ incompatible with sliceSize()"); + ASSERT(sliceSize(capacity) == capacity, + "newStorage should only be called on values returned from sliceSize()"); + ASSERT(!free_list_opt.has_value() || &free_list_opt->free_list_ == &free_list_); + + StoragePtr storage; + if (capacity == default_slice_size_ && free_list_opt.has_value()) { + FreeListType& free_list = free_list_opt->free_list_; + if (!free_list.empty()) { + storage = std::move(free_list.back()); + ASSERT(storage != nullptr); + ASSERT(free_list.back() == nullptr); + free_list.pop_back(); + return storage; + } + } + + storage.reset(new uint8_t[capacity]); + return storage; + } + + static void freeStorage(StoragePtr storage, uint64_t capacity, + absl::optional free_list_opt = absl::nullopt) { + if (storage == nullptr) { + return; + } + + if (capacity == default_slice_size_ && free_list_opt.has_value()) { + FreeListType& free_list = free_list_opt->free_list_; + if (free_list.size() < free_list_max_) { + free_list.emplace_back(std::move(storage)); + ASSERT(storage == nullptr); + return; + } + } + + storage.reset(); + } + + static thread_local FreeListType free_list_; + /** Length of the byte array that base_ points to. This is also the offset in bytes from the start * of the slice to the end of the Reservable section. */ uint64_t capacity_; /** Backing storage for mutable slices which own their own storage. This storage should never be * accessed directly; access base_ instead. */ - std::unique_ptr storage_; + StoragePtr storage_; /** Start of the slice. Points to storage_ iff the slice owns its own storage. */ uint8_t* base_{nullptr}; @@ -319,6 +385,8 @@ class Slice { std::list> drain_trackers_; }; +class OwnedImpl; + class SliceDataImpl : public SliceData { public: explicit SliceDataImpl(Slice&& slice) : slice_(std::move(slice)) {} @@ -330,6 +398,7 @@ class SliceDataImpl : public SliceData { } private: + friend OwnedImpl; Slice slice_; }; @@ -573,7 +642,6 @@ class OwnedImpl : public LibEventInstance { void add(const Instance& data) override; void prepend(absl::string_view data) override; void prepend(Instance& data) override; - void commit(RawSlice* iovecs, uint64_t num_iovecs) override; void copyOut(size_t start, uint64_t size, void* data) const override; void drain(uint64_t size) override; RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const override; @@ -583,7 +651,8 @@ class OwnedImpl : public LibEventInstance { void* linearize(uint32_t size) override; void move(Instance& rhs) override; void move(Instance& rhs, uint64_t length) override; - uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; + Reservation reserveForRead() override; + ReservationSingleSlice reserveSingleSlice(uint64_t length, bool separate_slice = false) override; ssize_t search(const void* data, uint64_t size, size_t start, size_t length) const override; bool startsWith(absl::string_view data) const override; std::string toString() const override; @@ -618,6 +687,25 @@ class OwnedImpl : public LibEventInstance { */ std::vector describeSlicesForTest() const; + /** + * Create a reservation for reading with a non-default length. Used in benchmark tests. + */ + Reservation reserveForReadWithLengthForTest(uint64_t length) { + return reserveWithMaxLength(length); + } + +protected: + static constexpr uint64_t default_read_reservation_size_ = + Reservation::MAX_SLICES_ * Slice::default_slice_size_; + + /** + * Create a reservation with a maximum length. + */ + Reservation reserveWithMaxLength(uint64_t max_length); + + void commit(uint64_t length, absl::Span slices, + ReservationSlicesOwnerPtr slices_owner) override; + private: /** * @param rhs another buffer @@ -641,6 +729,32 @@ class OwnedImpl : public LibEventInstance { /** Sum of the dataSize of all slices. */ OverflowDetectingUInt64 length_; + + struct OwnedImplReservationSlicesOwner : public ReservationSlicesOwner { + virtual absl::Span ownedSlices() PURE; + }; + + struct OwnedImplReservationSlicesOwnerMultiple : public OwnedImplReservationSlicesOwner { + // Optimization: get the thread_local freeList() once per Reservation, outside the loop. + OwnedImplReservationSlicesOwnerMultiple() : free_list_(Slice::freeList()) {} + + ~OwnedImplReservationSlicesOwnerMultiple() override { + while (!owned_slices_.empty()) { + owned_slices_.back().freeStorage(free_list_); + owned_slices_.pop_back(); + } + } + absl::Span ownedSlices() override { return absl::MakeSpan(owned_slices_); } + + Slice::FreeListReference free_list_; + absl::InlinedVector owned_slices_; + }; + + struct OwnedImplReservationSlicesOwnerSingle : public OwnedImplReservationSlicesOwner { + absl::Span ownedSlices() override { return absl::MakeSpan(&owned_slice_, 1); } + + Slice owned_slice_; + }; }; using BufferFragmentPtr = std::unique_ptr; diff --git a/source/common/buffer/watermark_buffer.cc b/source/common/buffer/watermark_buffer.cc index 0503266085b7..f7d95e5183f4 100644 --- a/source/common/buffer/watermark_buffer.cc +++ b/source/common/buffer/watermark_buffer.cc @@ -31,8 +31,9 @@ void WatermarkBuffer::prepend(Instance& data) { checkHighAndOverflowWatermarks(); } -void WatermarkBuffer::commit(RawSlice* iovecs, uint64_t num_iovecs) { - OwnedImpl::commit(iovecs, num_iovecs); +void WatermarkBuffer::commit(uint64_t length, absl::Span slices, + ReservationSlicesOwnerPtr slices_owner) { + OwnedImpl::commit(length, slices, std::move(slices_owner)); checkHighAndOverflowWatermarks(); } @@ -57,10 +58,27 @@ SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() { return result; } -uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) { - uint64_t bytes_reserved = OwnedImpl::reserve(length, iovecs, num_iovecs); - checkHighAndOverflowWatermarks(); - return bytes_reserved; +// Adjust the reservation size based on space available before hitting +// the high watermark to avoid overshooting by a lot and thus violating the limits +// the watermark is imposing. +Reservation WatermarkBuffer::reserveForRead() { + constexpr auto preferred_length = default_read_reservation_size_; + uint64_t adjusted_length = preferred_length; + + if (high_watermark_ > 0 && preferred_length > 0) { + const uint64_t current_length = OwnedImpl::length(); + if (current_length >= high_watermark_) { + // Always allow a read of at least some data. The API doesn't allow returning + // a zero-length reservation. + adjusted_length = Slice::default_slice_size_; + } else { + const uint64_t available_length = high_watermark_ - current_length; + adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_); + adjusted_length = std::min(adjusted_length, preferred_length); + } + } + + return OwnedImpl::reserveWithMaxLength(adjusted_length); } void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) { diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index a84f20c21d5d..9150cdaf54f9 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -30,12 +30,11 @@ class WatermarkBuffer : public OwnedImpl { void add(const Instance& data) override; void prepend(absl::string_view data) override; void prepend(Instance& data) override; - void commit(RawSlice* iovecs, uint64_t num_iovecs) override; void drain(uint64_t size) override; void move(Instance& rhs) override; void move(Instance& rhs, uint64_t length) override; SliceDataPtr extractMutableFrontSlice() override; - uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override; + Reservation reserveForRead() override; void postProcess() override { checkLowWatermark(); } void appendSliceForTest(const void* data, uint64_t size) override; void appendSliceForTest(absl::string_view data) override; @@ -52,6 +51,9 @@ class WatermarkBuffer : public OwnedImpl { void checkLowWatermark(); private: + void commit(uint64_t length, absl::Span slices, + ReservationSlicesOwnerPtr slices_owner) override; + std::function below_low_watermark_; std::function above_high_watermark_; std::function above_overflow_watermark_; diff --git a/source/common/common/utility.h b/source/common/common/utility.h index c995fed2db38..84acc287b0f3 100644 --- a/source/common/common/utility.h +++ b/source/common/common/utility.h @@ -179,6 +179,24 @@ class DateUtil { static uint64_t nowToMilliseconds(TimeSource& time_source); }; +/** + * Utility routines for working with integers. + */ +class IntUtil { +public: + /** + * Round `val` up to the next multiple. Examples: + * roundUpToMultiple(3, 8) -> 8 + * roundUpToMultiple(9, 8) -> 16 + * roundUpToMultiple(8, 8) -> 8 + */ + static uint64_t roundUpToMultiple(uint64_t val, uint32_t multiple) { + ASSERT(multiple > 0); + ASSERT((val + multiple) >= val, "Unsigned overflow"); + return ((val + multiple - 1) / multiple) * multiple; + } +}; + /** * Utility routines for working with strings. */ @@ -619,8 +637,8 @@ template struct TrieLookupTable { } /** - * Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, key - * length)) + * Finds the entry associated with the longest prefix. Complexity is O(min(longest key prefix, + * key length)). * @param key the key used to find. * @return the value matching the longest prefix based on the key. */ diff --git a/source/common/grpc/common.cc b/source/common/grpc/common.cc index 8b7551d8bea2..549be3dbbc72 100644 --- a/source/common/grpc/common.cc +++ b/source/common/grpc/common.cc @@ -131,11 +131,9 @@ Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& messag Buffer::InstancePtr body(new Buffer::OwnedImpl()); const uint32_t size = message.ByteSize(); const uint32_t alloc_size = size + 5; - Buffer::RawSlice iovec; - body->reserve(alloc_size, &iovec, 1); - ASSERT(iovec.len_ >= alloc_size); - iovec.len_ = alloc_size; - uint8_t* current = reinterpret_cast(iovec.mem_); + auto reservation = body->reserveSingleSlice(alloc_size); + ASSERT(reservation.slice().len_ >= alloc_size); + uint8_t* current = reinterpret_cast(reservation.slice().mem_); *current++ = 0; // flags const uint32_t nsize = htonl(size); std::memcpy(current, reinterpret_cast(&nsize), sizeof(uint32_t)); @@ -143,22 +141,20 @@ Buffer::InstancePtr Common::serializeToGrpcFrame(const Protobuf::Message& messag Protobuf::io::ArrayOutputStream stream(current, size, -1); Protobuf::io::CodedOutputStream codec_stream(&stream); message.SerializeWithCachedSizes(&codec_stream); - body->commit(&iovec, 1); + reservation.commit(alloc_size); return body; } Buffer::InstancePtr Common::serializeMessage(const Protobuf::Message& message) { auto body = std::make_unique(); const uint32_t size = message.ByteSize(); - Buffer::RawSlice iovec; - body->reserve(size, &iovec, 1); - ASSERT(iovec.len_ >= size); - iovec.len_ = size; - uint8_t* current = reinterpret_cast(iovec.mem_); + auto reservation = body->reserveSingleSlice(size); + ASSERT(reservation.slice().len_ >= size); + uint8_t* current = reinterpret_cast(reservation.slice().mem_); Protobuf::io::ArrayOutputStream stream(current, size, -1); Protobuf::io::CodedOutputStream codec_stream(&stream); message.SerializeWithCachedSizes(&codec_stream); - body->commit(&iovec, 1); + reservation.commit(size); return body; } diff --git a/source/common/http/http2/metadata_encoder.cc b/source/common/http/http2/metadata_encoder.cc index 5e5a6970a872..ac1c06af4f36 100644 --- a/source/common/http/http2/metadata_encoder.cc +++ b/source/common/http/http2/metadata_encoder.cc @@ -62,18 +62,16 @@ bool MetadataEncoder::createHeaderBlockUsingNghttp2(const MetadataMap& metadata_ ENVOY_LOG(error, "Payload size {} exceeds the max bound.", buflen); return false; } - Buffer::RawSlice iovec; - payload_.reserve(buflen, &iovec, 1); - ASSERT(iovec.len_ >= buflen); + auto reservation = payload_.reserveSingleSlice(buflen); + ASSERT(reservation.slice().len_ >= buflen); // Creates payload using nghttp2. - uint8_t* buf = reinterpret_cast(iovec.mem_); + uint8_t* buf = reinterpret_cast(reservation.slice().mem_); const ssize_t result = nghttp2_hd_deflate_hd(deflater_.get(), buf, buflen, nva.begin(), nvlen); RELEASE_ASSERT(result > 0, fmt::format("Failed to deflate metadata payload, with result {}.", result)); - iovec.len_ = result; - payload_.commit(&iovec, 1); + reservation.commit(result); return true; } diff --git a/source/common/network/io_socket_handle_impl.cc b/source/common/network/io_socket_handle_impl.cc index 508ce6400aac..b71a2273c96f 100644 --- a/source/common/network/io_socket_handle_impl.cc +++ b/source/common/network/io_socket_handle_impl.cc @@ -108,21 +108,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::readv(uint64_t max_length, Buffer::R return result; } -Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, uint64_t max_length) { +Api::IoCallUint64Result IoSocketHandleImpl::read(Buffer::Instance& buffer, + absl::optional max_length_opt) { + const uint64_t max_length = max_length_opt.value_or(UINT64_MAX); if (max_length == 0) { return Api::ioCallUint64ResultNoError(); } - constexpr uint64_t MaxSlices = 2; - Buffer::RawSlice slices[MaxSlices]; - const uint64_t num_slices = buffer.reserve(max_length, slices, MaxSlices); - Api::IoCallUint64Result result = readv(max_length, slices, num_slices); + Buffer::Reservation reservation = buffer.reserveForRead(); + Api::IoCallUint64Result result = readv(std::min(reservation.length(), max_length), + reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.rc_ : 0; ASSERT(bytes_to_commit <= max_length); - for (uint64_t i = 0; i < num_slices; i++) { - slices[i].len_ = std::min(slices[i].len_, static_cast(bytes_to_commit)); - bytes_to_commit -= slices[i].len_; - } - buffer.commit(slices, num_slices); + reservation.commit(bytes_to_commit); // Emulated edge events need to registered if the socket operation did not complete // because the socket would block. diff --git a/source/common/network/io_socket_handle_impl.h b/source/common/network/io_socket_handle_impl.h index 41603976a736..7fbb2c39d274 100644 --- a/source/common/network/io_socket_handle_impl.h +++ b/source/common/network/io_socket_handle_impl.h @@ -33,7 +33,8 @@ class IoSocketHandleImpl : public IoHandle, protected Logger::Loggable max_length) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; diff --git a/source/common/network/raw_buffer_socket.cc b/source/common/network/raw_buffer_socket.cc index 65a79a1cb770..96f75a1c419b 100644 --- a/source/common/network/raw_buffer_socket.cc +++ b/source/common/network/raw_buffer_socket.cc @@ -18,8 +18,7 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) { uint64_t bytes_read = 0; bool end_stream = false; do { - // 16K read is arbitrary. TODO(mattklein123) PERF: Tune the read size. - Api::IoCallUint64Result result = callbacks_->ioHandle().read(buffer, 16384); + Api::IoCallUint64Result result = callbacks_->ioHandle().read(buffer, absl::nullopt); if (result.ok()) { ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), result.rc_); diff --git a/source/common/network/utility.cc b/source/common/network/utility.cc index 61685068e650..14de3cdc92be 100644 --- a/source/common/network/utility.cc +++ b/source/common/network/utility.cc @@ -101,21 +101,14 @@ Api::IoCallUint64Result receiveMessage(uint64_t max_packet_size, Buffer::Instanc IoHandle::RecvMsgOutput& output, IoHandle& handle, const Address::Instance& local_address) { - Buffer::RawSlice slice; - const uint64_t num_slices = buffer->reserve(max_packet_size, &slice, 1); - ASSERT(num_slices == 1u); + auto reservation = buffer->reserveSingleSlice(max_packet_size); + Buffer::RawSlice slice = reservation.slice(); + Api::IoCallUint64Result result = handle.recvmsg(&slice, 1, local_address.ip()->port(), output); - Api::IoCallUint64Result result = - handle.recvmsg(&slice, num_slices, local_address.ip()->port(), output); - - if (!result.ok()) { - return result; + if (result.ok()) { + reservation.commit(std::min(max_packet_size, result.rc_)); } - // Adjust memory length and commit slice to buffer - slice.len_ = std::min(slice.len_, static_cast(result.rc_)); - buffer->commit(&slice, 1); - return result; } @@ -619,16 +612,27 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, } if (handle.supportsMmsg()) { - const uint32_t num_packets_per_mmsg_call = 16u; - const uint32_t num_slices_per_packet = 1u; - absl::FixedArray buffers(num_packets_per_mmsg_call); + const auto max_packet_size = udp_packet_processor.maxPacketSize(); + + // Buffer::ReservationSingleSlice is always passed by value, and can only be constructed + // by Buffer::Instance::reserve(), so this is needed to keep a fixed array + // in which all elements are legally constructed. + struct BufferAndReservation { + BufferAndReservation(uint64_t max_packet_size) + : buffer_(std::make_unique()), + reservation_(buffer_->reserveSingleSlice(max_packet_size, true)) {} + + Buffer::InstancePtr buffer_; + Buffer::ReservationSingleSlice reservation_; + }; + constexpr uint32_t num_packets_per_mmsg_call = 16u; + constexpr uint32_t num_slices_per_packet = 1u; + absl::InlinedVector buffers; RawSliceArrays slices(num_packets_per_mmsg_call, absl::FixedArray(num_slices_per_packet)); - for (uint32_t i = 0; i < num_packets_per_mmsg_call; ++i) { - buffers[i] = std::make_unique(); - const uint64_t num_slices = buffers[i]->reserve(udp_packet_processor.maxPacketSize(), - slices[i].data(), num_slices_per_packet); - ASSERT(num_slices == num_slices_per_packet); + for (uint32_t i = 0; i < num_packets_per_mmsg_call; i++) { + buffers.push_back(max_packet_size); + slices[i][0] = buffers[i].reservation_.slice(); } IoHandle::RecvMsgOutput output(num_packets_per_mmsg_call, packets_dropped); @@ -650,11 +654,9 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle, ENVOY_LOG_MISC(debug, "Receive a packet with {} bytes from {}", msg_len, output.msg_[i].peer_address_->asString()); - // Adjust used memory length and commit slice to buffer - slice->len_ = std::min(slice->len_, static_cast(msg_len)); - buffers[i]->commit(slice, 1); + buffers[i].reservation_.commit(std::min(max_packet_size, msg_len)); - passPayloadToProcessor(msg_len, std::move(buffers[i]), output.msg_[i].peer_address_, + passPayloadToProcessor(msg_len, std::move(buffers[i].buffer_), output.msg_[i].peer_address_, output.msg_[i].local_address_, udp_packet_processor, receive_time); } return result; diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc index d296af0bd503..d57840cda4e4 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_client_stream.cc @@ -188,12 +188,7 @@ void EnvoyQuicClientStream::OnBodyAvailable() { int num_regions = GetReadableRegions(&iov, 1); ASSERT(num_regions > 0); size_t bytes_read = iov.iov_len; - Buffer::RawSlice slice; - buffer->reserve(bytes_read, &slice, 1); - ASSERT(slice.len_ >= bytes_read); - slice.len_ = bytes_read; - memcpy(slice.mem_, iov.iov_base, iov.iov_len); - buffer->commit(&slice, 1); + buffer->add(iov.iov_base, bytes_read); MarkConsumed(bytes_read); } ASSERT(buffer->length() == 0 || !end_stream_decoded_); diff --git a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc index 311966d60ce6..42dde7450303 100644 --- a/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc +++ b/source/extensions/quic_listeners/quiche/envoy_quic_server_stream.cc @@ -182,12 +182,7 @@ void EnvoyQuicServerStream::OnBodyAvailable() { int num_regions = GetReadableRegions(&iov, 1); ASSERT(num_regions > 0); size_t bytes_read = iov.iov_len; - Buffer::RawSlice slice; - buffer->reserve(bytes_read, &slice, 1); - ASSERT(slice.len_ >= bytes_read); - slice.len_ = bytes_read; - memcpy(slice.mem_, iov.iov_base, iov.iov_len); - buffer->commit(&slice, 1); + buffer->add(iov.iov_base, bytes_read); MarkConsumed(bytes_read); } diff --git a/source/extensions/quic_listeners/quiche/platform/quic_mem_slice_storage_impl.cc b/source/extensions/quic_listeners/quiche/platform/quic_mem_slice_storage_impl.cc index 1c2c63b39e56..5105ec6815ab 100644 --- a/source/extensions/quic_listeners/quiche/platform/quic_mem_slice_storage_impl.cc +++ b/source/extensions/quic_listeners/quiche/platform/quic_mem_slice_storage_impl.cc @@ -30,19 +30,14 @@ QuicMemSliceStorageImpl::QuicMemSliceStorageImpl(const iovec* iov, int iov_count size_t io_offset = 0; while (io_offset < write_len) { size_t slice_len = std::min(write_len - io_offset, max_slice_len); - Envoy::Buffer::RawSlice slice; - // Populate a temporary buffer instance and then move it to |buffer_|. This is necessary because - // consecutive reserve/commit can return addresses in same slice which violates the restriction - // of |max_slice_len| when ToSpan() is called. - Envoy::Buffer::OwnedImpl buffer; - uint16_t num_slice = buffer.reserve(slice_len, &slice, 1); - ASSERT(num_slice == 1); - QuicUtils::CopyToBuffer(iov, iov_count, io_offset, slice_len, static_cast(slice.mem_)); + + // Use a separate slice so that we do not violate the restriction of |max_slice_len| when + // ToSpan() is called. + auto reservation = buffer_.reserveSingleSlice(slice_len, true); + QuicUtils::CopyToBuffer(iov, iov_count, io_offset, slice_len, + static_cast(reservation.slice().mem_)); io_offset += slice_len; - // OwnedImpl may return a slice longer than needed, trim it to requested length. - slice.len_ = slice_len; - buffer.commit(&slice, num_slice); - buffer_.move(buffer); + reservation.commit(slice_len); } } diff --git a/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h b/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h index fc273df6038a..a2f66d52e7a4 100644 --- a/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h +++ b/source/extensions/quic_listeners/quiche/quic_io_handle_wrapper.h @@ -29,7 +29,8 @@ class QuicIoHandleWrapper : public Network::IoHandle { } return io_handle_.readv(max_length, slices, num_slice); } - Api::IoCallUint64Result read(Buffer::Instance& buffer, uint64_t max_length) override { + Api::IoCallUint64Result read(Buffer::Instance& buffer, + absl::optional max_length) override { if (closed_) { return Api::IoCallUint64Result(0, Api::IoErrorPtr(new Network::IoSocketError(EBADF), Network::IoSocketError::deleteIoError)); diff --git a/source/extensions/stat_sinks/common/statsd/statsd.cc b/source/extensions/stat_sinks/common/statsd/statsd.cc index e1be68c6fd33..a1ad60cf1607 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.cc +++ b/source/extensions/stat_sinks/common/statsd/statsd.cc @@ -188,12 +188,12 @@ TcpStatsdSink::TlsSink::~TlsSink() { void TcpStatsdSink::TlsSink::beginFlush(bool expect_empty_buffer) { ASSERT(!expect_empty_buffer || buffer_.length() == 0); ASSERT(current_slice_mem_ == nullptr); + ASSERT(!current_buffer_reservation_.has_value()); - uint64_t num_iovecs = buffer_.reserve(FLUSH_SLICE_SIZE_BYTES, ¤t_buffer_slice_, 1); - ASSERT(num_iovecs == 1); + current_buffer_reservation_.emplace(buffer_.reserveSingleSlice(FLUSH_SLICE_SIZE_BYTES)); - ASSERT(current_buffer_slice_.len_ >= FLUSH_SLICE_SIZE_BYTES); - current_slice_mem_ = reinterpret_cast(current_buffer_slice_.mem_); + ASSERT(current_buffer_reservation_->slice().len_ >= FLUSH_SLICE_SIZE_BYTES); + current_slice_mem_ = reinterpret_cast(current_buffer_reservation_->slice().mem_); } void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value, char stat_type) { @@ -201,7 +201,7 @@ void TcpStatsdSink::TlsSink::commonFlush(const std::string& name, uint64_t value // 36 > 1 ("." after prefix) + 1 (":" after name) + 4 (postfix chars, e.g., "|ms\n") + 30 for // number (bigger than it will ever be) const uint32_t max_size = name.size() + parent_.getPrefix().size() + 36; - if (current_buffer_slice_.len_ - usedBuffer() < max_size) { + if (current_buffer_reservation_->slice().len_ - usedBuffer() < max_size) { endFlush(false); beginFlush(false); } @@ -234,8 +234,9 @@ void TcpStatsdSink::TlsSink::flushGauge(const std::string& name, uint64_t value) void TcpStatsdSink::TlsSink::endFlush(bool do_write) { ASSERT(current_slice_mem_ != nullptr); - current_buffer_slice_.len_ = usedBuffer(); - buffer_.commit(¤t_buffer_slice_, 1); + ASSERT(current_buffer_reservation_.has_value()); + current_buffer_reservation_->commit(usedBuffer()); + current_buffer_reservation_.reset(); current_slice_mem_ = nullptr; if (do_write) { write(buffer_); @@ -309,7 +310,8 @@ void TcpStatsdSink::TlsSink::write(Buffer::Instance& buffer) { uint64_t TcpStatsdSink::TlsSink::usedBuffer() const { ASSERT(current_slice_mem_ != nullptr); - return current_slice_mem_ - reinterpret_cast(current_buffer_slice_.mem_); + ASSERT(current_buffer_reservation_.has_value()); + return current_slice_mem_ - reinterpret_cast(current_buffer_reservation_->slice().mem_); } } // namespace Statsd diff --git a/source/extensions/stat_sinks/common/statsd/statsd.h b/source/extensions/stat_sinks/common/statsd/statsd.h index b7eb8bfac627..6abc53bc0661 100644 --- a/source/extensions/stat_sinks/common/statsd/statsd.h +++ b/source/extensions/stat_sinks/common/statsd/statsd.h @@ -135,7 +135,7 @@ class TcpStatsdSink : public Stats::Sink { Event::Dispatcher& dispatcher_; Network::ClientConnectionPtr connection_; Buffer::OwnedImpl buffer_; - Buffer::RawSlice current_buffer_slice_; + absl::optional current_buffer_reservation_; char* current_slice_mem_{}; }; diff --git a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc index d82ae4c45453..952d24e4284f 100644 --- a/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc +++ b/source/extensions/tracers/lightstep/lightstep_tracer_impl.cc @@ -24,12 +24,10 @@ namespace Lightstep { static void serializeGrpcMessage(const lightstep::BufferChain& buffer_chain, Buffer::Instance& body) { auto size = buffer_chain.num_bytes(); - Buffer::RawSlice iovec; - body.reserve(size, &iovec, 1); - ASSERT(iovec.len_ >= size); - iovec.len_ = size; - buffer_chain.CopyOut(static_cast(iovec.mem_), size); - body.commit(&iovec, 1); + auto reservation = body.reserveSingleSlice(size); + ASSERT(reservation.slice().len_ >= size); + buffer_chain.CopyOut(static_cast(reservation.slice().mem_), size); + reservation.commit(size); Grpc::Common::prependGrpcFrameHeader(body); } diff --git a/source/extensions/transport_sockets/tls/ssl_socket.cc b/source/extensions/transport_sockets/tls/ssl_socket.cc index 7c005f47c700..dbcb87543c7c 100644 --- a/source/extensions/transport_sockets/tls/ssl_socket.cc +++ b/source/extensions/transport_sockets/tls/ssl_socket.cc @@ -94,16 +94,13 @@ SslSocket::ReadResult SslSocket::sslReadIntoSlice(Buffer::RawSlice& slice) { ASSERT(static_cast(rc) <= remaining); mem += rc; remaining -= rc; - result.commit_slice_ = true; + result.bytes_read_ += rc; } else { result.error_ = absl::make_optional(rc); break; } } - if (result.commit_slice_) { - slice.len_ -= remaining; - } return result; } @@ -123,17 +120,11 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) { PostIoAction action = PostIoAction::KeepOpen; uint64_t bytes_read = 0; while (keep_reading) { - // We use 2 slices here so that we can use the remainder of an existing buffer chain element - // if there is extra space. 16K read is arbitrary and can be tuned later. - Buffer::RawSlice slices[2]; - uint64_t slices_to_commit = 0; - uint64_t num_slices = read_buffer.reserve(16384, slices, 2); - for (uint64_t i = 0; i < num_slices; i++) { - auto result = sslReadIntoSlice(slices[i]); - if (result.commit_slice_) { - slices_to_commit++; - bytes_read += slices[i].len_; - } + uint64_t bytes_read_this_iteration = 0; + Buffer::Reservation reservation = read_buffer.reserveForRead(); + for (uint64_t i = 0; i < reservation.numSlices(); i++) { + auto result = sslReadIntoSlice(reservation.slices()[i]); + bytes_read_this_iteration += result.bytes_read_; if (result.error_.has_value()) { keep_reading = false; int err = SSL_get_error(rawSsl(), result.error_.value()); @@ -165,13 +156,13 @@ Network::IoResult SslSocket::doRead(Buffer::Instance& read_buffer) { } } - if (slices_to_commit > 0) { - read_buffer.commit(slices, slices_to_commit); - if (callbacks_->shouldDrainReadBuffer()) { - callbacks_->setTransportSocketIsReadable(); - keep_reading = false; - } + reservation.commit(bytes_read_this_iteration); + if (bytes_read_this_iteration > 0 && callbacks_->shouldDrainReadBuffer()) { + callbacks_->setTransportSocketIsReadable(); + keep_reading = false; } + + bytes_read += bytes_read_this_iteration; } ENVOY_CONN_LOG(trace, "ssl read {} bytes", callbacks_->connection(), bytes_read); diff --git a/source/extensions/transport_sockets/tls/ssl_socket.h b/source/extensions/transport_sockets/tls/ssl_socket.h index 47b13970026f..cb15d82fa935 100644 --- a/source/extensions/transport_sockets/tls/ssl_socket.h +++ b/source/extensions/transport_sockets/tls/ssl_socket.h @@ -76,7 +76,7 @@ class SslSocket : public Network::TransportSocket, private: struct ReadResult { - bool commit_slice_{}; + uint64_t bytes_read_{0}; absl::optional error_; }; ReadResult sslReadIntoSlice(Buffer::RawSlice& slice); diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 8fa5a2098dcc..f5db5b6e75ab 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -105,11 +105,6 @@ class StringBuffer : public Buffer::Instance { src.size_ = 0; } - void commit(Buffer::RawSlice* iovecs, uint64_t num_iovecs) override { - FUZZ_ASSERT(num_iovecs == 1); - size_ += iovecs[0].len_; - } - void copyOut(size_t start, uint64_t size, void* data) const override { ::memcpy(data, this->start() + start, size); } @@ -146,12 +141,34 @@ class StringBuffer : public Buffer::Instance { src.size_ -= length; } - uint64_t reserve(uint64_t length, Buffer::RawSlice* iovecs, uint64_t num_iovecs) override { - FUZZ_ASSERT(num_iovecs > 0); + Buffer::Reservation reserveForRead() override { + auto reservation = Buffer::Reservation::bufferImplUseOnlyConstruct(*this); + Buffer::RawSlice slice; + slice.mem_ = mutableEnd(); + slice.len_ = data_.size() - (start_ + size_); + reservation.bufferImplUseOnlySlices().push_back(slice); + reservation.bufferImplUseOnlySetLength(slice.len_); + + return reservation; + } + + Buffer::ReservationSingleSlice reserveSingleSlice(uint64_t length, bool separate_slice) override { + ASSERT(!separate_slice); + FUZZ_ASSERT(start_ + size_ + length <= data_.size()); + + auto reservation = Buffer::ReservationSingleSlice::bufferImplUseOnlyConstruct(*this); + Buffer::RawSlice slice; + slice.mem_ = mutableEnd(); + slice.len_ = length; + reservation.bufferImplUseOnlySlice() = slice; + + return reservation; + } + + void commit(uint64_t length, absl::Span, + Buffer::ReservationSlicesOwnerPtr) override { + size_ += length; FUZZ_ASSERT(start_ + size_ + length <= data_.size()); - iovecs[0].mem_ = mutableEnd(); - iovecs[0].len_ = length; - return 1; } ssize_t search(const void* data, uint64_t size, size_t start, size_t length) const override { @@ -257,31 +274,20 @@ uint32_t bufferAction(Context& ctxt, char insert_value, uint32_t max_alloc, Buff if (reserve_length == 0) { break; } - constexpr uint32_t reserve_slices = 16; - Buffer::RawSlice slices[reserve_slices]; - const uint32_t allocated_slices = target_buffer.reserve(reserve_length, slices, reserve_slices); - uint32_t allocated_length = 0; - for (uint32_t i = 0; i < allocated_slices; ++i) { - ::memset(slices[i].mem_, insert_value, slices[i].len_); - allocated_length += slices[i].len_; - } - FUZZ_ASSERT(reserve_length <= allocated_length); - const uint32_t target_length = - std::min(reserve_length, action.reserve_commit().commit_length()); - uint32_t shrink_length = allocated_length; - int32_t shrink_slice = allocated_slices - 1; - while (shrink_length > target_length) { - FUZZ_ASSERT(shrink_slice >= 0); - const uint32_t available = slices[shrink_slice].len_; - const uint32_t remainder = shrink_length - target_length; - if (available >= remainder) { - slices[shrink_slice].len_ -= remainder; - break; + if (reserve_length < 16384) { + auto reservation = target_buffer.reserveSingleSlice(reserve_length); + ::memset(reservation.slice().mem_, insert_value, reservation.slice().len_); + reservation.commit( + std::min(action.reserve_commit().commit_length(), reservation.length())); + } else { + Buffer::Reservation reservation = target_buffer.reserveForRead(); + for (uint32_t i = 0; i < reservation.numSlices(); ++i) { + ::memset(reservation.slices()[i].mem_, insert_value, reservation.slices()[i].len_); } - shrink_length -= available; - slices[shrink_slice--].len_ = 0; + const uint32_t target_length = + std::min(reservation.length(), action.reserve_commit().commit_length()); + reservation.commit(target_length); } - target_buffer.commit(slices, allocated_slices); break; } case test::common::buffer::Action::kCopyOut: { diff --git a/test/common/buffer/buffer_speed_test.cc b/test/common/buffer/buffer_speed_test.cc index 49240c69f356..653a5bea770a 100644 --- a/test/common/buffer/buffer_speed_test.cc +++ b/test/common/buffer/buffer_speed_test.cc @@ -16,6 +16,7 @@ void deleteFragment(const void*, size_t, const Buffer::BufferFragmentImpl* self) static void bufferCreateEmpty(benchmark::State& state) { uint64_t length = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); Buffer::OwnedImpl buffer; length += buffer.length(); } @@ -29,6 +30,7 @@ static void bufferCreate(benchmark::State& state) { const absl::string_view input(data); uint64_t length = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); Buffer::OwnedImpl buffer(input); length += buffer.length(); } @@ -42,6 +44,7 @@ static void bufferAddSmallIncrement(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.add(input); if (buffer.length() >= MaxBufferLength) { // Keep the test's memory usage from growing too large. @@ -62,6 +65,7 @@ static void bufferAddString(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.add(data); if (buffer.length() >= MaxBufferLength) { buffer.drain(buffer.length()); @@ -79,6 +83,7 @@ static void bufferAddBuffer(benchmark::State& state) { const Buffer::OwnedImpl to_add(data); Buffer::OwnedImpl buffer(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.add(to_add); if (buffer.length() >= MaxBufferLength) { buffer.drain(buffer.length()); @@ -94,6 +99,7 @@ static void bufferPrependString(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.prepend(data); if (buffer.length() >= MaxBufferLength) { buffer.drain(buffer.length()); @@ -109,6 +115,7 @@ static void bufferPrependBuffer(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); // The prepend method removes the content from its source buffer. To populate a new source // buffer every time without the overhead of a copy, we use an BufferFragment that references // (and never deletes) an external string. @@ -144,6 +151,7 @@ static void bufferDrain(benchmark::State& state) { size_t drain_cycle = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.add(to_add); buffer.drain(drain_size[drain_cycle]); drain_cycle++; @@ -159,6 +167,7 @@ static void bufferDrainSmallIncrement(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.drain(state.range(0)); if (buffer.length() == 0) { buffer.add(input); @@ -175,6 +184,7 @@ static void bufferMove(benchmark::State& state) { Buffer::OwnedImpl buffer1(input); Buffer::OwnedImpl buffer2(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer1.move(buffer2); // now buffer1 has 2 copies of the input, and buffer2 is empty. buffer2.move(buffer1, input.size()); // now buffer1 and buffer2 are the same size. } @@ -192,6 +202,7 @@ static void bufferMovePartial(benchmark::State& state) { Buffer::OwnedImpl buffer1(input); Buffer::OwnedImpl buffer2(input); for (auto _ : state) { + UNREFERENCED_PARAMETER(_); while (buffer2.length() != 0) { buffer1.move(buffer2, 1); } @@ -207,42 +218,45 @@ BENCHMARK(bufferMovePartial)->Arg(1)->Arg(4096)->Arg(16384)->Arg(65536); static void bufferReserveCommit(benchmark::State& state) { Buffer::OwnedImpl buffer; for (auto _ : state) { - constexpr uint64_t NumSlices = 2; - Buffer::RawSlice slices[NumSlices]; - uint64_t slices_used = buffer.reserve(state.range(0), slices, NumSlices); - uint64_t bytes_to_commit = 0; - for (uint64_t i = 0; i < slices_used; i++) { - bytes_to_commit += static_cast(slices[i].len_); - } - buffer.commit(slices, slices_used); + UNREFERENCED_PARAMETER(_); + auto size = state.range(0); + Buffer::Reservation reservation = buffer.reserveForReadWithLengthForTest(size); + reservation.commit(reservation.length()); if (buffer.length() >= MaxBufferLength) { buffer.drain(buffer.length()); } } benchmark::DoNotOptimize(buffer.length()); } -BENCHMARK(bufferReserveCommit)->Arg(1)->Arg(4096)->Arg(16384)->Arg(65536); +BENCHMARK(bufferReserveCommit) + ->Arg(1) + ->Arg(4 * 1024) + ->Arg(16 * 1024) + ->Arg(64 * 1024) + ->Arg(128 * 1024); // Test the reserve+commit cycle, for the common case where the reserved space is // only partially used (and therefore the commit size is smaller than the reservation size). static void bufferReserveCommitPartial(benchmark::State& state) { Buffer::OwnedImpl buffer; for (auto _ : state) { - constexpr uint64_t NumSlices = 2; - Buffer::RawSlice slices[NumSlices]; - uint64_t slices_used = buffer.reserve(state.range(0), slices, NumSlices); - ASSERT(slices_used > 0); + UNREFERENCED_PARAMETER(_); + auto size = state.range(0); + Buffer::Reservation reservation = buffer.reserveForReadWithLengthForTest(size); // Commit one byte from the first slice and nothing from any subsequent slice. - uint64_t bytes_to_commit = 1; - slices[0].len_ = bytes_to_commit; - buffer.commit(slices, 1); + reservation.commit(1); if (buffer.length() >= MaxBufferLength) { buffer.drain(buffer.length()); } } benchmark::DoNotOptimize(buffer.length()); } -BENCHMARK(bufferReserveCommitPartial)->Arg(1)->Arg(4096)->Arg(16384)->Arg(65536); +BENCHMARK(bufferReserveCommitPartial) + ->Arg(1) + ->Arg(4 * 1024) + ->Arg(16 * 1024) + ->Arg(64 * 1024) + ->Arg(128 * 1024); // Test the linearization of a buffer in the best case where the data is in one slice. static void bufferLinearizeSimple(benchmark::State& state) { @@ -250,6 +264,7 @@ static void bufferLinearizeSimple(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.drain(buffer.length()); auto fragment = std::make_unique(input.data(), input.size(), deleteFragment); @@ -267,6 +282,7 @@ static void bufferLinearizeGeneral(benchmark::State& state) { const absl::string_view input(data); Buffer::OwnedImpl buffer; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); buffer.drain(buffer.length()); do { auto fragment = @@ -291,6 +307,7 @@ static void bufferSearch(benchmark::State& state) { Buffer::OwnedImpl buffer(input); ssize_t result = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); result += buffer.search(Pattern.c_str(), Pattern.length(), 0, 0); } benchmark::DoNotOptimize(result); @@ -314,6 +331,7 @@ static void bufferSearchPartialMatch(benchmark::State& state) { Buffer::OwnedImpl buffer(input); ssize_t result = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); result += buffer.search(Pattern.c_str(), Pattern.length(), 0, 0); } benchmark::DoNotOptimize(result); @@ -333,6 +351,7 @@ static void bufferStartsWith(benchmark::State& state) { Buffer::OwnedImpl buffer(input); ssize_t result = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); if (!buffer.startsWith({Pattern.c_str(), Pattern.length()})) { result++; } @@ -356,6 +375,7 @@ static void bufferStartsWithMatch(benchmark::State& state) { Buffer::OwnedImpl buffer(input); ssize_t result = 0; for (auto _ : state) { + UNREFERENCED_PARAMETER(_); if (buffer.startsWith({Prefix.c_str(), Prefix.length()})) { result++; } diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index c23fca2602f9..52930e8adabc 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -24,17 +24,6 @@ class OwnedImplTest : public testing::Test { bool release_callback_called_ = false; protected: - static void clearReservation(Buffer::RawSlice* iovecs, uint64_t num_iovecs, OwnedImpl& buffer) { - for (uint64_t i = 0; i < num_iovecs; i++) { - iovecs[i].len_ = 0; - } - buffer.commit(iovecs, num_iovecs); - } - - static void commitReservation(Buffer::RawSlice* iovecs, uint64_t num_iovecs, OwnedImpl& buffer) { - buffer.commit(iovecs, num_iovecs); - } - static void expectSlices(std::vector> buffer_list, OwnedImpl& buffer) { const auto& buffer_slices = buffer.describeSlicesForTest(); ASSERT_EQ(buffer_list.size(), buffer_slices.size()); @@ -794,79 +783,88 @@ TEST_F(OwnedImplTest, ReserveCommit) { { Buffer::OwnedImpl buffer; - // A zero-byte reservation should fail. - static constexpr uint64_t NumIovecs = 16; - Buffer::RawSlice iovecs[NumIovecs]; - uint64_t num_reserved = buffer.reserve(0, iovecs, NumIovecs); - EXPECT_EQ(0, num_reserved); - clearReservation(iovecs, num_reserved, buffer); + // A zero-byte reservation should return an empty reservation. + { + auto reservation = buffer.reserveSingleSlice(0); + EXPECT_EQ(0, reservation.slice().len_); + EXPECT_EQ(0, reservation.length()); + } EXPECT_EQ(0, buffer.length()); // Test and commit a small reservation. This should succeed. - num_reserved = buffer.reserve(1, iovecs, NumIovecs); - EXPECT_EQ(1, num_reserved); - // The implementation might provide a bigger reservation than requested. - EXPECT_LE(1, iovecs[0].len_); - iovecs[0].len_ = 1; - commitReservation(iovecs, num_reserved, buffer); + { + auto reservation = buffer.reserveForRead(); + reservation.commit(1); + } EXPECT_EQ(1, buffer.length()); // Request a reservation that fits in the remaining space at the end of the last slice. - num_reserved = buffer.reserve(1, iovecs, NumIovecs); - EXPECT_EQ(1, num_reserved); - EXPECT_LE(1, iovecs[0].len_); - iovecs[0].len_ = 1; - const void* slice1 = iovecs[0].mem_; - clearReservation(iovecs, num_reserved, buffer); + const void* slice1; + { + auto reservation = buffer.reserveSingleSlice(1); + EXPECT_EQ(1, reservation.slice().len_); + slice1 = reservation.slice().mem_; + } // Request a reservation that is too large to fit in the remaining space at the end of // the last slice, and allow the buffer to use only one slice. This should result in the // creation of a new slice within the buffer. - num_reserved = buffer.reserve(4096, iovecs, 1); - EXPECT_EQ(1, num_reserved); - EXPECT_NE(slice1, iovecs[0].mem_); - clearReservation(iovecs, num_reserved, buffer); + { + auto reservation = buffer.reserveSingleSlice(16384); + EXPECT_EQ(16384, reservation.slice().len_); + EXPECT_NE(slice1, reservation.slice().mem_); + } // Request the same size reservation, but allow the buffer to use multiple slices. This // should result in the buffer creating a second slice and splitting the reservation between the // last two slices. - num_reserved = buffer.reserve(4096, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(slice1, iovecs[0].mem_); - clearReservation(iovecs, num_reserved, buffer); - - // Request a reservation that too big to fit in the existing slices. This should result - // in the creation of a third slice. - expectSlices({{1, 4095, 4096}}, buffer); - buffer.reserve(4096, iovecs, NumIovecs); - expectSlices({{1, 4095, 4096}, {0, 4096, 4096}}, buffer); - const void* slice2 = iovecs[1].mem_; - num_reserved = buffer.reserve(8192, iovecs, NumIovecs); - expectSlices({{1, 4095, 4096}, {0, 4096, 4096}, {0, 4096, 4096}}, buffer); - EXPECT_EQ(3, num_reserved); - EXPECT_EQ(slice1, iovecs[0].mem_); - EXPECT_EQ(slice2, iovecs[1].mem_); - clearReservation(iovecs, num_reserved, buffer); + { + expectSlices({{1, 16383, 16384}}, buffer); + auto reservation = buffer.reserveForRead(); + EXPECT_GE(reservation.numSlices(), 2); + EXPECT_GE(reservation.length(), 32767); + EXPECT_EQ(slice1, reservation.slices()[0].mem_); + EXPECT_EQ(16383, reservation.slices()[0].len_); + EXPECT_EQ(16384, reservation.slices()[1].len_); + } // Append a fragment to the buffer, and then request a small reservation. The buffer // should make a new slice to satisfy the reservation; it cannot safely use any of // the previously seen slices, because they are no longer at the end of the buffer. - expectSlices({{1, 4095, 4096}}, buffer); - buffer.addBufferFragment(fragment); - EXPECT_EQ(13, buffer.length()); - num_reserved = buffer.reserve(1, iovecs, NumIovecs); - expectSlices({{1, 4095, 4096}, {12, 0, 12}, {0, 4096, 4096}}, buffer); - EXPECT_EQ(1, num_reserved); - EXPECT_NE(slice1, iovecs[0].mem_); - commitReservation(iovecs, num_reserved, buffer); + { + expectSlices({{1, 16383, 16384}}, buffer); + buffer.addBufferFragment(fragment); + EXPECT_EQ(13, buffer.length()); + auto reservation = buffer.reserveForRead(); + EXPECT_NE(slice1, reservation.slices()[0].mem_); + reservation.commit(1); + expectSlices({{1, 16383, 16384}, {12, 0, 12}, {1, 16383, 16384}}, buffer); + } EXPECT_EQ(14, buffer.length()); } + + { + Buffer::OwnedImpl buffer; + uint64_t default_reservation_length; + uint64_t default_slice_length; + { + auto reservation = buffer.reserveForRead(); + default_reservation_length = reservation.length(); + default_slice_length = reservation.slices()[0].len_; + reservation.commit(default_slice_length / 2); + } + { + // Test that the Reservation size is capped at the available space in the Reservation + // inline storage, including using the end of a previous slice, no matter how big the request + // is. + auto reservation = buffer.reserveForReadWithLengthForTest(UINT64_MAX); + EXPECT_EQ(reservation.length(), default_reservation_length - (default_slice_length / 2)); + } + } } TEST_F(OwnedImplTest, ReserveCommitReuse) { Buffer::OwnedImpl buffer; - static constexpr uint64_t NumIovecs = 2; - Buffer::RawSlice iovecs[NumIovecs]; // Reserve 8KB and commit all but a few bytes of it, to ensure that // the last slice of the buffer can hold part but not all of the @@ -874,108 +872,121 @@ TEST_F(OwnedImplTest, ReserveCommitReuse) { // allocate more than the requested 8KB. In case the implementation // uses a power-of-two allocator, the subsequent reservations all // request 16KB. - uint64_t num_reserved = buffer.reserve(8200, iovecs, NumIovecs); - EXPECT_EQ(1, num_reserved); - iovecs[0].len_ = 8000; - buffer.commit(iovecs, 1); + { + auto reservation = buffer.reserveSingleSlice(8200); + reservation.commit(8000); + } EXPECT_EQ(8000, buffer.length()); - // Reserve 16KB. The resulting reservation should span 2 slices. + // Reserve some space. The resulting reservation should span 2 slices. // Commit part of the first slice and none of the second slice. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - const void* first_slice = iovecs[0].mem_; - iovecs[0].len_ = 1; - expectSlices({{8000, 4288, 12288}, {0, 12288, 12288}}, buffer); - buffer.commit(iovecs, 1); + const void* first_slice; + { + expectSlices({{8000, 4288, 12288}}, buffer); + auto reservation = buffer.reserveForRead(); + + // No additional slices are added to the buffer until `commit()` is called + // on the reservation. + expectSlices({{8000, 4288, 12288}}, buffer); + first_slice = reservation.slices()[0].mem_; + + EXPECT_GE(reservation.numSlices(), 2); + reservation.commit(1); + } EXPECT_EQ(8001, buffer.length()); - EXPECT_EQ(first_slice, iovecs[0].mem_); // The second slice is now released because there's nothing in the second slice. expectSlices({{8001, 4287, 12288}}, buffer); - // Reserve 16KB again. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - expectSlices({{8001, 4287, 12288}, {0, 12288, 12288}}, buffer); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(static_cast(first_slice) + 1, - static_cast(iovecs[0].mem_)); + // Reserve again. + { + auto reservation = buffer.reserveForRead(); + EXPECT_GE(reservation.numSlices(), 2); + EXPECT_EQ(static_cast(first_slice) + 1, + static_cast(reservation.slices()[0].mem_)); + } + expectSlices({{8001, 4287, 12288}}, buffer); } -TEST_F(OwnedImplTest, ReserveReuse) { +// Test behavior when the size to commit() is larger than the reservation. +TEST_F(OwnedImplTest, ReserveOverCommit) { Buffer::OwnedImpl buffer; - static constexpr uint64_t NumIovecs = 2; - Buffer::RawSlice iovecs[NumIovecs]; - - // Reserve some space and leave it uncommitted. - uint64_t num_reserved = buffer.reserve(8200, iovecs, NumIovecs); - EXPECT_EQ(1, num_reserved); - const void* first_slice = iovecs[0].mem_; - - // Reserve more space and verify that it begins with the same slice from the last reservation. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(first_slice, iovecs[0].mem_); - const void* second_slice = iovecs[1].mem_; - - // Repeat the last reservation and verify that it yields the same slices. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(second_slice, iovecs[1].mem_); - expectSlices({{0, 12288, 12288}, {0, 4096, 4096}}, buffer); - - // Request a larger reservation, verify that the second entry is replaced with a block with a - // larger size. - num_reserved = buffer.reserve(30000, iovecs, NumIovecs); - const void* third_slice = iovecs[1].mem_; - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(12288, iovecs[0].len_); - EXPECT_NE(second_slice, iovecs[1].mem_); - EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); - expectSlices({{0, 12288, 12288}, {0, 4096, 4096}, {0, 20480, 20480}}, buffer); - - // Repeating a the reservation request for a smaller block returns the previous entry. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(second_slice, iovecs[1].mem_); - expectSlices({{0, 12288, 12288}, {0, 4096, 4096}, {0, 20480, 20480}}, buffer); - - // Repeat the larger reservation notice that it doesn't match the prior reservation for 30000 - // bytes. - num_reserved = buffer.reserve(30000, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - EXPECT_EQ(first_slice, iovecs[0].mem_); - EXPECT_EQ(12288, iovecs[0].len_); - EXPECT_NE(second_slice, iovecs[1].mem_); - EXPECT_NE(third_slice, iovecs[1].mem_); - EXPECT_EQ(30000 - iovecs[0].len_, iovecs[1].len_); - expectSlices({{0, 12288, 12288}, {0, 4096, 4096}, {0, 20480, 20480}, {0, 20480, 20480}}, buffer); - - // Commit the most recent reservation and verify the representation. - buffer.commit(iovecs, num_reserved); - expectSlices({{12288, 0, 12288}, {0, 4096, 4096}, {0, 20480, 20480}, {17712, 2768, 20480}}, - buffer); + auto reservation = buffer.reserveForRead(); + const auto reservation_length = reservation.length(); + const auto excess_length = reservation_length + 1; +#ifdef NDEBUG + reservation.commit(excess_length); + + // The length should be the Reservation length, not the value passed to commit. + EXPECT_EQ(reservation_length, buffer.length()); +#else + EXPECT_DEATH( + reservation.commit(excess_length), + "length <= length_. Details: commit\\(\\) length must be <= size of the Reservation"); +#endif +} - // Do another reservation. - num_reserved = buffer.reserve(16384, iovecs, NumIovecs); - EXPECT_EQ(2, num_reserved); - expectSlices({{12288, 0, 12288}, - {0, 4096, 4096}, - {0, 20480, 20480}, - {17712, 2768, 20480}, - {0, 16384, 16384}}, - buffer); +// Test behavior when the size to commit() is larger than the reservation. +TEST_F(OwnedImplTest, ReserveSingleOverCommit) { + Buffer::OwnedImpl buffer; + auto reservation = buffer.reserveSingleSlice(10); + const auto reservation_length = reservation.length(); + const auto excess_length = reservation_length + 1; +#ifdef NDEBUG + reservation.commit(excess_length); + + // The length should be the Reservation length, not the value passed to commit. + EXPECT_EQ(reservation_length, buffer.length()); +#else + EXPECT_DEATH( + reservation.commit(excess_length), + "length <= slice_.len_. Details: commit\\(\\) length must be <= size of the Reservation"); +#endif +} - // And commit. - buffer.commit(iovecs, num_reserved); - expectSlices({{12288, 0, 12288}, - {0, 4096, 4096}, - {0, 20480, 20480}, - {20480, 0, 20480}, - {13616, 2768, 16384}}, - buffer); +// Test functionality of the `freelist` (a performance optimization) +TEST_F(OwnedImplTest, SliceFreeList) { + Buffer::OwnedImpl b1, b2; + std::vector slices; + { + auto r = b1.reserveForRead(); + for (auto& slice : absl::MakeSpan(r.slices(), r.numSlices())) { + slices.push_back(slice.mem_); + } + r.commit(1); + EXPECT_EQ(slices[0], b1.getRawSlices()[0].mem_); + } + + { + auto r = b2.reserveForRead(); + EXPECT_EQ(slices[1], r.slices()[0].mem_); + r.commit(1); + EXPECT_EQ(slices[1], b2.getRawSlices()[0].mem_); + } + + b1.drain(1); + EXPECT_EQ(0, b1.getRawSlices().size()); + { + auto r = b2.reserveForRead(); + // slices()[0] is the partially used slice that is already part of this buffer. + EXPECT_EQ(slices[2], r.slices()[1].mem_); + } + { + auto r = b1.reserveForRead(); + EXPECT_EQ(slices[2], r.slices()[0].mem_); + } + { + // This causes an underflow in the `freelist` on creation, and overflows it on deletion. + auto r1 = b1.reserveForRead(); + auto r2 = b2.reserveForRead(); + for (auto& r1_slice : absl::MakeSpan(r1.slices(), r1.numSlices())) { + // r1 reservation does not contain the slice that is a part of b2. + EXPECT_NE(r1_slice.mem_, b2.getRawSlices()[0].mem_); + for (auto& r2_slice : absl::MakeSpan(r2.slices(), r2.numSlices())) { + // The two reservations do not share any slices. + EXPECT_NE(r1_slice.mem_, r2_slice.mem_); + } + } + } } TEST_F(OwnedImplTest, Search) { @@ -1139,13 +1150,9 @@ TEST_F(OwnedImplTest, ReserveZeroCommit) { buf.addBufferFragment(frag); buf.prepend("bbbbb"); buf.add(""); - constexpr uint32_t reserve_slices = 16; - Buffer::RawSlice slices[reserve_slices]; - const uint32_t allocated_slices = buf.reserve(1280, slices, reserve_slices); - for (uint32_t i = 0; i < allocated_slices; ++i) { - slices[i].len_ = 0; - } - buf.commit(slices, allocated_slices); + expectSlices({{5, 0, 4096}, {0, 0, 0}}, buf); + { auto reservation = buf.reserveSingleSlice(1280); } + expectSlices({{5, 0, 4096}}, buf); os_fd_t pipe_fds[2] = {0, 0}; auto& os_sys_calls = Api::OsSysCallsSingleton::get(); #ifdef WIN32 @@ -1166,7 +1173,7 @@ TEST_F(OwnedImplTest, ReserveZeroCommit) { ASSERT_EQ(os_sys_calls.close(pipe_fds[1]).rc_, 0); ASSERT_EQ(previous_length, buf.search(data.data(), rc, previous_length, 0)); EXPECT_EQ("bbbbb", buf.toString().substr(0, 5)); - expectSlices({{5, 0, 4096}, {1953, 2143, 4096}}, buf); + expectSlices({{5, 0, 4096}, {1953, 14431, 16384}}, buf); } TEST_F(OwnedImplTest, ReadReserveAndCommit) { diff --git a/test/common/buffer/watermark_buffer_test.cc b/test/common/buffer/watermark_buffer_test.cc index 1ce8149f3dac..b007625b2cd8 100644 --- a/test/common/buffer/watermark_buffer_test.cc +++ b/test/common/buffer/watermark_buffer_test.cc @@ -121,13 +121,20 @@ TEST_F(WatermarkBufferTest, PrependBuffer) { TEST_F(WatermarkBufferTest, Commit) { buffer_.add(TEN_BYTES, 10); EXPECT_EQ(0, times_high_watermark_called_); - RawSlice out; - buffer_.reserve(10, &out, 1); - memcpy(out.mem_, &TEN_BYTES[0], 10); - out.len_ = 10; - buffer_.commit(&out, 1); + { + auto reservation = buffer_.reserveForRead(); + reservation.commit(10); + } EXPECT_EQ(1, times_high_watermark_called_); EXPECT_EQ(20, buffer_.length()); + + { + auto reservation = buffer_.reserveSingleSlice(10); + reservation.commit(10); + } + // Buffer is already above high watermark, so it won't be called a second time. + EXPECT_EQ(1, times_high_watermark_called_); + EXPECT_EQ(30, buffer_.length()); } TEST_F(WatermarkBufferTest, Drain) { @@ -474,10 +481,8 @@ TEST_F(WatermarkBufferTest, OverflowWatermarkDisabledOnVeryHighValue) { const uint32_t segment_denominator = 128; const uint32_t big_segment_len = std::numeric_limits::max() / segment_denominator + 1; for (uint32_t i = 0; i < segment_denominator; ++i) { - Buffer::RawSlice iovecs[2]; - uint64_t num_reserved = buffer1.reserve(big_segment_len, iovecs, 2); - EXPECT_GE(num_reserved, 1); - buffer1.commit(iovecs, num_reserved); + auto reservation = buffer1.reserveSingleSlice(big_segment_len); + reservation.commit(big_segment_len); } EXPECT_GT(buffer1.length(), std::numeric_limits::max()); EXPECT_LT(buffer1.length(), high_watermark_threshold * overflow_multiplier); @@ -486,12 +491,9 @@ TEST_F(WatermarkBufferTest, OverflowWatermarkDisabledOnVeryHighValue) { // Reserve and commit additional space on the buffer beyond the expected // high_watermark_threshold * overflow_multiplier threshold. - // Adding high_watermark_threshold * overflow_multiplier - buffer1.length() + 1 bytes - Buffer::RawSlice iovecs[2]; - uint64_t num_reserved = buffer1.reserve( - high_watermark_threshold * overflow_multiplier - buffer1.length() + 1, iovecs, 2); - EXPECT_GE(num_reserved, 1); - buffer1.commit(iovecs, num_reserved); + const uint64_t size = high_watermark_threshold * overflow_multiplier - buffer1.length() + 1; + auto reservation = buffer1.reserveSingleSlice(size); + reservation.commit(size); EXPECT_EQ(buffer1.length(), high_watermark_threshold * overflow_multiplier + 1); EXPECT_EQ(1, high_watermark_buffer1); EXPECT_EQ(0, overflow_watermark_buffer1); diff --git a/test/common/common/utility_test.cc b/test/common/common/utility_test.cc index 48b82909bfce..348a0ae75221 100644 --- a/test/common/common/utility_test.cc +++ b/test/common/common/utility_test.cc @@ -29,6 +29,26 @@ using testing::Not; namespace Envoy { +TEST(IntUtil, roundUpToMultiple) { + // Round up to non-power-of-2 + EXPECT_EQ(3, IntUtil::roundUpToMultiple(1, 3)); + EXPECT_EQ(3, IntUtil::roundUpToMultiple(3, 3)); + EXPECT_EQ(6, IntUtil::roundUpToMultiple(4, 3)); + EXPECT_EQ(6, IntUtil::roundUpToMultiple(5, 3)); + EXPECT_EQ(6, IntUtil::roundUpToMultiple(6, 3)); + EXPECT_EQ(21, IntUtil::roundUpToMultiple(20, 3)); + EXPECT_EQ(21, IntUtil::roundUpToMultiple(21, 3)); + + // Round up to power-of-2 + EXPECT_EQ(0, IntUtil::roundUpToMultiple(0, 4)); + EXPECT_EQ(4, IntUtil::roundUpToMultiple(3, 4)); + EXPECT_EQ(4, IntUtil::roundUpToMultiple(4, 4)); + EXPECT_EQ(8, IntUtil::roundUpToMultiple(5, 4)); + EXPECT_EQ(8, IntUtil::roundUpToMultiple(8, 4)); + EXPECT_EQ(24, IntUtil::roundUpToMultiple(21, 4)); + EXPECT_EQ(24, IntUtil::roundUpToMultiple(24, 4)); +} + TEST(StringUtil, strtoull) { uint64_t out; const char* rest; diff --git a/test/common/http/http2/hpack_fuzz_test.cc b/test/common/http/http2/hpack_fuzz_test.cc index 49fa83db3dbc..cd090faaaabe 100644 --- a/test/common/http/http2/hpack_fuzz_test.cc +++ b/test/common/http/http2/hpack_fuzz_test.cc @@ -41,20 +41,17 @@ Buffer::OwnedImpl encodeHeaders(nghttp2_hd_deflater* deflater, // Estimate the upper bound const size_t buflen = nghttp2_hd_deflate_bound(deflater, input_nv.data(), input_nv.size()); - Buffer::RawSlice iovec; Buffer::OwnedImpl payload; - payload.reserve(buflen, &iovec, 1); - ASSERT(iovec.len_ >= buflen); + auto reservation = payload.reserveSingleSlice(buflen); // Encode using nghttp2 - uint8_t* buf = reinterpret_cast(iovec.mem_); + uint8_t* buf = reinterpret_cast(reservation.slice().mem_); ASSERT(input_nv.data() != nullptr); const ssize_t result = nghttp2_hd_deflate_hd(deflater, buf, buflen, input_nv.data(), input_nv.size()); ASSERT(result >= 0, absl::StrCat("Failed to decode with result ", result)); - iovec.len_ = result; - payload.commit(&iovec, 1); + reservation.commit(result); return payload; } diff --git a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc index b2c81d77de1f..f9ff8b482a6d 100644 --- a/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc +++ b/test/extensions/filters/network/postgres_proxy/postgres_decoder_test.cc @@ -519,7 +519,6 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(void, add, (const Instance&), (override)); MOCK_METHOD(void, prepend, (absl::string_view), (override)); MOCK_METHOD(void, prepend, (Instance&), (override)); - MOCK_METHOD(void, commit, (Buffer::RawSlice*, uint64_t), (override)); MOCK_METHOD(void, copyOut, (size_t, uint64_t, void*), (const, override)); MOCK_METHOD(void, drain, (uint64_t), (override)); MOCK_METHOD(Buffer::RawSliceVector, getRawSlices, (absl::optional), (const, override)); @@ -529,7 +528,11 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(void*, linearize, (uint32_t), (override)); MOCK_METHOD(void, move, (Instance&), (override)); MOCK_METHOD(void, move, (Instance&, uint64_t), (override)); - MOCK_METHOD(uint64_t, reserve, (uint64_t, Buffer::RawSlice*, uint64_t), (override)); + MOCK_METHOD(Buffer::Reservation, reserveForRead, (), (override)); + MOCK_METHOD(Buffer::ReservationSingleSlice, reserveSingleSlice, (uint64_t, bool), (override)); + MOCK_METHOD(void, commit, + (uint64_t, absl::Span, Buffer::ReservationSlicesOwnerPtr), + (override)); MOCK_METHOD(ssize_t, search, (const void*, uint64_t, size_t, size_t), (const, override)); MOCK_METHOD(bool, startsWith, (absl::string_view), (const, override)); MOCK_METHOD(std::string, toString, (), (const, override)); diff --git a/test/extensions/transport_sockets/tls/ssl_socket_test.cc b/test/extensions/transport_sockets/tls/ssl_socket_test.cc index e080412caa3e..0de35c9b8b65 100644 --- a/test/extensions/transport_sockets/tls/ssl_socket_test.cc +++ b/test/extensions/transport_sockets/tls/ssl_socket_test.cc @@ -4887,13 +4887,9 @@ class SslReadBufferLimitTest : public SslSocketTest { for (uint32_t i = 0; i < num_writes; i++) { Buffer::OwnedImpl data(std::string(write_size, 'a')); - // Incredibly contrived way of making sure that the write buffer has an empty chain in it. if (reserve_write_space) { - Buffer::RawSlice iovecs[2]; - EXPECT_EQ(2UL, data.reserve(16384, iovecs, 2)); - iovecs[0].len_ = 0; - iovecs[1].len_ = 0; - data.commit(iovecs, 2); + data.appendSliceForTest(absl::string_view()); + ASSERT_EQ(0, data.describeSlicesForTest().back().data); } client_connection_->write(data, false); diff --git a/test/extensions/transport_sockets/tls/tls_throughput_benchmark.cc b/test/extensions/transport_sockets/tls/tls_throughput_benchmark.cc index 19ec111bebad..f1e90313b0ca 100644 --- a/test/extensions/transport_sockets/tls/tls_throughput_benchmark.cc +++ b/test/extensions/transport_sockets/tls/tls_throughput_benchmark.cc @@ -33,36 +33,30 @@ static void handleSslError(SSL* ssl, int err, bool is_server) { } static void appendSlice(Buffer::Instance& buffer, uint32_t size) { - Buffer::RawSlice slice; std::string data(size, 'a'); RELEASE_ASSERT(data.size() <= 16384, "short_slice_size can't be larger than full slice"); // A 16kb request currently has inline metadata, which makes it 16384+8. This gets rounded up // to the next page size. Request enough that there is no extra space, to ensure that this results // in a new slice. - buffer.reserve(16384, &slice, 1); + auto reservation = buffer.reserveSingleSlice(16384); - memcpy(slice.mem_, data.data(), data.size()); - slice.len_ = data.size(); - buffer.commit(&slice, 1); + memcpy(reservation.slice().mem_, data.data(), data.size()); + reservation.commit(data.size()); } // If move_slices is true, add full-sized slices using move similar to how HTTP codecs move data // from the filter chain buffer to the output buffer. Else, append full-sized slices directly to the // output buffer like socket read would do. -static void addFullSlices(Buffer::Instance& output_buffer, int num_slices, bool move_slices) { +static void addFullSlices(Buffer::Instance& output_buffer, unsigned num_slices, bool move_slices) { Buffer::OwnedImpl tmp_buf; Buffer::Instance* buffer = move_slices ? &tmp_buf : &output_buffer; - for (int i = 0; i < num_slices; i++) { - auto start_size = buffer->length(); - Buffer::RawSlice slices[2]; - auto num_slices = buffer->reserve(16384, slices, 2); - for (unsigned i = 0; i < num_slices; i++) { - memset(slices[i].mem_, 'a', slices[i].len_); - } - buffer->commit(slices, num_slices); - RELEASE_ASSERT(buffer->length() - start_size == 16384, "correct reserve/commit"); + const auto initial_slices = buffer->getRawSlices().size(); + while ((buffer->getRawSlices().size() - initial_slices) < num_slices) { + Buffer::Reservation reservation = buffer->reserveForRead(); + memset(reservation.slices()[0].mem_, 'a', reservation.slices()[0].len_); + reservation.commit(reservation.slices()[0].len_); } if (move_slices) { diff --git a/test/integration/socket_interface_integration_test.cc b/test/integration/socket_interface_integration_test.cc index bcb54fb2bfd0..235867e57739 100644 --- a/test/integration/socket_interface_integration_test.cc +++ b/test/integration/socket_interface_integration_test.cc @@ -136,13 +136,13 @@ TEST_P(SocketInterfaceIntegrationTest, UdpSendToInternalAddressWithSocketInterfa local_valid_address, nullptr); Buffer::OwnedImpl buffer; - Buffer::RawSlice iovec; - buffer.reserve(100, &iovec, 1); + auto reservation = buffer.reserveSingleSlice(100); + auto slice = reservation.slice(); auto result = - socket->ioHandle().sendmsg(&iovec, 1, 0, local_valid_address->ip(), *peer_internal_address); + socket->ioHandle().sendmsg(&slice, 1, 0, local_valid_address->ip(), *peer_internal_address); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.err_->getErrorCode(), Api::IoError::IoErrorCode::NoSupport); } } // namespace -} // namespace Envoy \ No newline at end of file +} // namespace Envoy diff --git a/test/mocks/network/io_handle.h b/test/mocks/network/io_handle.h index 132f314048d1..c154269585cc 100644 --- a/test/mocks/network/io_handle.h +++ b/test/mocks/network/io_handle.h @@ -24,7 +24,8 @@ class MockIoHandle : public IoHandle { MOCK_METHOD(bool, isOpen, (), (const)); MOCK_METHOD(Api::IoCallUint64Result, readv, (uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice)); - MOCK_METHOD(Api::IoCallUint64Result, read, (Buffer::Instance & buffer, uint64_t max_length)); + MOCK_METHOD(Api::IoCallUint64Result, read, + (Buffer::Instance & buffer, absl::optional max_length)); MOCK_METHOD(Api::IoCallUint64Result, writev, (const Buffer::RawSlice* slices, uint64_t num_slice)); MOCK_METHOD(Api::IoCallUint64Result, write, (Buffer::Instance & buffer));