diff --git a/quantum/impl/quantum_coroutine_pool_allocator_impl.h b/quantum/impl/quantum_coroutine_pool_allocator_impl.h index 701f1fa..8703774 100644 --- a/quantum/impl/quantum_coroutine_pool_allocator_impl.h +++ b/quantum/impl/quantum_coroutine_pool_allocator_impl.h @@ -16,7 +16,15 @@ //NOTE: DO NOT INCLUDE DIRECTLY #include #include -#include +#include +#include +#include + +#if defined(_WIN32) && !defined(__CYGWIN__) + //TODO: Windows headers for memory mapping and page protection +#else + #include +#endif #if defined(BOOST_USE_VALGRIND) #include @@ -31,34 +39,48 @@ namespace quantum { template CoroutinePoolAllocator::CoroutinePoolAllocator(index_type size) : _size(size), - _blocks(new Header*[size]), - _freeBlocks(new index_type[size]), - _freeBlockIndex(size-1), + _blocks(nullptr), + _freeBlocks(nullptr), + _freeBlockIndex(size-1), //point to the last element _numHeapAllocatedBlocks(0), - _stackSize(std::min(std::max(traits::default_size(), traits::minimum_size()), traits::maximum_size())) + _stackSize(std::min(std::max(traits::default_size(), + traits::minimum_size()), + traits::maximum_size())) { - if (!_blocks || !_freeBlocks) { + if (_size == 0) + { + throw std::runtime_error("Invalid coroutine allocator pool size"); + } + _freeBlocks = new index_type[size]; + if (!_freeBlocks) + { throw std::bad_alloc(); } - if (_size == 0) { - throw std::runtime_error("Invalid coroutine allocator pool size"); + _blocks = new uint8_t*[size]; + if (!_blocks) + { + delete[] _freeBlocks; + throw std::bad_alloc(); } - //pre-allocate all the coroutine stack blocks - for (index_type i = 0; i < size; ++i) { - _blocks[i] = reinterpret_cast(new char[_stackSize]); - if (!_blocks[i]) { + //pre-allocate all the coroutine stack blocks and protect the last stack page to + //track coroutine stack overflows. + for (size_t i = 0; i < size; ++i) + { + _blocks[i] = allocateCoroutine(ProtectMemPage::On); + if (!_blocks[i]) + { + deallocateBlocks(i); throw std::bad_alloc(); } - _blocks[i]->_pos = i; //mark position + //set the block position + header(_blocks[i])->_pos = i; } //initialize the free block list - for (index_type i = 0; i < size; ++i) { - _freeBlocks[i] = i; - } + std::iota(_freeBlocks, _freeBlocks + size, 0); } template -CoroutinePoolAllocator::CoroutinePoolAllocator(CoroutinePoolAllocator&& other) +CoroutinePoolAllocator::CoroutinePoolAllocator(CoroutinePoolAllocator&& other) noexcept { *this = other; } @@ -82,17 +104,62 @@ CoroutinePoolAllocator& CoroutinePoolAllocator::oper template CoroutinePoolAllocator::~CoroutinePoolAllocator() { - for (size_t i = 0; i < _size; ++i) { - delete[] (char*)_blocks[i]; + deallocateBlocks(_size); +} + +template +void CoroutinePoolAllocator::deallocateBlocks(size_t pos) +{ + for (size_t j = 0; j < pos; ++j) + { + deallocateCoroutine(_blocks[j]); } delete[] _blocks; delete[] _freeBlocks; } +template +uint8_t* CoroutinePoolAllocator::allocateCoroutine(ProtectMemPage protect) const +{ +#if defined(_WIN32) && !defined(__CYGWIN__) + return new uint8_t[_stackSize]; +#else + uint8_t* block = (uint8_t*)mmap(nullptr, + _stackSize, + PROT_WRITE | PROT_READ | PROT_EXEC, + MAP_ANONYMOUS | MAP_PRIVATE, + -1, //invalid fd + 0); //no offset + if (block == MAP_FAILED) + { + return nullptr; + } + //Add protection to the lowest page + if ((protect == ProtectMemPage::On) && + mprotect(block, traits::page_size(), PROT_NONE) != 0) + { + munmap(block, _stackSize); //free region + return nullptr; + } + return block; +#endif +} + +template +int CoroutinePoolAllocator::deallocateCoroutine(uint8_t* block) const +{ + assert(block); +#if defined(_WIN32) && !defined(__CYGWIN__) + delete[] block; + return 0; +#else + return munmap(block, _stackSize); +#endif +} + template boost::context::stack_context CoroutinePoolAllocator::allocate() { - boost::context::stack_context ctx; - Header* block = nullptr; + uint8_t* block = nullptr; { SpinLock::Guard lock(_spinlock); if (!isEmpty()) @@ -100,28 +167,32 @@ boost::context::stack_context CoroutinePoolAllocator::allocate() { block = _blocks[_freeBlocks[_freeBlockIndex--]]; } } - if (!block) { - // Use heap allocation - block = (Header*)new char[_stackSize]; - if (!block) { + if (!block) + { + //Do not protect last memory page for performance reasons + block = allocateCoroutine(ProtectMemPage::Off); + if (!block) + { throw std::bad_alloc(); } - block->_pos = -1; //mark position as non-managed + header(block)->_pos = -1; //mark position as non-managed SpinLock::Guard lock(_spinlock); ++_numHeapAllocatedBlocks; } - char* block_start = reinterpret_cast(block) + sizeof(Header); + //populate stack context + boost::context::stack_context ctx; ctx.size = _stackSize - sizeof(Header); - ctx.sp = block_start + ctx.size; - #if defined(BOOST_USE_VALGRIND) - ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block_start); - #endif + ctx.sp = block + ctx.size; +#if defined(BOOST_USE_VALGRIND) + ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block); +#endif return ctx; } template void CoroutinePoolAllocator::deallocate(const boost::context::stack_context& ctx) { - if (!ctx.sp) { + if (!ctx.sp) + { return; } #if defined(BOOST_USE_VALGRIND) @@ -129,16 +200,24 @@ void CoroutinePoolAllocator::deallocate(const boost::context::stac #endif int bi = blockIndex(ctx); assert(bi >= -1 && bi < _size); //guard against coroutine stack overflow or corruption - if (isManaged(ctx)) { + if (isManaged(ctx)) + { //find index of the block SpinLock::Guard lock(_spinlock); _freeBlocks[++_freeBlockIndex] = bi; } - else { - delete[] (char*)getHeader(ctx); - SpinLock::Guard lock(_spinlock); - --_numHeapAllocatedBlocks; - assert(_numHeapAllocatedBlocks >= 0); + else + { + //Unlink coroutine stack + { + SpinLock::Guard lock(_spinlock); + --_numHeapAllocatedBlocks; + assert(_numHeapAllocatedBlocks >= 0); + } + if (deallocateCoroutine(stackEnd(ctx)) != 0) + { + throw std::runtime_error("Bad de-allocation"); + } } } @@ -168,9 +247,23 @@ bool CoroutinePoolAllocator::isEmpty() const template typename CoroutinePoolAllocator::Header* -CoroutinePoolAllocator::getHeader(const boost::context::stack_context& ctx) const +CoroutinePoolAllocator::header(const boost::context::stack_context& ctx) const +{ + return reinterpret_cast(ctx.sp); +} + +template +typename CoroutinePoolAllocator::Header* +CoroutinePoolAllocator::header(uint8_t* block) const +{ + return reinterpret_cast(block + _stackSize - sizeof(Header)); +} + +template +uint8_t* +CoroutinePoolAllocator::stackEnd(const boost::context::stack_context& ctx) const { - return reinterpret_cast(reinterpret_cast(ctx.sp) - ctx.size - sizeof(Header)); + return static_cast(ctx.sp) - ctx.size; } template @@ -182,7 +275,7 @@ bool CoroutinePoolAllocator::isManaged(const boost::context::stack template int CoroutinePoolAllocator::blockIndex(const boost::context::stack_context& ctx) const { - return getHeader(ctx)->_pos; + return header(ctx)->_pos; } }} diff --git a/quantum/impl/quantum_task_queue_impl.h b/quantum/impl/quantum_task_queue_impl.h index 4066502..3d1ae2b 100644 --- a/quantum/impl/quantum_task_queue_impl.h +++ b/quantum/impl/quantum_task_queue_impl.h @@ -92,7 +92,7 @@ TaskQueue::~TaskQueue() inline void TaskQueue::pinToCore(int coreId) { -#ifdef _WIN32 +#if defined(_WIN32) && !defined(__CYGWIN__) SetThreadAffinityMask(_thread->native_handle(), 1 << coreId); #else int cpuSetSize = sizeof(cpu_set_t); diff --git a/quantum/quantum_coroutine_pool_allocator.h b/quantum/quantum_coroutine_pool_allocator.h index 53e4898..9e2ad86 100644 --- a/quantum/quantum_coroutine_pool_allocator.h +++ b/quantum/quantum_coroutine_pool_allocator.h @@ -46,9 +46,9 @@ struct CoroutinePoolAllocator typedef STACK_TRAITS traits; //------------------------------- Methods ---------------------------------- - CoroutinePoolAllocator(index_type size); + explicit CoroutinePoolAllocator(index_type size); CoroutinePoolAllocator(const this_type&) = delete; - CoroutinePoolAllocator(this_type&&); + CoroutinePoolAllocator(this_type&&) noexcept; CoroutinePoolAllocator& operator=(const this_type&) = delete; CoroutinePoolAllocator& operator=(this_type&&); virtual ~CoroutinePoolAllocator(); @@ -65,14 +65,19 @@ struct CoroutinePoolAllocator struct Header { int _pos; }; - + enum class ProtectMemPage { On, Off }; int blockIndex(const boost::context::stack_context& ctx) const; bool isManaged(const boost::context::stack_context& ctx) const; - Header* getHeader(const boost::context::stack_context& ctx) const; + Header* header(const boost::context::stack_context& ctx) const; + Header* header(uint8_t* block) const; + uint8_t* stackEnd(const boost::context::stack_context& ctx) const; + void deallocateBlocks(size_t pos); + uint8_t* allocateCoroutine(ProtectMemPage protect) const; + int deallocateCoroutine(uint8_t*) const; //------------------------------- Members ---------------------------------- index_type _size; - Header** _blocks; + uint8_t** _blocks; index_type* _freeBlocks; ssize_t _freeBlockIndex; size_t _numHeapAllocatedBlocks; @@ -85,7 +90,8 @@ struct CoroutinePoolAllocatorProxy { typedef std::false_type default_constructor; - CoroutinePoolAllocatorProxy(uint16_t size) : _alloc(new CoroutinePoolAllocator(size)) + explicit CoroutinePoolAllocatorProxy(uint16_t size) : + _alloc(new CoroutinePoolAllocator(size)) { if (!_alloc) { throw std::bad_alloc(); diff --git a/quantum/quantum_dispatcher_core.h b/quantum/quantum_dispatcher_core.h index b7f3d08..381b747 100644 --- a/quantum/quantum_dispatcher_core.h +++ b/quantum/quantum_dispatcher_core.h @@ -23,7 +23,7 @@ #include #include #include -#ifdef _WIN32 +#if defined(_WIN32) && !defined(__CYGWIN__) #include #else #include diff --git a/tests/quantum_sequencer_tests.cpp b/tests/quantum_sequencer_tests.cpp index 7f0933c..f2fbe71 100644 --- a/tests/quantum_sequencer_tests.cpp +++ b/tests/quantum_sequencer_tests.cpp @@ -133,7 +133,7 @@ TEST_P(SequencerTest, BasicTaskOrder) { using namespace Bloomberg::quantum; - const int taskCount = 100; + const int taskCount = 2000; const int sequenceKeyCount = 3; SequencerTestData testData; SequencerTestData::SequenceKeyMap sequenceKeys;