diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index e907b5bd4b3b0..d06a083565ba6 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -77,17 +77,17 @@ void CChainLocksHandler::ProcessMessage(CNode* pfrom, const std::string& strComm auto hash = ::SerializeHash(clsig); - { - LOCK(cs_main); - connman.RemoveAskFor(hash, MSG_CLSIG); - } - ProcessNewChainLock(pfrom->GetId(), clsig, hash); } } void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLockSig& clsig, const uint256& hash) { + { + LOCK(cs_main); + g_connman->RemoveAskFor(hash, MSG_CLSIG); + } + { LOCK(cs); if (!seenChainLocks.emplace(hash, GetTimeMillis()).second) { @@ -190,6 +190,8 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl return; } + Cleanup(); + // DIP8 defines a process called "Signing attempts" which should run before the CLSIG is finalized // To simplify the initial implementation, we skip this process and directly try to create a CLSIG // This will fail when multiple blocks compete, but we accept this for the initial implementation. @@ -201,6 +203,12 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl { LOCK(cs); + if (bestChainLockBlockIndex == pindexNew) { + // we first got the CLSIG, then the header, and then the block was connected. + // In this case there is no need to continue here. + return; + } + if (InternalHasConflictingChainLock(pindexNew->nHeight, pindexNew->GetBlockHash())) { if (!inEnforceBestChainLock) { // we accepted this block when there was no lock yet, but now a conflicting lock appeared. Invalidate it. @@ -226,8 +234,6 @@ void CChainLocksHandler::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBl } quorumSigningManager->AsyncSignIfMember(Params().GetConsensus().llmqChainLocks, requestId, msgHash); - - Cleanup(); } // WARNING: cs_main and cs should not be held! diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index eb9964e61888c..87bd3320baa1b 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -54,6 +54,7 @@ void StartLLMQSystem() quorumDKGSessionManager->StartThreads(); } if (quorumSigSharesManager) { + quorumSigSharesManager->RegisterAsRecoveredSigsListener(); quorumSigSharesManager->StartWorkerThread(); } if (chainLocksHandler) { @@ -68,6 +69,7 @@ void StopLLMQSystem() } if (quorumSigSharesManager) { quorumSigSharesManager->StopWorkerThread(); + quorumSigSharesManager->UnregisterAsRecoveredSigsListener(); } if (quorumDKGSessionManager) { quorumDKGSessionManager->StopThreads(); diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 896e38a14d743..34a1a9816bfc4 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -17,6 +17,7 @@ #include #include +#include namespace llmq { @@ -79,8 +80,23 @@ bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash) bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash) { + int64_t t = GetTimeMillis(); + + { + LOCK(cs); + auto it = hasSigForHashCache.find(hash); + if (it != hasSigForHashCache.end()) { + it->second.second = t; + return it->second.first; + } + } + auto k = std::make_tuple('h', hash); - return db.Exists(k); + bool ret = db.Exists(k); + + LOCK(cs); + hasSigForHashCache.emplace(hash, std::make_pair(ret, t)); + return ret; } bool CRecoveredSigsDb::ReadRecoveredSig(Consensus::LLMQType llmqType, const uint256& id, CRecoveredSig& ret) @@ -152,13 +168,14 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig) LOCK(cs); hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t); hasSigForSessionCache[signHash] = std::make_pair(true, t); + hasSigForHashCache[recSig.GetHash()] = std::make_pair(true, t); } } -template -static void TruncateCacheMap(std::unordered_map>& m, size_t maxSize, size_t truncateThreshold) +template +static void TruncateCacheMap(std::unordered_map, H>& m, size_t maxSize, size_t truncateThreshold) { - typedef typename std::unordered_map> Map; + typedef typename std::unordered_map, H> Map; typedef typename Map::iterator Iterator; if (m.size() <= truncateThreshold) { @@ -237,10 +254,12 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge) hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)); hasSigForSessionCache.erase(signHash); + hasSigForHashCache.erase(recSig.GetHash()); } TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); + TruncateCacheMap(hasSigForHashCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD); } for (auto& e : toDelete2) { @@ -355,8 +374,8 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& void CSigningManager::CollectPendingRecoveredSigsToVerify( size_t maxUniqueSessions, - std::map>& retSigShares, - std::map, CQuorumCPtr>& retQuorums) + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { LOCK(cs); @@ -364,9 +383,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( return; } - std::set> uniqueSignHashes; - llmq::utils::IterateNodesRandom( - pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list& ns) { + std::unordered_set, StaticSaltedHasher> uniqueSignHashes; + llmq::utils::IterateNodesRandom(pendingRecoveredSigs, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, std::list& ns) { if (ns.empty()) { return false; } @@ -419,8 +437,8 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify( bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) { - std::map> recSigsByNode; - std::map, CQuorumCPtr> quorums; + std::unordered_map> recSigsByNode; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums); if (recSigsByNode.empty()) { @@ -443,13 +461,13 @@ bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman) } } - cxxtimer::Timer verifyTimer; + cxxtimer::Timer verifyTimer(true); batchVerifier.Verify(); verifyTimer.stop(); LogPrintf("llmq", "CSigningManager::%s -- verified recovered sig(s). count=%d, vt=%d, nodes=%d\n", __func__, verifyCount, verifyTimer.count(), recSigsByNode.size()); - std::set processed; + std::unordered_set processed; for (auto& p : recSigsByNode) { NodeId nodeId = p.first; auto& v = p.second; @@ -494,11 +512,25 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), nodeId); if (db.HasRecoveredSigForId(llmqType, recoveredSig.id)) { - // this should really not happen, as each masternode is participating in only one vote, - // even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id - LogPrintf("CSigningManager::%s -- conflicting recoveredSig for id=%s, msgHash=%s\n", __func__, - recoveredSig.id.ToString(), recoveredSig.msgHash.ToString()); - return; + CRecoveredSig otherRecoveredSig; + if (db.GetRecoveredSigById(llmqType, recoveredSig.id, otherRecoveredSig)) { + auto otherSignHash = llmq::utils::BuildSignHash(recoveredSig); + if (signHash != otherSignHash) { + // this should really not happen, as each masternode is participating in only one vote, + // even if it's a member of multiple quorums. so a majority is only possible on one quorum and one msgHash per id + LogPrintf("CSigningManager::%s -- conflicting recoveredSig for signHash=%s, id=%s, msgHash=%s, otherSignHash=%s\n", __func__, + signHash.ToString(), recoveredSig.id.ToString(), recoveredSig.msgHash.ToString(), otherSignHash.ToString()); + } else { + // Looks like we're trying to process a recSig that is already known. This might happen if the same + // recSig comes in through regular QRECSIG messages and at the same time through some other message + // which allowed to reconstruct a recSig (e.g. IXLOCK). In this case, just bail out. + } + return; + } else { + // This case is very unlikely. It can only happen when cleanup caused this specific recSig to vanish + // between the HasRecoveredSigForId and GetRecoveredSigById call. If that happens, treat it as if we + // never had that recSig + } } db.WriteRecoveredSig(recoveredSig); @@ -552,14 +584,19 @@ bool CSigningManager::AsyncSignIfMember(Consensus::LLMQType llmqType, const uint if (db.HasVotedOnId(llmqType, id)) { uint256 prevMsgHash; db.GetVoteForId(llmqType, id, prevMsgHash); - LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__, - id.ToString(), prevMsgHash.ToString(), msgHash.ToString()); + if (msgHash != prevMsgHash) { + LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting on conflicting msgHash=%s\n", __func__, + id.ToString(), prevMsgHash.ToString(), msgHash.ToString()); + } else { + LogPrintf("CSigningManager::%s -- already voted for id=%s and msgHash=%s. Not voting again.\n", __func__, + id.ToString(), prevMsgHash.ToString()); + } return false; } if (db.HasRecoveredSigForId(llmqType, id)) { // no need to sign it if we already have a recovered sig - return false; + return true; } db.WriteVoteForId(llmqType, id, msgHash); } diff --git a/src/llmq/quorums_signing.h b/src/llmq/quorums_signing.h index 45e3938c2d888..d4616d683f30c 100644 --- a/src/llmq/quorums_signing.h +++ b/src/llmq/quorums_signing.h @@ -10,21 +10,11 @@ #include "chainparams.h" #include "net.h" +#include "saltedhasher.h" #include "sync.h" #include -namespace std { - template <> - struct hash> - { - std::size_t operator()(const std::pair& k) const - { - return (std::size_t)((k.first + 1) * k.second.GetCheapHash()); - } - }; -} - namespace llmq { @@ -77,7 +67,6 @@ class CRecoveredSig } }; -// TODO implement caching to speed things up class CRecoveredSigsDb { static const size_t MAX_CACHE_SIZE = 30000; @@ -87,8 +76,9 @@ class CRecoveredSigsDb CDBWrapper db; RecursiveMutex cs; - std::unordered_map, std::pair> hasSigForIdCache; - std::unordered_map> hasSigForSessionCache; + std::unordered_map, std::pair, StaticSaltedHasher> hasSigForIdCache; + std::unordered_map, StaticSaltedHasher> hasSigForSessionCache; + std::unordered_map, StaticSaltedHasher> hasSigForHashCache; public: CRecoveredSigsDb(bool fMemory); @@ -136,7 +126,7 @@ class CSigningManager CRecoveredSigsDb db; // Incoming and not verified yet - std::map> pendingRecoveredSigs; + std::unordered_map> pendingRecoveredSigs; // must be protected by cs FastRandomContext rnd; @@ -157,7 +147,9 @@ class CSigningManager void ProcessMessageRecoveredSig(CNode* pfrom, const CRecoveredSig& recoveredSig, CConnman& connman); bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan); - void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map>& retSigShares, std::map, CQuorumCPtr>& retQuorums); + void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman); void Cleanup(); // called from the worker thread of CSigSharesManager diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index 000eefab3b100..ad8fa26f0c437 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -3,7 +3,8 @@ // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include "quorums_signing_shares.h" +#include "quorums_signing.h" + #include "activemasternode.h" #include "bls/bls_batchverifier.h" #include "cxxtimer.h" @@ -11,7 +12,7 @@ #include "net.h" #include "net_processing.h" #include "netmessagemaker.h" -#include "quorums_signing.h" +#include "quorums_signing_shares.h" #include "quorums_utils.h" #include "random.h" #include "shutdown.h" @@ -24,42 +25,20 @@ namespace llmq std::unique_ptr quorumSigSharesManager{nullptr}; -template -static std::pair FindBySignHash(const M& m, const uint256& signHash) -{ - return std::make_pair( - m.lower_bound(std::make_pair(signHash, (uint16_t)0)), - m.upper_bound(std::make_pair(signHash, std::numeric_limits::max()))); -} -template -static size_t CountBySignHash(const M& m, const uint256& signHash) -{ - auto itPair = FindBySignHash(m, signHash); - size_t count = 0; - while (itPair.first != itPair.second) { - count++; - ++itPair.first; - } - return count; -} - -template -static void RemoveBySignHash(M& m, const uint256& signHash) -{ - auto itPair = FindBySignHash(m, signHash); - m.erase(itPair.first, itPair.second); -} - void CSigShare::UpdateKey() { key.first = llmq::utils::BuildSignHash(*this); key.second = quorumMember; } +std::string CSigSesAnn::ToString() const +{ + return strprintf("sessionId=%d, llmqType=%d, quorumHash=%s, id=%s, msgHash=%s", + sessionId, llmqType, quorumHash.ToString(), id.ToString(), msgHash.ToString()); +} + void CSigSharesInv::Merge(const CSigSharesInv& inv2) { - assert(llmqType == inv2.llmqType); - assert(signHash == inv2.signHash); for (size_t i = 0; i < inv.size(); i++) { if (inv2.inv[i]) { inv[i] = inv2.inv[i]; @@ -74,7 +53,7 @@ size_t CSigSharesInv::CountSet() const std::string CSigSharesInv::ToString() const { - std::string str = strprintf("signHash=%s, inv=(", signHash.ToString()); + std::string str = "("; bool first = true; for (size_t i = 0; i < inv.size(); i++) { if (!inv[i]) { @@ -91,13 +70,9 @@ std::string CSigSharesInv::ToString() const return str; } -void CSigSharesInv::Init(Consensus::LLMQType _llmqType, const uint256& _signHash) +void CSigSharesInv::Init(size_t size) { - llmqType = _llmqType; - signHash = _signHash; - - size_t llmqSize = (size_t)(Params().GetConsensus().llmqs.at(_llmqType).size); - inv.resize(llmqSize, false); + inv.resize(size, false); } bool CSigSharesInv::IsSet(uint16_t quorumMember) const @@ -112,67 +87,95 @@ void CSigSharesInv::Set(uint16_t quorumMember, bool v) inv[quorumMember] = v; } -CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash) +std::string CBatchedSigShares::ToInvString() const { - auto& s = sessions[signHash]; - if (s.announced.inv.empty()) { - s.announced.Init(llmqType, signHash); - s.requested.Init(llmqType, signHash); - s.knows.Init(llmqType, signHash); - } else { - assert(s.announced.llmqType == llmqType); - assert(s.requested.llmqType == llmqType); - assert(s.knows.llmqType == llmqType); + CSigSharesInv inv; + // we use 400 here no matter what the real size is. We don't really care about that size as we just want to call ToString() + inv.Init(400); + for (size_t i = 0; i < sigShares.size(); i++) { + inv.inv[sigShares[i].first] = true; } - return s; + return inv.ToString(); } -void CSigSharesNodeState::MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv) +template +static void InitSession(CSigSharesNodeState::Session& s, const uint256& signHash, T& from) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).announced.Merge(inv); + const auto& params = Params().GetConsensus().llmqs.at((Consensus::LLMQType)from.llmqType); + + s.llmqType = (Consensus::LLMQType)from.llmqType; + s.quorumHash = from.quorumHash; + s.id = from.id; + s.msgHash = from.msgHash; + s.signHash = signHash; + s.announced.Init((size_t)params.size); + s.requested.Init((size_t)params.size); + s.knows.Init((size_t)params.size); } -void CSigSharesNodeState::MarkRequested(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromShare(const llmq::CSigShare& sigShare) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).requested.Merge(inv); + auto& s = sessions[sigShare.GetSignHash()]; + if (s.announced.inv.empty()) { + InitSession(s, sigShare.GetSignHash(), sigShare); + } + return s; } -void CSigSharesNodeState::MarkKnows(const uint256& signHash, const CSigSharesInv& inv) +CSigSharesNodeState::Session& CSigSharesNodeState::GetOrCreateSessionFromAnn(const llmq::CSigSesAnn& ann) { - GetOrCreateSession((Consensus::LLMQType)inv.llmqType, signHash).knows.Merge(inv); + auto signHash = llmq::utils::BuildSignHash((Consensus::LLMQType)ann.llmqType, ann.quorumHash, ann.id, ann.msgHash); + auto& s = sessions[signHash]; + if (s.announced.inv.empty()) { + InitSession(s, signHash, ann); + } + return s; } -void CSigSharesNodeState::MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionBySignHash(const uint256& signHash) { - GetOrCreateSession(llmqType, signHash).announced.Set(quorumMember, true); + auto it = sessions.find(signHash); + if (it == sessions.end()) { + return nullptr; + } + return &it->second; } -void CSigSharesNodeState::MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +CSigSharesNodeState::Session* CSigSharesNodeState::GetSessionByRecvId(uint32_t sessionId) { - GetOrCreateSession(llmqType, signHash).requested.Set(quorumMember, true); + auto it = sessionByRecvId.find(sessionId); + if (it == sessionByRecvId.end()) { + return nullptr; + } + return it->second; } -void CSigSharesNodeState::MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember) +bool CSigSharesNodeState::GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo) { - GetOrCreateSession(llmqType, signHash).knows.Set(quorumMember, true); -} + auto s = GetSessionByRecvId(sessionId); + if (!s) { + return false; + } + retInfo.recvSessionId = sessionId; + retInfo.llmqType = s->llmqType; + retInfo.quorumHash = s->quorumHash; + retInfo.id = s->id; + retInfo.msgHash = s->msgHash; + retInfo.signHash = s->signHash; + retInfo.quorum = s->quorum; -void CSigSharesNodeState::RemoveSession(const uint256& signHash) -{ - sessions.erase(signHash); - // pendingIncomingRecSigs.erase(signHash); - RemoveBySignHash(requestedSigShares, signHash); - RemoveBySignHash(pendingIncomingSigShares, signHash); + return true; } -CSigSharesInv CBatchedSigShares::ToInv() const +void CSigSharesNodeState::RemoveSession(const uint256& signHash) { - CSigSharesInv inv; - inv.Init((Consensus::LLMQType)llmqType, llmq::utils::BuildSignHash(*this)); - for (size_t i = 0; i < sigShares.size(); i++) { - inv.inv[sigShares[i].first] = true; + auto it = sessions.find(signHash); + if (it != sessions.end()) { + sessionByRecvId.erase(it->second.recvSessionId); + sessions.erase(it); } - return inv; + requestedSigShares.EraseAllForSignHash(signHash); + pendingIncomingSigShares.EraseAllForSignHash(signHash); } ////////////////////// @@ -198,6 +201,16 @@ void CSigSharesManager::StopWorkerThread() } } +void CSigSharesManager::RegisterAsRecoveredSigsListener() +{ + quorumSigningManager->RegisterRecoveredSigsListener(this); +} + +void CSigSharesManager::UnregisterAsRecoveredSigsListener() +{ + quorumSigningManager->UnregisterRecoveredSigsListener(this); +} + void CSigSharesManager::Interrupt() { interruptSigningShare(); @@ -210,89 +223,190 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma return; } - if (strCommand == NetMsgType::QSIGSHARESINV) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageSigSharesInv(pfrom, inv, connman); + if (strCommand == NetMsgType::QSIGSESANN) { + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSESANN) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many announcements in QSIGSESANN message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSESANN, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& ann : msgs) { + if (!ProcessMessageSigSesAnn(pfrom, ann, connman)) { + BanNode(pfrom->GetId()); + return; + } + } + } else if (strCommand == NetMsgType::QSIGSHARESINV) { + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QSIGSHARESINV) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QSIGSHARESINV message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QSIGSHARESINV, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageSigSharesInv(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QGETSIGSHARES) { - CSigSharesInv inv; - vRecv >> inv; - ProcessMessageGetSigShares(pfrom, inv, connman); + std::vector msgs; + vRecv >> msgs; + if (msgs.size() > MAX_MSGS_CNT_QGETSIGSHARES) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many invs in QGETSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_CNT_QGETSIGSHARES, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& inv : msgs) { + if (!ProcessMessageGetSigShares(pfrom, inv, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } else if (strCommand == NetMsgType::QBSIGSHARES) { - CBatchedSigShares batchedSigShares; - vRecv >> batchedSigShares; - ProcessMessageBatchedSigShares(pfrom, batchedSigShares, connman); + std::vector msgs; + vRecv >> msgs; + size_t totalSigsCount = 0; + for (auto& bs : msgs) { + totalSigsCount += bs.sigShares.size(); + } + if (totalSigsCount > MAX_MSGS_TOTAL_BATCHED_SIGS) { + LogPrintf("llmq", "CSigSharesManager::%s -- too many sigs in QBSIGSHARES message. cnt=%d, max=%d, node=%d\n", __func__, msgs.size(), MAX_MSGS_TOTAL_BATCHED_SIGS, pfrom->GetId()); + BanNode(pfrom->GetId()); + return; + } + for (auto& bs : msgs) { + if (!ProcessMessageBatchedSigShares(pfrom, bs, connman)) { + BanNode(pfrom->GetId()); + return; + } + } } } -bool CSigSharesManager::VerifySigSharesInv(NodeId from, const CSigSharesInv& inv) +bool CSigSharesManager::ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman) { - Consensus::LLMQType llmqType = (Consensus::LLMQType)inv.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType) || inv.signHash.IsNull()) { - BanNode(from); + auto llmqType = (Consensus::LLMQType)ann.llmqType; + if (!Params().GetConsensus().llmqs.count(llmqType)) { return false; } - - if (!activeMasternodeManager) { + if (ann.sessionId == (uint32_t)-1 || ann.quorumHash.IsNull() || ann.id.IsNull() || ann.msgHash.IsNull()) { return false; } + LogPrintf("llmq", "CSigSharesManager::%s -- ann={%s}, node=%d\n", __func__, ann.ToString(), pfrom->GetId()); + + auto quorum = quorumManager->GetQuorum(llmqType, ann.quorumHash); + if (!quorum) { + // TODO should we ban here? + LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, + ann.quorumHash.ToString(), pfrom->GetId()); + return true; // let's still try other announcements from the same message + } + + auto signHash = llmq::utils::BuildSignHash(llmqType, ann.quorumHash, ann.id, ann.msgHash); + + LOCK(cs); + auto& nodeState = nodeStates[pfrom->GetId()]; + auto& session = nodeState.GetOrCreateSessionFromAnn(ann); + nodeState.sessionByRecvId.erase(session.recvSessionId); + nodeState.sessionByRecvId.erase(ann.sessionId); + session.recvSessionId = ann.sessionId; + session.quorum = quorum; + nodeState.sessionByRecvId.emplace(ann.sessionId, &session); + + return true; +} + +bool CSigSharesManager::VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv) +{ size_t quorumSize = (size_t)Params().GetConsensus().llmqs.at(llmqType).size; if (inv.inv.size() != quorumSize) { - BanNode(from); return false; } return true; } -void CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->GetId(), inv)) { - return; + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { + return true; + } + + if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { - return; + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { + return true; } - LogPrintf("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->GetId()); + LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); + + if (sessionInfo.quorum->quorumVvec == nullptr) { + // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG + LogPrintf("CSigSharesManager::%s -- we don't have the quorum vvec for %s, not requesting sig shares. node=%d\n", __func__, + sessionInfo.quorumHash.ToString(), pfrom->GetId()); + return true; + } LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; - nodeState.MarkAnnounced(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return true; + } + session->announced.Merge(inv); + session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) +bool CSigSharesManager::ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman) { - if (!VerifySigSharesInv(pfrom->GetId(), inv)) { - return; + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), inv.sessionId, sessionInfo)) { + return true; + } + + if (!VerifySigSharesInv(pfrom->GetId(), sessionInfo.llmqType, inv)) { + return false; } // TODO for PoSe, we should consider propagating shares even if we already have a recovered sig - if (quorumSigningManager->HasRecoveredSigForSession(inv.signHash)) { - return; + if (quorumSigningManager->HasRecoveredSigForSession(sessionInfo.signHash)) { + return true; } - LogPrintf("llmq", "CSigSharesManager::%s -- inv={%s}, node=%d\n", __func__, inv.ToString(), pfrom->GetId()); + LogPrintf("llmq", "CSigSharesManager::%s -- signHash=%s, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), inv.ToString(), pfrom->GetId()); LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; - nodeState.MarkRequested(inv.signHash, inv); - nodeState.MarkKnows(inv.signHash, inv); + auto session = nodeState.GetSessionByRecvId(inv.sessionId); + if (!session) { + return true; + } + session->requested.Merge(inv); + session->knows.Merge(inv); + return true; } -void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) +bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman) { + CSigSharesNodeState::SessionInfo sessionInfo; + if (!GetSessionInfoByRecvId(pfrom->GetId(), batchedSigShares.sessionId, sessionInfo)) { + return true; + } + bool ban = false; - if (!PreVerifyBatchedSigShares(pfrom->GetId(), batchedSigShares, ban)) { - if (ban) { - BanNode(pfrom->GetId()); - return; - } - return; + if (!PreVerifyBatchedSigShares(pfrom->GetId(), sessionInfo, batchedSigShares, ban)) { + return ban; } std::vector sigShares; @@ -303,14 +417,14 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc auto& nodeState = nodeStates[pfrom->GetId()]; for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { - CSigShare sigShare = batchedSigShares.RebuildSigShare(i); - nodeState.requestedSigShares.erase(sigShare.GetKey()); + CSigShare sigShare = RebuildSigShare(sessionInfo, batchedSigShares, i); + nodeState.requestedSigShares.Erase(sigShare.GetKey()); // TODO track invalid sig shares received for PoSe? // It's important to only skip seen *valid* sig shares here. If a node sends us a // batch of mostly valid sig shares with a single invalid one and thus batched // verification fails, we'd skip the valid ones in the future if received from other nodes - if (this->sigShares.count(sigShare.GetKey())) { + if (this->sigShares.Has(sigShare.GetKey())) { continue; } @@ -323,54 +437,41 @@ void CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc } } - LogPrintf("llmq", "CSigSharesManager::%s -- shares=%d, new=%d, inv={%s}, node=%d\n", __func__, - batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInv().ToString(), pfrom->GetId()); + LogPrint(BCLog::LLMQ, "CSigSharesManager::%s -- signHash=%s, shares=%d, new=%d, inv={%s}, node=%d\n", __func__, + sessionInfo.signHash.ToString(), batchedSigShares.sigShares.size(), sigShares.size(), batchedSigShares.ToInvString(), pfrom->GetId()); if (sigShares.empty()) { - return; + return true; } LOCK(cs); auto& nodeState = nodeStates[pfrom->GetId()]; for (auto& s : sigShares) { - nodeState.pendingIncomingSigShares.emplace(s.GetKey(), s); + nodeState.pendingIncomingSigShares.Add(s.GetKey(), s); } + return true; } -bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan) +bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan) { retBan = false; - auto llmqType = (Consensus::LLMQType)batchedSigShares.llmqType; - if (!Params().GetConsensus().llmqs.count(llmqType)) { - retBan = true; - return false; - } - - CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, batchedSigShares.quorumHash); - - if (!quorum) { - // TODO should we ban here? - LogPrintf("CSigSharesManager::%s -- quorum %s not found, node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); - return false; - } - if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) { + if (!llmq::utils::IsQuorumActive(session.llmqType, session.quorum->pindexQuorum->GetBlockHash())) { // quorum is too old return false; } - if (!quorum->IsMember(activeMasternodeManager->GetProTx())) { + if (!session.quorum->IsMember(activeMasternodeManager->GetProTx())) { // we're not a member so we can't verify it (we actually shouldn't have received it) return false; } - if (quorum->quorumVvec == nullptr) { + if (session.quorum->quorumVvec == nullptr) { // TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG LogPrintf("CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__, - batchedSigShares.quorumHash.ToString(), nodeId); + session.quorumHash.ToString(), nodeId); return false; } - std::set dupMembers; + std::unordered_set dupMembers; for (size_t i = 0; i < batchedSigShares.sigShares.size(); i++) { auto quorumMember = batchedSigShares.sigShares[i].first; @@ -379,12 +480,12 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS return false; } - if (quorumMember >= quorum->members.size()) { + if (quorumMember >= session.quorum->members.size()) { LogPrintf("CSigSharesManager::%s -- quorumMember out of bounds\n", __func__); retBan = true; return false; } - if (!quorum->validMembers[quorumMember]) { + if (!session.quorum->validMembers[quorumMember]) { LogPrintf("CSigSharesManager::%s -- quorumMember not valid\n", __func__); retBan = true; return false; @@ -395,8 +496,8 @@ bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedS void CSigSharesManager::CollectPendingSigSharesToVerify( size_t maxUniqueSessions, - std::map>& retSigShares, - std::map, CQuorumCPtr>& retQuorums) + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums) { { LOCK(cs); @@ -410,21 +511,20 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( // invalid, making batch verification fail and revert to per-share verification, which in turn would slow down // the whole verification process - std::set> uniqueSignHashes; - llmq::utils::IterateNodesRandom( - nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) { - if (ns.pendingIncomingSigShares.empty()) { + std::unordered_set, StaticSaltedHasher> uniqueSignHashes; + llmq::utils::IterateNodesRandom(nodeStates, [&]() { return uniqueSignHashes.size() < maxUniqueSessions; }, [&](NodeId nodeId, CSigSharesNodeState& ns) { + if (ns.pendingIncomingSigShares.Empty()) { return false; } - auto& sigShare = ns.pendingIncomingSigShares.begin()->second; + auto& sigShare = *ns.pendingIncomingSigShares.GetFirst(); - bool alreadyHave = this->sigShares.count(sigShare.GetKey()) != 0; + bool alreadyHave = this->sigShares.Has(sigShare.GetKey()); if (!alreadyHave) { uniqueSignHashes.emplace(nodeId, sigShare.GetSignHash()); retSigShares[nodeId].emplace_back(sigShare); } - ns.pendingIncomingSigShares.erase(ns.pendingIncomingSigShares.begin()); - return !ns.pendingIncomingSigShares.empty(); }, rnd); + ns.pendingIncomingSigShares.Erase(sigShare.GetKey()); + return !ns.pendingIncomingSigShares.Empty(); }, rnd); if (retSigShares.empty()) { return; @@ -455,8 +555,8 @@ void CSigSharesManager::CollectPendingSigSharesToVerify( bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) { - std::map> sigSharesByNodes; - std::map, CQuorumCPtr> quorums; + std::unordered_map> sigSharesByNodes; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; CollectPendingSigSharesToVerify(32, sigSharesByNodes, quorums); if (sigSharesByNodes.empty()) { @@ -500,7 +600,7 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) } } - cxxtimer::Timer verifyTimer; + cxxtimer::Timer verifyTimer(true); batchVerifier.Verify(); verifyTimer.stop(); @@ -525,7 +625,10 @@ bool CSigSharesManager::ProcessPendingSigShares(CConnman& connman) } // It's ensured that no duplicates are passed to this method -void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::map, CQuorumCPtr>& quorums, CConnman& connman) +void CSigSharesManager::ProcessPendingSigSharesFromNode(NodeId nodeId, + const std::vector& sigShares, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + CConnman& connman) { LOCK(cs); auto& nodeState = nodeStates[nodeId]; @@ -571,12 +674,21 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare { LOCK(cs); - if (!sigShares.emplace(sigShare.GetKey(), sigShare).second) { + if (!sigShares.Add(sigShare.GetKey(), sigShare)) { return; } - sigSharesToAnnounce.emplace(sigShare.GetKey()); - firstSeenForSessions.emplace(sigShare.GetSignHash(), GetTimeMillis()); + sigSharesToAnnounce.Add(sigShare.GetKey(), true); + + auto it = timeSeenForSessions.find(sigShare.GetSignHash()); + if (it == timeSeenForSessions.end()) { + auto t = GetTimeMillis(); + // insert first-seen and last-seen time + timeSeenForSessions.emplace(sigShare.GetSignHash(), std::make_pair(t, t)); + } else { + // update last-seen time + it->second.second = GetTimeMillis(); + } if (!quorumNodes.empty()) { // don't announce and wait for other nodes to request this share and directly send it to them @@ -586,12 +698,14 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare if (!quorumNodes.count(p.first) && !p.second.interestedIn.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { continue; } - p.second.MarkRequested((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); - p.second.MarkKnows((Consensus::LLMQType)sigShare.llmqType, sigShare.GetSignHash(), sigShare.quorumMember); + auto& session = p.second.GetOrCreateSessionFromShare(sigShare); + session.quorum = quorum; + session.requested.Set(sigShare.quorumMember, true); + session.knows.Set(sigShare.quorumMember, true); } } - size_t sigShareCount = CountBySignHash(sigShares, sigShare.GetSignHash()); + size_t sigShareCount = sigShares.CountForSignHash(sigShare.GetSignHash()); if (sigShareCount >= quorum->params.threshold) { canTryRecovery = true; } @@ -616,11 +730,14 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& auto k = std::make_pair(quorum->params.type, id); auto signHash = llmq::utils::BuildSignHash(quorum->params.type, quorum->pindexQuorum->GetBlockHash(), id, msgHash); - auto itPair = FindBySignHash(sigShares, signHash); + auto sigShares = this->sigShares.GetAllForSignHash(signHash); + if (!sigShares) { + return; + } sigSharesForRecovery.reserve((size_t)quorum->params.threshold); idsForRecovery.reserve((size_t)quorum->params.threshold); - for (auto it = itPair.first; it != itPair.second && sigSharesForRecovery.size() < quorum->params.threshold; ++it) { + for (auto it = sigShares->begin(); it != sigShares->end() && sigSharesForRecovery.size() < quorum->params.threshold; ++it) { auto& sigShare = it->second; sigSharesForRecovery.emplace_back(sigShare.sigShare.Get()); idsForRecovery.emplace_back(CBLSId(quorum->members[sigShare.quorumMember]->proTxHash)); @@ -665,11 +782,9 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256& } // cs must be held -void CSigSharesManager::CollectSigSharesToRequest(std::map>& sigSharesToRequest) +void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest) { int64_t now = GetTimeMillis(); - std::map> nodesBySigShares; - const size_t maxRequestsForNode = 32; // avoid requesting from same nodes all the time @@ -690,18 +805,17 @@ void CSigSharesManager::CollectSigSharesToRequest(std::mapsecond >= SIG_SHARE_REQUEST_TIMEOUT) { + nodeState.requestedSigShares.EraseIf([&](const SigShareKey& k, int64_t t) { + if (now - t >= SIG_SHARE_REQUEST_TIMEOUT) { // timeout while waiting for this one, so retry it with another node - LogPrintf("llmq", "CSigSharesManager::%s -- timeout while waiting for %s-%d, node=%d\n", __func__, - it->first.first.ToString(), it->first.second, nodeId); - it = nodeState.requestedSigShares.erase(it); - } else { - ++it; + LogPrintf("llmq", "CSigSharesManager::CollectSigSharesToRequest -- timeout while waiting for %s-%d, node=%d\n", + k.first.ToString(), k.second, nodeId); + return true; } - } + return false; + }); - std::map* invMap = nullptr; + decltype(sigSharesToRequest.begin()->second)* invMap = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -716,21 +830,21 @@ void CSigSharesManager::CollectSigSharesToRequest(std::map= maxRequestsForNode) { + if (nodeState.requestedSigShares.Size() >= maxRequestsForNode) { // too many pending requests for this node break; } - auto it = sigSharesRequested.find(k); - if (it != sigSharesRequested.end()) { - if (now - it->second.second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != it->second.first) { + auto p = sigSharesRequested.Get(k); + if (p) { + if (now - p->second >= SIG_SHARE_REQUEST_TIMEOUT && nodeId != p->first) { // other node timed out, re-request from this node LogPrintf("llmq", "CSigSharesManager::%s -- other node timeout while waiting for %s-%d, re-request from=%d, node=%d\n", __func__, - it->first.first.ToString(), it->first.second, nodeId, it->second.first); + k.first.ToString(), k.second, nodeId, p->first); } else { continue; } @@ -738,10 +852,10 @@ void CSigSharesManager::CollectSigSharesToRequest(std::map>& sigSharesToSend) +void CSigSharesManager::CollectSigSharesToSend(std::unordered_map>& sigSharesToSend) { for (auto& p : nodeStates) { auto nodeId = p.first; @@ -772,7 +887,7 @@ void CSigSharesManager::CollectSigSharesToSend(std::map* sigSharesToSend2 = nullptr; + decltype(sigSharesToSend.begin()->second)* sigSharesToSend2 = nullptr; for (auto& p2 : nodeState.sessions) { auto& signHash = p2.first; @@ -791,21 +906,14 @@ void CSigSharesManager::CollectSigSharesToSend(std::mapsecond; - if (batchedSigShares.sigShares.empty()) { - batchedSigShares.llmqType = sigShare.llmqType; - batchedSigShares.quorumHash = sigShare.quorumHash; - batchedSigShares.id = sigShare.id; - batchedSigShares.msgHash = sigShare.msgHash; - } - batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare.sigShare); + batchedSigShares.sigShares.emplace_back((uint16_t)i, sigShare->sigShare); } if (!batchedSigShares.sigShares.empty()) { @@ -820,20 +928,19 @@ void CSigSharesManager::CollectSigSharesToSend(std::map>& sigSharesToAnnounce) +void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce) { - std::set> quorumNodesPrepared; + std::unordered_set, StaticSaltedHasher> quorumNodesPrepared; - for (auto& sigShareKey : this->sigSharesToAnnounce) { + this->sigSharesToAnnounce.ForEach([&](const SigShareKey& sigShareKey, bool) { auto& signHash = sigShareKey.first; auto quorumMember = sigShareKey.second; - auto sigShareIt = sigShares.find(sigShareKey); - if (sigShareIt == sigShares.end()) { - continue; + const CSigShare* sigShare = sigShares.Get(sigShareKey); + if (!sigShare) { + return; } - auto& sigShare = sigShareIt->second; - auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash); + auto quorumKey = std::make_pair((Consensus::LLMQType)sigShare->llmqType, sigShare->quorumHash); if (quorumNodesPrepared.emplace(quorumKey).second) { // make sure we announce to at least the nodes which we know through the inter-quorum-communication system @@ -861,7 +968,7 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::mapllmqType); + inv.Init((size_t)params.size); } inv.inv[quorumMember] = true; session.knows.inv[quorumMember] = true; } - } + }); // don't announce these anymore // nodes which did not send us a valid sig share before were left out now, but this is ok as it only results in slower // propagation for the first signing session of a fresh quorum. The sig shares should still arrive on all nodes due to // the deterministic inter-quorum-communication system - this->sigSharesToAnnounce.clear(); + this->sigSharesToAnnounce.Clear(); } bool CSigSharesManager::SendMessages() @@ -891,15 +999,51 @@ bool CSigSharesManager::SendMessages() nodesByAddress.emplace(pnode->addr, pnode->GetId()); }); - std::map> sigSharesToRequest; - std::map> sigSharesToSend; - std::map> sigSharesToAnnounce; + std::unordered_map> sigSharesToRequest; + std::unordered_map> sigSharesToSend; + std::unordered_map> sigSharesToAnnounce; + std::unordered_map> sigSessionAnnouncements; + + auto addSigSesAnnIfNeeded = [&](NodeId nodeId, const uint256& signHash) { + auto& nodeState = nodeStates[nodeId]; + auto session = nodeState.GetSessionBySignHash(signHash); + assert(session); + if (session->sendSessionId == (uint32_t)-1) { + session->sendSessionId = nodeState.nextSendSessionId++; + + CSigSesAnn sigSesAnn; + sigSesAnn.sessionId = session->sendSessionId; + sigSesAnn.llmqType = (uint8_t)session->llmqType; + sigSesAnn.quorumHash = session->quorumHash; + sigSesAnn.id = session->id; + sigSesAnn.msgHash = session->msgHash; + + sigSessionAnnouncements[nodeId].emplace_back(sigSesAnn); + } + return session->sendSessionId; + }; { LOCK(cs); CollectSigSharesToRequest(sigSharesToRequest); CollectSigSharesToSend(sigSharesToSend); CollectSigSharesToAnnounce(sigSharesToAnnounce); + + for (auto& p : sigSharesToRequest) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToSend) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } + for (auto& p : sigSharesToAnnounce) { + for (auto& p2 : p.second) { + p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first); + } + } } bool didSend = false; @@ -907,35 +1051,85 @@ bool CSigSharesManager::SendMessages() g_connman->ForEachNode([&](CNode* pnode) { CNetMsgMaker msgMaker(pnode->GetSendVersion()); + auto it1 = sigSessionAnnouncements.find(pnode->GetId()); + if (it1 != sigSessionAnnouncements.end()) { + std::vector msgs; + msgs.reserve(it1->second.size()); + for (auto& sigSesAnn : it1->second) { + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSESANN signHash=%s, sessionId=%d, node=%d\n", + llmq::utils::BuildSignHash(sigSesAnn).ToString(), sigSesAnn.sessionId, pnode->GetId()); + msgs.emplace_back(sigSesAnn); + if (msgs.size() == MAX_MSGS_CNT_QSIGSESANN) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSESANN, msgs)); + didSend = true; + } + } + auto it = sigSharesToRequest.find(pnode->GetId()); if (it != sigSharesToRequest.end()) { + std::vector msgs; for (auto& p : it->second) { assert(p.second.CountSet() != 0); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES inv={%s}, node=%d\n", - p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, p.second)); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QGETSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->GetId()); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QGETSIGSHARES) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QGETSIGSHARES, msgs)); didSend = true; } } auto jt = sigSharesToSend.find(pnode->GetId()); if (jt != sigSharesToSend.end()) { + size_t totalSigsCount = 0; + std::vector msgs; for (auto& p : jt->second) { assert(!p.second.sigShares.empty()); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QBSIGSHARES inv={%s}, node=%d\n", - p.second.ToInv().ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, p.second)); + LogPrint(BCLog::LLMQ, "CSigSharesManager::SendMessages -- QBSIGSHARES signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToInvString(), pnode->GetId()); + if (totalSigsCount + p.second.sigShares.size() > MAX_MSGS_TOTAL_BATCHED_SIGS) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, msgs)); + msgs.clear(); + totalSigsCount = 0; + didSend = true; + } + totalSigsCount += p.second.sigShares.size(); + msgs.emplace_back(std::move(p.second)); + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QBSIGSHARES, std::move(msgs))); didSend = true; } } auto kt = sigSharesToAnnounce.find(pnode->GetId()); if (kt != sigSharesToAnnounce.end()) { + std::vector msgs; for (auto& p : kt->second) { assert(p.second.CountSet() != 0); - LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV inv={%s}, node=%d\n", - p.second.ToString(), pnode->GetId()); - g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, p.second)); + LogPrintf("llmq", "CSigSharesManager::SendMessages -- QSIGSHARESINV signHash=%s, inv={%s}, node=%d\n", + p.first.ToString(), p.second.ToString(), pnode->GetId()); + msgs.emplace_back(std::move(p.second)); + if (msgs.size() == MAX_MSGS_CNT_QSIGSHARESINV) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); + msgs.clear(); + didSend = true; + } + } + if (!msgs.empty()) { + g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARESINV, msgs)); didSend = true; } } @@ -946,6 +1140,27 @@ bool CSigSharesManager::SendMessages() return didSend; } +bool CSigSharesManager::GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo) +{ + LOCK(cs); + return nodeStates[nodeId].GetSessionInfoByRecvId(sessionId, retInfo); +} + +CSigShare CSigSharesManager::RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx) +{ + assert(idx < batchedSigShares.sigShares.size()); + auto& s = batchedSigShares.sigShares[idx]; + CSigShare sigShare; + sigShare.llmqType = session.llmqType; + sigShare.quorumHash = session.quorumHash; + sigShare.quorumMember = s.first; + sigShare.id = session.id; + sigShare.msgHash = session.msgHash; + sigShare.sigShare = s.second; + sigShare.UpdateKey(); + return sigShare; +} + void CSigSharesManager::Cleanup() { int64_t now = GetTimeMillis(); @@ -953,80 +1168,106 @@ void CSigSharesManager::Cleanup() return; } - std::set> quorumsToCheck; + // This map is first filled with all quorums found in all sig shares. Then we remove all inactive quorums and + // loop through all sig shares again to find the ones belonging to the inactive quorums. We then delete the + // sessions belonging to the sig shares. At the same time, we use this map as a cache when we later need to resolve + // quorumHash -> quorumPtr (as GetQuorum() requires cs_main, leading to deadlocks with cs held) + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; { LOCK(cs); + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + quorums.emplace(std::make_pair((Consensus::LLMQType) sigShare.llmqType, sigShare.quorumHash), nullptr); + }); + } - // Remove sessions which timed out - std::set timeoutSessions; - for (auto& p : firstSeenForSessions) { - auto& signHash = p.first; - int64_t time = p.second; - - if (now - time >= SIGNING_SESSION_TIMEOUT) { - timeoutSessions.emplace(signHash); - } + // Find quorums which became inactive + for (auto it = quorums.begin(); it != quorums.end(); ) { + if (llmq::utils::IsQuorumActive(it->first.first, it->first.second)) { + it->second = quorumManager->GetQuorum(it->first.first, it->first.second); + ++it; + } else { + it = quorums.erase(it); } - for (auto& signHash : timeoutSessions) { - size_t count = CountBySignHash(sigShares, signHash); + } - if (count > 0) { - auto itPair = FindBySignHash(sigShares, signHash); - auto& firstSigShare = itPair.first->second; - LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d\n", __func__, - signHash.ToString(), firstSigShare.id.ToString(), firstSigShare.msgHash.ToString(), count); - } else { - LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, sigShareCount=%d\n", __func__, - signHash.ToString(), count); + { + // Now delete sessions which are for inactive quorums + LOCK(cs); + std::unordered_set inactiveQuorumSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (!quorums.count(std::make_pair((Consensus::LLMQType)sigShare.llmqType, sigShare.quorumHash))) { + inactiveQuorumSessions.emplace(sigShare.GetSignHash()); } + }); + for (auto& signHash : inactiveQuorumSessions) { RemoveSigSharesForSession(signHash); } + } + + { + LOCK(cs); // Remove sessions which were successfully recovered - std::set doneSessions; - for (auto& p : sigShares) { - if (doneSessions.count(p.second.GetSignHash())) { - continue; + std::unordered_set doneSessions; + sigShares.ForEach([&](const SigShareKey& k, const CSigShare& sigShare) { + if (doneSessions.count(sigShare.GetSignHash())) { + return; } - if (quorumSigningManager->HasRecoveredSigForSession(p.second.GetSignHash())) { - doneSessions.emplace(p.second.GetSignHash()); + if (quorumSigningManager->HasRecoveredSigForSession(sigShare.GetSignHash())) { + doneSessions.emplace(sigShare.GetSignHash()); } - } + }); for (auto& signHash : doneSessions) { RemoveSigSharesForSession(signHash); } - for (auto& p : sigShares) { - quorumsToCheck.emplace((Consensus::LLMQType)p.second.llmqType, p.second.quorumHash); - } - } + // Remove sessions which timed out + std::unordered_set timeoutSessions; + for (auto& p : timeSeenForSessions) { + auto& signHash = p.first; + int64_t firstSeenTime = p.second.first; + int64_t lastSeenTime = p.second.second; - // Find quorums which became inactive - for (auto it = quorumsToCheck.begin(); it != quorumsToCheck.end();) { - if (llmq::utils::IsQuorumActive(it->first, it->second)) { - it = quorumsToCheck.erase(it); - } else { - ++it; + if (now - firstSeenTime >= SESSION_TOTAL_TIMEOUT || now - lastSeenTime >= SESSION_NEW_SHARES_TIMEOUT) { + timeoutSessions.emplace(signHash); + } } - } + for (auto& signHash : timeoutSessions) { + size_t count = sigShares.CountForSignHash(signHash); - { - // Now delete sessions which are for inactive quorums - LOCK(cs); - std::set inactiveQuorumSessions; - for (auto& p : sigShares) { - if (quorumsToCheck.count(std::make_pair((Consensus::LLMQType)p.second.llmqType, p.second.quorumHash))) { - inactiveQuorumSessions.emplace(p.second.GetSignHash()); + if (count > 0) { + auto m = sigShares.GetAllForSignHash(signHash); + assert(m); + + auto& oneSigShare = m->begin()->second; + + std::string strMissingMembers; + if (LogAcceptCategory(BCLog::LogFlags::LLMQ)) { + auto quorumIt = quorums.find(std::make_pair((Consensus::LLMQType)oneSigShare.llmqType, oneSigShare.quorumHash)); + if (quorumIt != quorums.end()) { + auto& quorum = quorumIt->second; + for (size_t i = 0; i < quorum->members.size(); i++) { + if (!m->count((uint16_t)i)) { + auto& dmn = quorum->members[i]; + strMissingMembers += strprintf("\n %s", dmn->proTxHash.ToString()); + } + } + } + } + + LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, id=%s, msgHash=%s, sigShareCount=%d, missingMembers=%s\n", __func__, + signHash.ToString(), oneSigShare.id.ToString(), oneSigShare.msgHash.ToString(), count, strMissingMembers); + } else { + LogPrintf("CSigSharesManager::%s -- signing session timed out. signHash=%s, sigShareCount=%d\n", __func__, + signHash.ToString(), count); } - } - for (auto& signHash : inactiveQuorumSessions) { RemoveSigSharesForSession(signHash); } } // Find node states for peers that disappeared from CConnman - std::set nodeStatesToDelete; + std::unordered_set nodeStatesToDelete; for (auto& p : nodeStates) { nodeStatesToDelete.emplace(p.first); } @@ -1039,9 +1280,9 @@ void CSigSharesManager::Cleanup() for (auto nodeId : nodeStatesToDelete) { auto& nodeState = nodeStates[nodeId]; // remove global requested state to force a re-request from another node - for (auto& p : nodeState.requestedSigShares) { - sigSharesRequested.erase(p.first); - } + nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, bool) { + sigSharesRequested.Erase(k); + }); nodeStates.erase(nodeId); } @@ -1055,10 +1296,10 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash) ns.RemoveSession(signHash); } - RemoveBySignHash(sigSharesRequested, signHash); - RemoveBySignHash(sigSharesToAnnounce, signHash); - RemoveBySignHash(sigShares, signHash); - firstSeenForSessions.erase(signHash); + sigSharesRequested.EraseAllForSignHash(signHash); + sigSharesToAnnounce.EraseAllForSignHash(signHash); + sigShares.EraseAllForSignHash(signHash); + timeSeenForSessions.erase(signHash); } void CSigSharesManager::RemoveBannedNodeStates() @@ -1066,13 +1307,13 @@ void CSigSharesManager::RemoveBannedNodeStates() // Called regularly to cleanup local node states for banned nodes LOCK2(cs, cs_main); - std::set toRemove; + std::unordered_set toRemove; for (auto it = nodeStates.begin(); it != nodeStates.end();) { if (IsBanned(it->first)) { // re-request sigshares from other nodes - for (auto& p : it->second.requestedSigShares) { - sigSharesRequested.erase(p.first); - } + it->second.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) { + sigSharesRequested.Erase(k); + }); it = nodeStates.erase(it); } else { ++it; @@ -1099,10 +1340,10 @@ void CSigSharesManager::BanNode(NodeId nodeId) auto& nodeState = it->second; // Whatever we requested from him, let's request it from someone else now - for (auto& p : nodeState.requestedSigShares) { - sigSharesRequested.erase(p.first); - } - nodeState.requestedSigShares.clear(); + nodeState.requestedSigShares.ForEach([&](const SigShareKey& k, int64_t) { + sigSharesRequested.Erase(k); + }); + nodeState.requestedSigShares.Clear(); nodeState.banned = true; } @@ -1127,7 +1368,7 @@ void CSigSharesManager::WorkThreadMain() quorumSigningManager->Cleanup(); // TODO Wakeup when pending signing is needed? - if(!didWork) { + if (!didWork) { if (!interruptSigningShare.sleep_for(std::chrono::milliseconds(100))) { return; } @@ -1186,15 +1427,22 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const sigShare.sigShare.Set(skShare.Sign(signHash)); if (!sigShare.sigShare.Get().IsValid()) { - LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- failed to sign sigShare. signHahs=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); return; } sigShare.UpdateKey(); - LogPrintf("CSigSharesManager::%s -- signed sigShare. id=%s, msgHash=%s, time=%s\n", __func__, - sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); + LogPrintf("CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, time=%s\n", __func__, + signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), t.count()); ProcessSigShare(-1, sigShare, *g_connman, quorum); } + +void CSigSharesManager::HandleNewRecoveredSig(const llmq::CRecoveredSig& recoveredSig) +{ + LOCK(cs); + RemoveSigSharesForSession(llmq::utils::BuildSignHash(recoveredSig)); +} + } // namespace llmq \ No newline at end of file diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index 1397a2dbdb8ae..28d73fb9a99bb 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -10,6 +10,7 @@ #include "consensus/params.h" #include "net.h" #include "random.h" +#include "saltedhasher.h" #include "serialize.h" #include "sync.h" #include "tinyformat.h" @@ -19,13 +20,14 @@ #include #include +#include +#include class CEvoDB; class CScheduler; namespace llmq { - // typedef std::pair SigShareKey; @@ -55,29 +57,47 @@ class CSigShare } }; -class CSigSharesInv +// Nodes will first announce a signing session with a sessionId to be used in all future P2P messages related to that +// session. We locally keep track of the mapping for each node. We also assign new sessionIds for outgoing sessions +// and send QSIGSESANN messages appropriately. All values except the max value for uint32_t are valid as sessionId +class CSigSesAnn { public: + uint32_t sessionId{(uint32_t)-1}; uint8_t llmqType; - uint256 signHash; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + + SERIALIZE_METHODS(CSigSesAnn, obj) + { + READWRITE(VARINT(obj.sessionId)); + READWRITE(obj.llmqType); + READWRITE(obj.quorumHash); + READWRITE(obj.id); + READWRITE(obj.msgHash); + } + + std::string ToString() const; +}; + +class CSigSharesInv +{ +public: + uint32_t sessionId{(uint32_t)-1}; std::vector inv; public: SERIALIZE_METHODS(CSigSharesInv, obj) { - READWRITE(obj.llmqType); + uint64_t invSize = obj.inv.size(); - auto& consensus = Params().GetConsensus(); - auto it = consensus.llmqs.find((Consensus::LLMQType)obj.llmqType); - if (it == consensus.llmqs.end()) { - throw std::ios_base::failure("invalid llmqType"); - } - const auto& params = it->second; - READWRITE(obj.signHash); - READWRITE(AUTOBITSET(obj.inv, (size_t)params.size)); + READWRITE(VARINT(obj.sessionId)); + READWRITE(COMPACTSIZE(invSize)); + READWRITE(AUTOBITSET(obj.inv, (size_t)invSize)); } - void Init(Consensus::LLMQType _llmqType, const uint256& _signHash); + void Init(size_t size); bool IsSet(uint16_t quorumMember) const; void Set(uint16_t quorumMember, bool v); void Merge(const CSigSharesInv& inv2); @@ -90,90 +110,246 @@ class CSigSharesInv class CBatchedSigShares { public: - uint8_t llmqType; - uint256 quorumHash; - uint256 id; - uint256 msgHash; + uint32_t sessionId{(uint32_t)-1}; std::vector> sigShares; public: SERIALIZE_METHODS(CBatchedSigShares, obj) { - READWRITE(obj.llmqType); - READWRITE(obj.quorumHash); - READWRITE(obj.id); - READWRITE(obj.msgHash); + READWRITE(VARINT(obj.sessionId)); READWRITE(obj.sigShares); } - CSigShare RebuildSigShare(size_t idx) const + std::string ToInvString() const; +}; + +template +class SigShareMap +{ +private: + std::unordered_map, StaticSaltedHasher> internalMap; + +public: + bool Add(const SigShareKey& k, const T& v) { - assert(idx < sigShares.size()); - auto& s = sigShares[idx]; - CSigShare sigShare; - sigShare.llmqType = llmqType; - sigShare.quorumHash = quorumHash; - sigShare.quorumMember = s.first; - sigShare.id = id; - sigShare.msgHash = msgHash; - sigShare.sigShare = s.second; - sigShare.UpdateKey(); - return sigShare; + auto& m = internalMap[k.first]; + return m.emplace(k.second, v).second; } - CSigSharesInv ToInv() const; + void Erase(const SigShareKey& k) + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return; + } + it->second.erase(k.second); + if (it->second.empty()) { + internalMap.erase(it); + } + } + + void Clear() + { + internalMap.clear(); + } + + bool Has(const SigShareKey& k) const + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return false; + } + return it->second.count(k.second) != 0; + } + + T* Get(const SigShareKey& k) + { + auto it = internalMap.find(k.first); + if (it == internalMap.end()) { + return nullptr; + } + + auto jt = it->second.find(k.second); + if (jt == it->second.end()) { + return nullptr; + } + + return &jt->second; + } + + T& GetOrAdd(const SigShareKey& k) + { + T* v = Get(k); + if (!v) { + Add(k, T()); + v = Get(k); + } + return *v; + } + + const T* GetFirst() const + { + if (internalMap.empty()) { + return nullptr; + } + return &internalMap.begin()->second.begin()->second; + } + + size_t Size() const + { + size_t s = 0; + for (auto& p : internalMap) { + s += p.second.size(); + } + return s; + } + + size_t CountForSignHash(const uint256& signHash) const + { + auto it = internalMap.find(signHash); + if (it == internalMap.end()) { + return 0; + } + return it->second.size(); + } + + bool Empty() const + { + return internalMap.empty(); + } + + const std::unordered_map* GetAllForSignHash(const uint256& signHash) + { + auto it = internalMap.find(signHash); + if (it == internalMap.end()) { + return nullptr; + } + return &it->second; + } + + void EraseAllForSignHash(const uint256& signHash) + { + internalMap.erase(signHash); + } + + template + void EraseIf(F&& f) + { + for (auto it = internalMap.begin(); it != internalMap.end();) { + SigShareKey k; + k.first = it->first; + for (auto jt = it->second.begin(); jt != it->second.end();) { + k.second = jt->first; + if (f(k, jt->second)) { + jt = it->second.erase(jt); + } else { + ++jt; + } + } + if (it->second.empty()) { + it = internalMap.erase(it); + } else { + ++it; + } + } + } + + template + void ForEach(F&& f) + { + for (auto& p : internalMap) { + SigShareKey k; + k.first = p.first; + for (auto& p2 : p.second) { + k.second = p2.first; + f(k, p2.second); + } + } + } }; class CSigSharesNodeState { public: + // Used to avoid holding locks too long + struct SessionInfo { + uint32_t recvSessionId; + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + }; + struct Session { + uint32_t recvSessionId{(uint32_t)-1}; + uint32_t sendSessionId{(uint32_t)-1}; + + Consensus::LLMQType llmqType; + uint256 quorumHash; + uint256 id; + uint256 msgHash; + uint256 signHash; + + CQuorumCPtr quorum; + CSigSharesInv announced; CSigSharesInv requested; CSigSharesInv knows; }; - // TODO limit number of sessions per node; signHash Session - std::map sessions; + // TODO limit number of sessions per node + std::unordered_map sessions; - std::map pendingIncomingSigShares; - std::map requestedSigShares; + std::unordered_map sessionByRecvId; + uint32_t nextSendSessionId{1}; + + SigShareMap pendingIncomingSigShares; + SigShareMap requestedSigShares; // elements are added whenever we receive a valid sig share from this node // this triggers us to send inventory items to him as he seems to be interested in these - std::set> interestedIn; + std::unordered_set, StaticSaltedHasher> interestedIn; bool banned{false}; - Session& GetOrCreateSession(Consensus::LLMQType llmqType, const uint256& signHash); - - void MarkAnnounced(const uint256& signHash, const CSigSharesInv& inv); - void MarkRequested(const uint256& signHash, const CSigSharesInv& inv); - void MarkKnows(const uint256& signHash, const CSigSharesInv& inv); - - void MarkAnnounced(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkRequested(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); - void MarkKnows(Consensus::LLMQType llmqType, const uint256& signHash, uint16_t quorumMember); + Session& GetOrCreateSessionFromShare(const CSigShare& sigShare); + Session& GetOrCreateSessionFromAnn(const CSigSesAnn& ann); + Session* GetSessionBySignHash(const uint256& signHash); + Session* GetSessionByRecvId(uint32_t sessionId); + bool GetSessionInfoByRecvId(uint32_t sessionId, SessionInfo& retInfo); void RemoveSession(const uint256& signHash); }; -class CSigSharesManager +class CSigSharesManager : public CRecoveredSigsListener { - static const int64_t SIGNING_SESSION_TIMEOUT = 60 * 1000; + static const int64_t SESSION_NEW_SHARES_TIMEOUT = 60 * 1000; + static const int64_t SESSION_TOTAL_TIMEOUT = 5 * 60 * 1000; static const int64_t SIG_SHARE_REQUEST_TIMEOUT = 5 * 1000; + // we try to keep total message size below 10k + const size_t MAX_MSGS_CNT_QSIGSESANN = 100; + const size_t MAX_MSGS_CNT_QGETSIGSHARES = 200; + const size_t MAX_MSGS_CNT_QSIGSHARESINV = 200; + // 400 is the maximum quorum size, so this is also the maximum number of sigs we need to support + const size_t MAX_MSGS_TOTAL_BATCHED_SIGS = 400; + private: RecursiveMutex cs; std::thread workThread; CThreadInterrupt interruptSigningShare; - std::map sigShares; - std::map firstSeenForSessions; + SigShareMap sigShares; + + // stores time of first and last receivedSigShare. Used to detect timeouts + std::unordered_map, StaticSaltedHasher> timeSeenForSessions; - std::map nodeStates; - std::map> sigSharesRequested; - std::set sigSharesToAnnounce; + std::unordered_map nodeStates; + SigShareMap> sigSharesRequested; + SigShareMap sigSharesToAnnounce; std::vector> pendingSigns; @@ -189,6 +365,8 @@ class CSigSharesManager void StartWorkerThread(); void StopWorkerThread(); void Interrupt(); + void RegisterAsRecoveredSigsListener(); + void UnregisterAsRecoveredSigsListener(); public: void ProcessMessage(CNode* pnode, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); @@ -196,23 +374,35 @@ class CSigSharesManager void AsyncSign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); -private: - void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); - void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); - - bool VerifySigSharesInv(NodeId from, const CSigSharesInv& inv); - bool PreVerifyBatchedSigShares(NodeId nodeId, const CBatchedSigShares& batchedSigShares, bool& retBan); + void HandleNewRecoveredSig(const CRecoveredSig& recoveredSig); - void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, std::map>& retSigShares, std::map, CQuorumCPtr>& retQuorums); +private: + // all of these return false when the currently processed message should be aborted (as each message actually contains multiple messages) + bool ProcessMessageSigSesAnn(CNode* pfrom, const CSigSesAnn& ann, CConnman& connman); + bool ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); + bool ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman); + + bool VerifySigSharesInv(NodeId from, Consensus::LLMQType llmqType, const CSigSharesInv& inv); + bool PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan); + + void CollectPendingSigSharesToVerify(size_t maxUniqueSessions, + std::unordered_map>& retSigShares, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& retQuorums); bool ProcessPendingSigShares(CConnman& connman); - void ProcessPendingSigSharesFromNode(NodeId nodeId, const std::vector& sigShares, const std::map, CQuorumCPtr>& quorums, CConnman& connman); + void ProcessPendingSigSharesFromNode(NodeId nodeId, + const std::vector& sigShares, + const std::unordered_map, CQuorumCPtr, StaticSaltedHasher>& quorums, + CConnman& connman); void ProcessSigShare(NodeId nodeId, const CSigShare& sigShare, CConnman& connman, const CQuorumCPtr& quorum); void TryRecoverSig(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash, CConnman& connman); private: + bool GetSessionInfoByRecvId(NodeId nodeId, uint32_t sessionId, CSigSharesNodeState::SessionInfo& retInfo); + CSigShare RebuildSigShare(const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, size_t idx); + void Cleanup(); void RemoveSigSharesForSession(const uint256& signHash); void RemoveBannedNodeStates(); @@ -220,9 +410,9 @@ class CSigSharesManager void BanNode(NodeId nodeId); bool SendMessages(); - void CollectSigSharesToRequest(std::map>& sigSharesToRequest); - void CollectSigSharesToSend(std::map>& sigSharesToSend); - void CollectSigSharesToAnnounce(std::map>& sigSharesToAnnounce); + void CollectSigSharesToRequest(std::unordered_map>& sigSharesToRequest); + void CollectSigSharesToSend(std::unordered_map>& sigSharesToSend); + void CollectSigSharesToAnnounce(std::unordered_map>& sigSharesToAnnounce); bool SignPendingSigShares(); void WorkThreadMain(); }; diff --git a/src/protocol.cpp b/src/protocol.cpp index 5a332a920a2cb..b327b3daeb352 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -60,6 +60,7 @@ const char* QCONTRIB = "qcontrib"; const char* QCOMPLAINT = "qcomplaint"; const char* QJUSTIFICATION = "qjustify"; const char* QPCOMMITMENT = "qpcommit"; +const char* QSIGSESANN = "qsigsesann"; const char* QSIGSHARESINV = "qsigsinv"; const char* QGETSIGSHARES = "qgetsigs"; const char* QBSIGSHARES = "qbsigs"; @@ -122,6 +123,7 @@ const static std::string allNetMessageTypes[] = { NetMsgType::QCOMPLAINT, NetMsgType::QJUSTIFICATION, NetMsgType::QPCOMMITMENT, + NetMsgType::QSIGSESANN, NetMsgType::QSIGSHARESINV, NetMsgType::QGETSIGSHARES, NetMsgType::QBSIGSHARES, diff --git a/src/protocol.h b/src/protocol.h index fe93d927034d5..a0d251bf87060 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -293,6 +293,7 @@ extern const char* QCONTRIB; extern const char* QCOMPLAINT; extern const char* QJUSTIFICATION; extern const char* QPCOMMITMENT; +extern const char* QSIGSESANN; extern const char* QSIGSHARESINV; extern const char* QGETSIGSHARES; extern const char* QBSIGSHARES; diff --git a/src/tiertwo_networksync.cpp b/src/tiertwo_networksync.cpp index 296e1824863d2..383e8b67caaa3 100644 --- a/src/tiertwo_networksync.cpp +++ b/src/tiertwo_networksync.cpp @@ -72,7 +72,7 @@ bool CMasternodeSync::MessageDispatcher(CNode* pfrom, std::string& strCommand, C } return true; } - if (strCommand == NetMsgType::QSIGSHARESINV || strCommand == NetMsgType::QGETSIGSHARES || strCommand == NetMsgType::QBSIGSHARES) { + if (strCommand == NetMsgType::QSIGSHARESINV || strCommand == NetMsgType::QGETSIGSHARES || strCommand == NetMsgType::QBSIGSHARES || strCommand == NetMsgType::QSIGSESANN) { llmq::quorumSigSharesManager->ProcessMessage(pfrom, strCommand, vRecv, *g_connman); return true; }