Skip to content

Commit

Permalink
Merge pull request #138 from accelerated/mmap-alloc
Browse files Browse the repository at this point in the history
Alloc with MMAP
  • Loading branch information
Alex Damian authored Nov 23, 2020
2 parents b80f812 + eae2870 commit 92a4a06
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 50 deletions.
175 changes: 134 additions & 41 deletions quantum/impl/quantum_coroutine_pool_allocator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@
//NOTE: DO NOT INCLUDE DIRECTLY
#include <type_traits>
#include <algorithm>
#include <assert.h>
#include <cassert>
#include <algorithm>
#include <cstring>

#if defined(_WIN32) && !defined(__CYGWIN__)
//TODO: Windows headers for memory mapping and page protection
#else
#include <sys/mman.h>
#endif

#if defined(BOOST_USE_VALGRIND)
#include <valgrind/valgrind.h>
Expand All @@ -31,34 +39,48 @@ namespace quantum {
template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(index_type size) :
_size(size),
_blocks(new Header*[size]),
_freeBlocks(new index_type[size]),
_freeBlockIndex(size-1),
_blocks(nullptr),
_freeBlocks(nullptr),
_freeBlockIndex(size-1), //point to the last element
_numHeapAllocatedBlocks(0),
_stackSize(std::min(std::max(traits::default_size(), traits::minimum_size()), traits::maximum_size()))
_stackSize(std::min(std::max(traits::default_size(),
traits::minimum_size()),
traits::maximum_size()))
{
if (!_blocks || !_freeBlocks) {
if (_size == 0)
{
throw std::runtime_error("Invalid coroutine allocator pool size");
}
_freeBlocks = new index_type[size];
if (!_freeBlocks)
{
throw std::bad_alloc();
}
if (_size == 0) {
throw std::runtime_error("Invalid coroutine allocator pool size");
_blocks = new uint8_t*[size];
if (!_blocks)
{
delete[] _freeBlocks;
throw std::bad_alloc();
}
//pre-allocate all the coroutine stack blocks
for (index_type i = 0; i < size; ++i) {
_blocks[i] = reinterpret_cast<Header*>(new char[_stackSize]);
if (!_blocks[i]) {
//pre-allocate all the coroutine stack blocks and protect the last stack page to
//track coroutine stack overflows.
for (size_t i = 0; i < size; ++i)
{
_blocks[i] = allocateCoroutine(ProtectMemPage::On);
if (!_blocks[i])
{
deallocateBlocks(i);
throw std::bad_alloc();
}
_blocks[i]->_pos = i; //mark position
//set the block position
header(_blocks[i])->_pos = i;
}
//initialize the free block list
for (index_type i = 0; i < size; ++i) {
_freeBlocks[i] = i;
}
std::iota(_freeBlocks, _freeBlocks + size, 0);
}

template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(CoroutinePoolAllocator<STACK_TRAITS>&& other)
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(CoroutinePoolAllocator<STACK_TRAITS>&& other) noexcept
{
*this = other;
}
Expand All @@ -82,63 +104,120 @@ CoroutinePoolAllocator<STACK_TRAITS>& CoroutinePoolAllocator<STACK_TRAITS>::oper
template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::~CoroutinePoolAllocator()
{
for (size_t i = 0; i < _size; ++i) {
delete[] (char*)_blocks[i];
deallocateBlocks(_size);
}

template <typename STACK_TRAITS>
void CoroutinePoolAllocator<STACK_TRAITS>::deallocateBlocks(size_t pos)
{
for (size_t j = 0; j < pos; ++j)
{
deallocateCoroutine(_blocks[j]);
}
delete[] _blocks;
delete[] _freeBlocks;
}

template <typename STACK_TRAITS>
uint8_t* CoroutinePoolAllocator<STACK_TRAITS>::allocateCoroutine(ProtectMemPage protect) const
{
#if defined(_WIN32) && !defined(__CYGWIN__)
return new uint8_t[_stackSize];
#else
uint8_t* block = (uint8_t*)mmap(nullptr,
_stackSize,
PROT_WRITE | PROT_READ | PROT_EXEC,
MAP_ANONYMOUS | MAP_PRIVATE,
-1, //invalid fd
0); //no offset
if (block == MAP_FAILED)
{
return nullptr;
}
//Add protection to the lowest page
if ((protect == ProtectMemPage::On) &&
mprotect(block, traits::page_size(), PROT_NONE) != 0)
{
munmap(block, _stackSize); //free region
return nullptr;
}
return block;
#endif
}

template <typename STACK_TRAITS>
int CoroutinePoolAllocator<STACK_TRAITS>::deallocateCoroutine(uint8_t* block) const
{
assert(block);
#if defined(_WIN32) && !defined(__CYGWIN__)
delete[] block;
return 0;
#else
return munmap(block, _stackSize);
#endif
}

template <typename STACK_TRAITS>
boost::context::stack_context CoroutinePoolAllocator<STACK_TRAITS>::allocate() {
boost::context::stack_context ctx;
Header* block = nullptr;
uint8_t* block = nullptr;
{
SpinLock::Guard lock(_spinlock);
if (!isEmpty())
{
block = _blocks[_freeBlocks[_freeBlockIndex--]];
}
}
if (!block) {
// Use heap allocation
block = (Header*)new char[_stackSize];
if (!block) {
if (!block)
{
//Do not protect last memory page for performance reasons
block = allocateCoroutine(ProtectMemPage::Off);
if (!block)
{
throw std::bad_alloc();
}
block->_pos = -1; //mark position as non-managed
header(block)->_pos = -1; //mark position as non-managed
SpinLock::Guard lock(_spinlock);
++_numHeapAllocatedBlocks;
}
char* block_start = reinterpret_cast<char*>(block) + sizeof(Header);
//populate stack context
boost::context::stack_context ctx;
ctx.size = _stackSize - sizeof(Header);
ctx.sp = block_start + ctx.size;
#if defined(BOOST_USE_VALGRIND)
ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block_start);
#endif
ctx.sp = block + ctx.size;
#if defined(BOOST_USE_VALGRIND)
ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block);
#endif
return ctx;
}

template <typename STACK_TRAITS>
void CoroutinePoolAllocator<STACK_TRAITS>::deallocate(const boost::context::stack_context& ctx) {
if (!ctx.sp) {
if (!ctx.sp)
{
return;
}
#if defined(BOOST_USE_VALGRIND)
VALGRIND_STACK_DEREGISTER(ctx.valgrind_stack_id);
#endif
int bi = blockIndex(ctx);
assert(bi >= -1 && bi < _size); //guard against coroutine stack overflow or corruption
if (isManaged(ctx)) {
if (isManaged(ctx))
{
//find index of the block
SpinLock::Guard lock(_spinlock);
_freeBlocks[++_freeBlockIndex] = bi;
}
else {
delete[] (char*)getHeader(ctx);
SpinLock::Guard lock(_spinlock);
--_numHeapAllocatedBlocks;
assert(_numHeapAllocatedBlocks >= 0);
else
{
//Unlink coroutine stack
{
SpinLock::Guard lock(_spinlock);
--_numHeapAllocatedBlocks;
assert(_numHeapAllocatedBlocks >= 0);
}
if (deallocateCoroutine(stackEnd(ctx)) != 0)
{
throw std::runtime_error("Bad de-allocation");
}
}
}

Expand Down Expand Up @@ -168,9 +247,23 @@ bool CoroutinePoolAllocator<STACK_TRAITS>::isEmpty() const

template <typename STACK_TRAITS>
typename CoroutinePoolAllocator<STACK_TRAITS>::Header*
CoroutinePoolAllocator<STACK_TRAITS>::getHeader(const boost::context::stack_context& ctx) const
CoroutinePoolAllocator<STACK_TRAITS>::header(const boost::context::stack_context& ctx) const
{
return reinterpret_cast<Header*>(ctx.sp);
}

template <typename STACK_TRAITS>
typename CoroutinePoolAllocator<STACK_TRAITS>::Header*
CoroutinePoolAllocator<STACK_TRAITS>::header(uint8_t* block) const
{
return reinterpret_cast<Header*>(block + _stackSize - sizeof(Header));
}

template <typename STACK_TRAITS>
uint8_t*
CoroutinePoolAllocator<STACK_TRAITS>::stackEnd(const boost::context::stack_context& ctx) const
{
return reinterpret_cast<Header*>(reinterpret_cast<char*>(ctx.sp) - ctx.size - sizeof(Header));
return static_cast<uint8_t*>(ctx.sp) - ctx.size;
}

template <typename STACK_TRAITS>
Expand All @@ -182,7 +275,7 @@ bool CoroutinePoolAllocator<STACK_TRAITS>::isManaged(const boost::context::stack
template <typename STACK_TRAITS>
int CoroutinePoolAllocator<STACK_TRAITS>::blockIndex(const boost::context::stack_context& ctx) const
{
return getHeader(ctx)->_pos;
return header(ctx)->_pos;
}

}}
2 changes: 1 addition & 1 deletion quantum/impl/quantum_task_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ TaskQueue::~TaskQueue()
inline
void TaskQueue::pinToCore(int coreId)
{
#ifdef _WIN32
#if defined(_WIN32) && !defined(__CYGWIN__)
SetThreadAffinityMask(_thread->native_handle(), 1 << coreId);
#else
int cpuSetSize = sizeof(cpu_set_t);
Expand Down
18 changes: 12 additions & 6 deletions quantum/quantum_coroutine_pool_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ struct CoroutinePoolAllocator
typedef STACK_TRAITS traits;

//------------------------------- Methods ----------------------------------
CoroutinePoolAllocator(index_type size);
explicit CoroutinePoolAllocator(index_type size);
CoroutinePoolAllocator(const this_type&) = delete;
CoroutinePoolAllocator(this_type&&);
CoroutinePoolAllocator(this_type&&) noexcept;
CoroutinePoolAllocator& operator=(const this_type&) = delete;
CoroutinePoolAllocator& operator=(this_type&&);
virtual ~CoroutinePoolAllocator();
Expand All @@ -65,14 +65,19 @@ struct CoroutinePoolAllocator
struct Header {
int _pos;
};

enum class ProtectMemPage { On, Off };
int blockIndex(const boost::context::stack_context& ctx) const;
bool isManaged(const boost::context::stack_context& ctx) const;
Header* getHeader(const boost::context::stack_context& ctx) const;
Header* header(const boost::context::stack_context& ctx) const;
Header* header(uint8_t* block) const;
uint8_t* stackEnd(const boost::context::stack_context& ctx) const;
void deallocateBlocks(size_t pos);
uint8_t* allocateCoroutine(ProtectMemPage protect) const;
int deallocateCoroutine(uint8_t*) const;

//------------------------------- Members ----------------------------------
index_type _size;
Header** _blocks;
uint8_t** _blocks;
index_type* _freeBlocks;
ssize_t _freeBlockIndex;
size_t _numHeapAllocatedBlocks;
Expand All @@ -85,7 +90,8 @@ struct CoroutinePoolAllocatorProxy
{
typedef std::false_type default_constructor;

CoroutinePoolAllocatorProxy(uint16_t size) : _alloc(new CoroutinePoolAllocator<STACK_TRAITS>(size))
explicit CoroutinePoolAllocatorProxy(uint16_t size) :
_alloc(new CoroutinePoolAllocator<STACK_TRAITS>(size))
{
if (!_alloc) {
throw std::bad_alloc();
Expand Down
2 changes: 1 addition & 1 deletion quantum/quantum_dispatcher_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <thread>
#include <functional>
#include <algorithm>
#ifdef _WIN32
#if defined(_WIN32) && !defined(__CYGWIN__)
#include <winbase.h>
#else
#include <pthread.h>
Expand Down
2 changes: 1 addition & 1 deletion tests/quantum_sequencer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST_P(SequencerTest, BasicTaskOrder)
{
using namespace Bloomberg::quantum;

const int taskCount = 100;
const int taskCount = 2000;
const int sequenceKeyCount = 3;
SequencerTestData testData;
SequencerTestData::SequenceKeyMap sequenceKeys;
Expand Down

0 comments on commit 92a4a06

Please sign in to comment.