diff --git a/src/tateyama/transport/wire.h b/src/tateyama/transport/wire.h index ea9ac14..feb23ac 100644 --- a/src/tateyama/transport/wire.h +++ b/src/tateyama/transport/wire.h @@ -1004,15 +1004,15 @@ class connection_queue index_queue(std::size_t size, boost::interprocess::managed_shared_memory::segment_manager* mgr) : queue_(mgr), capacity_(size) { queue_.resize(capacity_); } - void fill() { + void fill(std::uint8_t admin_slots) { for (std::size_t i = 0; i < capacity_; i++) { queue_.at(i) = i; } - pushed_.store(capacity_); + pushed_.store(capacity_ - admin_slots); } - void push(std::size_t len) { + void push(std::size_t sid, std::size_t admin_slots = 0) { boost::interprocess::scoped_lock lock(mutex_); - queue_.at(index(pushed_.load())) = len; + queue_.at(index(pushed_.load() + admin_slots)) = sid; pushed_.fetch_add(1); std::atomic_thread_fence(std::memory_order_acq_rel); condition_.notify_one(); @@ -1020,7 +1020,18 @@ class connection_queue [[nodiscard]] std::size_t try_pop() { auto current = poped_.load(); while (true) { - if (pushed_.load() == current) { + if (pushed_.load() <= current) { + throw std::runtime_error("no request available"); + } + if (poped_.compare_exchange_strong(current, current + 1)) { + return queue_.at(index(current)); + } + } + } + [[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) { + auto current = poped_.load(); + while (true) { + if ((pushed_.load() + admin_slots) <= current) { throw std::runtime_error("no request available"); } if (poped_.compare_exchange_strong(current, current + 1)) { @@ -1129,8 +1140,9 @@ class connection_queue /** * @brief Construct a new object. */ - connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr) : q_free_(n, mgr), q_requested_(n, mgr), v_requested_(n, mgr) { - q_free_.fill(); + connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr, std::uint8_t as_n) + : q_free_(n + as_n, mgr), q_requested_(n + as_n, mgr), v_requested_(n + as_n, mgr), admin_slots_(as_n) { + q_free_.fill(as_n); } ~connection_queue() = default; @@ -1143,12 +1155,17 @@ class connection_queue connection_queue& operator = (connection_queue&&) = delete; std::size_t request() { - auto rid = q_free_.try_pop(); - q_requested_.push(rid); - return rid; + auto sid = q_free_.try_pop(); + q_requested_.push(sid); + return sid; + } + std::size_t request_admin() { + auto sid = q_free_.try_pop(admin_slots_); + q_requested_.push(sid); + return sid; } - std::size_t wait(std::size_t rid, std::int64_t timeout = 0) { - auto& entry = v_requested_.at(rid); + std::size_t wait(std::size_t sid, std::int64_t timeout = 0) { + auto& entry = v_requested_.at(sid); try { auto rtnv = entry.wait(timeout); entry.reuse(); @@ -1158,8 +1175,8 @@ class connection_queue throw ex; } } - bool check(std::size_t rid) { - return v_requested_.at(rid).check(); + bool check(std::size_t sid) { + return v_requested_.at(sid).check(); } std::size_t listen() { if (q_requested_.wait(terminate_)) { @@ -1179,10 +1196,10 @@ class connection_queue void reject(std::size_t sid) { q_requested_.pop(); v_requested_.at(sid).reject(); - q_free_.push(sid); + q_free_.push(sid, admin_slots_); } - void disconnect(std::size_t rid) { - q_free_.push(rid); + void disconnect(std::size_t sid) { + q_free_.push(sid, admin_slots_); } // for terminate @@ -1209,6 +1226,7 @@ class connection_queue boost::interprocess::vector v_requested_; std::atomic_bool terminate_{false}; + std::uint8_t admin_slots_; boost::interprocess::interprocess_semaphore s_terminated_{0}; std::size_t session_id_{}; diff --git a/test/ogawayama/stub/server_wires_impl.h b/test/ogawayama/stub/server_wires_impl.h index 5d6a8cb..8327990 100644 --- a/test/ogawayama/stub/server_wires_impl.h +++ b/test/ogawayama/stub/server_wires_impl.h @@ -429,7 +429,7 @@ class connection_container managed_shared_memory_ = std::make_unique(boost::interprocess::create_only, name_.c_str(), request_queue_size); managed_shared_memory_->destroy(connection_queue::name); - connection_queue_ = managed_shared_memory_->construct(connection_queue::name)(n, managed_shared_memory_->get_segment_manager()); + connection_queue_ = managed_shared_memory_->construct(connection_queue::name)(n, managed_shared_memory_->get_segment_manager(), 1); } catch(const boost::interprocess::interprocess_exception& ex) { std::abort(); // FIXME