Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
shilangyu committed Nov 12, 2023
1 parent 42b6673 commit 10d0f1d
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 25 deletions.
19 changes: 19 additions & 0 deletions src/include/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,22 @@ struct OwnedSlice : public Slice<T> {
OwnedSlice(OwnedSlice&&) = delete;
OwnedSlice& operator=(OwnedSlice&&) = delete;
};

/// @brief Returns the amount of encoded bytes.
template <typename T>
inline auto encode_integer_little_endian(const T value, std::uint8_t* buffer)
-> std::size_t {
for (size_t i = 0; i < sizeof(T); i++) {
buffer[i] = (value >> (8 * i)) & 0xff;
}
return sizeof(T);
}

template <typename T>
inline auto decode_integer_little_endian(const std::uint8_t* buffer) -> T {
T value = 0;
for (size_t i = 0; i < sizeof(T); i++) {
value |= static_cast<T>(buffer[i]) << (8 * i);
}
return value;
}
20 changes: 9 additions & 11 deletions src/include/perfect_link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ class PerfectLink {
/// @brief The type used to store ID of a process.
using ProcessIdType = std::uint8_t;

/// @brief The type used to store ID of a message.
using MessageIdType = std::uint32_t;

static constexpr std::uint8_t MAX_MESSAGE_COUNT_IN_PACKET = 8;
static constexpr ProcessIdType MAX_PROCESSES = 128;

PerfectLink(const ProcessIdType id);

Expand Down Expand Up @@ -59,9 +63,6 @@ class PerfectLink {
auto send(const in_addr_t host, const in_port_t port, Data... datas) -> void;

private:
/// @brief The type used to store ID of a message.
using MessageIdType = std::uint32_t;

/// @brief The type used to store the size of data.
using MessageSizeType = std::uint16_t;

Expand Down Expand Up @@ -94,11 +95,11 @@ class PerfectLink {
};

/// @brief Bound socket file descriptor. None if no bind was performed.
std::optional<int> _sock_fd = std::nullopt;
std::optional<int> _sock_fd;
/// @brief Current sequence number of messages.
MessageIdType _seq_nr = 1;
/// @brief Map of sent messages that have not yet sent back an ACK.
std::unordered_map<MessageIdType, PendingMessage> _pending_for_ack = {};
std::unordered_map<MessageIdType, PendingMessage> _pending_for_ack;
std::mutex _pending_for_ack_mutex;
std::condition_variable _pending_for_ack_cv;
/// @brief A map of messages that have been delivered.
Expand Down Expand Up @@ -144,17 +145,14 @@ inline auto PerfectLink::_prepare_message(const MessageIdType seq_nr,
// message = [is_ack, ...seq_nr, ...process_id, ...[data_length, ...data]]
std::array<uint8_t, MAX_MESSAGE_SIZE> message;
message[0] = static_cast<uint8_t>(is_ack);
for (size_t i = 0; i < sizeof(MessageIdType); i++) {
message[i + 1] = (seq_nr >> (8 * i)) & 0xff;
}
encode_integer_little_endian(seq_nr, message.data() + 1);
message[1 + sizeof(MessageIdType)] = _id;
auto offset = 1 + sizeof(MessageIdType) + sizeof(ProcessIdType);

if constexpr (sizeof...(Data) > 0) {
for (const auto& [data, length] : {datas...}) {
for (size_t i = 0; i < sizeof(MessageSizeType); i++) {
message[offset++] = (length >> (8 * i)) & 0xff;
}
offset += encode_integer_little_endian(
static_cast<MessageSizeType>(length), message.data() + offset);
std::memcpy(message.data() + offset, data, length);
offset += length;
}
Expand Down
46 changes: 46 additions & 0 deletions src/include/uniform_reliable_broadcast.hpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#pragma once

#include <netinet/in.h>
#include <bitset>
#include <tuple>
#include <unordered_map>
#include <vector>
#include "best_effort_broadcast.hpp"
#include "perfect_link.hpp"
Expand Down Expand Up @@ -41,5 +43,49 @@ class UniformReliableBroadcast {
auto broadcast(Data... datas) -> void;

private:
/// @brief A broadcasted message is identified by its source process and a
/// message ID for that process.
using MessageIdType =
std::tuple<PerfectLink::ProcessIdType, PerfectLink::MessageIdType>;

/// @brief Hash function for `MessageIdType`.
struct hash_message_id_type {
size_t operator()(const MessageIdType& arg) const noexcept {
return std::get<0>(arg) ^ std::get<1>(arg);
}
};

BestEffortBroadcast _link;
std::unordered_map<MessageIdType,
std::bitset<PerfectLink::MAX_PROCESSES>,
hash_message_id_type>
_acknowledged;
};

// template <typename... Data, class, class>
// auto UniformReliableBroadcast::broadcast(Data... datas) -> void {}

/*
upon event <Init>
delivered = ∅
forward = ∅
ack[] = ∅
upon event <urb, Broadcast | m>
forward = forward ∪ {[self, m]}
trigger <beb, Broadcast | [Data, self, m]>
upon event <beb, Delivered | pi, [Data, pj, m]>
ack[m] = ack[m] ∪ {pi}
if [pj, m] not in forward
forward = forward ∪ {[pj, m]}
trigger <beb, Broadcast | [Data, pj, m]>
function candeliver(m)
return |ack[m]| > N/2
upon event ([pj, m] in forward) such that candeliver(m) and <m not in delivered>
delivered = delivered ∪ {m}
trigger <urb, Delivered | pj, m>
*/
10 changes: 4 additions & 6 deletions src/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
#include <mutex>
#include <thread>
#include <vector>
#include "best_effort_broadcast.hpp"
#include "common.hpp"
#include "parser.hpp"
#include "perfect_link.hpp"
#include "uniform_reliable_broadcast.hpp"

using SendType = std::uint32_t;
using Delivered = std::tuple<PerfectLink::ProcessIdType, SendType>;
Expand Down Expand Up @@ -156,11 +156,9 @@ int main(int argc, char** argv) {
std::array<uint8_t, pack * sizeof(SendType)> msg;
for (SendType n = pack; n <= m; n += 8) {
logger.set_sent_amount(n);
for (size_t j = 1; j <= pack; j++) {
for (size_t i = 0; i < sizeof(SendType); i++) {
msg[(j - 1) * sizeof(SendType) + i] =
((n - pack + j) >> (i * 8)) & 0xff;
}
for (std::uint8_t j = 1; j <= pack; j++) {
encode_integer_little_endian<SendType>(
n - pack + j, msg.data() + ((j - 1) * sizeof(SendType)));
}

link.broadcast(
Expand Down
13 changes: 5 additions & 8 deletions src/src/perfect_link.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,15 @@ inline auto PerfectLink::_decode_message(
std::vector<Slice<uint8_t>>& data_buffer)
-> std::tuple<bool, MessageIdType, ProcessIdType> {
bool is_ack = static_cast<bool>(message[0]);
MessageIdType seq_nr = 0;
for (size_t i = 0; i < sizeof(MessageIdType); i++) {
seq_nr |= static_cast<MessageIdType>(message[i + 1]) << (8 * i);
}
auto seq_nr = decode_integer_little_endian<MessageIdType>(message.data() + 1);
ProcessIdType process_id = message[1 + sizeof(MessageIdType)];

data_buffer.clear();
auto offset = 1 + sizeof(MessageIdType) + sizeof(ProcessIdType);
while (offset < message_size) {
size_t length = 0;
for (size_t i = 0; i < sizeof(MessageSizeType); i++) {
length |= static_cast<size_t>(message[offset++]) << (8 * i);
}
auto length = decode_integer_little_endian<size_t>(message.data() + offset);
offset += sizeof(length);

data_buffer.emplace_back(message.data() + offset, length);
offset += length;
}
Expand Down Expand Up @@ -103,6 +99,7 @@ auto PerfectLink::listen(ListenCallback callback) -> void {
}

if (message_size < 0 && errno == EAGAIN) {
// TODO: consider scoping resends to a single process
// timed out, resend messages without ACKs
std::lock_guard<std::mutex> guard(_pending_for_ack_mutex);
for (auto& [seq_nr, pending] : _pending_for_ack) {
Expand Down

0 comments on commit 10d0f1d

Please sign in to comment.