Skip to content

Commit

Permalink
Split UpdateAllocatedBytes into two functions
Browse files Browse the repository at this point in the history
  • Loading branch information
felipecrv committed Mar 25, 2024
1 parent 0382341 commit f9b2494
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
12 changes: 6 additions & 6 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Expand All @@ -494,7 +494,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
}
#endif

stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

Expand All @@ -509,7 +509,7 @@ class BaseMemoryPoolImpl : public MemoryPool {
#endif
Allocator::DeallocateAligned(buffer, size, alignment);

stats_.UpdateAllocatedBytes</*IsFree=*/true>(-size);
stats_.DidFreeBytes(size);
}

void ReleaseUnused() override { Allocator::ReleaseUnused(); }
Expand Down Expand Up @@ -761,20 +761,20 @@ class ProxyMemoryPool::ProxyMemoryPoolImpl {

Status Allocate(int64_t size, int64_t alignment, uint8_t** out) {
RETURN_NOT_OK(pool_->Allocate(size, alignment, out));
stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, int64_t alignment,
uint8_t** ptr) {
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, alignment, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) {
pool_->Free(buffer, size, alignment);
stats_.UpdateAllocatedBytes</*IsFree=*/true>(-size);
stats_.DidFreeBytes(size);
}

int64_t bytes_allocated() const { return stats_.bytes_allocated(); }
Expand Down
57 changes: 30 additions & 27 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,40 @@ class alignas(64) MemoryPoolStats {

int64_t num_allocations() const { return num_allocs_.load(std::memory_order_acquire); }

template <bool IsFree = false>
inline void UpdateAllocatedBytes(int64_t diff) {
// max_memory_ is monotonically increasing, so we can use a relaxed load
// before the read-modify-write. Issue the load before everything else.
inline void DidAllocateBytes(int64_t size) {
// Issue the load before everything else. max_memory_ is monotonically increasing,
// so we can use a relaxed load before the read-modify-write.
auto max_memory = max_memory_.load(std::memory_order_relaxed);
auto old_bytes_allocated =
bytes_allocated_.fetch_add(diff, std::memory_order_acq_rel);
if constexpr (IsFree) {
assert(diff <= 0);
const auto old_bytes_allocated =
bytes_allocated_.fetch_add(size, std::memory_order_acq_rel);
// Issue store operations on values that we don't depend on to proceed
// with execution. When done, max_memory and old_bytes_allocated have
// a higher chance of being available on CPU registers. This also has the
// nice side-effect of putting 3 atomic stores close to each other in the
// instruction stream.
total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel);
num_allocs_.fetch_add(1, std::memory_order_acq_rel);

// If other threads are updating max_memory_ concurrently we leave the loop without
// updating knowing that it already reached a value even higher than ours.
const auto allocated = old_bytes_allocated + size;
while (max_memory < allocated && !max_memory_.compare_exchange_weak(
/*expected=*/max_memory, /*desired=*/allocated,
std::memory_order_acq_rel)) {
}
}

inline void DidReallocateBytes(int64_t old_size, int64_t new_size) {
if (new_size > old_size) {
DidAllocateBytes(new_size - old_size);
} else {
// Issue store operations on values that we don't depend on to proceed
// with execution. When done, max_memory and old_bytes_allocated have
// a higher chance of being available on CPU registers. This also has the
// nice side-effect of putting 3 atomic stores close to each other in the
// instruction stream.
//
// Reallocations might expand/contract the allocation in place or create a new
// allocation, copy, and free the previous allocation. We can't really know,
// so we just represent reallocations the same way we represent allocations.
total_allocated_bytes_.fetch_add(diff, std::memory_order_acq_rel);
num_allocs_.fetch_add(1, std::memory_order_acq_rel);

// If other threads are updating max_memory_ concurrently we leave the loop without
// updating knowing that it already reached a value even higher than ours.
const auto allocated = old_bytes_allocated + diff;
while (max_memory < allocated && !max_memory_.compare_exchange_weak(
/*expected=*/max_memory, /*desired=*/allocated,
std::memory_order_acq_rel)) {
}
DidFreeBytes(old_size - new_size);
}
}

inline void DidFreeBytes(int64_t size) {
bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel);
}
};

} // namespace internal
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/stl_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class STLMemoryPool : public MemoryPool {
} catch (std::bad_alloc& e) {
return Status::OutOfMemory(e.what());
}
stats_.UpdateAllocatedBytes(size);
stats_.DidAllocateBytes(size);
return Status::OK();
}

Expand All @@ -124,13 +124,13 @@ class STLMemoryPool : public MemoryPool {
}
memcpy(*ptr, old_ptr, std::min(old_size, new_size));
alloc_.deallocate(old_ptr, old_size);
stats_.UpdateAllocatedBytes(new_size - old_size);
stats_.DidReallocateBytes(old_size, new_size);
return Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t /*alignment*/) override {
alloc_.deallocate(buffer, size);
stats_.UpdateAllocatedBytes</*IsFree=*/true>(-size);
stats_.DidFreeBytes(size);
}

int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }
Expand Down

0 comments on commit f9b2494

Please sign in to comment.