From 9c60b40065f324a1593ee579588c940d9daaecb2 Mon Sep 17 00:00:00 2001 From: Alexander Lednev <57529355+iceseer@users.noreply.github.com> Date: Fri, 8 Apr 2022 10:21:40 +0300 Subject: [PATCH] Feature/proposal request optimization v2 (#1971) * bloom filter Signed-off-by: iceseer Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com> * transport with BF support Signed-off-by: iceseer Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com> * txs diff complete Signed-off-by: iceseer Signed-off-by: Alexander Lednev <57529355+iceseer@users.noreply.github.com> --- .github/TESTS_ALLOWED_TO_FAIL | 3 + .../impl/postgres_specific_query_executor.cpp | 2 +- irohad/main/impl/on_demand_ordering_init.cpp | 8 +- irohad/main/subscription_fwd.hpp | 6 + irohad/ordering/impl/batches_cache.hpp | 7 + .../impl/on_demand_connection_manager.cpp | 19 +- .../impl/on_demand_connection_manager.hpp | 7 +- .../ordering/impl/on_demand_ordering_gate.cpp | 26 +++ .../ordering/impl/on_demand_ordering_gate.hpp | 10 +- .../impl/on_demand_ordering_service_impl.cpp | 128 +++++++++-- .../impl/on_demand_ordering_service_impl.hpp | 18 +- .../impl/on_demand_os_client_grpc.cpp | 85 +++++--- .../impl/on_demand_os_client_grpc.hpp | 10 +- .../impl/on_demand_os_server_grpc.cpp | 58 +++-- .../ordering/on_demand_ordering_service.hpp | 13 +- irohad/ordering/on_demand_os_transport.hpp | 8 +- irohad/ordering/ordering_types.hpp | 34 +++ libs/crypto/bloom.hpp | 112 ++++++++++ schema/ordering.proto | 13 +- .../backend/protobuf/impl/proposal.cpp | 4 + .../backend/protobuf/impl/transaction.cpp | 10 + shared_model/backend/protobuf/proposal.hpp | 2 + .../protobuf/proto_proposal_factory.hpp | 10 +- shared_model/backend/protobuf/transaction.hpp | 6 + .../interfaces/iroha_internal/proposal.hpp | 1 + .../iroha_internal/transaction_batch_impl.cpp | 2 + shared_model/interfaces/transaction.hpp | 3 + shared_model/validators/default_validator.hpp | 10 +- .../network/on_demand_os_network_notifier.cpp | 13 +- .../network/on_demand_os_network_notifier.hpp | 4 +- .../integration_test_framework.cpp | 3 +- .../integration_test_framework.hpp | 2 + .../acceptance/fake_peer_example_test.cpp | 62 ------ .../acceptance/fake_peer_fixture.hpp | 3 + test/module/irohad/ordering/CMakeLists.txt | 1 + .../mock_on_demand_os_notification.hpp | 11 +- .../on_demand_connection_manager_test.cpp | 13 +- .../ordering/on_demand_ordering_gate_test.cpp | 35 ++- .../on_demand_os_client_grpc_test.cpp | 33 ++- .../on_demand_os_server_grpc_test.cpp | 204 ++++++++++++++---- .../irohad/ordering/on_demand_os_test.cpp | 10 +- .../module/irohad/ordering/ordering_mocks.hpp | 7 +- test/module/libs/crypto/CMakeLists.txt | 7 + test/module/libs/crypto/bloom_filter_test.cpp | 201 +++++++++++++++++ test/module/shared_model/interface_mocks.hpp | 9 + 45 files changed, 984 insertions(+), 249 deletions(-) create mode 100644 irohad/ordering/ordering_types.hpp create mode 100644 libs/crypto/bloom.hpp create mode 100644 test/module/libs/crypto/bloom_filter_test.cpp diff --git a/.github/TESTS_ALLOWED_TO_FAIL b/.github/TESTS_ALLOWED_TO_FAIL index ef466b3fc03..a67e6a18cb4 100644 --- a/.github/TESTS_ALLOWED_TO_FAIL +++ b/.github/TESTS_ALLOWED_TO_FAIL @@ -1 +1,4 @@ system_irohad_test +integration_add_peer_test +regression_regression_test +integration_remove_peer_test diff --git a/irohad/ametsuchi/impl/postgres_specific_query_executor.cpp b/irohad/ametsuchi/impl/postgres_specific_query_executor.cpp index 87105ddc92c..7e9dd3baab8 100644 --- a/irohad/ametsuchi/impl/postgres_specific_query_executor.cpp +++ b/irohad/ametsuchi/impl/postgres_specific_query_executor.cpp @@ -1498,7 +1498,7 @@ namespace iroha { for (const auto &row : range) { iroha::ametsuchi::apply( row, - [this, &peers]( + [&peers]( auto &peer_key, auto &address, auto &tls_certificate) { if (peer_key and address) { peers.push_back( diff --git a/irohad/main/impl/on_demand_ordering_init.cpp b/irohad/main/impl/on_demand_ordering_init.cpp index 0a4aa4e71a5..4637c9fad0f 100644 --- a/irohad/main/impl/on_demand_ordering_init.cpp +++ b/irohad/main/impl/on_demand_ordering_init.cpp @@ -56,10 +56,6 @@ auto createNotificationFactory( std::make_unique>( std::move(client_factory)), - [](iroha::ordering::ProposalEvent event) { - iroha::getSubscription()->notify(iroha::EventTypes::kOnProposalResponse, - std::move(event)); - }, std::move(os_execution_keepers)); } @@ -87,7 +83,7 @@ auto OnDemandOrderingInit::createGate( size_t max_number_of_transactions, const logger::LoggerManagerTreePtr &ordering_log_manager, bool syncing_mode) { - return std::make_shared( + auto og = std::make_shared( std::move(ordering_service), std::move(network_client), std::move(proposal_factory), @@ -95,6 +91,8 @@ auto OnDemandOrderingInit::createGate( max_number_of_transactions, ordering_log_manager->getChild("Gate")->getLogger(), syncing_mode); + og->initialize(); + return og; } auto OnDemandOrderingInit::createService( diff --git a/irohad/main/subscription_fwd.hpp b/irohad/main/subscription_fwd.hpp index 883a2f58c2f..f699d72fd81 100644 --- a/irohad/main/subscription_fwd.hpp +++ b/irohad/main/subscription_fwd.hpp @@ -11,6 +11,9 @@ namespace iroha { enum SubscriptionEngineHandlers { kYac = 0, + kRequestProposal, + kVoteProcess, + kProposalProcessing, kMetrics, kNotifications, //--------------- @@ -38,10 +41,13 @@ namespace iroha { kOnTxsEnoughForProposal, kOnPackProposal, kOnProposalResponse, + kOnProposalResponseFailed, kOnTransactionResponse, kOnConsensusGateEvent, kSendBatchComplete, + kRemoteProposalDiff, + // RDB kOnRdbStats, diff --git a/irohad/ordering/impl/batches_cache.hpp b/irohad/ordering/impl/batches_cache.hpp index d74340dbef0..4af024b3f1b 100644 --- a/irohad/ordering/impl/batches_cache.hpp +++ b/irohad/ordering/impl/batches_cache.hpp @@ -18,6 +18,7 @@ #include "common/common.hpp" #include "consensus/round.hpp" +#include "ordering/ordering_types.hpp" namespace shared_model::interface { class TransactionBatch; @@ -187,9 +188,11 @@ namespace iroha::ordering { size_t requested_tx_amount, std::vector> &collection, + BloomFilter256 &bf, IsProcessedFunc &&is_processed) { collection.clear(); collection.reserve(requested_tx_amount); + bf.clear(); std::unique_lock lock(batches_cache_cs_); uint32_t depth_counter = 0ul; @@ -204,10 +207,14 @@ namespace iroha::ordering { return false; } + for (auto &tx : batch->transactions()) + tx->storeBatchHash(batch->reducedHash()); + collection.insert(std::end(collection), std::begin(batch->transactions()), std::end(batch->transactions())); + bf.set(batch->reducedHash()); used_batches_cache_.insert(batch); return true; }); diff --git a/irohad/ordering/impl/on_demand_connection_manager.cpp b/irohad/ordering/impl/on_demand_connection_manager.cpp index db23b241845..45e202a722f 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.cpp +++ b/irohad/ordering/impl/on_demand_connection_manager.cpp @@ -77,18 +77,19 @@ void OnDemandConnectionManager::onBatchesToWholeNetwork( void OnDemandConnectionManager::onRequestProposal( consensus::Round round, - std::optional> - ref_proposal) { + std::optional< + std::pair, + BloomFilter256>> proposal) { std::shared_lock lock(mutex_); - if (stop_requested_.load(std::memory_order_relaxed)) { + if (stop_requested_.load(std::memory_order_relaxed)) return; - } - log_->debug("onRequestProposal, {}", round); - - if (auto &connection = connections_.peers[kIssuer]) { - (*connection)->onRequestProposal(round, std::move(ref_proposal)); - } + assert(!proposal || proposal.value().first); + log_->debug("onRequestProposal, {} : {}", + round, + proposal ? proposal.value().first->toString() : "NULL_OPT"); + if (auto &connection = connections_.peers[kIssuer]) + (*connection)->onRequestProposal(round, std::move(proposal)); } void OnDemandConnectionManager::initializeConnections( diff --git a/irohad/ordering/impl/on_demand_connection_manager.hpp b/irohad/ordering/impl/on_demand_connection_manager.hpp index a764ecbf928..fba42be65b3 100644 --- a/irohad/ordering/impl/on_demand_connection_manager.hpp +++ b/irohad/ordering/impl/on_demand_connection_manager.hpp @@ -58,11 +58,12 @@ namespace iroha { void onBatches(CollectionType batches) override; void onBatchesToWholeNetwork(CollectionType batches) override; std::chrono::milliseconds getRequestDelay() const override; + void onRequestProposal( consensus::Round round, - std::optional< - std::shared_ptr> - ref_proposal) override; + std::optional, + BloomFilter256>> proposal) override; /** * Initialize corresponding peers in connections_ using factory_ diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index 74e7b0847c7..639de7bc03c 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -20,6 +20,7 @@ #include "interfaces/iroha_internal/transaction_batch_impl.hpp" #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp" #include "logger/logger.hpp" +#include "main/subscription.hpp" #include "ordering/impl/on_demand_common.hpp" #include "validators/field_validator.hpp" @@ -41,7 +42,32 @@ OnDemandOrderingGate::OnDemandOrderingGate( tx_cache_(std::move(tx_cache)), syncing_mode_(syncing_mode) {} +void OnDemandOrderingGate::initialize() { + failed_proposal_response_ = + SubscriberCreator::template create< + EventTypes::kOnProposalResponseFailed>( + SubscriptionEngineHandlers::kYac, + [_w_this{weak_from_this()}](auto, auto ev) { + if (auto _this = _w_this.lock()) { + std::shared_lock stop_lock( + _this->stop_mutex_); + if (_this->stop_requested_) { + _this->log_->warn( + "Not doing anything because stop was requested."); + return; + } + + if (!_this->syncing_mode_) { + assert(_this->network_client_); + _this->network_client_->onRequestProposal(ev.round, + std::nullopt); + } + } + }); +} + OnDemandOrderingGate::~OnDemandOrderingGate() { + failed_proposal_response_->unsubscribe(); stop(); } diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index 966f09a25dd..a1a355829f6 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -8,12 +8,14 @@ #include "network/ordering_gate.hpp" +#include #include #include "interfaces/common_objects/types.hpp" #include "interfaces/iroha_internal/proposal.hpp" #include "interfaces/iroha_internal/unsafe_proposal_factory.hpp" #include "logger/logger_fwd.hpp" +#include "main/subscription.hpp" #include "ordering/impl/on_demand_common.hpp" #include "ordering/impl/round_switch.hpp" #include "ordering/on_demand_ordering_service.hpp" @@ -30,7 +32,9 @@ namespace iroha { * Ordering gate which requests proposals from the ordering service * votes for proposals, and passes committed proposals to the pipeline */ - class OnDemandOrderingGate : public network::OrderingGate { + class OnDemandOrderingGate + : public network::OrderingGate, + public std::enable_shared_from_this { public: OnDemandOrderingGate( std::shared_ptr ordering_service, @@ -44,6 +48,8 @@ namespace iroha { ~OnDemandOrderingGate() override; + void initialize(); + void propagateBatch( std::shared_ptr batch) override; @@ -96,6 +102,8 @@ namespace iroha { std::shared_ptr tx_cache_; consensus::Round current_round_; std::shared_ptr current_ledger_state_; + std::shared_ptr> + failed_proposal_response_; std::shared_timed_mutex stop_mutex_; bool stop_requested_{false}; diff --git a/irohad/ordering/impl/on_demand_ordering_service_impl.cpp b/irohad/ordering/impl/on_demand_ordering_service_impl.cpp index 5edfe34eacd..50dc45c2b56 100644 --- a/irohad/ordering/impl/on_demand_ordering_service_impl.cpp +++ b/irohad/ordering/impl/on_demand_ordering_service_impl.cpp @@ -5,6 +5,7 @@ #include "ordering/impl/on_demand_ordering_service_impl.hpp" +#include #include #include @@ -15,13 +16,53 @@ #include "datetime/time.hpp" #include "interfaces/iroha_internal/proposal.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" +#include "interfaces/iroha_internal/transaction_batch_impl.hpp" +#include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp" #include "interfaces/transaction.hpp" #include "logger/logger.hpp" #include "main/subscription.hpp" +#include "ordering/ordering_types.hpp" #include "subscription/scheduler_impl.hpp" using iroha::ordering::OnDemandOrderingServiceImpl; +namespace { + auto parseProposal( + shared_model::interface::types::TransactionsCollectionType const &txs) { + shared_model::interface::types::SharedTxsCollectionType transactions; + for (auto const &transaction : txs) + transactions.push_back(clone(transaction)); + + return shared_model::interface::TransactionBatchParserImpl().parseBatches( + transactions); + } + + void uploadBatches( + iroha::ordering::BatchesCache::BatchesSetType &batches, + shared_model::interface::types::TransactionsCollectionType const &txs) { + auto batch_txs = parseProposal(txs); + for (auto &txs : batch_txs) { + batches.insert( + std::make_shared( + std::move(txs))); + } + } + + void uploadBatchesWithFilter( + iroha::ordering::BloomFilter256 const &bf, + iroha::ordering::BatchesCache::BatchesSetType &batches, + shared_model::interface::types::TransactionsCollectionType const &txs) { + auto batch_txs = parseProposal(txs); + for (auto &txs : batch_txs) { + auto batch = + std::make_shared( + std::move(txs)); + if (bf.test(batch->reducedHash())) + batches.insert(batch); + } + } +} // namespace + OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl( size_t transaction_limit, std::shared_ptr @@ -33,7 +74,66 @@ OnDemandOrderingServiceImpl::OnDemandOrderingServiceImpl( number_of_proposals_(number_of_proposals), proposal_factory_(std::move(proposal_factory)), tx_cache_(std::move(tx_cache)), - log_(std::move(log)) {} + log_(std::move(log)) { + remote_proposal_observer_ = + SubscriberCreator::template create< + iroha::EventTypes::kRemoteProposalDiff>( + iroha::SubscriptionEngineHandlers::kProposalProcessing, + [this]( + auto, + auto ev) { /// TODO(iceseer): remove `this` from lambda context + BatchesCache::BatchesSetType batches; + uploadBatches(batches, ev.remote->transactions()); + + if (ev.bloom_filter.size() == BloomFilter256::kBytesCount) { + BloomFilter256 bf; + bf.store(ev.bloom_filter); + uploadBatchesWithFilter(bf, batches, ev.local->transactions()); + } + + std::vector> + collection; + for (auto const &batch : batches) { + collection.insert(std::end(collection), + std::begin(batch->transactions()), + std::end(batch->transactions())); + } + if (auto result = + tryCreateProposal(ev.round, collection, ev.created_time); + result + && result.value()->hash() + == shared_model::crypto::Hash(ev.remote_proposal_hash)) { + log_->debug("Local correct proposal: {}, while remote {}", + result.value()->hash(), + shared_model::crypto::Hash(ev.remote_proposal_hash)); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::move(result).value(), ev.round}); + } else { + if (result) + log_->debug( + "Local incorrect proposal: {}\nwhile remote {}\nremote " + "proposal: {}\nlocal proposal: {}", + result.value()->hash(), + shared_model::crypto::Hash(ev.remote_proposal_hash), + *ev.remote, + **result); + else + log_->debug( + "Local proposal was not created while remote hash " + "{}\nremote proposal: {}", + shared_model::crypto::Hash(ev.remote_proposal_hash), + *ev.remote); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponseFailed, + ProposalEvent{std::nullopt, ev.round}); + } + }); +} + +OnDemandOrderingServiceImpl::~OnDemandOrderingServiceImpl() { + remote_proposal_observer_->unsubscribe(); +} // -------------------------| OnDemandOrderingService |------------------------- @@ -85,7 +185,7 @@ void OnDemandOrderingServiceImpl::forCachedBatches( batches_cache_.forCachedBatches(f); } -std::optional> +OnDemandOrderingServiceImpl::PackedProposalData OnDemandOrderingServiceImpl::waitForLocalProposal( consensus::Round const &round, std::chrono::milliseconds const &delay) { if (!hasProposal(round) && !hasEnoughBatchesInCache()) { @@ -124,12 +224,10 @@ OnDemandOrderingServiceImpl::waitForLocalProposal( return onRequestProposal(round); } -std::optional> +OnDemandOrderingServiceImpl::PackedProposalData OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) { log_->debug("Requesting a proposal for round {}", round); - std::optional< - std::shared_ptr> - result; + OnDemandOrderingServiceImpl::PackedProposalData result; do { std::lock_guard lock(proposals_mutex_); auto it = proposal_map_.find(round); @@ -146,10 +244,12 @@ OnDemandOrderingServiceImpl::onRequestProposal(consensus::Round round) { if (is_current_round_or_next2) { result = packNextProposals(round); + proposal_map_.emplace(round, result); getSubscription()->notify(EventTypes::kOnPackProposal, round); } } while (false); - log_->debug("uploadProposal, {}, {}returning a proposal.", + + log_->debug("uploadProposal, {}, {} returning a proposal.", round, result ? "" : "NOT "); return result; @@ -173,25 +273,27 @@ OnDemandOrderingServiceImpl::tryCreateProposal( proposal = std::nullopt; log_->debug("No transactions to create a proposal for {}", round); } - - assert(proposal_map_.find(round) == proposal_map_.end()); - proposal_map_.emplace(round, proposal); return proposal; } -std::optional> +OnDemandOrderingServiceImpl::PackedProposalData OnDemandOrderingServiceImpl::packNextProposals(const consensus::Round &round) { auto now = iroha::time::now(); std::vector> txs; + BloomFilter256 bf; + if (!isEmptyBatchesCache()) batches_cache_.getTransactions( - transaction_limit_, txs, [&](auto const &batch) { + transaction_limit_, txs, bf, [&](auto const &batch) { assert(batch); return batchAlreadyProcessed(*batch); }); log_->debug("Packed proposal contains: {} transactions.", txs.size()); - return tryCreateProposal(round, txs, now); + if (auto result = tryCreateProposal(round, txs, now)) + return std::make_pair(std::move(result).value(), bf); + + return std::nullopt; } void OnDemandOrderingServiceImpl::tryErase( diff --git a/irohad/ordering/impl/on_demand_ordering_service_impl.hpp b/irohad/ordering/impl/on_demand_ordering_service_impl.hpp index 59738d3bd91..46bd7aa6f83 100644 --- a/irohad/ordering/impl/on_demand_ordering_service_impl.hpp +++ b/irohad/ordering/impl/on_demand_ordering_service_impl.hpp @@ -16,6 +16,7 @@ #include "logger/logger_fwd.hpp" #include "ordering/impl/batches_cache.hpp" // TODO 2019-03-15 andrei: IR-403 Separate BatchHashEquality and MstState +#include "main/subscription.hpp" #include "ordering/impl/on_demand_common.hpp" namespace iroha { @@ -26,8 +27,7 @@ namespace iroha { namespace detail { using ProposalMapType = std::map>>; + OnDemandOrderingService::PackedProposalData>; } // namespace detail class OnDemandOrderingServiceImpl : public OnDemandOrderingService { @@ -50,12 +50,13 @@ namespace iroha { logger::LoggerPtr log, size_t number_of_proposals = 3); + ~OnDemandOrderingServiceImpl() override; + // --------------------- | OnDemandOrderingService |_--------------------- void onBatches(CollectionType batches) override; - std::optional> onRequestProposal( - consensus::Round round) override; + PackedProposalData onRequestProposal(consensus::Round round) override; void onCollaborationOutcome(consensus::Round round) override; @@ -69,7 +70,7 @@ namespace iroha { void processReceivedProposal(CollectionType batches) override; - std::optional> waitForLocalProposal( + PackedProposalData waitForLocalProposal( consensus::Round const &round, std::chrono::milliseconds const &delay) override; @@ -78,8 +79,7 @@ namespace iroha { * Packs new proposals and creates new rounds * Note: method is not thread-safe */ - std::optional> - packNextProposals(const consensus::Round &round); + PackedProposalData packNextProposals(const consensus::Round &round); using TransactionsCollectionType = std::vector>; @@ -158,6 +158,10 @@ namespace iroha { * Current round */ consensus::Round current_round_; + + std::shared_ptr< + iroha::BaseSubscriber> + remote_proposal_observer_; }; } // namespace ordering } // namespace iroha diff --git a/irohad/ordering/impl/on_demand_os_client_grpc.cpp b/irohad/ordering/impl/on_demand_os_client_grpc.cpp index 749fe7dd994..9e5474ce4a8 100644 --- a/irohad/ordering/impl/on_demand_os_client_grpc.cpp +++ b/irohad/ordering/impl/on_demand_os_client_grpc.cpp @@ -9,6 +9,8 @@ #include "backend/protobuf/transaction.hpp" #include "interfaces/common_objects/peer.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" +#include "interfaces/iroha_internal/transaction_batch_impl.hpp" +#include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp" #include "logger/logger.hpp" #include "main/subscription.hpp" #include "network/impl/client_factory.hpp" @@ -73,7 +75,6 @@ OnDemandOsClientGrpc::OnDemandOsClientGrpc( std::function time_provider, std::chrono::milliseconds proposal_request_timeout, logger::LoggerPtr log, - std::function callback, std::shared_ptr os_execution_keepers, std::string peer_name) : log_(std::move(log)), @@ -81,7 +82,6 @@ OnDemandOsClientGrpc::OnDemandOsClientGrpc( proposal_factory_(std::move(proposal_factory)), time_provider_(std::move(time_provider)), proposal_request_timeout_(proposal_request_timeout), - callback_(std::move(callback)), os_execution_keepers_(std::move(os_execution_keepers)), peer_name_(std::move(peer_name)) { assert(os_execution_keepers_); @@ -156,8 +156,9 @@ std::chrono::milliseconds OnDemandOsClientGrpc::getRequestDelay() const { void OnDemandOsClientGrpc::onRequestProposal( consensus::Round round, - std::optional> - ref_proposal) { + std::optional< + std::pair, + BloomFilter256>> ref_proposal) { // Cancel an unfinished request if (auto maybe_context = context_.lock()) { maybe_context->TryCancel(); @@ -169,9 +170,9 @@ void OnDemandOsClientGrpc::onRequestProposal( request.mutable_round()->set_block_round(round.block_round); request.mutable_round()->set_reject_round(round.reject_round); if (ref_proposal.has_value()) - request.set_ref_proposal_hash( - std::string((char *)ref_proposal.value()->hash().blob().data(), - ref_proposal.value()->hash().blob().size())); + request.set_bloom_filter( + std::string(ref_proposal.value().second.load().data(), + ref_proposal.value().second.load().size())); getSubscription()->dispatcher()->add( getSubscription()->dispatcher()->kExecuteInPool, @@ -183,14 +184,15 @@ void OnDemandOsClientGrpc::onRequestProposal( request(std::move(request)), stub(utils::make_weak(stub_)), log(utils::make_weak(log_)), - proposal_factory(utils::make_weak(proposal_factory_)), - callback(callback_)] { + proposal_factory(utils::make_weak(proposal_factory_))] { auto maybe_stub = stub.lock(); auto maybe_log = log.lock(); auto maybe_proposal_factory = proposal_factory.lock(); if (not(maybe_stub and maybe_log and maybe_proposal_factory)) { return; } + + /// make request context->set_deadline(time_provider() + proposal_request_timeout); proto::ProposalResponse response; maybe_log->info("Requesting proposal"); @@ -199,27 +201,61 @@ void OnDemandOsClientGrpc::onRequestProposal( if (not status.ok()) { maybe_log->warn( "RPC failed: {} {}", context->peer(), status.error_message()); - callback({std::nullopt, round}); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::nullopt, round}); return; } else { maybe_log->info("RPC succeeded(RequestingProposal): {}", context->peer()); } - switch (response.optional_proposal_case()) { - case proto::ProposalResponse::kSameProposalHash: { - callback({std::move(ref_proposal), round}); - } break; - case proto::ProposalResponse::kProposal: { - auto proposal_result = + if (!response.has_proposal_hash()) { + maybe_log->info("Remote node {} has no proposal.", context->peer()); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::nullopt, round}); + return; + } + + /// parse request + std::shared_ptr + remote_proposal; + if (auto proposal_result = maybe_proposal_factory->build(response.proposal()); - if (expected::hasError(proposal_result)) { - maybe_log->info("{}", proposal_result.assumeError().error); - callback({std::nullopt, round}); - } else - callback({std::move(proposal_result).assumeValue(), round}); - } break; - default: { callback({std::nullopt, round}); } break; + expected::hasError(proposal_result)) { + maybe_log->warn("{}", proposal_result.assumeError().error); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::nullopt, round}); + return; + } else + remote_proposal = std::move(proposal_result).assumeValue(); + + /// merge if has local proposal or process directly if not + if (ref_proposal.has_value()) { + std::shared_ptr + local_proposal; + local_proposal = ref_proposal.value().first; + + iroha::getSubscription()->notify( + iroha::EventTypes::kRemoteProposalDiff, + RemoteProposalDownloadedEvent{ + local_proposal, + remote_proposal, + response.bloom_filter(), + response.proposal_hash(), + round, + remote_proposal ? remote_proposal->createdTime() : 0ull}); + } else if (!remote_proposal->transactions().empty()) + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::move(remote_proposal), round}); + else { + maybe_log->info("Transactions sequence in proposal is empty"); + iroha::getSubscription()->notify( + iroha::EventTypes::kOnProposalResponse, + ProposalEvent{std::nullopt, round}); } }); } @@ -230,14 +266,12 @@ OnDemandOsClientGrpcFactory::OnDemandOsClientGrpcFactory( OnDemandOsClientGrpc::TimeoutType proposal_request_timeout, logger::LoggerPtr client_log, std::unique_ptr client_factory, - std::function callback, std::shared_ptr os_execution_keepers) : proposal_factory_(std::move(proposal_factory)), time_provider_(time_provider), proposal_request_timeout_(proposal_request_timeout), client_log_(std::move(client_log)), client_factory_(std::move(client_factory)), - callback_(callback), os_execution_keepers_(std::move(os_execution_keepers)) { assert(os_execution_keepers_); } @@ -253,7 +287,6 @@ OnDemandOsClientGrpcFactory::create(const shared_model::interface::Peer &to) { time_provider_, proposal_request_timeout_, client_log_, - callback_, os_execution_keepers_, to.pubkey()); }; diff --git a/irohad/ordering/impl/on_demand_os_client_grpc.hpp b/irohad/ordering/impl/on_demand_os_client_grpc.hpp index 7ed298d9f7f..bd8bbb8fd19 100644 --- a/irohad/ordering/impl/on_demand_os_client_grpc.hpp +++ b/irohad/ordering/impl/on_demand_os_client_grpc.hpp @@ -49,7 +49,6 @@ namespace iroha { std::function time_provider, std::chrono::milliseconds proposal_request_timeout, logger::LoggerPtr log, - std::function callback, std::shared_ptr os_execution_keepers, std::string peer_name); @@ -60,9 +59,9 @@ namespace iroha { void onRequestProposal( consensus::Round round, - std::optional< - std::shared_ptr> - ref_proposal) override; + std::optional, + BloomFilter256>> proposal) override; std::chrono::milliseconds getRequestDelay() const override; @@ -72,7 +71,6 @@ namespace iroha { std::shared_ptr proposal_factory_; std::function time_provider_; std::chrono::milliseconds proposal_request_timeout_; - std::function callback_; std::weak_ptr context_; std::shared_ptr os_execution_keepers_; std::string peer_name_; @@ -90,7 +88,6 @@ namespace iroha { OnDemandOsClientGrpc::TimeoutType proposal_request_timeout, logger::LoggerPtr client_log, std::unique_ptr client_factory, - std::function callback, std::shared_ptr os_execution_keepers); iroha::expected::Result, std::string> @@ -104,7 +101,6 @@ namespace iroha { std::chrono::milliseconds proposal_request_timeout_; logger::LoggerPtr client_log_; std::unique_ptr client_factory_; - std::function callback_; std::shared_ptr os_execution_keepers_; }; diff --git a/irohad/ordering/impl/on_demand_os_server_grpc.cpp b/irohad/ordering/impl/on_demand_os_server_grpc.cpp index 93632c5d8c6..5fdb23c059a 100644 --- a/irohad/ordering/impl/on_demand_os_server_grpc.cpp +++ b/irohad/ordering/impl/on_demand_os_server_grpc.cpp @@ -7,6 +7,7 @@ #include "backend/protobuf/deserialize_repeated_transactions.hpp" #include "backend/protobuf/proposal.hpp" +#include "backend/protobuf/transaction.hpp" #include "interfaces/iroha_internal/parse_and_create_batches.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" #include "logger/logger.hpp" @@ -70,28 +71,43 @@ grpc::Status OnDemandOsServerGrpc::RequestProposal( log_->info("Received RequestProposal for {} from {}", round, context->peer()); auto maybe_proposal = ordering_service_->waitForLocalProposal(round, delay_); if (maybe_proposal.has_value()) { - if (request->has_ref_proposal_hash() - && maybe_proposal.value()->hash() - == shared_model::crypto::Hash(request->ref_proposal_hash())) - response->set_same_proposal_hash(request->ref_proposal_hash()); - else - *response->mutable_proposal() = - static_cast( - maybe_proposal->get()) - ->getTransport(); - } + auto const &[sptr_proposal, bf_local] = maybe_proposal.value(); + response->set_bloom_filter(bf_local.load().data(), bf_local.load().size()); + response->set_proposal_hash(sptr_proposal->hash().blob().data(), + sptr_proposal->hash().blob().size()); + + log_->debug( + "OS proposal: {}\nproposal: {}", sptr_proposal->hash(), *sptr_proposal); - log_->debug( - "Responding for {} with {}: our proposal {}", - round, - request->has_ref_proposal_hash() ? request->ref_proposal_hash() - : "NO REFERENCE PROPOSAL HASH", - response->optional_proposal_case() == response->kProposal - ? fmt::format("has DIFFERENT hash {}, sending full proposal", - maybe_proposal.value()->hash().hex()) - : response->optional_proposal_case() == response->kSameProposalHash - ? "has SAME hash, sending only hash" - : "is EMPTY"); + auto const &proto_proposal = + static_cast(sptr_proposal.get()) + ->getTransport(); + if (!request->has_bloom_filter() + || request->bloom_filter().size() != BloomFilter256::kBytesCount) { + log_->info("Response with full {} txs proposal.", + sptr_proposal->transactions().size()); + *response->mutable_proposal() = proto_proposal; + } else { + response->mutable_proposal()->set_created_time( + proto_proposal.created_time()); + response->mutable_proposal()->set_height(proto_proposal.height()); + BloomFilter256 bf_remote; + bf_remote.store(std::string_view(request->bloom_filter())); + + assert((size_t)proto_proposal.transactions().size() + == sptr_proposal->transactions().size()); + for (size_t ix = 0; ix < sptr_proposal->transactions().size(); ++ix) { + assert(sptr_proposal->transactions()[ix].getBatchHash()); + if (!bf_remote.test(sptr_proposal->transactions()[(int)ix] + .getBatchHash() + .value())) { + auto *tx_dst = + response->mutable_proposal()->mutable_transactions()->Add(); + *tx_dst = proto_proposal.transactions()[(int)ix]; + } + } + } + } return ::grpc::Status::OK; } diff --git a/irohad/ordering/on_demand_ordering_service.hpp b/irohad/ordering/on_demand_ordering_service.hpp index a6509a1d9fc..b7923ba94f4 100644 --- a/irohad/ordering/on_demand_ordering_service.hpp +++ b/irohad/ordering/on_demand_ordering_service.hpp @@ -12,6 +12,7 @@ #include "consensus/round.hpp" #include "cryptography/hash.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" +#include "ordering/ordering_types.hpp" namespace shared_model { namespace interface { @@ -48,6 +49,9 @@ namespace iroha { std::set, shared_model::interface::BatchHashLess>; + using PackedProposalData = std::optional< + std::pair, BloomFilter256>>; + /** * Type of stored transaction batches */ @@ -65,8 +69,7 @@ namespace iroha { */ virtual void onBatches(CollectionType batches) = 0; - virtual std::optional> - onRequestProposal(consensus::Round round) = 0; + virtual PackedProposalData onRequestProposal(consensus::Round round) = 0; using HashesSetType = std::unordered_set> - waitForLocalProposal(consensus::Round const &round, - std::chrono::milliseconds const &delay) = 0; + virtual PackedProposalData waitForLocalProposal( + consensus::Round const &round, + std::chrono::milliseconds const &delay) = 0; /** * Method to get betches under lock diff --git a/irohad/ordering/on_demand_os_transport.hpp b/irohad/ordering/on_demand_os_transport.hpp index a331807594b..35a8a272e96 100644 --- a/irohad/ordering/on_demand_os_transport.hpp +++ b/irohad/ordering/on_demand_os_transport.hpp @@ -15,6 +15,7 @@ #include "consensus/round.hpp" #include "interfaces/iroha_internal/proposal.hpp" #include "interfaces/iroha_internal/transaction_batch.hpp" +#include "ordering/ordering_types.hpp" namespace shared_model { namespace interface { @@ -60,12 +61,13 @@ namespace iroha { * Callback on request about proposal * @param round - number of collaboration round. * Calculated as block_height + 1 + * @param proposal data with Bloom filter */ virtual void onRequestProposal( consensus::Round round, - std::optional< - std::shared_ptr> - ref_proposal) = 0; + std::optional, + BloomFilter256>> proposal) = 0; /** * @return delay proposal to wait for. diff --git a/irohad/ordering/ordering_types.hpp b/irohad/ordering/ordering_types.hpp new file mode 100644 index 00000000000..7ef03700f9e --- /dev/null +++ b/irohad/ordering/ordering_types.hpp @@ -0,0 +1,34 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_ORDERING_TYPES_HPP +#define IROHA_ORDERING_TYPES_HPP + +#include "consensus/round.hpp" +#include "crypto/bloom.hpp" +#include "interfaces/iroha_internal/proposal.hpp" + +namespace iroha::ordering { + + using BloomFilter256 = shared_model::crypto::BloomFilter< + shared_model::crypto::Hash, + 256, + shared_model::crypto::Iroha2BloomHasher64<0, 32>, + shared_model::crypto::Iroha2BloomHasher64<1, 32>, + shared_model::crypto::Iroha2BloomHasher64<2, 32>, + shared_model::crypto::Iroha2BloomHasher64<3, 32>>; + + struct RemoteProposalDownloadedEvent { + std::shared_ptr local; + std::shared_ptr remote; + std::string bloom_filter; + std::string remote_proposal_hash; + consensus::Round round; + shared_model::interface::types::TimestampType created_time; + }; + +} // namespace iroha::ordering + +#endif // IROHA_ORDERING_TYPES_HPP diff --git a/libs/crypto/bloom.hpp b/libs/crypto/bloom.hpp new file mode 100644 index 00000000000..a8427693ec2 --- /dev/null +++ b/libs/crypto/bloom.hpp @@ -0,0 +1,112 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_CRYPTO_BLOOM_HPP +#define IROHA_CRYPTO_BLOOM_HPP + +#include +#include +#include + +#include "common/mem_operations.hpp" +#include "cryptography/hash.hpp" + +namespace shared_model::crypto { + + template + class Iroha2BloomHasher64 { + static_assert(kIndex * sizeof(uint64_t) < kSize, "Unexpected size."); + static_assert(kSize % sizeof(uint64_t) == 0, "Inconsistent size."); + + public: + static auto pack8(shared_model::crypto::Hash const &hash) { + auto const input = *(((uint64_t *)&hash.blob()[0]) + kIndex); + auto const pack1 = (input >> 32) ^ input; + auto const pack2 = (pack1 >> 16) ^ pack1; + auto const pack3 = (((pack2 >> 8) ^ pack2) & 0xff); + + assert((pack3 >> 3) < kSize); + return std::make_pair(pack3 >> 3, pack3 & 0x7); + } + static void set(shared_model::crypto::Hash const &hash, + uint8_t (&bloom)[kSize]) { + auto const &[byte_position, bit_position] = pack8(hash); + auto &target = *(bloom + byte_position); + target |= (1 << bit_position); + } + static bool isSet(shared_model::crypto::Hash const &hash, + uint8_t const (&bloom)[kSize]) { + auto const &[byte_position, bit_position] = pack8(hash); + auto const &target = *(bloom + byte_position); + return ((target & (1 << bit_position)) != 0); + } + }; + + template + class BloomFilter final { + public: + static_assert((kBitsCount & 0x7) == 0, "BitsCount must be multiple of 8"); + static_assert(kBitsCount != 0, "BitsCount can not be 0"); + static constexpr size_t kBytesCount = (kBitsCount >> 3); + + private: + template + struct ArgsListNE { + static constexpr auto value = sizeof...(T) > 0; + }; + + uint8_t filter_[kBytesCount] __attribute__((aligned(16))); + + template + auto checkHash(DataType const &data) const { + return Hasher::isSet(data, filter_); + }; + + template ::value, + bool>::type = true> + auto runHashers(DataType const &data) const { + return checkHash(data) && runHashers(data); + } + template + auto runHashers(DataType const &data) const { + return checkHash(data); + } + + public: + BloomFilter() { + clear(); + } + + bool operator==(BloomFilter const &other) const { + return (0 == std::memcmp(filter_, other.filter_, sizeof(filter_))); + } + + void set(DataType const &data) { + ((void)HashFunctions::set(data, filter_), ...); + } + + bool test(DataType const &data) const { + return runHashers(data); + } + + void clear() { + iroha::memzero(filter_); + } + + void store(std::string_view const &data) { + if (data.size() == kBytesCount) + memcpy(filter_, data.data(), kBytesCount); + } + + std::string_view load() const { + return std::string_view((char *)filter_, kBytesCount); + } + }; + +} // namespace shared_model::crypto + +#endif // IROHA_CRYPTO_BLOOM_HPP diff --git a/schema/ordering.proto b/schema/ordering.proto index eabf483f7f4..3d73a0b4513 100644 --- a/schema/ordering.proto +++ b/schema/ordering.proto @@ -25,16 +25,17 @@ message BatchesRequest { message ProposalRequest { ProposalRound round = 1; - oneof optional_ref_proposal { - bytes ref_proposal_hash = 2; + oneof optional_bloom_filter { + bytes bloom_filter = 2; } } message ProposalResponse { - oneof optional_proposal { - protocol.Proposal proposal = 1; - bytes same_proposal_hash = 2; - } + bytes bloom_filter = 1; + protocol.Proposal proposal = 2; + oneof optional_proposal_hash { + bytes proposal_hash = 3; + } } service OnDemandOrdering { diff --git a/shared_model/backend/protobuf/impl/proposal.cpp b/shared_model/backend/protobuf/impl/proposal.cpp index dd7a1abc74c..723dcadc93a 100644 --- a/shared_model/backend/protobuf/impl/proposal.cpp +++ b/shared_model/backend/protobuf/impl/proposal.cpp @@ -41,6 +41,10 @@ namespace shared_model { impl_ = std::make_unique(std::move(ref)); } + interface::types::TransactionsCollectionType Proposal::mut_transactions() { + return impl_->transactions_; + } + TransactionsCollectionType Proposal::transactions() const { return impl_->transactions_; } diff --git a/shared_model/backend/protobuf/impl/transaction.cpp b/shared_model/backend/protobuf/impl/transaction.cpp index f43310df2d3..8d9438d27fb 100644 --- a/shared_model/backend/protobuf/impl/transaction.cpp +++ b/shared_model/backend/protobuf/impl/transaction.cpp @@ -176,5 +176,15 @@ namespace shared_model { return new Transaction(TransportType(*impl_->proto_)); } + void Transaction::storeBatchHash( + shared_model::interface::types::HashType const &hash) { + batch_hash_ = hash; + } + + std::optional const & + Transaction::getBatchHash() const { + return batch_hash_; + } + } // namespace proto } // namespace shared_model diff --git a/shared_model/backend/protobuf/proposal.hpp b/shared_model/backend/protobuf/proposal.hpp index e825982abac..00990a4d018 100644 --- a/shared_model/backend/protobuf/proposal.hpp +++ b/shared_model/backend/protobuf/proposal.hpp @@ -35,6 +35,8 @@ namespace shared_model { const interface::types::HashType &hash() const override; + interface::types::TransactionsCollectionType mut_transactions() override; + ~Proposal() override; private: diff --git a/shared_model/backend/protobuf/proto_proposal_factory.hpp b/shared_model/backend/protobuf/proto_proposal_factory.hpp index 947dd7cbce0..1e92a2b9aac 100644 --- a/shared_model/backend/protobuf/proto_proposal_factory.hpp +++ b/shared_model/backend/protobuf/proto_proposal_factory.hpp @@ -42,8 +42,16 @@ namespace shared_model { interface::types::HeightType height, interface::types::TimestampType created_time, UnsafeTransactionsCollectionType transactions) override { - return std::make_unique( + auto proposal = std::make_unique( createProtoProposal(height, created_time, transactions)); + + size_t ix = 0; + for (auto &tx : transactions) + if (tx.getBatchHash()) + proposal->transactions()[ix++].storeBatchHash( + tx.getBatchHash().value()); + + return proposal; } /** diff --git a/shared_model/backend/protobuf/transaction.hpp b/shared_model/backend/protobuf/transaction.hpp index 34d9e49826c..13ad8b72b15 100644 --- a/shared_model/backend/protobuf/transaction.hpp +++ b/shared_model/backend/protobuf/transaction.hpp @@ -58,12 +58,18 @@ namespace shared_model { std::optional> batchMeta() const override; + void storeBatchHash( + shared_model::interface::types::HashType const &hash) override; + std::optional const & + getBatchHash() const override; + protected: Transaction::ModelType *clone() const override; private: struct Impl; std::unique_ptr impl_; + std::optional batch_hash_; }; } // namespace proto } // namespace shared_model diff --git a/shared_model/interfaces/iroha_internal/proposal.hpp b/shared_model/interfaces/iroha_internal/proposal.hpp index 9e9b85dd2bc..89fa5f14c91 100644 --- a/shared_model/interfaces/iroha_internal/proposal.hpp +++ b/shared_model/interfaces/iroha_internal/proposal.hpp @@ -20,6 +20,7 @@ namespace shared_model { * @return transactions */ virtual types::TransactionsCollectionType transactions() const = 0; + virtual types::TransactionsCollectionType mut_transactions() = 0; /** * @return the height diff --git a/shared_model/interfaces/iroha_internal/transaction_batch_impl.cpp b/shared_model/interfaces/iroha_internal/transaction_batch_impl.cpp index ae100e257e0..549dd18ea22 100644 --- a/shared_model/interfaces/iroha_internal/transaction_batch_impl.cpp +++ b/shared_model/interfaces/iroha_internal/transaction_batch_impl.cpp @@ -24,6 +24,8 @@ namespace shared_model { transactions_ | boost::adaptors::transformed([](const auto &tx) { return tx->reducedHash(); })); + + for (auto &tx : transactions_) tx->storeBatchHash(reduced_hash_); } const types::SharedTxsCollectionType &TransactionBatchImpl::transactions() diff --git a/shared_model/interfaces/transaction.hpp b/shared_model/interfaces/transaction.hpp index 44ef9c7fa3d..2046fb9138d 100644 --- a/shared_model/interfaces/transaction.hpp +++ b/shared_model/interfaces/transaction.hpp @@ -54,6 +54,9 @@ namespace shared_model { */ virtual const types::HashType &reducedHash() const = 0; + virtual void storeBatchHash(types::HashType const &hash) = 0; + virtual std::optional const &getBatchHash() const = 0; + /** * @return new Transaction obeject with moved data */ diff --git a/shared_model/validators/default_validator.hpp b/shared_model/validators/default_validator.hpp index 70163967994..dc5ab1c462d 100644 --- a/shared_model/validators/default_validator.hpp +++ b/shared_model/validators/default_validator.hpp @@ -109,11 +109,17 @@ namespace shared_model { BatchOrderValidator, false>; + using DefaultSignedTransactionsValidatorWithAllowedEmptyList = + TransactionsCollectionValidator; + /** * Proposal validator which checks stateless validation of proposal */ - using DefaultProposalValidator = - ProposalValidator; + using DefaultProposalValidator = ProposalValidator< + FieldValidator, + DefaultSignedTransactionsValidatorWithAllowedEmptyList>; /** * Block validator which checks blocks WITHOUT signatures. Note that it does diff --git a/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.cpp b/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.cpp index 138ad72303d..8927f0623da 100644 --- a/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.cpp +++ b/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.cpp @@ -11,6 +11,7 @@ #include "framework/integration_framework/fake_peer/behaviour/behaviour.hpp" #include "framework/integration_framework/fake_peer/fake_peer.hpp" #include "framework/integration_framework/fake_peer/proposal_storage.hpp" +#include "ordering/ordering_types.hpp" namespace integration_framework::fake_peer { @@ -24,14 +25,14 @@ namespace integration_framework::fake_peer { std::make_shared(std::move(batches))); } - std::optional> + OnDemandOsNetworkNotifier::PackedProposalData OnDemandOsNetworkNotifier::waitForLocalProposal( iroha::consensus::Round const &round, std::chrono::milliseconds const & /*delay*/) { return onRequestProposal(round); } - std::optional> + OnDemandOsNetworkNotifier::PackedProposalData OnDemandOsNetworkNotifier::onRequestProposal(iroha::consensus::Round round) { { std::lock_guard guard(rounds_subject_mutex_); @@ -43,9 +44,11 @@ namespace integration_framework::fake_peer { if (behaviour) { auto opt_proposal = behaviour->processOrderingProposalRequest(round); if (opt_proposal) { - return std::shared_ptr( - std::static_pointer_cast( - *opt_proposal)); + return std::make_pair( + std::shared_ptr( + std::static_pointer_cast( + *opt_proposal)), + iroha::ordering::BloomFilter256{}); } } return {}; diff --git a/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.hpp b/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.hpp index 331dc954ef3..e42dc0b4559 100644 --- a/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.hpp +++ b/test/framework/integration_framework/fake_peer/network/on_demand_os_network_notifier.hpp @@ -23,7 +23,7 @@ namespace integration_framework::fake_peer { void onBatches(CollectionType batches) override; - std::optional> onRequestProposal( + PackedProposalData onRequestProposal( iroha::consensus::Round round) override; void onCollaborationOutcome(iroha::consensus::Round round) override; @@ -37,7 +37,7 @@ namespace integration_framework::fake_peer { iroha::ordering::OnDemandOrderingService::BatchesSetType &)> const &f) override; - std::optional> waitForLocalProposal( + PackedProposalData waitForLocalProposal( iroha::consensus::Round const &round, std::chrono::milliseconds const &delay) override; diff --git a/test/framework/integration_framework/integration_test_framework.cpp b/test/framework/integration_framework/integration_test_framework.cpp index 104e2a0dcd7..6dd1188a88e 100644 --- a/test/framework/integration_framework/integration_test_framework.cpp +++ b/test/framework/integration_framework/integration_test_framework.cpp @@ -231,7 +231,8 @@ IntegrationTestFramework::IntegrationTestFramework( logger::LoggerManagerTreePtr log_manager, std::string db_wsv_path, std::string db_store_path) - : log_(log_manager->getLogger()), + : subscription{iroha::getSubscription()}, + log_(log_manager->getLogger()), log_manager_(std::move(log_manager)), proposal_queue_( std::make_unique subscription; + logger::LoggerPtr log_; logger::LoggerManagerTreePtr log_manager_; diff --git a/test/integration/acceptance/fake_peer_example_test.cpp b/test/integration/acceptance/fake_peer_example_test.cpp index 0e06dedd6c6..70f77496551 100644 --- a/test/integration/acceptance/fake_peer_example_test.cpp +++ b/test/integration/acceptance/fake_peer_example_test.cpp @@ -32,25 +32,6 @@ static constexpr std::chrono::seconds kSynchronizerWaitingTime(20); struct FakePeerExampleTest : FakePeerFixture {}; INSTANTIATE_TEST_SUITE_P_DifferentStorageTypes(FakePeerExampleTest); -/** - * Check that after sending a not fully signed transaction, an MST state - * propagates to another peer - * @given a not fully signed transaction - * @when such transaction is sent to one of two iroha peers in the network - * @then that peer propagates MST state to another peer - */ -TEST_P(FakePeerExampleTest, - MstStateOfTransactionWithoutAllSignaturesPropagtesToOtherPeer) { - createFakePeers(1); - auto &itf = prepareState(); - - itf.sendTxWithoutValidation(complete( - baseTx(kAdminId) - .transferAsset(kAdminId, kUserId, kAssetId, "income", "500.0") - .quorum(2), - kAdminKeypair)); -} - /** * Check that Irohad loads correct block version when having a malicious fork on * the network. @@ -234,46 +215,3 @@ TEST_P(FakePeerExampleTest, SynchronizeTheRightVersionOfForkedLedger) { ASSERT_TRUE(completed.wait(kSynchronizerWaitingTime)) << "Error waiting for synchronization"; } - -/** - * Check that after receiving a valid command the ITF peer provides a proposal - * containing it. - * - * \attention this code is nothing more but an example of Fake Peer usage - * - * @given a network of one real and one fake peers - * @when fake peer provides a proposal with valid tx - * @then the real peer must commit the transaction from that proposal - */ -TEST_P(FakePeerExampleTest, OnDemandOrderingProposalAfterValidCommandReceived) { - // Create the tx: - const auto tx = complete( - baseTx(kAdminId).transferAsset(kAdminId, kUserId, kAssetId, "tx1", "1.0"), - kAdminKeypair); - - createFakePeers(1); - - prepareState(); - - // provide the proposal - fake_peers_.front()->getProposalStorage().addTransactions({clone(tx)}); - - // watch the proposal requests to fake peer - constexpr std::chrono::seconds kCommitWaitingTime(20); - iroha::utils::WaitForSingleObject completed; - auto subscriber = iroha::SubscriberCreator< - bool, - std::shared_ptr>:: - template create( - static_cast( - iroha::getSubscription()->dispatcher()->kExecuteInPool), - [&completed, my_hash = tx.reducedHash()](auto, auto block) { - for (const auto &tx : block->transactions()) { - if (my_hash == tx.reducedHash()) { - completed.set(); - } - } - }); - ASSERT_TRUE(completed.wait(kCommitWaitingTime)) - << "Error waiting for the commit"; -} diff --git a/test/integration/acceptance/fake_peer_fixture.hpp b/test/integration/acceptance/fake_peer_fixture.hpp index 7d2d85ebf97..c8b3cbd1e90 100644 --- a/test/integration/acceptance/fake_peer_fixture.hpp +++ b/test/integration/acceptance/fake_peer_fixture.hpp @@ -62,6 +62,7 @@ struct FakePeerFixture : AcceptanceFixture, protected: void SetUp() override { + subscription = iroha::getSubscription(); itf_ = std::make_unique( 1, GetParam(), @@ -74,9 +75,11 @@ struct FakePeerFixture : AcceptanceFixture, void TearDown() override { itf_.reset(); + subscription->dispose(); } std::vector> fake_peers_; + std::shared_ptr subscription; }; #endif // IROHA_FAKE_PEER_FIXTURE_HPP diff --git a/test/module/irohad/ordering/CMakeLists.txt b/test/module/irohad/ordering/CMakeLists.txt index 7926eecb229..26142ea86b6 100644 --- a/test/module/irohad/ordering/CMakeLists.txt +++ b/test/module/irohad/ordering/CMakeLists.txt @@ -37,6 +37,7 @@ target_link_libraries(on_demand_ordering_gate_test shared_model_interfaces_factories test_logger shared_model_proto_backend + sync_subscription ) addtest(mst_processing_test mst_processing_test.cpp) diff --git a/test/module/irohad/ordering/mock_on_demand_os_notification.hpp b/test/module/irohad/ordering/mock_on_demand_os_notification.hpp index 7886990437b..dba669a9e8d 100644 --- a/test/module/irohad/ordering/mock_on_demand_os_notification.hpp +++ b/test/module/irohad/ordering/mock_on_demand_os_notification.hpp @@ -7,6 +7,7 @@ #define IROHA_MOCK_ON_DEMAND_OS_NOTIFICATION_HPP #include "ordering/on_demand_os_transport.hpp" +#include "ordering/ordering_types.hpp" #include @@ -17,10 +18,12 @@ namespace iroha { struct MockOdOsNotification : public OdOsNotification { MOCK_METHOD1(onBatches, void(CollectionType)); MOCK_METHOD1(onBatchesToWholeNetwork, void(CollectionType)); - MOCK_METHOD2(onRequestProposal, - void(consensus::Round, - std::optional>)); + MOCK_METHOD2( + onRequestProposal, + void(consensus::Round, + std::optional, + BloomFilter256>>)); MOCK_CONST_METHOD0(getRequestDelay, std::chrono::milliseconds()); }; diff --git a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp index f2dc8441600..1f583dd5f6f 100644 --- a/test/module/irohad/ordering/on_demand_connection_manager_test.cpp +++ b/test/module/irohad/ordering/on_demand_connection_manager_test.cpp @@ -14,6 +14,7 @@ #include "module/irohad/ordering/ordering_mocks.hpp" #include "module/shared_model/interface_mocks.hpp" #include "ordering/impl/on_demand_common.hpp" +#include "ordering/ordering_types.hpp" using namespace iroha; using namespace iroha::ordering; @@ -107,11 +108,15 @@ TEST_F(OnDemandConnectionManagerTest, onBatches) { */ TEST_F(OnDemandConnectionManagerTest, onRequestProposal) { consensus::Round round{}; - std::optional> - prop{}; + auto proposal = std::make_shared(); + auto p = std::make_optional(std::make_pair( + std::shared_ptr{proposal}, + ordering::BloomFilter256{})); + + EXPECT_CALL(*proposal, toString()).Times(1); EXPECT_CALL(*connections[OnDemandConnectionManager::kIssuer], - onRequestProposal(round, prop)) + onRequestProposal(round, p)) .Times(1); - manager->onRequestProposal(round, prop); + manager->onRequestProposal(round, p); } diff --git a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp index 69e013f6d7f..fe0ac51d21a 100644 --- a/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp +++ b/test/module/irohad/ordering/on_demand_ordering_gate_test.cpp @@ -59,6 +59,7 @@ class OnDemandOrderingGateTest : public ::testing::Test { 1000, getTestLogger("OrderingGate"), false); + ordering_gate->initialize(); auto peer = makePeer("127.0.0.1", "111"_hex_pubkey); ledger_state = std::make_shared( @@ -140,7 +141,10 @@ TEST_F(OnDemandOrderingGateTest, BlockEvent) { waitForLocalProposal(round, std::chrono::milliseconds(1))) .WillOnce(Return(std::nullopt)); - std::optional> p{}; + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); auto event = RoundSwitch(round, ledger_state); @@ -171,7 +175,11 @@ TEST_F(OnDemandOrderingGateTest, EmptyEvent) { EXPECT_CALL(*notification, getRequestDelay()) .WillOnce(Return(std::chrono::milliseconds(1))); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); - std::optional> p{}; + + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); EXPECT_CALL(*ordering_service, @@ -205,7 +213,10 @@ TEST_F(OnDemandOrderingGateTest, BlockEventNoProposal) { .WillOnce(Return(std::nullopt)); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); - std::optional> p{}; + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); ordering_gate->processRoundSwitch(RoundSwitch(round, ledger_state)); @@ -232,7 +243,11 @@ TEST_F(OnDemandOrderingGateTest, EmptyEventNoProposal) { waitForLocalProposal(round, std::chrono::milliseconds(1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); - std::optional> p{}; + + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); ordering_gate->processRoundSwitch(RoundSwitch(round, ledger_state)); @@ -271,7 +286,11 @@ TEST_F(OnDemandOrderingGateTest, ReplayedTransactionInProposal) { waitForLocalProposal(round, std::chrono::milliseconds(1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); - std::optional> p{}; + + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); EXPECT_CALL(*tx_cache, check(testing::Matcher(_))) @@ -331,7 +350,11 @@ TEST_F(OnDemandOrderingGateTest, RepeatedTransactionInProposal) { waitForLocalProposal(round, std::chrono::milliseconds(1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(*ordering_service, onCollaborationOutcome(round)).Times(1); - std::optional> p{}; + + std::optional< + std::pair, + ordering::BloomFilter256>> + p{}; EXPECT_CALL(*notification, onRequestProposal(round, p)).Times(1); EXPECT_CALL(*tx_cache, check(testing::Matcher(_))) diff --git a/test/module/irohad/ordering/on_demand_os_client_grpc_test.cpp b/test/module/irohad/ordering/on_demand_os_client_grpc_test.cpp index eeca6404daa..e9db758c8cb 100644 --- a/test/module/irohad/ordering/on_demand_os_client_grpc_test.cpp +++ b/test/module/irohad/ordering/on_demand_os_client_grpc_test.cpp @@ -42,7 +42,14 @@ class OnDemandOsClientGrpcTest : public ::testing::Test { using MockProtoProposalValidator = shared_model::validation::MockValidator; + void TearDown() override { + proposals_subscription_->unsubscribe(); + proposals_subscription_.reset(); + subscription->dispose(); + } + void SetUp() override { + subscription = iroha::getSubscription(); auto ustub = std::make_unique(); stub = ustub.get(); auto validator = std::make_unique(); @@ -64,15 +71,20 @@ class OnDemandOsClientGrpcTest : public ::testing::Test { std::shared_ptr pk[] = {std::make_shared("123")}; exec_keeper->syncronize(&pk[0], &pk[1]); - client = std::make_shared( - std::move(ustub), - proposal_factory, - [&] { return timepoint; }, - timeout, - getTestLogger("OdOsClientGrpc"), - [this](ProposalEvent event) { received_event = event; }, - exec_keeper, - "123"); + proposals_subscription_ = + SubscriberCreator::template create< + EventTypes::kOnProposalResponse>( + iroha::SubscriptionEngineHandlers::kYac, + [this](auto, auto event) { received_event = event; }); + + client = + std::make_shared(std::move(ustub), + proposal_factory, + [&] { return timepoint; }, + timeout, + getTestLogger("OdOsClientGrpc"), + exec_keeper, + "123"); } proto::MockOnDemandOrderingStub *stub; @@ -81,6 +93,8 @@ class OnDemandOsClientGrpcTest : public ::testing::Test { std::shared_ptr client; consensus::Round round{1, 2}; ProposalEvent received_event; + std::shared_ptr> proposals_subscription_; + std::shared_ptr subscription; MockProposalValidator *proposal_validator; MockProtoProposalValidator *proto_proposal_validator; @@ -165,6 +179,7 @@ TEST_F(OnDemandOsClientGrpcTest, onRequestProposal) { ->mutable_payload() ->mutable_reduced_payload() ->set_creator_account_id(creator); + response.set_proposal_hash("hash_1"); EXPECT_CALL(*stub, RequestProposal(_, _, _)) .WillOnce(DoAll(SaveClientContextDeadline(&deadline), SaveArg<1>(&request), diff --git a/test/module/irohad/ordering/on_demand_os_server_grpc_test.cpp b/test/module/irohad/ordering/on_demand_os_server_grpc_test.cpp index 4f048bf64b0..4b728f85e2e 100644 --- a/test/module/irohad/ordering/on_demand_os_server_grpc_test.cpp +++ b/test/module/irohad/ordering/on_demand_os_server_grpc_test.cpp @@ -7,15 +7,22 @@ #include #include +#include + #include "backend/protobuf/proposal.hpp" +#include "backend/protobuf/proto_proposal_factory.hpp" #include "backend/protobuf/proto_transport_factory.hpp" #include "backend/protobuf/transaction.hpp" #include "framework/test_logger.hpp" #include "interfaces/iroha_internal/transaction_batch_impl.hpp" #include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp" +#include "module/irohad/common/validators_config.hpp" +#include "module/irohad/ordering/mst_test_helpers.hpp" #include "module/irohad/ordering/ordering_mocks.hpp" #include "module/shared_model/interface/mock_transaction_batch_factory.hpp" #include "module/shared_model/validators/validators.hpp" +#include "validators/default_validator.hpp" +#include "validators/field_validator.hpp" using namespace iroha; using namespace iroha::ordering; @@ -112,40 +119,6 @@ TEST_F(OnDemandOsServerGrpcTest, SendBatches) { creator); } -/** - * @given server - * @when proposal is requested with reference hash same as proposal hash - * @then only hash returns - */ -TEST_F(OnDemandOsServerGrpcTest, RequestSameProposal) { - auto creator = "test"; - protocol::Proposal proposal; - proposal.add_transactions() - ->mutable_payload() - ->mutable_reduced_payload() - ->set_creator_account_id(creator); - - std::shared_ptr iproposal( - std::make_shared(proposal)); - - proto::ProposalRequest request; - request.mutable_round()->set_block_round(round.block_round); - request.mutable_round()->set_reject_round(round.reject_round); - request.set_ref_proposal_hash( - std::string((char *)iproposal->hash().blob().data(), - iproposal->hash().blob().size())); - - std::chrono::milliseconds delay(0); - EXPECT_CALL(*notification, waitForLocalProposal(round, delay)) - .WillOnce(Return(ByMove(std::move(iproposal)))); - - grpc::ServerContext context; - proto::ProposalResponse response; - server->RequestProposal(&context, &request, &response); - - ASSERT_TRUE(response.has_same_proposal_hash()); -} - /** * @given server * @when proposal is requested @@ -164,11 +137,13 @@ TEST_F(OnDemandOsServerGrpcTest, RequestProposal) { ->mutable_reduced_payload() ->set_creator_account_id(creator); - std::shared_ptr iproposal( - std::make_shared(proposal)); + auto p = std::make_pair( + std::shared_ptr( + std::make_shared(proposal)), + ordering::BloomFilter256{}); std::chrono::milliseconds delay(0); EXPECT_CALL(*notification, waitForLocalProposal(round, delay)) - .WillOnce(Return(ByMove(std::move(iproposal)))); + .WillOnce(Return(ByMove(std::move(p)))); grpc::ServerContext context; server->RequestProposal(&context, &request, &response); @@ -203,3 +178,158 @@ TEST_F(OnDemandOsServerGrpcTest, RequestProposalNone) { ASSERT_FALSE(response.has_proposal()); } + +void add2Proposal( + iroha::protocol::Proposal &to, + ordering::BloomFilter256 &bf, + std::shared_ptr const &batch) { + bf.set(batch->reducedHash()); + for (auto const &transaction : batch->transactions()) + *to.add_transactions() = + static_cast(transaction.get()) + ->getTransport(); +} + +std::tuple makeProposal( + size_t batchCount, + std::vector &hashes, + shared_model::interface::types::TimestampType ts) { + auto result = + std::make_tuple(iroha::protocol::Proposal{}, ordering::BloomFilter256{}); + for (size_t ix = 1; ix <= batchCount; ++ix) { + auto batch = makeTestBatch(txBuilder(ix, ts + ix, 1)); + hashes.emplace_back(batch->reducedHash()); + add2Proposal(std::get<0>(result), std::get<1>(result), batch); + } + return result; +} + +TEST_F(OnDemandOsServerGrpcTest, DiffCalculation_wholeIntersection) { + shared_model::proto::ProtoProposalFactory< + shared_model::validation::DefaultProposalValidator> + factory(iroha::test::kTestsValidatorsConfig); + std::vector hashes; + auto proposal = makeProposal(2, hashes, 10); + + proto::ProposalRequest request; + request.mutable_round()->set_block_round(round.block_round); + request.mutable_round()->set_reject_round(round.reject_round); + request.set_bloom_filter(std::get<1>(proposal).load().data(), + std::get<1>(proposal).load().size()); + + proto::ProposalResponse response; + std::chrono::milliseconds delay(0); + + auto result = std::make_optional( + std::make_pair(std::shared_ptr( + std::make_shared( + std::move(std::get<0>(proposal)))), + std::get<1>(proposal))); + + result->first->mut_transactions()[0].storeBatchHash(hashes[0]); + result->first->mut_transactions()[1].storeBatchHash(hashes[1]); + + EXPECT_CALL(*notification, waitForLocalProposal(round, delay)) + .WillOnce(Return(ByMove(std::move(result)))); + + grpc::ServerContext context; + server->RequestProposal(&context, &request, &response); + + ASSERT_TRUE(response.has_proposal()); + ASSERT_TRUE(response.proposal().transactions().empty()); +} + +TEST_F(OnDemandOsServerGrpcTest, DiffCalculation_noIntersection) { + shared_model::proto::ProtoProposalFactory< + shared_model::validation::DefaultProposalValidator> + factory(iroha::test::kTestsValidatorsConfig); + std::vector hashes_1; + auto proposal_pack_1 = makeProposal(2, hashes_1, 10); + + std::vector hashes_2; + auto proposal_pack_2 = makeProposal(2, hashes_2, 100); + + proto::ProposalRequest request; + request.mutable_round()->set_block_round(round.block_round); + request.mutable_round()->set_reject_round(round.reject_round); + request.set_bloom_filter(std::get<1>(proposal_pack_1).load().data(), + std::get<1>(proposal_pack_1).load().size()); + + proto::ProposalResponse response; + std::chrono::milliseconds delay(0); + + auto result = std::make_optional( + std::make_pair(std::shared_ptr( + std::make_shared( + std::get<0>(proposal_pack_2))), + std::get<1>(proposal_pack_2))); + + result->first->mut_transactions()[0].storeBatchHash(hashes_2[0]); + result->first->mut_transactions()[1].storeBatchHash(hashes_2[1]); + + EXPECT_CALL(*notification, waitForLocalProposal(round, delay)) + .WillOnce(Return(ByMove(result))); + + grpc::ServerContext context; + server->RequestProposal(&context, &request, &response); + + ASSERT_TRUE(response.has_proposal()); + assert(response.proposal().transactions().size() == 2); + ASSERT_TRUE(response.proposal().transactions().size() == 2); + + ASSERT_TRUE( + shared_model::proto::Transaction(response.proposal().transactions()[0]) + == shared_model::proto::Transaction( + std::get<0>(proposal_pack_2).transactions()[0])); + ASSERT_TRUE( + shared_model::proto::Transaction(response.proposal().transactions()[1]) + == shared_model::proto::Transaction( + std::get<0>(proposal_pack_2).transactions()[1])); +} + +TEST_F(OnDemandOsServerGrpcTest, DiffCalculation_partIntersection) { + shared_model::proto::ProtoProposalFactory< + shared_model::validation::DefaultProposalValidator> + factory(iroha::test::kTestsValidatorsConfig); + std::vector hashes; + auto proposal_pack = makeProposal(2, hashes, 10); + + proto::ProposalRequest request; + request.mutable_round()->set_block_round(round.block_round); + request.mutable_round()->set_reject_round(round.reject_round); + request.set_bloom_filter(std::get<1>(proposal_pack).load().data(), + std::get<1>(proposal_pack).load().size()); + + auto addition_batch = makeTestBatch(txBuilder(3, 100, 1)); + add2Proposal( + std::get<0>(proposal_pack), std::get<1>(proposal_pack), addition_batch); + + proto::ProposalResponse response; + std::chrono::milliseconds delay(0); + + auto result = std::make_optional( + std::make_pair(std::shared_ptr( + std::make_shared( + std::get<0>(proposal_pack))), + std::get<1>(proposal_pack))); + + result->first->mut_transactions()[0].storeBatchHash(hashes[0]); + result->first->mut_transactions()[1].storeBatchHash(hashes[1]); + result->first->mut_transactions()[2].storeBatchHash( + addition_batch->reducedHash()); + + EXPECT_CALL(*notification, waitForLocalProposal(round, delay)) + .WillOnce(Return(ByMove(result))); + + grpc::ServerContext context; + server->RequestProposal(&context, &request, &response); + + ASSERT_TRUE(response.has_proposal()); + assert(response.proposal().transactions().size() == 1); + ASSERT_TRUE(response.proposal().transactions().size() == 1); + + ASSERT_TRUE( + shared_model::proto::Transaction(response.proposal().transactions()[0]) + == shared_model::proto::Transaction( + std::get<0>(proposal_pack).transactions()[2])); +} diff --git a/test/module/irohad/ordering/on_demand_os_test.cpp b/test/module/irohad/ordering/on_demand_os_test.cpp index d65523ac3be..999eb775152 100644 --- a/test/module/irohad/ordering/on_demand_os_test.cpp +++ b/test/module/irohad/ordering/on_demand_os_test.cpp @@ -155,7 +155,7 @@ TEST_F(OnDemandOsTest, OverflowRound) { ASSERT_TRUE(os->onRequestProposal(target_round)); ASSERT_EQ(transaction_limit, - (*os->onRequestProposal(target_round))->transactions().size()); + os->onRequestProposal(target_round)->first->transactions().size()); } /** @@ -302,7 +302,7 @@ TEST_F(OnDemandOsTest, SeveralTransactionsOneCommited) { os->onCollaborationOutcome(commit_round); auto proposal = os->onRequestProposal(target_round); - const auto &txs = proposal->get()->transactions(); + const auto &txs = proposal->first->transactions(); auto &batch2_tx = *batch2.transactions().at(0); EXPECT_TRUE(proposal); @@ -326,7 +326,7 @@ TEST_F(OnDemandOsTest, DuplicateTxTest) { os->onCollaborationOutcome(commit_round); auto proposal = os->onRequestProposal(target_round); - ASSERT_EQ(1, boost::size((*proposal)->transactions())); + ASSERT_EQ(1, boost::size(proposal->first->transactions())); } /** @@ -348,10 +348,10 @@ TEST_F(OnDemandOsTest, RejectCommit) { auto proposal = os->onRequestProposal( {initial_round.block_round, initial_round.reject_round + 3}); - ASSERT_EQ(2, boost::size((*proposal)->transactions())); + ASSERT_EQ(2, boost::size(proposal->first->transactions())); proposal = os->onRequestProposal(commit_round); - ASSERT_EQ(2, boost::size((*proposal)->transactions())); + ASSERT_EQ(2, boost::size(proposal->first->transactions())); } /** diff --git a/test/module/irohad/ordering/ordering_mocks.hpp b/test/module/irohad/ordering/ordering_mocks.hpp index ad16dde2112..cd5fc9b0b64 100644 --- a/test/module/irohad/ordering/ordering_mocks.hpp +++ b/test/module/irohad/ordering/ordering_mocks.hpp @@ -28,7 +28,7 @@ namespace iroha::ordering { struct MockOnDemandOrderingService : public OnDemandOrderingService { MOCK_METHOD(void, onBatches, (CollectionType), (override)); - MOCK_METHOD((std::optional>), + MOCK_METHOD(PackedProposalData, onRequestProposal, (consensus::Round), (override)); @@ -45,9 +45,8 @@ namespace iroha::ordering { MOCK_METHOD(void, processReceivedProposal, (CollectionType), (override)); MOCK_METHOD2(waitForLocalProposal, - std::optional>( - consensus::Round const &, - std::chrono::milliseconds const &)); + PackedProposalData(consensus::Round const &, + std::chrono::milliseconds const &)); }; } // namespace iroha::ordering diff --git a/test/module/libs/crypto/CMakeLists.txt b/test/module/libs/crypto/CMakeLists.txt index 311934f36c6..26645b22959 100644 --- a/test/module/libs/crypto/CMakeLists.txt +++ b/test/module/libs/crypto/CMakeLists.txt @@ -15,3 +15,10 @@ target_link_libraries(keys_manager_test keys_manager test_logger ) + +#Bloom filter Test +AddTest(bloom_filter_test bloom_filter_test.cpp) +target_link_libraries(bloom_filter_test + shared_model_cryptography_model + consensus_round + ) diff --git a/test/module/libs/crypto/bloom_filter_test.cpp b/test/module/libs/crypto/bloom_filter_test.cpp new file mode 100644 index 00000000000..6964789f5ce --- /dev/null +++ b/test/module/libs/crypto/bloom_filter_test.cpp @@ -0,0 +1,201 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include + +#include "ordering/ordering_types.hpp" + +using ::testing::ByMove; +using ::testing::Ref; +using ::testing::Return; +using ::testing::ReturnRefOfCopy; + +struct BloomFilterTest : public ::testing::Test { + void SetUp() override { + filter_ = std::make_shared(); + } + std::shared_ptr filter_; +}; + +/** + * @given Bloom-filter + * @when set Hash there + * @then test of that Hash will return true + */ +TEST_F(BloomFilterTest, SimplePos) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000")); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000"))); +} + +/** + * @given Bloom-filter + * @when set Hash there + * @then test of the other Hash will return false + */ +TEST_F(BloomFilterTest, SimpleNeg) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000")); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "0000000000000002000000000000000000000000000000000000000000000000"))); +} + +/** + * @given Bloom-filter + * @when set multiple Hashes + * @then test of the Hashes which are not present should return false(remember + * false-positive) + */ +TEST_F(BloomFilterTest, RandomNeg) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "1111111111111111111111111111111111111111111111111111111111111111")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "3897425687243695369327492877329067903476059372073409674908137884")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "2934756983467951879084309649306870136709760987508225675248658387")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "0912570146507610507436597430971934675798697834672098347567983268")); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "0000000000000002000000000000000000000000000000000000000000000000"))); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "0111111111111111111111111111111111111111111111111111111111111111"))); +} + +/** + * @given Bloom-filter + * @when set multiple Hashes there + * @then test the ones that are present will always return true + */ +TEST_F(BloomFilterTest, RandomPos) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "1111111111111111111111111111111111111111111111111111111111111111")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "3897425687243695369327492877329067903476059372073409674908137884")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "2934756983467951879084309649306870136709760987508225675248658387")); + filter_->set(shared_model::crypto::Hash::fromHexString( + "0912570146507610507436597430971934675798697834672098347567983268")); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "0000000000000001000000000000000000000000000000000000000000000000"))); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); +} + +/** + * @given Bloom-filter + * @when set Hash there + * @and make clean after that + * @then test of this Hash will return false + */ +TEST_F(BloomFilterTest, ClearTest) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "1111111111111111111111111111111111111111111111111111111111111111")); + filter_->clear(); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1111111111111111111111111111111111111111111111111111111111111111"))); +} + +/** + * @given Bloom-filter + * @when set Hash1 there + * @and make clear after that + * @and add another Hash2 in BF + * @then test of the Hash1 will return false and test Hash 2 will return true + */ +TEST_F(BloomFilterTest, Clear2Test) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597")); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597"))); + filter_->clear(); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597"))); + filter_->set(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598")); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); +} + +/** + * @given Bloom-filter + * @when call load + * @then the result data should be the appropriate size + */ +TEST_F(BloomFilterTest, LoadTest) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597")); + auto value = filter_->load(); + ASSERT_EQ(value.size(), iroha::ordering::BloomFilter256::kBytesCount); +} + +/** + * @given Bloom-filter + * @when set Hash there + * @and after that load data from the filter to string + * @and after thar clear the filter + * @and after that store this data in filter again + * @then test of the Hash should return true + */ +TEST_F(BloomFilterTest, ReloadTest) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598")); + std::string const stored(filter_->load().data(), filter_->load().size()); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); + filter_->clear(); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); + + filter_->store(stored); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); +} + +/** + * @given Bloom-filter + * @when set Hash1 there + * @and after that load data from the filter to string + * @and after thar clear the filter + * @and after that set another Hash2 to the filter + * @and after that store the BF from the string to the BF + * @then BF will be updated: Hash1 test will return true and Hash2 will be + * overriten and return false + */ +TEST_F(BloomFilterTest, ReloadTest2) { + filter_->set(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597")); + std::string const stored(filter_->load().data(), filter_->load().size()); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597"))); + filter_->clear(); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597"))); + + filter_->set(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598")); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); + + filter_->store(stored); + ASSERT_TRUE(filter_->test(shared_model::crypto::Hash::fromHexString( + "9123594865892659791270573928567890379843798672987395677893427597"))); + ASSERT_FALSE(filter_->test(shared_model::crypto::Hash::fromHexString( + "1298367587946526947123063707196892848236917480679537296387464598"))); +} diff --git a/test/module/shared_model/interface_mocks.hpp b/test/module/shared_model/interface_mocks.hpp index a718c9cf0f4..f73a8ee23ff 100644 --- a/test/module/shared_model/interface_mocks.hpp +++ b/test/module/shared_model/interface_mocks.hpp @@ -74,6 +74,12 @@ struct MockTransaction : public shared_model::interface::Transaction { batchMeta, std::optional>()); MOCK_METHOD0(moveTo, std::unique_ptr()); + + MOCK_METHOD1(storeBatchHash, + void(shared_model::interface::types::HashType const &)); + MOCK_CONST_METHOD0( + getBatchHash, + std::optional const &()); }; /** @@ -181,6 +187,9 @@ struct MockProposal : public shared_model::interface::Proposal { MOCK_CONST_METHOD0(blob, const shared_model::interface::types::BlobType &()); MOCK_CONST_METHOD0(hash, const shared_model::interface::types::HashType &()); MOCK_CONST_METHOD0(clone, MockProposal *()); + MOCK_CONST_METHOD0(toString, std::string()); + MOCK_METHOD0(mut_transactions, + shared_model::interface::types::TransactionsCollectionType()); }; struct MockPeer : public shared_model::interface::Peer {