Skip to content

Commit

Permalink
Add fifo ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
shilangyu committed Nov 26, 2023
1 parent 910f94b commit d2d4618
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 16 deletions.
14 changes: 11 additions & 3 deletions src/include/uniform_reliable_broadcast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class UniformReliableBroadcast {
const PerfectLink::ProcessIdType id,
const BestEffortBroadcast::AvailableProcesses processes);

using ListenCallback =
std::function<auto(PerfectLink::ProcessIdType process_id,
PerfectLink::MessageIdType seq_nr,
OwnedSlice<std::uint8_t>& data)
->void>;

/// @brief Binds this broadcast link to a host and port. Once done cannot be
/// done again.
auto bind(const in_addr_t host, const in_port_t port) -> void;
Expand All @@ -32,7 +38,7 @@ class UniformReliableBroadcast {
/// messages. Receives ACKs and resends messages with missing ACKs. Thread
/// safe.
/// @param callback Function that will be called when a message is delivered.
auto listen(PerfectLink::ListenCallback callback) -> void;
auto listen(ListenCallback callback) -> void;

/// @brief Broadcasts a message to all processes. The data has to be smaller
/// than about 64KiB. Sending is possible only after performing a bind. At
Expand All @@ -47,6 +53,8 @@ class UniformReliableBroadcast {
/// @brief Id of this process.
inline auto id() const -> PerfectLink::ProcessIdType { return _link.id(); }

static constexpr PerfectLink::MessageIdType INITIAL_SEQ_NR = 1;

private:
/// @brief Amount of in-flight broadcast messages of this process.
static constexpr std::size_t MAX_IN_FLIGHT = 1;
Expand All @@ -70,7 +78,7 @@ class UniformReliableBroadcast {
std::mutex _acknowledged_mutex;

/// @brief Current sequence number of messages.
PerfectLink::MessageIdType _seq_nr = 1;
PerfectLink::MessageIdType _seq_nr = INITIAL_SEQ_NR;

Semaphore _send_semaphore{MAX_IN_FLIGHT};
};
Expand All @@ -97,7 +105,7 @@ auto UniformReliableBroadcast::broadcast(Data... datas) -> void {
std::lock_guard lock(_acknowledged_mutex);
// add map entry to indicate this message is pending
_acknowledged.try_emplace(message_id);
_seq_nr += 1;
_seq_nr += static_cast<PerfectLink::MessageIdType>(sizeof...(Data));
}

_link.broadcast(
Expand Down
88 changes: 79 additions & 9 deletions src/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <csignal>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <thread>
#include <vector>
Expand Down Expand Up @@ -116,6 +117,81 @@ static auto map_hosts(std::vector<Parser::Host> hosts)
return result;
}

/// Filters delivers to make sure they are FIFO. Specialized for the data we use
/// to save needless allocations.
struct FifoBroadcast {
FifoBroadcast(const PerfectLink::ProcessIdType id,
const BestEffortBroadcast::AvailableProcesses processes)
: _link(id, processes) {}

using ListenCallback = std::function<
auto(PerfectLink::ProcessIdType process_id, SendType msg)->void>;

auto bind(const in_addr_t host, const in_port_t port) -> void {
_link.bind(host, port);
}

template <typename... Data,
class = std::enable_if_t<
are_equal<PerfectLink::MessageData, Data...>::value>,
class = std::enable_if_t<
(sizeof...(Data) <= PerfectLink::MAX_MESSAGE_COUNT_IN_PACKET)>>
auto broadcast(Data... datas) -> void {
_link.broadcast(datas...);
}

/// @brief NOT thread safe.
auto listen(ListenCallback callback) -> void {
_link.listen([&](auto process_id, auto seq_nr, auto& data) {
SendType msg = 0;
for (size_t i = 0; i < sizeof(SendType); i++) {
msg |= static_cast<SendType>(data[i]) << (i * 8);
}

auto& buffer = _buffered[process_id - 1];

if (buffer.next_seq_nr == seq_nr) {
callback(process_id, msg);
buffer.next_seq_nr += 1;
// deliver all next messages
for (; !buffer.buffer.empty(); buffer.buffer.pop()) {
auto [top_seq_nr, top_msg] = buffer.buffer.top();
if (top_seq_nr != buffer.next_seq_nr) {
break;
}
callback(process_id, top_msg);
buffer.next_seq_nr += 1;
}
} else {
buffer.buffer.emplace(seq_nr, msg);
}
});
}

private:
struct BufferedMessages {
struct BufferedMessage {
BufferedMessage(const PerfectLink::MessageIdType seq_nr,
const SendType msg)
: seq_nr(seq_nr), msg(msg) {}
PerfectLink::MessageIdType seq_nr;
SendType msg;

friend auto operator<(BufferedMessage const& left,
BufferedMessage const& right) -> bool {
return left.seq_nr > right.seq_nr;
}
};

// min heap
std::priority_queue<BufferedMessage> buffer;
PerfectLink::MessageIdType next_seq_nr =
UniformReliableBroadcast::INITIAL_SEQ_NR;
};
UniformReliableBroadcast _link;
std::array<BufferedMessages, PerfectLink::MAX_PROCESSES> _buffered;
};

int main(int argc, char** argv) {
perror_check<sig_t>([] { return std::signal(SIGTERM, stop); },
[](auto res) { return res == SIG_ERR; },
Expand All @@ -132,7 +208,7 @@ int main(int argc, char** argv) {
logger.open(parser.outputPath());

// create broadcast link and bind
UniformReliableBroadcast link{parser.id(), map_hosts(parser.hosts())};
FifoBroadcast link{parser.id(), map_hosts(parser.hosts())};
if (auto myHost = parser.hostById(parser.id()); myHost.has_value()) {
link.bind(myHost.value().ip, myHost.value().port);
} else {
Expand All @@ -144,14 +220,8 @@ int main(int argc, char** argv) {

// listen for deliveries
auto listen_handle = std::thread([&] {
link.listen([](auto process_id, auto& data) {
SendType msg = 0;
for (size_t i = 0; i < sizeof(SendType); i++) {
msg |= static_cast<SendType>(data[i]) << (i * 8);
}

logger.deliver(process_id, msg);
});
link.listen(
[](auto process_id, auto msg) { logger.deliver(process_id, msg); });
});

// pack 8 datas in one message
Expand Down
11 changes: 7 additions & 4 deletions src/src/uniform_reliable_broadcast.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ auto UniformReliableBroadcast::bind(const in_addr_t host, const in_port_t port)
_link.bind(host, port);
}

auto UniformReliableBroadcast::listen(PerfectLink::ListenCallback callback)
-> void {
auto UniformReliableBroadcast::listen(ListenCallback callback) -> void {
_link.listen_batch([&](auto process_id, auto& metadata, auto& datas) {
MessageIdType message_id = 0;
for (size_t i = 0; i < sizeof(MessageIdType); i++) {
Expand All @@ -38,19 +37,23 @@ auto UniformReliableBroadcast::listen(PerfectLink::ListenCallback callback)
_acknowledged_mutex.unlock();

if (should_deliver) {
// extract original process author id and deliver the batch
// extract original process author id and seq_nr
PerfectLink::ProcessIdType author_id =
static_cast<PerfectLink::ProcessIdType>(
message_id &
static_cast<MessageIdType>(
std::numeric_limits<PerfectLink::ProcessIdType>::max()));
PerfectLink::MessageIdType seq_nr =
static_cast<PerfectLink::MessageIdType>(
(message_id >> (8 * sizeof(PerfectLink::ProcessIdType))));
// if we are delivering our own broadcast, inform semaphore
if (author_id == id()) {
_send_semaphore.release();
}
for (auto& data : datas) {
OwnedSlice owned = data;
callback(author_id, owned);
callback(author_id, seq_nr, owned);
seq_nr += 1;
}
}

Expand Down

0 comments on commit d2d4618

Please sign in to comment.