Skip to content

Commit

Permalink
Feature/proposal request optimization v2 (#1971)
Browse files Browse the repository at this point in the history
* bloom filter

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* transport with BF support

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>

* txs diff complete

Signed-off-by: iceseer <iceseer@gmail.com>
Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com>
  • Loading branch information
iceseer committed Apr 8, 2022
1 parent 6f25e15 commit 9c60b40
Show file tree
Hide file tree
Showing 45 changed files with 984 additions and 249 deletions.
3 changes: 3 additions & 0 deletions .github/TESTS_ALLOWED_TO_FAIL
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
system_irohad_test
integration_add_peer_test
regression_regression_test
integration_remove_peer_test
2 changes: 1 addition & 1 deletion irohad/ametsuchi/impl/postgres_specific_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ namespace iroha {
for (const auto &row : range) {
iroha::ametsuchi::apply(
row,
[this, &peers](
[&peers](
auto &peer_key, auto &address, auto &tls_certificate) {
if (peer_key and address) {
peers.push_back(
Expand Down
8 changes: 3 additions & 5 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ auto createNotificationFactory(
std::make_unique<iroha::network::ClientFactoryImpl<
iroha::ordering::transport::OnDemandOsClientGrpcFactory::Service>>(
std::move(client_factory)),
[](iroha::ordering::ProposalEvent event) {
iroha::getSubscription()->notify(iroha::EventTypes::kOnProposalResponse,
std::move(event));
},
std::move(os_execution_keepers));
}

Expand Down Expand Up @@ -87,14 +83,16 @@ auto OnDemandOrderingInit::createGate(
size_t max_number_of_transactions,
const logger::LoggerManagerTreePtr &ordering_log_manager,
bool syncing_mode) {
return std::make_shared<OnDemandOrderingGate>(
auto og = std::make_shared<OnDemandOrderingGate>(
std::move(ordering_service),
std::move(network_client),
std::move(proposal_factory),
std::move(tx_cache),
max_number_of_transactions,
ordering_log_manager->getChild("Gate")->getLogger(),
syncing_mode);
og->initialize();
return og;
}

auto OnDemandOrderingInit::createService(
Expand Down
6 changes: 6 additions & 0 deletions irohad/main/subscription_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
namespace iroha {
enum SubscriptionEngineHandlers {
kYac = 0,
kRequestProposal,
kVoteProcess,
kProposalProcessing,
kMetrics,
kNotifications,
//---------------
Expand Down Expand Up @@ -38,10 +41,13 @@ namespace iroha {
kOnTxsEnoughForProposal,
kOnPackProposal,
kOnProposalResponse,
kOnProposalResponseFailed,
kOnTransactionResponse,
kOnConsensusGateEvent,
kSendBatchComplete,

kRemoteProposalDiff,

// RDB
kOnRdbStats,

Expand Down
7 changes: 7 additions & 0 deletions irohad/ordering/impl/batches_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/common.hpp"
#include "consensus/round.hpp"
#include "ordering/ordering_types.hpp"

namespace shared_model::interface {
class TransactionBatch;
Expand Down Expand Up @@ -187,9 +188,11 @@ namespace iroha::ordering {
size_t requested_tx_amount,
std::vector<std::shared_ptr<shared_model::interface::Transaction>>
&collection,
BloomFilter256 &bf,
IsProcessedFunc &&is_processed) {
collection.clear();
collection.reserve(requested_tx_amount);
bf.clear();

std::unique_lock lock(batches_cache_cs_);
uint32_t depth_counter = 0ul;
Expand All @@ -204,10 +207,14 @@ namespace iroha::ordering {
return false;
}

for (auto &tx : batch->transactions())
tx->storeBatchHash(batch->reducedHash());

collection.insert(std::end(collection),
std::begin(batch->transactions()),
std::end(batch->transactions()));

bf.set(batch->reducedHash());
used_batches_cache_.insert(batch);
return true;
});
Expand Down
19 changes: 10 additions & 9 deletions irohad/ordering/impl/on_demand_connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ void OnDemandConnectionManager::onBatchesToWholeNetwork(

void OnDemandConnectionManager::onRequestProposal(
consensus::Round round,
std::optional<std::shared_ptr<const shared_model::interface::Proposal>>
ref_proposal) {
std::optional<
std::pair<std::shared_ptr<shared_model::interface::Proposal const>,
BloomFilter256>> proposal) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (stop_requested_.load(std::memory_order_relaxed)) {
if (stop_requested_.load(std::memory_order_relaxed))
return;
}

log_->debug("onRequestProposal, {}", round);

if (auto &connection = connections_.peers[kIssuer]) {
(*connection)->onRequestProposal(round, std::move(ref_proposal));
}
assert(!proposal || proposal.value().first);
log_->debug("onRequestProposal, {} : {}",
round,
proposal ? proposal.value().first->toString() : "NULL_OPT");
if (auto &connection = connections_.peers[kIssuer])
(*connection)->onRequestProposal(round, std::move(proposal));
}

void OnDemandConnectionManager::initializeConnections(
Expand Down
7 changes: 4 additions & 3 deletions irohad/ordering/impl/on_demand_connection_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ namespace iroha {
void onBatches(CollectionType batches) override;
void onBatchesToWholeNetwork(CollectionType batches) override;
std::chrono::milliseconds getRequestDelay() const override;

void onRequestProposal(
consensus::Round round,
std::optional<
std::shared_ptr<const shared_model::interface::Proposal>>
ref_proposal) override;
std::optional<std::pair<
std::shared_ptr<shared_model::interface::Proposal const>,
BloomFilter256>> proposal) override;

/**
* Initialize corresponding peers in connections_ using factory_
Expand Down
26 changes: 26 additions & 0 deletions irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "interfaces/iroha_internal/transaction_batch_impl.hpp"
#include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
#include "logger/logger.hpp"
#include "main/subscription.hpp"
#include "ordering/impl/on_demand_common.hpp"
#include "validators/field_validator.hpp"

Expand All @@ -41,7 +42,32 @@ OnDemandOrderingGate::OnDemandOrderingGate(
tx_cache_(std::move(tx_cache)),
syncing_mode_(syncing_mode) {}

void OnDemandOrderingGate::initialize() {
failed_proposal_response_ =
SubscriberCreator<bool, ProposalEvent>::template create<
EventTypes::kOnProposalResponseFailed>(
SubscriptionEngineHandlers::kYac,
[_w_this{weak_from_this()}](auto, auto ev) {
if (auto _this = _w_this.lock()) {
std::shared_lock<std::shared_timed_mutex> stop_lock(
_this->stop_mutex_);
if (_this->stop_requested_) {
_this->log_->warn(
"Not doing anything because stop was requested.");
return;
}

if (!_this->syncing_mode_) {
assert(_this->network_client_);
_this->network_client_->onRequestProposal(ev.round,
std::nullopt);
}
}
});
}

OnDemandOrderingGate::~OnDemandOrderingGate() {
failed_proposal_response_->unsubscribe();
stop();
}

Expand Down
10 changes: 9 additions & 1 deletion irohad/ordering/impl/on_demand_ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

#include "network/ordering_gate.hpp"

#include <memory>
#include <shared_mutex>

#include "interfaces/common_objects/types.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/unsafe_proposal_factory.hpp"
#include "logger/logger_fwd.hpp"
#include "main/subscription.hpp"
#include "ordering/impl/on_demand_common.hpp"
#include "ordering/impl/round_switch.hpp"
#include "ordering/on_demand_ordering_service.hpp"
Expand All @@ -30,7 +32,9 @@ namespace iroha {
* Ordering gate which requests proposals from the ordering service
* votes for proposals, and passes committed proposals to the pipeline
*/
class OnDemandOrderingGate : public network::OrderingGate {
class OnDemandOrderingGate
: public network::OrderingGate,
public std::enable_shared_from_this<OnDemandOrderingGate> {
public:
OnDemandOrderingGate(
std::shared_ptr<OnDemandOrderingService> ordering_service,
Expand All @@ -44,6 +48,8 @@ namespace iroha {

~OnDemandOrderingGate() override;

void initialize();

void propagateBatch(
std::shared_ptr<shared_model::interface::TransactionBatch> batch)
override;
Expand Down Expand Up @@ -96,6 +102,8 @@ namespace iroha {
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache_;
consensus::Round current_round_;
std::shared_ptr<const LedgerState> current_ledger_state_;
std::shared_ptr<iroha::BaseSubscriber<bool, ProposalEvent>>
failed_proposal_response_;

std::shared_timed_mutex stop_mutex_;
bool stop_requested_{false};
Expand Down
Loading

0 comments on commit 9c60b40

Please sign in to comment.