Skip to content

Commit

Permalink
Preparations for kv-example (pass rsm to raft) (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
resetius authored Mar 3, 2024
1 parent b731456 commit 4b6fb8a
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 8 deletions.
15 changes: 12 additions & 3 deletions examples/kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,25 @@ struct TReadKv: public TCommandRequest {
char Data[0];
};

struct TResultValue: public TCommandResponse {
uint16_t ValSize;
char Data[0];
};

TMessageHolder<TMessage> TKv::Read(TMessageHolder<TCommandRequest> message) {
auto readKv = message.Cast<TReadKv>();
std::string_view k(readKv->Data, readKv->KeySize);
auto it = H.find(std::string(k));
if (it == H.end()) {
// TODO
auto res = NewHoldedMessage<TResultValue>(sizeof(TResultValue));
res->ValSize = -1;
return res;
} else {
// TODO
auto res = NewHoldedMessage<TResultValue>(sizeof(TResultValue)+it->second.size());
res->ValSize = it->second.size();
memcpy(res->Data, it->second.data(), res->ValSize);
return res;
}
return {};
}

void TKv::Write(TMessageHolder<TLogEntry> message) {
Expand Down
3 changes: 2 additions & 1 deletion server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ int main(int argc, char** argv) {
std::cerr << "Host not found\n"; return 1;
}

auto raft = std::make_shared<TRaft>(myHost.Id, nodes);
std::shared_ptr<IRsm> rsm = std::make_shared<TDummyRsm>();
auto raft = std::make_shared<TRaft>(rsm, myHost.Id, nodes);
TPoller::TSocket socket(NNet::TAddress{myHost.Address, myHost.Port}, loop.Poller());
socket.Bind();
socket.Listen();
Expand Down
22 changes: 20 additions & 2 deletions src/raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,23 @@ static uint32_t rand_(uint32_t* seed) {

} // namespace

TMessageHolder<TMessage> TDummyRsm::Read(TMessageHolder<TCommandRequest> message)
{
return {};
}

void TDummyRsm::Write(TMessageHolder<TLogEntry> message)
{ }

TMessageHolder<TLogEntry> TDummyRsm::Prepare(TMessageHolder<TCommandRequest> command, uint64_t term)
{
auto dataSize = command->Len - sizeof(TCommandRequest);
auto entry = NewHoldedMessage<TLogEntry>(sizeof(TLogEntry)+dataSize);
memcpy(entry->Data, command->Data, dataSize);
entry->Term = term;
return entry;
}

TVolatileState& TVolatileState::SetElectionDue(ITimeSource::Time due) {
ElectionDue = due;
return *this;
Expand Down Expand Up @@ -95,8 +112,9 @@ TVolatileState& TVolatileState::SetCommitIndex(int index)
return *this;
}

TRaft::TRaft(int node, const TNodeDict& nodes)
: Id(node)
TRaft::TRaft(std::shared_ptr<IRsm> rsm, int node, const TNodeDict& nodes)
: Rsm(rsm)
, Id(node)
, Nodes(nodes)
, MinVotes((nodes.size()+2+nodes.size()%2)/2)
, Npeers(nodes.size())
Expand Down
9 changes: 8 additions & 1 deletion src/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ struct IRsm {
virtual TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) = 0;
};

struct TDummyRsm: public IRsm {
TMessageHolder<TMessage> Read(TMessageHolder<TCommandRequest> message) override;
void Write(TMessageHolder<TLogEntry> message) override;
TMessageHolder<TLogEntry> Prepare(TMessageHolder<TCommandRequest> message, uint64_t term) override;
};

using TNodeDict = std::unordered_map<uint32_t, std::shared_ptr<INode>>;

struct TState {
Expand Down Expand Up @@ -80,7 +86,7 @@ enum class EState: int {

class TRaft {
public:
TRaft(int node, const TNodeDict& nodes);
TRaft(std::shared_ptr<IRsm> rsm, int node, const TNodeDict& nodes);

void Process(ITimeSource::Time now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo = {});
void ProcessTimeout(ITimeSource::Time now);
Expand Down Expand Up @@ -140,6 +146,7 @@ class TRaft {
void ProcessWaiting();
ITimeSource::Time MakeElection(ITimeSource::Time now);

std::shared_ptr<IRsm> Rsm;
uint32_t Id;
TNodeDict Nodes;
int MinVotes;
Expand Down
3 changes: 2 additions & 1 deletion test/test_raft.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ std::shared_ptr<TRaft> MakeRaft(
const OnSendFunc& sendFunc = {},
int count = 3)
{
std::shared_ptr<IRsm> rsm = std::make_shared<TDummyRsm>();
TNodeDict nodes;
for (int i = 2; i <= count; i++) {
nodes[i] = std::make_shared<TFakeNode>(sendFunc);
}
return std::make_shared<TRaft>(1, nodes);
return std::make_shared<TRaft>(std::move(rsm), 1, nodes);
}

template<typename T=TMessage>
Expand Down

0 comments on commit 4b6fb8a

Please sign in to comment.