Skip to content

Commit

Permalink
Merge #2941: More Dash llmq backports
Browse files Browse the repository at this point in the history
a29d294 Fix deadlock in CSigSharesManager::SendMessages (#2757) (Alexander Block)
b4a4e09 Ignore sig share inv messages when we don't have the quorum vvec (#2733) (Alexander Block)
a2fb276 On timeout, print members proTxHashes from members which did not send a share (#2731) (Alexander Block)
d1084e0 Actually start the timers for sig share and recSig verification (#2730) (Alexander Block)
71092e0 Send/Receive multiple messages as part of one P2P message in CSigSharesManager (#2729) (Alexander Block)
e73c238 Merge pull request #2726 from codablock/pr_llmq_sessionids (UdjinM6)
7ccd790 Merge pull request #2725 from codablock/pr_llmq_hashmaps (Alexander Block)
a0084f5 Multiple fixes and optimizations for LLMQs and ChainLocks (#2724) (Alexander Block)
0613978 Cleanup successful sessions before doing timeout check (#2712) (Alexander Block)
c9127e1 Avoid using ordered maps in LLMQ signing code (#2708) (Alexander Block)

Pull request description:

  Follow up of #2921

  each commit backports a PR. you can find the number of the PR in the commit description

ACKs for top commit: a29d294
  Duddino:
    utACK a29d294
  Fuzzbawls:
    utACK a29d294

Tree-SHA512: 75483d543f39d85a2924606b1f7c359a45a52e0ebd84bdc06275080db2d07aa657d692461fbf22d23890d3a0394ebffae0c662a2def420d53ebcdb69c974ba6f
  • Loading branch information
Fuzzbawls committed Nov 5, 2024
2 parents 67d2db1 + a29d294 commit 717728b
Show file tree
Hide file tree
Showing 9 changed files with 879 additions and 401 deletions.
20 changes: 13 additions & 7 deletions src/llmq/quorums_chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,17 @@ void CChainLocksHandler::ProcessMessage(CNode* pfrom, const std::string& strComm

auto hash = ::SerializeHash(clsig);

{
LOCK(cs_main);
connman.RemoveAskFor(hash, MSG_CLSIG);
}

ProcessNewChainLock(pfrom->GetId(), clsig, hash);
}
}

void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLockSig& clsig, const uint256& hash)
{
{
LOCK(cs_main);
g_connman->RemoveAskFor(hash, MSG_CLSIG);
}

{
LOCK(cs);
if (!seenChainLocks.emplace(hash, GetTimeMillis()).second) {
Expand Down Expand Up @@ -190,6 +190,8 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
return;
}

Cleanup();

// DIP8 defines a process called "Signing attempts" which should run before the CLSIG is finalized
// To simplify the initial implementation, we skip this process and directly try to create a CLSIG
// This will fail when multiple blocks compete, but we accept this for the initial implementation.
Expand All @@ -201,6 +203,12 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
{
LOCK(cs);

if (bestChainLockBlockIndex == pindexNew) {
// we first got the CLSIG, then the header, and then the block was connected.
// In this case there is no need to continue here.
return;
}

if (InternalHasConflictingChainLock(pindexNew->nHeight, pindexNew->GetBlockHash())) {
if (!inEnforceBestChainLock) {
// we accepted this block when there was no lock yet, but now a conflicting lock appeared. Invalidate it.
Expand All @@ -226,8 +234,6 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl
}

quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqChainLocks, requestId, msgHash);

Cleanup();
}

// WARNING: cs_main and cs should not be held!
Expand Down
2 changes: 2 additions & 0 deletions src/llmq/quorums_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ void StartLLMQSystem()
quorumDKGSessionManager->StartThreads();
}
if (quorumSigSharesManager) {
quorumSigSharesManager->RegisterAsRecoveredSigsListener();
quorumSigSharesManager->StartWorkerThread();
}
if (chainLocksHandler) {
Expand All @@ -68,6 +69,7 @@ void StopLLMQSystem()
}
if (quorumSigSharesManager) {
quorumSigSharesManager->StopWorkerThread();
quorumSigSharesManager->UnregisterAsRecoveredSigsListener();
}
if (quorumDKGSessionManager) {
quorumDKGSessionManager->StopThreads();
Expand Down
79 changes: 58 additions & 21 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <algorithm>
#include <limits>
#include <unordered_set>

namespace llmq
{
Expand Down Expand Up @@ -79,8 +80,23 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash)

bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash)
{
int64_t t = GetTimeMillis();

{
LOCK(cs);
auto it = hasSigForHashCache.find(hash);
if (it != hasSigForHashCache.end()) {
it->second.second = t;
return it->second.first;
}
}

auto k = std::make_tuple('h', hash);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForHashCache.emplace(hash, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret)
Expand Down Expand Up @@ -152,13 +168,14 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
LOCK(cs);
hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t);
hasSigForSessionCache[signHash] = std::make_pair(true, t);
hasSigForHashCache[recSig.GetHash()] = std::make_pair(true, t);
}
}

template<typename K>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>>& m, size_t maxSize, size_t truncateThreshold)
template <typename K, typename H>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>, H>& m, size_t maxSize, size_t truncateThreshold)
{
typedef typename std::unordered_map<K, std::pair<bool, int64_t>> Map;
typedef typename std::unordered_map<K, std::pair<bool, int64_t>, H> Map;
typedef typename Map::iterator Iterator;

if (m.size() <= truncateThreshold) {
Expand Down Expand Up @@ -237,10 +254,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
hasSigForHashCache.erase(recSig.GetHash());
}

TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForHashCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
}

for (auto& e : toDelete2) {
Expand Down Expand Up @@ -355,18 +374,17 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig&

void CSigningManager::CollectPendingRecoveredSigsToVerify(
size_t maxUniqueSessions,
std::map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums)
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums)
{
{
LOCK(cs);
if (pendingRecoveredSigs.empty()) {
return;
}

std::set<std::pair<NodeId, uint256>> uniqueSignHashes;
llmq::utils::IterateNodesRandom(
pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list<CRecoveredSig>& ns) {
std::unordered_set<std::pair<NodeId, uint256>, StaticSaltedHasher> uniqueSignHashes;
llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list<CRecoveredSig>& ns) {
if (ns.empty()) {
return false;
}
Expand Down Expand Up @@ -419,8 +437,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(

bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{
std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;
std::unordered_map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher> quorums;

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
Expand All @@ -443,13 +461,13 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
}
}

cxxtimer::Timer verifyTimer;
cxxtimer::Timer verifyTimer(true);
batchVerifier.Verify();
verifyTimer.stop();

LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size());

std::set<uint256> processed;
std::unordered_set<uint256, StaticSaltedHasher> processed;
for (auto& p : recSigsByNode) {
NodeId nodeId = p.first;
auto& v = p.second;
Expand Down Expand Up @@ -494,11 +512,25 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re
signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), nodeId);

if (db.HasRecoveredSigForId(llmqType, recoveredSig.id)) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for id=%s, msgHash=%s\n", __func__,
recoveredSig.id.ToString(), recoveredSig.msgHash.ToString());
return;
CRecoveredSig otherRecoveredSig;
if (db.GetRecoveredSigById(llmqType, recoveredSig.id, otherRecoveredSig)) {
auto otherSignHash = llmq::utils::BuildSignHash(recoveredSig);
if (signHash != otherSignHash) {
// this should really not happen, as each masternode is participating in only one vote,
// even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id
LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__,
signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), otherSignHash.ToString());
} else {
// Looks like we're trying to process a recSig that is already known. This might happen if the same
// recSig comes in through regular QRECSIG messages and at the same time through some other message
// which allowed to reconstruct a recSig (e.g. IXLOCK). In this case, just bail out.
}
return;
} else {
// This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish
// between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we
// never had that recSig
}
}

