Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/proposal request optimization v2 #1971

Merged
merged 32 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
00ea6e5
bloom filter
iceseer Mar 9, 2022
7cbd230
variant 2
iceseer Mar 9, 2022
3befc7c
variant 3
iceseer Mar 9, 2022
6931382
variant 3
iceseer Mar 9, 2022
8ed64eb
variant 3
iceseer Mar 10, 2022
a7b5117
bloom filter in os
iceseer Mar 13, 2022
5ad33dc
transport with BF support
iceseer Mar 13, 2022
7f1374c
transport with BF support
iceseer Mar 13, 2022
6459679
grpc server txs trouble
iceseer Mar 13, 2022
faf933a
txs diff complete
iceseer Mar 13, 2022
3ebd913
txs diff complete
iceseer Mar 13, 2022
d3fcbbb
tests
iceseer Mar 14, 2022
a952e58
tests
iceseer Mar 14, 2022
eebfef3
tests
iceseer Mar 15, 2022
32eacf6
formatting
iceseer Mar 15, 2022
44accbb
Merge branch 'feature/proposal_request_optimization' into feature/pro…
iceseer Mar 20, 2022
b04489e
Merge remote-tracking branch 'origin/develop' into feature/proposal_r…
iceseer Mar 20, 2022
13e0aae
Merge branch 'develop' into feature/proposal_request_optimization_v2
iceseer Mar 24, 2022
5f1b5c5
remove comment
iceseer Mar 24, 2022
33c1540
builded test
iceseer Mar 24, 2022
359a5da
builded test
iceseer Mar 24, 2022
777c37a
on_demand_os_test
iceseer Mar 24, 2022
1f06aca
formatting
iceseer Mar 24, 2022
c62a7f4
bf fix
iceseer Mar 25, 2022
9176378
tests
iceseer Mar 25, 2022
fdac125
Merge branch 'develop' into feature/proposal_request_optimization_v2
iceseer Mar 27, 2022
7796bfb
tests
iceseer Mar 27, 2022
2ec9855
tests
iceseer Mar 28, 2022
4954fa4
tests
iceseer Mar 28, 2022
7b6ae75
tests
iceseer Apr 7, 2022
999efdb
review issues
iceseer Apr 7, 2022
44d5ce9
Merge branch 'develop' into feature/proposal_request_optimization_v2
iceseer Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/TESTS_ALLOWED_TO_FAIL
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
system_irohad_test
integration_add_peer_test
regression_regression_test
integration_remove_peer_test
2 changes: 1 addition & 1 deletion irohad/ametsuchi/impl/postgres_specific_query_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ namespace iroha {
for (const auto &row : range) {
iroha::ametsuchi::apply(
row,
[this, &peers](
[&peers](
auto &peer_key, auto &address, auto &tls_certificate) {
if (peer_key and address) {
peers.push_back(
Expand Down
8 changes: 3 additions & 5 deletions irohad/main/impl/on_demand_ordering_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ auto createNotificationFactory(
std::make_unique<iroha::network::ClientFactoryImpl<
iroha::ordering::transport::OnDemandOsClientGrpcFactory::Service>>(
std::move(client_factory)),
[](iroha::ordering::ProposalEvent event) {
iroha::getSubscription()->notify(iroha::EventTypes::kOnProposalResponse,
std::move(event));
},
std::move(os_execution_keepers));
}

Expand Down Expand Up @@ -87,14 +83,16 @@ auto OnDemandOrderingInit::createGate(
size_t max_number_of_transactions,
const logger::LoggerManagerTreePtr &ordering_log_manager,
bool syncing_mode) {
return std::make_shared<OnDemandOrderingGate>(
auto og = std::make_shared<OnDemandOrderingGate>(
std::move(ordering_service),
std::move(network_client),
std::move(proposal_factory),
std::move(tx_cache),
max_number_of_transactions,
ordering_log_manager->getChild("Gate")->getLogger(),
syncing_mode);
og->initialize();
return og;
}

auto OnDemandOrderingInit::createService(
Expand Down
6 changes: 6 additions & 0 deletions irohad/main/subscription_fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
namespace iroha {
enum SubscriptionEngineHandlers {
kYac = 0,
kRequestProposal,
kVoteProcess,
kProposalProcessing,
kMetrics,
kNotifications,
//---------------
Expand Down Expand Up @@ -38,10 +41,13 @@ namespace iroha {
kOnTxsEnoughForProposal,
kOnPackProposal,
kOnProposalResponse,
kOnProposalResponseFailed,
kOnTransactionResponse,
kOnConsensusGateEvent,
kSendBatchComplete,

kRemoteProposalDiff,

// RDB
kOnRdbStats,

Expand Down
7 changes: 7 additions & 0 deletions irohad/ordering/impl/batches_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "common/common.hpp"
#include "consensus/round.hpp"
#include "ordering/ordering_types.hpp"

namespace shared_model::interface {
class TransactionBatch;
Expand Down Expand Up @@ -187,9 +188,11 @@ namespace iroha::ordering {
size_t requested_tx_amount,
std::vector<std::shared_ptr<shared_model::interface::Transaction>>
&collection,
BloomFilter256 &bf,
IsProcessedFunc &&is_processed) {
collection.clear();
collection.reserve(requested_tx_amount);
bf.clear();

std::unique_lock lock(batches_cache_cs_);
uint32_t depth_counter = 0ul;
Expand All @@ -204,10 +207,14 @@ namespace iroha::ordering {
return false;
}

for (auto &tx : batch->transactions())
tx->storeBatchHash(batch->reducedHash());

collection.insert(std::end(collection),
std::begin(batch->transactions()),
std::end(batch->transactions()));

bf.set(batch->reducedHash());
used_batches_cache_.insert(batch);
return true;
});
Expand Down
19 changes: 10 additions & 9 deletions irohad/ordering/impl/on_demand_connection_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ void OnDemandConnectionManager::onBatchesToWholeNetwork(

void OnDemandConnectionManager::onRequestProposal(
consensus::Round round,
std::optional<std::shared_ptr<const shared_model::interface::Proposal>>
ref_proposal) {
std::optional<
std::pair<std::shared_ptr<shared_model::interface::Proposal const>,
BloomFilter256>> proposal) {
std::shared_lock<std::shared_timed_mutex> lock(mutex_);
if (stop_requested_.load(std::memory_order_relaxed)) {
if (stop_requested_.load(std::memory_order_relaxed))
return;
}

log_->debug("onRequestProposal, {}", round);

if (auto &connection = connections_.peers[kIssuer]) {
(*connection)->onRequestProposal(round, std::move(ref_proposal));
}
assert(!proposal || proposal.value().first);
log_->debug("onRequestProposal, {} : {}",
round,
proposal ? proposal.value().first->toString() : "NULL_OPT");
if (auto &connection = connections_.peers[kIssuer])
(*connection)->onRequestProposal(round, std::move(proposal));
}

void OnDemandConnectionManager::initializeConnections(
Expand Down
7 changes: 4 additions & 3 deletions irohad/ordering/impl/on_demand_connection_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ namespace iroha {
void onBatches(CollectionType batches) override;
void onBatchesToWholeNetwork(CollectionType batches) override;
std::chrono::milliseconds getRequestDelay() const override;

void onRequestProposal(
consensus::Round round,
std::optional<
std::shared_ptr<const shared_model::interface::Proposal>>
ref_proposal) override;
std::optional<std::pair<
std::shared_ptr<shared_model::interface::Proposal const>,
BloomFilter256>> proposal) override;

/**
* Initialize corresponding peers in connections_ using factory_
Expand Down
26 changes: 26 additions & 0 deletions irohad/ordering/impl/on_demand_ordering_gate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "interfaces/iroha_internal/transaction_batch_impl.hpp"
#include "interfaces/iroha_internal/transaction_batch_parser_impl.hpp"
#include "logger/logger.hpp"
#include "main/subscription.hpp"
#include "ordering/impl/on_demand_common.hpp"
#include "validators/field_validator.hpp"

Expand All @@ -41,7 +42,32 @@ OnDemandOrderingGate::OnDemandOrderingGate(
tx_cache_(std::move(tx_cache)),
syncing_mode_(syncing_mode) {}

void OnDemandOrderingGate::initialize() {
failed_proposal_response_ =
SubscriberCreator<bool, ProposalEvent>::template create<
EventTypes::kOnProposalResponseFailed>(
SubscriptionEngineHandlers::kYac,
[_w_this{weak_from_this()}](auto, auto ev) {
if (auto _this = _w_this.lock()) {
std::shared_lock<std::shared_timed_mutex> stop_lock(
_this->stop_mutex_);
if (_this->stop_requested_) {
_this->log_->warn(
"Not doing anything because stop was requested.");
return;
}

if (!_this->syncing_mode_) {
assert(_this->network_client_);
_this->network_client_->onRequestProposal(ev.round,
std::nullopt);
}
}
});
}

OnDemandOrderingGate::~OnDemandOrderingGate() {
failed_proposal_response_->unsubscribe();
stop();
}

Expand Down
10 changes: 9 additions & 1 deletion irohad/ordering/impl/on_demand_ordering_gate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

#include "network/ordering_gate.hpp"

#include <memory>
#include <shared_mutex>

#include "interfaces/common_objects/types.hpp"
#include "interfaces/iroha_internal/proposal.hpp"
#include "interfaces/iroha_internal/unsafe_proposal_factory.hpp"
#include "logger/logger_fwd.hpp"
#include "main/subscription.hpp"
#include "ordering/impl/on_demand_common.hpp"
#include "ordering/impl/round_switch.hpp"
#include "ordering/on_demand_ordering_service.hpp"
Expand All @@ -30,7 +32,9 @@ namespace iroha {
* Ordering gate which requests proposals from the ordering service
* votes for proposals, and passes committed proposals to the pipeline
*/
class OnDemandOrderingGate : public network::OrderingGate {
class OnDemandOrderingGate
: public network::OrderingGate,
public std::enable_shared_from_this<OnDemandOrderingGate> {
public:
OnDemandOrderingGate(
std::shared_ptr<OnDemandOrderingService> ordering_service,
Expand All @@ -44,6 +48,8 @@ namespace iroha {

~OnDemandOrderingGate() override;

void initialize();

void propagateBatch(
std::shared_ptr<shared_model::interface::TransactionBatch> batch)
override;
Expand Down Expand Up @@ -96,6 +102,8 @@ namespace iroha {
std::shared_ptr<ametsuchi::TxPresenceCache> tx_cache_;
consensus::Round current_round_;
std::shared_ptr<const LedgerState> current_ledger_state_;
std::shared_ptr<iroha::BaseSubscriber<bool, ProposalEvent>>
failed_proposal_response_;

std::shared_timed_mutex stop_mutex_;
bool stop_requested_{false};
Expand Down
Loading