Skip to content

Commit

Permalink
updated wire.h to be the same as tateyama
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Jun 28, 2024
1 parent 50e7f83 commit 2580741
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 18 deletions.
52 changes: 35 additions & 17 deletions src/tateyama/transport/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -1004,23 +1004,34 @@ class connection_queue
index_queue(std::size_t size, boost::interprocess::managed_shared_memory::segment_manager* mgr) : queue_(mgr), capacity_(size) {
queue_.resize(capacity_);
}
void fill() {
void fill(std::uint8_t admin_slots) {
for (std::size_t i = 0; i < capacity_; i++) {
queue_.at(i) = i;
}
pushed_.store(capacity_);
pushed_.store(capacity_ - admin_slots);
}
void push(std::size_t len) {
void push(std::size_t sid, std::size_t admin_slots = 0) {
boost::interprocess::scoped_lock lock(mutex_);
queue_.at(index(pushed_.load())) = len;
queue_.at(index(pushed_.load() + admin_slots)) = sid;
pushed_.fetch_add(1);
std::atomic_thread_fence(std::memory_order_acq_rel);
condition_.notify_one();
}
[[nodiscard]] std::size_t try_pop() {
auto current = poped_.load();
while (true) {
if (pushed_.load() == current) {
if (pushed_.load() <= current) {
throw std::runtime_error("no request available");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
}
}
}
[[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) {
auto current = poped_.load();
while (true) {
if ((pushed_.load() + admin_slots) <= current) {
throw std::runtime_error("no request available");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
Expand Down Expand Up @@ -1129,8 +1140,9 @@ class connection_queue
/**
* @brief Construct a new object.
*/
connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr) : q_free_(n, mgr), q_requested_(n, mgr), v_requested_(n, mgr) {
q_free_.fill();
connection_queue(std::size_t n, boost::interprocess::managed_shared_memory::segment_manager* mgr, std::uint8_t as_n)
: q_free_(n + as_n, mgr), q_requested_(n + as_n, mgr), v_requested_(n + as_n, mgr), admin_slots_(as_n) {
q_free_.fill(as_n);
}
~connection_queue() = default;

Expand All @@ -1143,12 +1155,17 @@ class connection_queue
connection_queue& operator = (connection_queue&&) = delete;

std::size_t request() {
auto rid = q_free_.try_pop();
q_requested_.push(rid);
return rid;
auto sid = q_free_.try_pop();
q_requested_.push(sid);
return sid;
}
std::size_t request_admin() {
auto sid = q_free_.try_pop(admin_slots_);
q_requested_.push(sid);
return sid;
}
std::size_t wait(std::size_t rid, std::int64_t timeout = 0) {
auto& entry = v_requested_.at(rid);
std::size_t wait(std::size_t sid, std::int64_t timeout = 0) {
auto& entry = v_requested_.at(sid);
try {
auto rtnv = entry.wait(timeout);
entry.reuse();
Expand All @@ -1158,8 +1175,8 @@ class connection_queue
throw ex;
}
}
bool check(std::size_t rid) {
return v_requested_.at(rid).check();
bool check(std::size_t sid) {
return v_requested_.at(sid).check();
}
std::size_t listen() {
if (q_requested_.wait(terminate_)) {
Expand All @@ -1179,10 +1196,10 @@ class connection_queue
void reject(std::size_t sid) {
q_requested_.pop();
v_requested_.at(sid).reject();
q_free_.push(sid);
q_free_.push(sid, admin_slots_);
}
void disconnect(std::size_t rid) {
q_free_.push(rid);
void disconnect(std::size_t sid) {
q_free_.push(sid, admin_slots_);
}

// for terminate
Expand All @@ -1209,6 +1226,7 @@ class connection_queue
boost::interprocess::vector<element, element_allocator> v_requested_;

std::atomic_bool terminate_{false};
std::uint8_t admin_slots_;
boost::interprocess::interprocess_semaphore s_terminated_{0};

std::size_t session_id_{};
Expand Down
2 changes: 1 addition & 1 deletion test/ogawayama/stub/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ class connection_container
managed_shared_memory_ =
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), request_queue_size);
managed_shared_memory_->destroy<connection_queue>(connection_queue::name);
connection_queue_ = managed_shared_memory_->construct<connection_queue>(connection_queue::name)(n, managed_shared_memory_->get_segment_manager());
connection_queue_ = managed_shared_memory_->construct<connection_queue>(connection_queue::name)(n, managed_shared_memory_->get_segment_manager(), 1);
}
catch(const boost::interprocess::interprocess_exception& ex) {
std::abort(); // FIXME
Expand Down

0 comments on commit 2580741

Please sign in to comment.