db.WriteRecoveredSig(recoveredSig);
Expand Down Expand Up @@ -552,14 +584,19 @@ bool CSigningManager::AsyncSignIfMember(Consensus::LLMQType llmqType, const uint
if (db.HasVotedOnId(llmqType, id)) {
uint256 prevMsgHash;
db.GetVoteForId(llmqType, id, prevMsgHash);
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__,
id.ToString(), prevMsgHash.ToString(), msgHash.ToString());
if (msgHash != prevMsgHash) {
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__,
id.ToString(), prevMsgHash.ToString(), msgHash.ToString());
} else {
LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting again.\n", __func__,
id.ToString(), prevMsgHash.ToString());
}
return false;
}

if (db.HasRecoveredSigForId(llmqType, id)) {
// no need to sign it if we already have a recovered sig
return false;
return true;
}
db.WriteVoteForId(llmqType, id, msgHash);
}
Expand Down
24 changes: 8 additions & 16 deletions src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,11 @@

#include "chainparams.h"
#include "net.h"
#include "saltedhasher.h"
#include "sync.h"

#include <unordered_map>

namespace std {
template <>
struct hash<std::pair<Consensus::LLMQType, uint256>>
{
std::size_t operator()(const std::pair<Consensus::LLMQType, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
}

namespace llmq
{

Expand Down Expand Up @@ -77,7 +67,6 @@ class CRecoveredSig
}
};

// TODO implement caching to speed things up
class CRecoveredSigsDb
{
static const size_t MAX_CACHE_SIZE = 30000;
Expand All @@ -87,8 +76,9 @@ class CRecoveredSigsDb
CDBWrapper db;

RecursiveMutex cs;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>> hasSigForSessionCache;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForSessionCache;
std::unordered_map<uint256, std::pair<bool, int64_t>, StaticSaltedHasher> hasSigForHashCache;

public:
CRecoveredSigsDb(bool fMemory);
Expand Down Expand Up @@ -136,7 +126,7 @@ class CSigningManager
CRecoveredSigsDb db;

// Incoming and not verified yet
std::map<NodeId, std::list<CRecoveredSig>> pendingRecoveredSigs;
std::unordered_map<NodeId, std::list<CRecoveredSig>> pendingRecoveredSigs;

// must be protected by cs
FastRandomContext rnd;
Expand All @@ -157,7 +147,9 @@ class CSigningManager
void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman);
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions,
std::unordered_map<NodeId, std::list<CRecoveredSig>>& retSigShares,
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr, StaticSaltedHasher>& retQuorums);
bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager
Expand Down
Loading

0 comments on commit 717728b

Please sign in to comment.