Skip to content

Commit

Permalink
Minor fixes (#2218)
Browse files Browse the repository at this point in the history
* no cache
no 4 Mb limit

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* proposal response parsing refactoring

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* configuration refactoring

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* yac stream

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* block fake peer test for MAC OS

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>
  • Loading branch information
iceseer committed Jun 28, 2022
1 parent 3b38c98 commit e584bf9
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 225 deletions.
1 change: 1 addition & 0 deletions .github/TESTS_ALLOWED_TO_FAIL
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
system_irohad_test
fake_peer_example_test
get_engine_receipts_test
integration_add_peer_test
integration_remove_peer_test
28 changes: 20 additions & 8 deletions irohad/consensus/yac/transport/impl/consensus_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,39 @@ ServiceImpl::ServiceImpl(logger::LoggerPtr log,
std::function<void(std::vector<VoteMessage>)> callback)
: callback_(std::move(callback)), log_(std::move(log)) {}

grpc::Status ServiceImpl::SendState(
grpc::Status ServiceImpl::HandleState(
::grpc::ServerContext *context,
const ::iroha::consensus::yac::proto::State *request,
::google::protobuf::Empty *response) {
::iroha::consensus::yac::proto::State &request) {
std::vector<VoteMessage> state;
for (const auto &pb_vote : request->votes()) {
if (auto vote = PbConverters::deserializeVote(pb_vote, log_)) {
for (const auto &pb_vote : request.votes())
if (auto vote = PbConverters::deserializeVote(pb_vote, log_))
state.push_back(*vote);
}
}

if (state.empty()) {
log_->info("Received an empty votes collection");
return grpc::Status::CANCELLED;
}

if (not sameKeys(state)) {
log_->info("Votes are statelessly invalid: proposal rounds are different");
return grpc::Status::CANCELLED;
}

log_->info("Received votes[size={}] from {}", state.size(), context->peer());

callback_(std::move(state));
return grpc::Status::OK;
}

grpc::Status ServiceImpl::SendState(
::grpc::ServerContext *context,
::grpc::ServerReader< ::iroha::consensus::yac::proto::State> *reader,
::google::protobuf::Empty *response) {
::iroha::consensus::yac::proto::State request;

grpc::Status status = grpc::Status::OK;
while (status.ok() && reader->Read(&request)) {
status = HandleState(context, request);
}

return status;
}
14 changes: 10 additions & 4 deletions irohad/consensus/yac/transport/impl/consensus_service_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,19 @@ namespace iroha::consensus::yac {
* Naming is confusing, because this is rpc call that
* perform on another machine;
*/
grpc::Status SendState(::grpc::ServerContext *context,
const ::iroha::consensus::yac::proto::State *request,
::google::protobuf::Empty *response) override;
grpc::Status SendState(
::grpc::ServerContext *context,
::grpc::ServerReader< ::iroha::consensus::yac::proto::State> *reader,
::google::protobuf::Empty *response) override;

/**
* Handles state;
*/
grpc::Status HandleState(::grpc::ServerContext *context,
::iroha::consensus::yac::proto::State &request);

private:
std::function<void(std::vector<VoteMessage>)> callback_;

logger::LoggerPtr log_;
};
} // namespace iroha::consensus::yac
Expand Down
75 changes: 54 additions & 21 deletions irohad/consensus/yac/transport/impl/network_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,78 @@ void NetworkImpl::sendState(const shared_model::interface::Peer &to,
*pb_vote = PbConverters::serializeVote(vote);
}

auto maybe_client = client_factory_->createClient(to);
if (expected::hasError(maybe_client)) {
log_->error(
"Could not send state to {}: {}", to, maybe_client.assumeError());
auto stream_writer = stubs_.exclusiveAccess(
[&](auto &stubs) -> std::shared_ptr<::grpc::ClientWriterInterface<
::iroha::consensus::yac::proto::State>> {
auto const it = stubs.find(to.pubkey());
if (it == stubs.end() || std::get<0>(it->second) != to.address()) {
if (it != stubs.end()) {
// clear all
std::get<3>(it->second)->WritesDone();
stubs.erase(to.pubkey());
}

auto maybe_client = client_factory_->createClient(to);
if (expected::hasError(maybe_client)) {
log_->error("Could not send state to {}: {}",
to,
maybe_client.assumeError());
return nullptr;
}

std::unique_ptr<proto::Yac::StubInterface> client =
std::move(maybe_client).assumeValue();

auto context = std::make_unique<grpc::ClientContext>();
context->set_wait_for_ready(true);
context->set_deadline(std::chrono::system_clock::now()
+ std::chrono::seconds(5));

auto response = std::make_unique<::google::protobuf::Empty>();
std::shared_ptr<::grpc::ClientWriterInterface<
::iroha::consensus::yac::proto::State>>
writer = client->SendState(context.get(), response.get());

stubs[to.pubkey()] = std::make_tuple(std::string{to.address()},
std::move(client),
std::move(context),
writer,
std::move(response));
return writer;
}

return std::get<3>(it->second);
});

if (!stream_writer)
return;
}
std::shared_ptr<decltype(maybe_client)::ValueInnerType::element_type> client =
std::move(maybe_client).assumeValue();

log_->debug("Propagating votes for {}, size={} to {}",
state.front().hash.vote_round,
state.size(),
to);
getSubscription()->dispatcher()->add(
getSubscription()->dispatcher()->kExecuteInPool,
[request(std::move(request)),
client(std::move(client)),
[peer{to.pubkey()},
request(std::move(request)),
wstream_writer(utils::make_weak(stream_writer)),
log(utils::make_weak(log_)),
log_sending_msg(fmt::format("Send votes bundle[size={}] for {} to {}",
state.size(),
state.front().hash.vote_round,
to))] {
auto maybe_log = log.lock();
if (not maybe_log) {
auto stream_writer = wstream_writer.lock();

if (!maybe_log || !stream_writer) {
return;
}
grpc::ClientContext context;
context.set_wait_for_ready(true);
context.set_deadline(std::chrono::system_clock::now()
+ std::chrono::seconds(5));
google::protobuf::Empty response;

maybe_log->info(log_sending_msg);
auto status = client->SendState(&context, request, &response);
if (not status.ok()) {
maybe_log->warn(
"RPC failed: {} {}", context.peer(), status.error_message());
if (!stream_writer->Write(request)) {
maybe_log->warn("RPC failed: {}", peer);
return;
} else {
maybe_log->info("RPC succeeded: {}", context.peer());
}
maybe_log->info("RPC succeeded: {}", peer);
});
}
14 changes: 14 additions & 0 deletions irohad/consensus/yac/transport/impl/network_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
#include <memory>
#include <mutex>

#include "common/common.hpp"
#include "consensus/yac/vote_message.hpp"
#include "interfaces/common_objects/peer.hpp"
#include "logger/logger_fwd.hpp"
#include "network/impl/client_factory.hpp"

Expand Down Expand Up @@ -40,6 +42,18 @@ namespace iroha::consensus::yac {
* Yac stub creator
*/
std::unique_ptr<ClientFactory> client_factory_;
google::protobuf::Empty response_;

using StubData = std::tuple<shared_model::interface::types::AddressType,
std::unique_ptr<proto::Yac::StubInterface>,
std::unique_ptr<grpc::ClientContext>,
std::shared_ptr<::grpc::ClientWriterInterface<
::iroha::consensus::yac::proto::State>>,
std::unique_ptr<::google::protobuf::Empty>>;

utils::ReadWriteObject<std::unordered_map<std::string, StubData>,
std::mutex>
stubs_;

std::mutex stop_mutex_;
bool stop_requested_{false};
Expand Down
9 changes: 2 additions & 7 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ static constexpr iroha::consensus::yac::ConsistencyModel

static constexpr uint32_t kStaleStreamMaxRoundsDefault = 2;
static constexpr uint32_t kMstExpirationTimeDefault = 1440;
static constexpr uint32_t kMaxRoundsDelayDefault = 3000;
static constexpr uint32_t kProposalDelayMultiplier = 2;

/**
* Configuring iroha daemon
Expand Down Expand Up @@ -745,9 +743,7 @@ Irohad::RunResult Irohad::initOrderingGate() {
ordering_gate = ordering_init->initOrderingGate(
config_.max_proposal_size,
config_.getMaxpProposalPack(),
std::chrono::milliseconds(
config_.proposal_creation_timeout.value_or(kMaxRoundsDelayDefault)
* kProposalDelayMultiplier),
std::chrono::milliseconds(config_.getProposalDelay()),
transaction_factory,
batch_parser,
transaction_batch_factory_,
Expand All @@ -756,8 +752,7 @@ Irohad::RunResult Irohad::initOrderingGate() {
persistent_cache,
log_manager_->getChild("Ordering"),
inter_peer_client_factory_,
std::chrono::milliseconds(
config_.proposal_creation_timeout.value_or(kMaxRoundsDelayDefault)),
std::chrono::milliseconds(config_.getProposalCreationTimeout()),
config_.syncing_mode);
log_->info("[Init] => init ordering gate - [{}]",
logger::boolRepr(bool(ordering_gate)));
Expand Down
8 changes: 8 additions & 0 deletions irohad/main/iroha_conf_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,3 +788,11 @@ iroha::expected::Result<IrohadConfig, std::string> parse_iroha_config(
return e.what();
};
}

uint32_t IrohadConfig::getProposalDelay() const {
return getProposalCreationTimeout() * 2ul;
}

uint32_t IrohadConfig::getProposalCreationTimeout() const {
return proposal_creation_timeout.value_or(3000ul);
}
3 changes: 3 additions & 0 deletions irohad/main/iroha_conf_loader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ struct IrohadConfig {
std::optional<shared_model::interface::types::PeerList> initial_peers;
boost::optional<UtilityService> utility_service;

// getters
uint32_t getMaxpProposalPack() const;
uint32_t getProposalDelay() const;
uint32_t getProposalCreationTimeout() const;

// This is a part of cryto providers feature:
// https://github.com/MBoldyrev/iroha/tree/feature/hsm-utimaco.
Expand Down
3 changes: 3 additions & 0 deletions irohad/main/server_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <chrono>

#include "logger/logger.hpp"
#include "network/channel_constants.hpp"
#include "network/impl/tls_credentials.hpp"

using namespace iroha::network;
Expand Down Expand Up @@ -61,6 +62,8 @@ iroha::expected::Result<int, std::string> ServerRunner::run() {

builder.AddChannelArgument(GRPC_ARG_ALLOW_REUSEPORT, reuse_ ? 1 : 0);
builder.AddListeningPort(server_address_, credentials_, &selected_port);
builder.SetMaxReceiveMessageSize(kMaxMessageSize);
builder.SetMaxSendMessageSize(kMaxMessageSize);

for (auto &service : services_) {
builder.RegisterService(service.get());
Expand Down
16 changes: 16 additions & 0 deletions irohad/network/channel_constants.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_CHANNEL_CONSTANTS_HPP
#define IROHA_CHANNEL_CONSTANTS_HPP

namespace iroha::network {

/// Determines maximum packet size can be sent via grpc
static constexpr int kMaxMessageSize = 128 * 1024 * 1024;

} // namespace iroha::network

#endif // IROHA_CHANNEL_CONSTANTS_HPP
5 changes: 5 additions & 0 deletions irohad/network/impl/channel_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
#include <fmt/core.h>
#include <boost/algorithm/string/join.hpp>
#include <boost/range/adaptor/transformed.hpp>

#include "common/bind.hpp"
#include "interfaces/common_objects/peer.hpp"
#include "network/channel_constants.hpp"

using namespace iroha::expected;
using namespace iroha::network;
Expand Down Expand Up @@ -106,6 +108,9 @@ class ChannelFactory::ChannelArgumentsProvider {
args_ = detail::makeInterPeerChannelArguments(service_names_,
*maybe_params_.value());
}

args_.SetMaxSendMessageSize(kMaxMessageSize);
args_.SetMaxReceiveMessageSize(kMaxMessageSize);
return args_;
}

Expand Down
2 changes: 0 additions & 2 deletions irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ void OnDemandOrderingGate::processRoundSwitch(RoundSwitch const &event) {
forLocalOS(&OnDemandOrderingService::onCollaborationOutcome,
event.next_round);

this->sendCachedTransactions();

if (!syncing_mode_) {
assert(ordering_service_);
assert(network_client_);
Expand Down
Loading

0 comments on commit e584bf9

Please sign in to comment.