From f9b2494f00007ee2b57c452d1f5c2fcfa7b091b3 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 22 Mar 2024 20:54:16 -0300 Subject: [PATCH] Split UpdateAllocatedBytes into two functions --- cpp/src/arrow/memory_pool.cc | 12 ++++---- cpp/src/arrow/memory_pool.h | 57 ++++++++++++++++++----------------- cpp/src/arrow/stl_allocator.h | 6 ++-- 3 files changed, 39 insertions(+), 36 deletions(-) diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index bfae006dc1b10..2f8ce3a6fa8c7 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -472,7 +472,7 @@ class BaseMemoryPoolImpl : public MemoryPool { } #endif - stats_.UpdateAllocatedBytes(size); + stats_.DidAllocateBytes(size); return Status::OK(); } @@ -494,7 +494,7 @@ class BaseMemoryPoolImpl : public MemoryPool { } #endif - stats_.UpdateAllocatedBytes(new_size - old_size); + stats_.DidReallocateBytes(old_size, new_size); return Status::OK(); } @@ -509,7 +509,7 @@ class BaseMemoryPoolImpl : public MemoryPool { #endif Allocator::DeallocateAligned(buffer, size, alignment); - stats_.UpdateAllocatedBytes(-size); + stats_.DidFreeBytes(size); } void ReleaseUnused() override { Allocator::ReleaseUnused(); } @@ -761,20 +761,20 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl { Status Allocate(int64_t size, int64_t alignment, uint8_t** out) { RETURN_NOT_OK(pool_->Allocate(size, alignment, out)); - stats_.UpdateAllocatedBytes(size); + stats_.DidAllocateBytes(size); return Status::OK(); } Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment, uint8_t** ptr) { RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr)); - stats_.UpdateAllocatedBytes(new_size - old_size); + stats_.DidReallocateBytes(old_size, new_size); return Status::OK(); } void Free(uint8_t* buffer, int64_t size, int64_t alignment) { pool_->Free(buffer, size, alignment); - stats_.UpdateAllocatedBytes(-size); + stats_.DidFreeBytes(size); } int64_t bytes_allocated() const { return stats_.bytes_allocated(); } diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 3df63e6350c19..96db42de1e048 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -64,37 +64,40 @@ class alignas(64) MemoryPoolStats { int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); } - template - inline void UpdateAllocatedBytes(int64_t diff) { - // max_memory_ is monotonically increasing, so we can use a relaxed load - // before the read-modify-write. Issue the load before everything else. + inline void DidAllocateBytes(int64_t size) { + // Issue the load before everything else. max_memory_ is monotonically increasing, + // so we can use a relaxed load before the read-modify-write. auto max_memory = max_memory_.load(std::memory_order_relaxed); - auto old_bytes_allocated = - bytes_allocated_.fetch_add(diff, std::memory_order_acq_rel); - if constexpr (IsFree) { - assert(diff <= 0); + const auto old_bytes_allocated = + bytes_allocated_.fetch_add(size, std::memory_order_acq_rel); + // Issue store operations on values that we don't depend on to proceed + // with execution. When done, max_memory and old_bytes_allocated have + // a higher chance of being available on CPU registers. This also has the + // nice side-effect of putting 3 atomic stores close to each other in the + // instruction stream. + total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel); + num_allocs_.fetch_add(1, std::memory_order_acq_rel); + + // If other threads are updating max_memory_ concurrently we leave the loop without + // updating knowing that it already reached a value even higher than ours. + const auto allocated = old_bytes_allocated + size; + while (max_memory < allocated && !max_memory_.compare_exchange_weak( + /*expected=*/max_memory, /*desired=*/allocated, + std::memory_order_acq_rel)) { + } + } + + inline void DidReallocateBytes(int64_t old_size, int64_t new_size) { + if (new_size > old_size) { + DidAllocateBytes(new_size - old_size); } else { - // Issue store operations on values that we don't depend on to proceed - // with execution. When done, max_memory and old_bytes_allocated have - // a higher chance of being available on CPU registers. This also has the - // nice side-effect of putting 3 atomic stores close to each other in the - // instruction stream. - // - // Reallocations might expand/contract the allocation in place or create a new - // allocation, copy, and free the previous allocation. We can't really know, - // so we just represent reallocations the same way we represent allocations. - total_allocated_bytes_.fetch_add(diff, std::memory_order_acq_rel); - num_allocs_.fetch_add(1, std::memory_order_acq_rel); - - // If other threads are updating max_memory_ concurrently we leave the loop without - // updating knowing that it already reached a value even higher than ours. - const auto allocated = old_bytes_allocated + diff; - while (max_memory < allocated && !max_memory_.compare_exchange_weak( - /*expected=*/max_memory, /*desired=*/allocated, - std::memory_order_acq_rel)) { - } + DidFreeBytes(old_size - new_size); } } + + inline void DidFreeBytes(int64_t size) { + bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel); + } }; } // namespace internal diff --git a/cpp/src/arrow/stl_allocator.h b/cpp/src/arrow/stl_allocator.h index 9f51bd03411ca..82e6aaa8772b9 100644 --- a/cpp/src/arrow/stl_allocator.h +++ b/cpp/src/arrow/stl_allocator.h @@ -110,7 +110,7 @@ class STLMemoryPool : public MemoryPool { } catch (std::bad_alloc& e) { return Status::OutOfMemory(e.what()); } - stats_.UpdateAllocatedBytes(size); + stats_.DidAllocateBytes(size); return Status::OK(); } @@ -124,13 +124,13 @@ class STLMemoryPool : public MemoryPool { } memcpy(*ptr, old_ptr, std::min(old_size, new_size)); alloc_.deallocate(old_ptr, old_size); - stats_.UpdateAllocatedBytes(new_size - old_size); + stats_.DidReallocateBytes(old_size, new_size); return Status::OK(); } void Free(uint8_t* buffer, int64_t size, int64_t /*alignment*/) override { alloc_.deallocate(buffer, size); - stats_.UpdateAllocatedBytes(-size); + stats_.DidFreeBytes(size); } int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }