Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

babe deadlock #1872

Merged
merged 5 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 46 additions & 32 deletions core/consensus/babe/impl/babe_config_repository_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ namespace kagome::consensus::babe {
BOOST_ASSERT(header_repo_ != nullptr);
BOOST_ASSERT(babe_api_ != nullptr);

if (auto r = indexer_.init(); not r) {
logger_->error("Indexer::init error: {}", r.error());
}
SAFE_UNIQUE(indexer_) {
if (auto r = indexer_.init(); not r) {
logger_->error("Indexer::init error: {}", r.error());
}
};

app_state_manager.takeControl(*this);
}
Expand All @@ -104,27 +106,29 @@ namespace kagome::consensus::babe {
return false;
}

std::unique_lock lock{indexer_mutex_};
auto finalized = block_tree_->getLastFinalized();
auto finalized_header = block_tree_->getBlockHeader(finalized.hash).value();

if (finalized.number - indexer_.last_finalized_indexed_.number
> kMaxUnindexedBlocksNum
and trie_storage_->getEphemeralBatchAt(finalized_header.state_root)) {
warp(lock, finalized);
}
SAFE_UNIQUE(indexer_) {
if (finalized.number - indexer_.last_finalized_indexed_.number
> kMaxUnindexedBlocksNum
and trie_storage_->getEphemeralBatchAt(finalized_header.state_root)) {
warp(indexer_, finalized);
}

if (!timings_) {
auto genesis_res = config({block_tree_->getGenesisBlockHash(), 0}, false);
if (genesis_res.has_value()) {
auto &genesis = genesis_res.value();
timings_.init(genesis->slot_duration, genesis->epoch_length);
SL_DEBUG(logger_,
"Timing was initialized: slot is {}ms, epoch is {} slots",
timings_.slot_duration.count(),
timings_.epoch_length);
if (!timings_) {
auto genesis_res =
config(indexer_, {block_tree_->getGenesisBlockHash(), 0}, false);
if (genesis_res.has_value()) {
auto &genesis = genesis_res.value();
timings_.init(genesis->slot_duration, genesis->epoch_length);
SL_DEBUG(logger_,
"Timing was initialized: slot is {}ms, epoch is {} slots",
timings_.slot_duration.count(),
timings_.epoch_length);
}
}
}
};

[[maybe_unused]] bool active_ = false;

Expand All @@ -133,7 +137,10 @@ namespace kagome::consensus::babe {
auto consensus = consensus_selector_.get()->getProductionConsensus(best);
if (std::dynamic_pointer_cast<Babe>(consensus)) {
active_ = true;
if (auto res = config(best, true); not res and not config_warp_sync_) {
auto res = SAFE_UNIQUE(indexer_) {
return config(indexer_, best, true);
};
if (not res and not config_warp_sync_) {
SL_ERROR(logger_, "get config at best {} error: {}", best, res.error());
auto best_header = block_tree_->getBlockHeader(best.hash).value();
if (not trie_storage_->getEphemeralBatchAt(best_header.state_root)) {
Expand All @@ -146,8 +153,10 @@ namespace kagome::consensus::babe {

chain_sub_.onFinalize([weak{weak_from_this()}]() {
if (auto self = weak.lock()) {
std::unique_lock lock{self->indexer_mutex_};
self->indexer_.finalize();
auto &indexer_ = self->indexer_;
SAFE_UNIQUE(indexer_) {
indexer_.finalize();
};
}
});

Expand All @@ -165,8 +174,9 @@ namespace kagome::consensus::babe {
slots_util_.get()->slotToEpoch(parent_info, parent_slot));
epoch_changed = epoch_number != parent_epoch;
}
std::unique_lock lock{indexer_mutex_};
return config(parent_info, epoch_changed);
return SAFE_UNIQUE(indexer_) {
return config(indexer_, parent_info, epoch_changed);
};
}

outcome::result<SlotNumber> BabeConfigRepositoryImpl::getFirstBlockSlotNumber(
Expand Down Expand Up @@ -209,18 +219,20 @@ namespace kagome::consensus::babe {
return slot1.value();
}

void BabeConfigRepositoryImpl::warp(std::unique_lock<std::mutex> &lock,
void BabeConfigRepositoryImpl::warp(Indexer &indexer_,
const primitives::BlockInfo &block) {
indexer_.put(block, {}, true);
}

void BabeConfigRepositoryImpl::warp(const primitives::BlockInfo &block) {
std::unique_lock lock{indexer_mutex_};
warp(lock, block);
SAFE_UNIQUE(indexer_) {
warp(indexer_, block);
};
}

outcome::result<std::shared_ptr<const BabeConfiguration>>
BabeConfigRepositoryImpl::config(const primitives::BlockInfo &block,
BabeConfigRepositoryImpl::config(Indexer &indexer_,
const primitives::BlockInfo &block,
bool next_epoch) const {
auto descent = indexer_.descend(block);
outcome::result<void> cb_res = outcome::success();
Expand Down Expand Up @@ -262,7 +274,7 @@ namespace kagome::consensus::babe {
OUTCOME_TRY(header, block_tree_->getBlockHeader(info.hash));
if (HasBabeConsensusDigest digests{header}) {
if (not prev_state) {
BOOST_OUTCOME_TRY(prev_state, loadPrev(prev));
BOOST_OUTCOME_TRY(prev_state, loadPrev(indexer_, prev));
}
auto state = applyDigests(getConfig(*prev_state), digests);
BabeIndexedValue value{
Expand Down Expand Up @@ -294,10 +306,10 @@ namespace kagome::consensus::babe {
return *r->second.value->state;
}
if (next_epoch) {
OUTCOME_TRY(load(r->first, r->second));
OUTCOME_TRY(load(indexer_, r->first, r->second));
return *r->second.value->next_state;
}
return loadPrev(r->second.prev);
return loadPrev(indexer_, r->second.prev);
}

std::shared_ptr<BabeConfiguration> BabeConfigRepositoryImpl::applyDigests(
Expand All @@ -320,6 +332,7 @@ namespace kagome::consensus::babe {
}

outcome::result<void> BabeConfigRepositoryImpl::load(
Indexer &indexer_,
const primitives::BlockInfo &block,
blockchain::Indexed<BabeIndexedValue> &item) const {
if (not item.value->next_state) {
Expand All @@ -339,6 +352,7 @@ namespace kagome::consensus::babe {

outcome::result<std::shared_ptr<const BabeConfiguration>>
BabeConfigRepositoryImpl::loadPrev(
Indexer &indexer_,
const std::optional<primitives::BlockInfo> &prev) const {
if (not prev) {
return Error::PREVIOUS_NOT_FOUND;
Expand All @@ -350,7 +364,7 @@ namespace kagome::consensus::babe {
if (not r->value) {
return Error::PREVIOUS_NOT_FOUND;
}
OUTCOME_TRY(load(*prev, *r));
OUTCOME_TRY(load(indexer_, *prev, *r));
return *r->value->next_state;
}
} // namespace kagome::consensus::babe
17 changes: 10 additions & 7 deletions core/consensus/babe/impl/babe_config_repository_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

#include "consensus/babe/babe_config_repository.hpp"

#include <mutex>

#include "blockchain/indexer.hpp"
#include "consensus/babe/has_babe_consensus_digest.hpp"
#include "consensus/babe/types/scheduled_change.hpp"
Expand All @@ -18,6 +16,7 @@
#include "primitives/block_data.hpp"
#include "primitives/event_types.hpp"
#include "storage/spaced_storage.hpp"
#include "utils/safe_object.hpp"

namespace kagome::application {
class AppStateManager;
Expand Down Expand Up @@ -98,32 +97,36 @@ namespace kagome::consensus::babe {
void warp(const primitives::BlockInfo &block) override;

private:
using Indexer = blockchain::Indexer<BabeIndexedValue>;

outcome::result<SlotNumber> getFirstBlockSlotNumber(
const primitives::BlockInfo &parent_info) const;

outcome::result<std::shared_ptr<const BabeConfiguration>> config(
const primitives::BlockInfo &block, bool next_epoch) const;
Indexer &indexer_,
const primitives::BlockInfo &block,
bool next_epoch) const;

std::shared_ptr<BabeConfiguration> applyDigests(
const NextConfigDataV1 &config,
const HasBabeConsensusDigest &digests) const;

outcome::result<void> load(
Indexer &indexer_,
const primitives::BlockInfo &block,
blockchain::Indexed<BabeIndexedValue> &item) const;

outcome::result<std::shared_ptr<const BabeConfiguration>> loadPrev(
Indexer &indexer_,
const std::optional<primitives::BlockInfo> &prev) const;

void warp(std::unique_lock<std::mutex> &lock,
const primitives::BlockInfo &block);
void warp(Indexer &indexer_, const primitives::BlockInfo &block);

std::shared_ptr<storage::BufferStorage> persistent_storage_;
bool config_warp_sync_;
EpochTimings &timings_;
std::shared_ptr<blockchain::BlockTree> block_tree_;
mutable std::mutex indexer_mutex_;
mutable blockchain::Indexer<BabeIndexedValue> indexer_;
mutable SafeObject<Indexer> indexer_;
std::shared_ptr<blockchain::BlockHeaderRepository> header_repo_;
LazySPtr<ConsensusSelector> consensus_selector_;
std::shared_ptr<runtime::BabeApi> babe_api_;
Expand Down
4 changes: 3 additions & 1 deletion core/consensus/timeline/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ namespace kagome::consensus {
*this);
}

auto operator<=>(const SlotDuration &) const = default;
auto operator<=>(const SlotDuration &r) const {
return count() <=> r.count();
}

friend ::scale::ScaleEncoderStream &operator<<(
::scale::ScaleEncoderStream &s, const SlotDuration &duration) {
Expand Down
15 changes: 15 additions & 0 deletions core/utils/safe_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
#include <condition_variable>
#include <mutex>
#include <shared_mutex>
#include <type_traits>

#define SAFE_UNIQUE_CAPTURE(x, ...) \
x ^= __VA_ARGS__(typename std::remove_cvref_t<decltype(x)>::Type & x)
#define SAFE_SHARED_CAPTURE(x, ...) \
x |= __VA_ARGS__(const typename std::remove_cvref_t<decltype(x)>::Type &x)
#define SAFE_UNIQUE(x) SAFE_UNIQUE_CAPTURE(x, [&])
#define SAFE_SHARED(x) SAFE_SHARED_CAPTURE(x, [&])

// clang-format off
/**
Expand Down Expand Up @@ -55,6 +63,13 @@ struct SafeObject {
return std::forward<F>(f)(t_);
}

auto operator^=(auto &&f) {
return exclusiveAccess(std::forward<decltype(f)>(f));
}
auto operator|=(auto &&f) const {
return sharedAccess(std::forward<decltype(f)>(f));
}

T &unsafeGet() {
return t_;
}
Expand Down