Skip to content

Commit

Permalink
feature: escalation dispute by disabled validators
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Khaustov aka xDimon <khaustov.dm@gmail.com>
  • Loading branch information
xDimon committed Apr 12, 2024
1 parent 390cbe2 commit d426223
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 62 deletions.
33 changes: 29 additions & 4 deletions core/dispute_coordinator/impl/candidate_vote_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
#include <set>

namespace kagome::dispute {
CandidateVoteState CandidateVoteState::create(CandidateVotes votes,
CandidateEnvironment &env,
Timestamp now) {
CandidateVoteState CandidateVoteState::create(
CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled_validators,
Timestamp now) {
CandidateVoteState res{.votes = std::move(votes),
.own_vote = CannotVote{},
.dispute_status = std::nullopt};

// Collect own votes
auto controlled_indices = env.controlled_indices;
if (not controlled_indices.empty()) {
Voted voted;
Expand All @@ -41,11 +44,33 @@ namespace kagome::dispute {
}

// Check if isn't disputed
// (We have a dispute, if we have votes on both sides)

// (We have a dispute if we have votes on both sides.)
if (res.votes.invalid.empty() or res.votes.valid.empty()) {
return res;
}

// Check if escalated by active validators

auto has_vote_of_active = [&](auto &votes) {
auto is_not_disabled = [&](auto &vote) {
return not std::binary_search(
disabled_validators.begin(), disabled_validators.end(), vote.first);
};
return std::find_if(votes.begin(), votes.end(), is_not_disabled)
!= votes.end();
};

auto escalated_by_active = has_vote_of_active(res.votes.invalid)
and has_vote_of_active(res.votes.valid);

if (not escalated_by_active) {
res.dispute_status = Postponed{};
return res;
}

// Check thresholds

auto n_validators = env.session.validators.size();

auto byzantine_threshold = (std::max<size_t>(n_validators, 1) - 1) / 3;
Expand Down
1 change: 1 addition & 0 deletions core/dispute_coordinator/impl/candidate_vote_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace kagome::dispute {
public:
static CandidateVoteState create(CandidateVotes votes,
CandidateEnvironment &env,
std::vector<ValidatorIndex> &disabled,
Timestamp now);

/// Votes already existing for the candidate + receipt.
Expand Down
176 changes: 123 additions & 53 deletions core/dispute_coordinator/impl/dispute_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -337,8 +340,20 @@ namespace kagome::dispute {
}
auto &candidate_votes = votes_res.value().value();

auto &relay_parent =
candidate_votes.candidate_receipt.descriptor.relay_parent;

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
continue;
}
auto &disabled_validators = disabled_validators_res.value();

auto vote_state = CandidateVoteState::create(
candidate_votes, env, system_clock_.nowUint64());
candidate_votes, env, disabled_validators, system_clock_.nowUint64());

auto is_included = scraper_->is_candidate_included(candidate_hash);
auto is_backed = scraper_->is_candidate_backed(candidate_hash);
Expand Down Expand Up @@ -1015,23 +1030,51 @@ namespace kagome::dispute {
// blocks, and hence we do not have a `CandidateReceipt` available.

CandidateVoteState old_state;
std::vector<ValidatorIndex> disabled_validators;

OUTCOME_TRY(old_state_opt,
storage_->load_candidate_votes(session, candidate_hash));
if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(old_state_opt.value(), env, now);
} else {

primitives::BlockHash relay_parent{};

if (not old_state_opt.has_value()) {
auto provided_opt = if_type<CandidateReceipt>(candidate_receipt);
if (not provided_opt.has_value()) {
SL_ERROR(log_,
"Cannot import votes, without `CandidateReceipt` available!");
return outcome::success(false);
}

relay_parent = provided_opt.value().get().descriptor.relay_parent;

old_state = {
.votes = {.candidate_receipt = provided_opt.value().get()},
.own_vote = CannotVote{},
.dispute_status = std::nullopt,
};

} else {
relay_parent = old_state_opt->candidate_receipt.descriptor.relay_parent;
}

auto disabled_validators_res = api_->disabled_validators(relay_parent);
if (disabled_validators_res.has_error()) {
SL_WARN(log_,
"Cannot import votes, without getting disabled validators: {}",
disabled_validators_res.error());
return outcome::success(false);
}
disabled_validators = std::move(disabled_validators_res.value());

auto is_disabled = [&disabled_validators =
disabled_validators_res.value()](auto index) {
return std::binary_search(
disabled_validators.begin(), disabled_validators.end(), index);
};

if (old_state_opt.has_value()) {
old_state = CandidateVoteState::create(
old_state_opt.value(), env, disabled_validators, now);
}

SL_TRACE(log_, "Loaded votes");
Expand Down Expand Up @@ -1060,16 +1103,6 @@ namespace kagome::dispute {

auto expected_candidate_hash = votes.candidate_receipt.hash(*hasher_);

// const auto &relay_parent = candidate_receipt.descriptor.relay_parent;
// auto disabled_validators_res = api_->disabled_validators(relay_parent);
// if (disabled_validators_res.has_error()) {
// SL_WARN(log_,
// "Can't get disabled validators: {}",
// disabled_validators_res.error());
// return;
// }
// const auto &disabled_validators = disabled_validators_res.value();

std::vector<Indexed<SignedDisputeStatement>> postponed_statements;

for (auto &vote : statements) {
Expand All @@ -1092,16 +1125,15 @@ namespace kagome::dispute {
continue;
}

// // Don't exist any votes for candidate - check disabled
// if (votes.valid.empty() and votes.invalid.empty()) {
// auto is_disabled_validator = std::binary_search(
// disabled_validators.begin(), disabled_validators.end(),
// val_index);
// if (is_disabled_validator) {
// postponed_statements.emplace_back(std::move(vote));
// continue;
// }
// }
auto is_disabled_validator = is_disabled(val_index);

// Postpone votes of disabled validators while any votes for candidate are
// not exist
if (is_disabled_validator and votes.valid.empty()
and votes.invalid.empty()) {
postponed_statements.emplace_back(std::move(vote));
continue;
}

visit_in_place(
statement.dispute_statement,
Expand All @@ -1110,11 +1142,13 @@ namespace kagome::dispute {
val_index,
std::make_tuple(valid, statement.validator_signature));
if (fresh) {
if (imported_valid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_valid_votes;
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
return true;
}
auto &existing = std::get<0>(it->second);
Expand All @@ -1134,26 +1168,29 @@ namespace kagome::dispute {
val_index,
std::make_tuple(invalid, statement.validator_signature));
if (fresh) {
++imported_invalid_votes;
new_invalid_voters.push_back(val_index);
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
if (imported_invalid_votes == 0) {
// Return postponed statements to process
std::move_backward(postponed_statements.begin(),
postponed_statements.end(),
statements.end());
}
++imported_invalid_votes;
return true;
}
return false;
});
}

CandidateVoteState _new_state = CandidateVoteState::create(votes, env, now);

ImportResult intermediate_result{old_state,
_new_state,
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters};
ImportResult intermediate_result{
std::move(old_state),
CandidateVoteState::create(
votes, env, disabled_validators, now), // new_state
imported_invalid_votes,
imported_valid_votes,
0,
new_invalid_voters,
};

// Handle approval vote import:
//
Expand Down Expand Up @@ -1305,8 +1342,8 @@ namespace kagome::dispute {
}
}

import_result.new_state =
CandidateVoteState::create(std::move(_votes), env, now);
import_result.new_state = CandidateVoteState::create(
std::move(_votes), env, disabled_validators, now);
}
} else {
SL_TRACE(log_, "Not requested approval signatures");
Expand Down Expand Up @@ -1948,6 +1985,9 @@ namespace kagome::dispute {
},
[](const ConcludedAgainst &at) -> std::optional<Timestamp> {
return (Timestamp)at;
},
[](const Postponed &) -> std::optional<Timestamp> {
return std::nullopt;
});

auto dispute_is_inactive =
Expand Down Expand Up @@ -2006,8 +2046,6 @@ namespace kagome::dispute {
std::move(candidate_receipt),
valid);

// TODO ничего не возвращаем, вызывается из апрувала и партисипейшена

SL_TRACE(log_, "DisputeCoordinatorMessage::IssueLocalStatement");
auto res = issue_local_statement(
candidate_hash, candidate_receipt, session, valid);
Expand Down Expand Up @@ -2207,8 +2245,8 @@ namespace kagome::dispute {
BOOST_ASSERT_MSG(not queue.empty(),
"Invariant that queues are never empty is broken.");

auto &[msg, cb] = queue.front();
heads.emplace_back(peer_id, std::move(msg), std::move(cb));
auto &[request, cb] = queue.front();
heads.emplace_back(peer_id, std::move(request), std::move(cb));
queue.pop_front();

if (not queue.empty()) {
Expand Down Expand Up @@ -2326,6 +2364,31 @@ namespace kagome::dispute {
auto &valid_vote = checked_valid_vote;
auto &invalid_vote = checked_invalid_vote;

// // Check if request from disabled validator
// const auto &relay_parent = candidate_receipt.descriptor.relay_parent;
// OUTCOME_TRY(disabled_validators,
// api_->disabled_validators(relay_parent));
//
// auto is_disabled_validator =
// std::binary_search(disabled_validators.begin(),
// disabled_validators.end(),
// valid_vote.ix)
// or
// std::binary_search(disabled_validators.begin(),
// disabled_validators.end(),
// invalid_vote.ix);

// // Don't exist any votes for candidate - check disabled
// if (votes.valid.empty() and votes.invalid.empty()) {
// auto is_disabled_validator = std::binary_search(
// disabled_validators.begin(), disabled_validators.end(),
// val_index);
// if (is_disabled_validator) {
// postponed_statements.emplace_back(std::move(vote));
// continue;
// }
// }

OUTCOME_TRY(found_batch,
batches_->find_batch(candidate_hash, candidate_receipt));
auto &[batch, just_created] = found_batch;
Expand Down Expand Up @@ -2354,6 +2417,7 @@ namespace kagome::dispute {
auto cb_opt =
batch->add_votes(valid_vote, invalid_vote, peer, std::move(cb));

// Returned value means duplicate
if (cb_opt.has_value()) {
// We don't expect honest peers to send redundant votes within a
// single batch, as the timeout for retry is much higher. Still we
Expand All @@ -2375,12 +2439,18 @@ namespace kagome::dispute {
// limit.
(*cb_opt)(BatchError::RedundantMessage);

} else { // FIXME testing code. must be removed
auto prepared_import =
batch->tick(steady_clock_.now() + Batch::kBatchCollectingInterval);
if (prepared_import.has_value()) {
start_import(std::move(prepared_import.value()));
}
// } else { // FIXME testing code. must be removed
// auto prepared_import =
// batch->tick(steady_clock_.now() +
// Batch::kBatchCollectingInterval);
// if (prepared_import.has_value()) {
// start_import(std::move(prepared_import.value()));
// }
}

auto ready_prepared_imports = batches_->check_batches();
for (auto &prepared_import : ready_prepared_imports) {
start_import(std::move(prepared_import));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ namespace kagome::dispute {
return (Timestamp)concluded
+ DisputeCoordinatorImpl::kActiveDurationSecs
< now;
});
},
[](const Postponed &) { return true; });

// Split recent disputes in ACTIVE and INACTIVE
auto [unknown, concluded, unconcluded] =
Expand Down
8 changes: 6 additions & 2 deletions core/dispute_coordinator/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,14 @@ namespace kagome::dispute {
/// successfully ourselves).
using Confirmed = Tagged<Empty, struct ConfirmedTag>;

/// Dispute has been postponed 'cause all votes is received from disabled
/// validators.
using Postponed = Tagged<Empty, struct PostponedTag>;

/// The status of dispute.
/// NOTE: This status is persisted to the database
using DisputeStatus =
boost::variant<Active, ConcludedFor, ConcludedAgainst, Confirmed>;
using DisputeStatus = boost::
variant<Active, ConcludedFor, ConcludedAgainst, Confirmed, Postponed>;

/// The mapping for recent disputes; any which have not
/// yet been pruned for being ancient.
Expand Down
3 changes: 1 addition & 2 deletions core/network/types/dispute_messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ namespace kagome::network {
};

/// A dispute initiating/participating message that have been built from
/// signed
/// statements.
/// signed statements.
///
/// And most likely has been constructed correctly. This is used with
/// `DisputeDistributionMessage::SendDispute` for sending out votes.
Expand Down

0 comments on commit d426223

Please sign in to comment.