Skip to content

Commit

Permalink
update wire.h to be the same as tsubakuro
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Jul 5, 2024
1 parent 8aa0e18 commit 9701edd
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions src/tateyama/transport/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,15 +383,18 @@ class unidirectional_message_wire : public simple_wire<message_header> {
*/
message_header peep(const char* base) {
while (true) {
bool termination_requested = termination_requested_.load();
bool onetime_notification = onetime_notification_.load();
std::atomic_thread_fence(std::memory_order_acq_rel);
if(stored() >= message_header::size) {
copy_header(base);
return header_received_;
}
if (termination_requested_.load()) {
if (termination_requested) {
termination_requested_.store(false);
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
if (onetime_notification) {
throw std::runtime_error("received shutdown request from outside the communication partner");
}
boost::interprocess::scoped_lock lock(m_mutex_);
Expand Down Expand Up @@ -473,10 +476,12 @@ class unidirectional_response_wire : public simple_wire<response_header> {
}

while (true) {
bool closed_shutdown = closed_.load() || shutdown_.load();
std::atomic_thread_fence(std::memory_order_acq_rel);
if(stored() >= response_header::size) {
break;
}
if (closed_.load() || shutdown_.load()) {
if (closed_shutdown) {
header_received_ = response_header(0, 0, 0);
return header_received_;
}
Expand Down Expand Up @@ -841,13 +846,15 @@ class unidirectional_simple_wires {
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))),
#endif
[this, &active_wire](){
bool eor = is_eor();
std::atomic_thread_fence(std::memory_order_acq_rel);
for (auto&& wire: unidirectional_simple_wires_) {
if (wire.has_record()) {
active_wire = &wire;
return true;
}
}
return is_eor();
return eor;
})) {
wait_for_record_ = false;
throw std::runtime_error("record has not been received within the specified time");
Expand Down Expand Up @@ -896,10 +903,7 @@ class unidirectional_simple_wires {
* used by client
*/
[[nodiscard]] bool is_eor() const {
if (!eor_) {
return false;
}
return std::all_of(unidirectional_simple_wires_.begin(), unidirectional_simple_wires_.end(), [](const unidirectional_simple_wire& wire) { return !wire.has_record(); });
return eor_;
}

private:
Expand Down

0 comments on commit 9701edd

Please sign in to comment.