Skip to content

Commit

Permalink
Merge pull request #1394 from lucteo/system_context-cxx-replaceabilit…
Browse files Browse the repository at this point in the history
…y-api

Define "replaceability API" in terms of C++ interfaces.
  • Loading branch information
ericniebler authored Sep 4, 2024
2 parents 8bc7c7f + 0228b4d commit 6f47fbb
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 485 deletions.
240 changes: 129 additions & 111 deletions include/exec/__detail/__system_context_default_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,97 @@
*/
#pragma once

#include "__system_context_if.h"
#include "__system_context_replaceability_api.hpp"
#include "stdexec/execution.hpp"
#include "exec/static_thread_pool.hpp"

namespace exec::__system_context_default_impl {
using namespace stdexec::tags;
using system_context_replaceability::receiver;
using system_context_replaceability::bulk_item_receiver;
using system_context_replaceability::storage;
using system_context_replaceability::system_scheduler;
using system_context_replaceability::__system_context_replaceability;

using __pool_scheduler_t = decltype(std::declval<exec::static_thread_pool>().get_scheduler());

/// Receiver that calls the callback when the operation completes.
template <class _Sender>
struct __operation;

/*
Storage needed for a backend operation-state:
schedule:
- __recv::__r_ (receiver*) -- 8
- __recv::__op_ (__operation*) -- 8
- __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 56 (when connected with an empty receiver)
- __operation::__on_heap_ (bool) -- optimized away
---------------------
Total: 72; extra 16 bytes compared to internal operation state.
extra for bulk:
- __recv::__r_ (receiver*) -- 8
- __recv::__op_ (__operation*) -- 8
- __operation::__inner_op_ (stdexec::connect_result_t<_Sender, __recv<_Sender>>) -- 128 (when connected with an empty receiver & fun)
- __operation::__on_heap_ (bool) -- optimized away
- __bulk_functor::__r_ (bulk_item_receiver*) - 8
---------------------
Total: 152; extra 24 bytes compared to internal operation state.
[*] sizes taken on an Apple M2 Pro arm64 arch. They may differ on other architectures, or with different implementations.
*/

template <class _Sender>
struct __recv {
using receiver_concept = stdexec::receiver_t;

/// The callback to be called.
__exec_system_context_completion_callback_t __cb_;

/// The data to be passed to the callback.
void* __data_;
//! The operation state on the frontend.
receiver* __r_;

/// The owning operation state, to be destructed when the operation completes.
//! The parent operation state that we will destroy when we complete.
__operation<_Sender>* __op_;

void set_value() noexcept {
__cb_(__data_, 0, nullptr);
auto __op = __op_;
auto __r = __r_;
__op->__destruct(); // destroys the operation, including `this`.
__r->set_value();
// Note: when calling a completion signal, the parent operation might complete, making the
// static storage passed to this operation invalid. Thus, we need to ensure that we are not
// using the operation state after the completion signal.
}

void set_error(std::exception_ptr __ptr) noexcept {
__cb_(__data_, 2, *reinterpret_cast<void**>(&__ptr));
auto __op = __op_;
auto __r = __r_;
__op->__destruct(); // destroys the operation, including `this`.
__r->set_error(__ptr);
}

void set_stopped() noexcept {
__cb_(__data_, 1, nullptr);
auto __op = __op_;
auto __r = __r_;
__op->__destruct(); // destroys the operation, including `this`.
__r->set_stopped();
}
};

/// Ensure that `__storage` is aligned to `__alignment`. Shrinks the storage, if needed, to match desired alignment.
inline storage __ensure_alignment(storage __storage, size_t __alignment) noexcept {
auto __pn = reinterpret_cast<uintptr_t>(__storage.__data);
if (__pn % __alignment == 0) {
return __storage;
} else if (__storage.__size < __alignment) {
return {nullptr, 0};
} else {
auto __new_pn = (__pn + __alignment - 1) & ~(__alignment - 1);
return {
reinterpret_cast<void*>(__new_pn),
static_cast<uint32_t>(__storage.__size - (__new_pn - __pn))};
}
}

template <typename _Sender>
struct __operation {
/// The inner operation state, that results out of connecting the underlying sender with the receiver.
Expand All @@ -62,19 +114,21 @@ namespace exec::__system_context_default_impl {
bool __on_heap_;

/// Try to construct the operation in the preallocated memory if it fits, otherwise allocate a new operation.
static __operation* __construct_maybe_alloc(
void* __preallocated,
size_t __psize,
_Sender __sndr,
__exec_system_context_completion_callback_t __cb,
void* __data) {
if (__preallocated == nullptr || __psize < sizeof(__operation)) {
return new __operation(std::move(__sndr), __cb, __data, true);
static __operation*
__construct_maybe_alloc(storage __storage, receiver* __completion, _Sender __sndr) {
__storage = __ensure_alignment(__storage, alignof(__operation));
if (__storage.__data == nullptr || __storage.__size < sizeof(__operation)) {
return new __operation(std::move(__sndr), __completion, true);
} else {
return new (__preallocated) __operation(std::move(__sndr), __cb, __data, false);
return new (__storage.__data) __operation(std::move(__sndr), __completion, false);
}
}

//! Starts the operation that will schedule work on the system scheduler.
void start() noexcept {
stdexec::start(__inner_op_);
}

/// Destructs the operation; frees any allocated memory.
void __destruct() {
if (__on_heap_) {
Expand All @@ -85,40 +139,27 @@ namespace exec::__system_context_default_impl {
}

private:
__operation(
_Sender __sndr,
__exec_system_context_completion_callback_t __cb,
void* __data,
bool __on_heap)
: __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__cb, __data, this}))
__operation(_Sender __sndr, receiver* __completion, bool __on_heap)
: __inner_op_(stdexec::connect(std::move(__sndr), __recv<_Sender>{__completion, this}))
, __on_heap_(__on_heap) {
}
};

struct __system_scheduler_impl : __exec_system_scheduler_interface {
explicit __system_scheduler_impl(exec::static_thread_pool& __pool)
: __pool_scheduler_{__pool.get_scheduler()} {
__forward_progress_guarantee = 1; // parallel
__schedule_operation_size = sizeof(__schedule_operation_t),
__schedule_operation_alignment = alignof(__schedule_operation_t),
__schedule = __schedule_impl;
__destruct_schedule_operation = __destruct_schedule_operation_impl;
__bulk_schedule_operation_size = sizeof(__bulk_schedule_operation_t),
__bulk_schedule_operation_alignment = alignof(__bulk_schedule_operation_t),
__bulk_schedule = __bulk_schedule_impl;
__destruct_bulk_schedule_operation = __destruct_bulk_schedule_operation_impl;
struct __system_scheduler_impl : system_scheduler {
__system_scheduler_impl()
: __pool_scheduler_(__pool_.get_scheduler()) {
}

private:
/// Scheduler from the underlying thread pool.
/// The underlying thread pool.
exec::static_thread_pool __pool_;
__pool_scheduler_t __pool_scheduler_;

//! Functor called by the `bulk` operation; sends a `start` signal to the frontend.
struct __bulk_functor {
__exec_system_context_bulk_item_callback_t __cb_item_;
void* __data_;
bulk_item_receiver* __r_;

void operator()(unsigned long __idx) const noexcept {
__cb_item_(__data_, __idx);
__r_->start(static_cast<uint32_t>(__idx));
}
};

Expand All @@ -127,78 +168,36 @@ namespace exec::__system_context_default_impl {

using __bulk_schedule_operation_t = __operation<decltype(stdexec::bulk(
stdexec::schedule(std::declval<__pool_scheduler_t>()),
std::declval<unsigned long>(),
std::declval<uint32_t>(),
std::declval<__bulk_functor>()))>;

static void* __schedule_impl(
__exec_system_scheduler_interface* __self,
void* __preallocated,
uint32_t __psize,
__exec_system_context_completion_callback_t __cb,
void* __data) noexcept {

auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sndr = stdexec::schedule(__this->__pool_scheduler_);
auto __os = __schedule_operation_t::__construct_maybe_alloc(
__preallocated, __psize, std::move(__sndr), __cb, __data);
stdexec::start(__os->__inner_op_);
return __os;
}

static void __destruct_schedule_operation_impl(
__exec_system_scheduler_interface* /*__self*/,
void* __operation) noexcept {
auto __op = static_cast<__schedule_operation_t*>(__operation);
__op->__destruct();
}

static void* __bulk_schedule_impl(
__exec_system_scheduler_interface* __self,
void* __preallocated,
uint32_t __psize,
__exec_system_context_completion_callback_t __cb,
__exec_system_context_bulk_item_callback_t __cb_item,
void* __data,
unsigned long __size) noexcept {

auto __this = static_cast<__system_scheduler_impl*>(__self);
auto __sndr = stdexec::bulk(
stdexec::schedule(__this->__pool_scheduler_), __size, __bulk_functor{__cb_item, __data});
auto __os = __bulk_schedule_operation_t::__construct_maybe_alloc(
__preallocated, __psize, std::move(__sndr), __cb, __data);
stdexec::start(__os->__inner_op_);
return __os;
}

static void __destruct_bulk_schedule_operation_impl(
__exec_system_scheduler_interface* /*__self*/,
void* __operation) noexcept {
auto __op = static_cast<__bulk_schedule_operation_t*>(__operation);
__op->__destruct();
}
};

/// Default implementation of a system context, based on `static_thread_pool`
struct __system_context_impl : __exec_system_context_interface {
__system_context_impl() {
__version = 202402;
__get_scheduler = __get_scheduler_impl;
public:
void schedule(storage __storage, receiver* __r) noexcept override {
try {
auto __sndr = stdexec::schedule(__pool_scheduler_);
auto __os =
__schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
__os->start();
} catch (std::exception& __e) {
__r->set_error(std::current_exception());
}
}

private:
/// The underlying thread pool.
exec::static_thread_pool __pool_{};

/// The system scheduler implementation.
__system_scheduler_impl __scheduler_{__pool_};

static __exec_system_scheduler_interface*
__get_scheduler_impl(__exec_system_context_interface* __self) noexcept {
return &static_cast<__system_context_impl*>(__self)->__scheduler_;
void
bulk_schedule(uint32_t __size, storage __storage, bulk_item_receiver* __r) noexcept override {
try {
auto __sndr =
stdexec::bulk(stdexec::schedule(__pool_scheduler_), __size, __bulk_functor{__r});
auto __os =
__bulk_schedule_operation_t::__construct_maybe_alloc(__storage, __r, std::move(__sndr));
__os->start();
} catch (std::exception& __e) {
__r->set_error(std::current_exception());
}
}
};

/// Keeps track of the object implementing the system context interface.
/// Keeps track of the object implementing the system context interfaces.
struct __instance_holder {

/// Get the only instance of this class.
Expand All @@ -208,22 +207,41 @@ namespace exec::__system_context_default_impl {
}

/// Get the currently selected system context object.
__exec_system_context_interface* __get_current_instance() const noexcept {
system_scheduler* __get_current_instance() const noexcept {
return __current_instance_;
}

/// Allows changing the currently selected system context object; used for testing.
void __set_current_instance(__exec_system_context_interface* __instance) noexcept {
void __set_current_instance(system_scheduler* __instance) noexcept {
__current_instance_ = __instance;
}

private:
__instance_holder() {
static __system_context_impl __default_instance_;
static __system_scheduler_impl __default_instance_;
__current_instance_ = &__default_instance_;
}

__exec_system_context_interface* __current_instance_;
system_scheduler* __current_instance_;
};

struct __system_context_replaceability_impl : __system_context_replaceability {
//! Globally replaces the system scheduler backend.
//! This needs to be called within `main()` and before the system scheduler is accessed.
void __set_system_scheduler(system_scheduler* __backend) noexcept override {
__instance_holder::__singleton().__set_current_instance(__backend);
}
};

void* __default_query_system_context_interface(const __uuid& __id) noexcept {
if (__id == system_scheduler::__interface_identifier) {
return __instance_holder::__singleton().__get_current_instance();
} else if (__id == __system_context_replaceability::__interface_identifier) {
static __system_context_replaceability_impl __impl;
return &__impl;
}

return nullptr;
}

} // namespace exec::__system_context_default_impl
19 changes: 6 additions & 13 deletions include/exec/__detail/__system_context_default_impl_entry.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,12 @@ STDEXEC_PRAGMA_PUSH()
STDEXEC_PRAGMA_IGNORE_GNU("-Wattributes") // warning: inline function '[...]' declared weak

/// Gets the default system context implementation.
extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak))
__exec_system_context_interface*
__get_exec_system_context_impl() {
return exec::__system_context_default_impl::__instance_holder::__singleton()
.__get_current_instance();
}

/// Sets the default system context implementation.
extern "C" STDEXEC_SYSTEM_CONTEXT_INLINE STDEXEC_ATTRIBUTE((weak))
void
__set_exec_system_context_impl(__exec_system_context_interface* __instance) {
return exec::__system_context_default_impl::__instance_holder::__singleton()
.__set_current_instance(__instance);
extern
STDEXEC_SYSTEM_CONTEXT_INLINE
STDEXEC_ATTRIBUTE((weak))
void*
__query_system_context_interface(const __uuid& __id) noexcept {
return exec::__system_context_default_impl::__default_query_system_context_interface(__id);
}

STDEXEC_PRAGMA_POP()
Loading

0 comments on commit 6f47fbb

Please sign in to comment.