Skip to content

Commit

Permalink
Merge #2921: Dash llmq backports
Browse files Browse the repository at this point in the history
22efef0 No matter how fast we process sig shares, always force 100ms between sending (Alexander Block)
1bd0c2f Don't sleep in WorkThreadMain when CPU intensive work was done (Alexander Block)
40a247d Implement caching in CRecoveredSigsDb (Alexander Block)
6c63a77 Use CBLSLazySignature in CBatchedSigShares (Alexander Block)
bff6e6e CLLMQUtils::IsQuorumActive() shouldn't require cs_main to be held (UdjinM6)
fdba615 Introduce startBlockHeight to make things more explicit (Alexander Block)
7bca9a2 Correctly use SIGN_HEIGHT_OFFSET when checking for out of bound height (Alexander Block)
04b6a67 Reserve 4k of buffer for CSerializedNetMsg buffer (Alexander Block)
be1c4a7 Fix cleanup of old recovered sigs (Alexander Block)

Pull request description:

  Backport dash PRs (in the following order):
  dashpay#2640
  dashpay#2672
  dashpay#2674
  dashpay#2702
  dashpay#2705
  dashpay#2706
  dashpay#2707

  NB: Some or them are partial backports since most of our llmq files are already updated to a more recent upstream version. So I basically ended up picking the commits that touch only the files `quorums_signing_shares.*` and `quorums_signing.*`

  I suggest to review by checking each individual commit against the corresponding upstream one

ACKs for top commit: 22efef0
  Liquid369:
    tACK 22efef0
  Duddino:
    ACK 22efef0

Tree-SHA512: 94e5dcdef044524ad4fa4d1fadbe670d839a08ccadc39071ba9300cfc44ce3d19ba24dfef1a6c6745271139d43f43ea47ed5a29e06f0d94e13ff09957f8e0c10
  • Loading branch information
Fuzzbawls committed Aug 9, 2024
2 parents a254b29 + 22efef0 commit 4c17603
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 160 deletions.
198 changes: 136 additions & 62 deletions src/llmq/quorums_signing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,46 @@ bool CRecoveredSigsDb::HasRecoveredSig(Consensus::LLMQType llmqType, const uint2

bool CRecoveredSigsDb::HasRecoveredSigForId(Consensus::LLMQType llmqType, const uint256& id)
{
int64_t t = GetTimeMillis();

auto cacheKey = std::make_pair(llmqType, id);
{
LOCK(cs);
auto it = hasSigForIdCache.find(cacheKey);
if (it != hasSigForIdCache.end()) {
it->second.second = t;
return it->second.first;
}
}


auto k = std::make_tuple('r', (uint8_t)llmqType, id);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForIdCache.emplace(cacheKey, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::HasRecoveredSigForSession(const uint256& signHash)
{
int64_t t = GetTimeMillis();

{
LOCK(cs);
auto it = hasSigForSessionCache.find(signHash);
if (it != hasSigForSessionCache.end()) {
it->second.second = t;
return it->second.first;
}
}

auto k = std::make_tuple('s', signHash);
return db.Exists(k);
bool ret = db.Exists(k);

LOCK(cs);
hasSigForSessionCache.emplace(signHash, std::make_pair(ret, t));
return ret;
}

bool CRecoveredSigsDb::HasRecoveredSigForHash(const uint256& hash)
Expand Down Expand Up @@ -100,7 +132,8 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
batch.Write(k3, std::make_pair(recSig.llmqType, recSig.id));

// store by signHash
auto k4 = std::make_tuple('s', llmq::utils::BuildSignHash(recSig));
auto signHash = llmq::utils::BuildSignHash(recSig);
auto k4 = std::make_tuple('s', signHash);
batch.Write(k4, (uint8_t)1);

// remove the votedForId entry as we won't need it anymore
Expand All @@ -112,14 +145,48 @@ void CRecoveredSigsDb::WriteRecoveredSig(const llmq::CRecoveredSig& recSig)
batch.Write(k6, (uint8_t)1);

db.WriteBatch(batch);

{
int64_t t = GetTimeMillis();

LOCK(cs);
hasSigForIdCache[std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id)] = std::make_pair(true, t);
hasSigForSessionCache[signHash] = std::make_pair(true, t);
}
}

template<typename K>
static void TruncateCacheMap(std::unordered_map<K, std::pair<bool, int64_t>>& m, size_t maxSize, size_t truncateThreshold)
{
typedef typename std::unordered_map<K, std::pair<bool, int64_t>> Map;
typedef typename Map::iterator Iterator;

if (m.size() <= truncateThreshold) {
return;
}

std::vector<Iterator> vec;
vec.reserve(m.size());
for (auto it = m.begin(); it != m.end(); ++it) {
vec.emplace_back(it);
}
// sort by last access time (descending order)
std::sort(vec.begin(), vec.end(), [](const Iterator& it1, const Iterator& it2) {
return it1->second.second > it2->second.second;
});

for (size_t i = maxSize; i < vec.size(); i++) {
m.erase(vec[i]);
}
}

void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
{
std::unique_ptr<CDBIterator> pcursor(db.NewIterator());

static const uint256 maxUint256 = uint256S("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff");
auto start = std::make_tuple('t', (uint32_t)0, (uint8_t)0, uint256());
auto end = std::make_tuple('t', (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)0, uint256());
auto end = std::make_tuple('t', (uint32_t)(GetAdjustedTime() - maxAge), (uint8_t)255, maxUint256);
pcursor->Seek(start);

std::vector<std::pair<Consensus::LLMQType, uint256>> toDelete;
Expand Down Expand Up @@ -147,22 +214,33 @@ void CRecoveredSigsDb::CleanupOldRecoveredSigs(int64_t maxAge)
}

CDBBatch batch(CLIENT_VERSION | ADDRV2_FORMAT);
for (auto& e : toDelete) {
CRecoveredSig recSig;
if (!ReadRecoveredSig(e.first, e.second, recSig)) {
continue;
{
LOCK(cs);
for (auto& e : toDelete) {
CRecoveredSig recSig;
if (!ReadRecoveredSig(e.first, e.second, recSig)) {
continue;
}

auto signHash = llmq::utils::BuildSignHash(recSig);

auto k1 = std::make_tuple('r', recSig.llmqType, recSig.id);
auto k2 = std::make_tuple('r', recSig.llmqType, recSig.id, recSig.msgHash);
auto k3 = std::make_tuple('h', recSig.GetHash());
auto k4 = std::make_tuple('s', signHash);
auto k5 = std::make_tuple('v', recSig.llmqType, recSig.id);
batch.Erase(k1);
batch.Erase(k2);
batch.Erase(k3);
batch.Erase(k4);
batch.Erase(k5);

hasSigForIdCache.erase(std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.id));
hasSigForSessionCache.erase(signHash);
}

auto k1 = std::make_tuple('r', recSig.llmqType, recSig.id);
auto k2 = std::make_tuple('r', recSig.llmqType, recSig.id, recSig.msgHash);
auto k3 = std::make_tuple('h', recSig.GetHash());
auto k4 = std::make_tuple('s', llmq::utils::BuildSignHash(recSig));
auto k5 = std::make_tuple('v', recSig.llmqType, recSig.id);
batch.Erase(k1);
batch.Erase(k2);
batch.Erase(k3);
batch.Erase(k4);
batch.Erase(k5);
TruncateCacheMap(hasSigForIdCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
TruncateCacheMap(hasSigForSessionCache, MAX_CACHE_SIZE, MAX_CACHE_TRUNCATE_THRESHOLD);
}

for (auto& e : toDelete2) {
Expand Down Expand Up @@ -256,19 +334,15 @@ bool CSigningManager::PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig&
return false;
}

CQuorumCPtr quorum;
{
LOCK(cs_main);
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recoveredSig.quorumHash);

quorum = quorumManager->GetQuorum(llmqType, recoveredSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recoveredSig.quorumHash.ToString(), nodeId);
return false;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
return false;
}
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recoveredSig.quorumHash.ToString(), nodeId);
return false;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
return false;
}

if (!recoveredSig.sig.IsValid()) {
Expand Down Expand Up @@ -311,49 +385,46 @@ void CSigningManager::CollectPendingRecoveredSigsToVerify(
}
}

{
LOCK(cs_main);
for (auto& p : retSigShares) {
NodeId nodeId = p.first;
auto& v = p.second;

for (auto it = v.begin(); it != v.end();) {
auto& recSig = *it;

Consensus::LLMQType llmqType = (Consensus::LLMQType)recSig.llmqType;
auto quorumKey = std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.quorumHash);
if (!retQuorums.count(quorumKey)) {
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
LogPrintf("CSigningManager::%s -- quorum %s not active anymore, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}

retQuorums.emplace(quorumKey, quorum);
for (auto& p : retSigShares) {
NodeId nodeId = p.first;
auto& v = p.second;

for (auto it = v.begin(); it != v.end();) {
auto& recSig = *it;

Consensus::LLMQType llmqType = (Consensus::LLMQType)recSig.llmqType;
auto quorumKey = std::make_pair((Consensus::LLMQType)recSig.llmqType, recSig.quorumHash);
if (!retQuorums.count(quorumKey)) {
CQuorumCPtr quorum = quorumManager->GetQuorum(llmqType, recSig.quorumHash);
if (!quorum) {
LogPrintf("CSigningManager::%s -- quorum %s not found, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}
if (!llmq::utils::IsQuorumActive(llmqType, quorum->pindexQuorum->GetBlockHash())) {
LogPrintf("CSigningManager::%s -- quorum %s not active anymore, node=%d\n", __func__,
recSig.quorumHash.ToString(), nodeId);
it = v.erase(it);
continue;
}

++it;
retQuorums.emplace(quorumKey, quorum);
}

++it;
}
}
}

void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
bool CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
{
std::map<NodeId, std::list<CRecoveredSig>> recSigsByNode;
std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr> quorums;

CollectPendingRecoveredSigsToVerify(32, recSigsByNode, quorums);
if (recSigsByNode.empty()) {
return;
return false;
}

// It's ok to perform insecure batched verification here as we verify against the quorum public keys, which are not
Expand Down Expand Up @@ -399,6 +470,8 @@ void CSigningManager::ProcessPendingRecoveredSigs(CConnman& connman)
ProcessRecoveredSig(nodeId, recSig, quorum, connman);
}
}

return true;
}

// signature must be verified already
Expand Down Expand Up @@ -557,10 +630,11 @@ CQuorumCPtr CSigningManager::SelectQuorumForSigning(Consensus::LLMQType llmqType
CBlockIndex* pindexStart;
{
LOCK(cs_main);
if (signHeight > chainActive.Height()) {
int startBlockHeight = signHeight - SIGN_HEIGHT_OFFSET;
if (startBlockHeight > chainActive.Height()) {
return nullptr;
}
pindexStart = chainActive[signHeight - SIGN_HEIGHT_OFFSET];
pindexStart = chainActive[startBlockHeight];
}

auto quorums = quorumManager->ScanQuorums(llmqType, pindexStart, poolSize);
Expand Down
22 changes: 21 additions & 1 deletion src/llmq/quorums_signing.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@
#include "net.h"
#include "sync.h"

#include <unordered_map>

namespace std {
template <>
struct hash<std::pair<Consensus::LLMQType, uint256>>
{
std::size_t operator()(const std::pair<Consensus::LLMQType, uint256>& k) const
{
return (std::size_t)((k.first + 1) * k.second.GetCheapHash());
}
};
}

namespace llmq
{

Expand Down Expand Up @@ -67,9 +80,16 @@ class CRecoveredSig
// TODO implement caching to speed things up
class CRecoveredSigsDb
{
static const size_t MAX_CACHE_SIZE = 30000;
static const size_t MAX_CACHE_TRUNCATE_THRESHOLD = 50000;

private:
CDBWrapper db;

RecursiveMutex cs;
std::unordered_map<std::pair<Consensus::LLMQType, uint256>, std::pair<bool, int64_t>> hasSigForIdCache;
std::unordered_map<uint256, std::pair<bool, int64_t>> hasSigForSessionCache;

public:
CRecoveredSigsDb(bool fMemory);

Expand Down Expand Up @@ -138,7 +158,7 @@ class CSigningManager
bool PreVerifyRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, bool& retBan);

void CollectPendingRecoveredSigsToVerify(size_t maxUniqueSessions, std::map<NodeId, std::list<CRecoveredSig>>& retSigShares, std::map<std::pair<Consensus::LLMQType, uint256>, CQuorumCPtr>& retQuorums);
void ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
bool ProcessPendingRecoveredSigs(CConnman& connman); // called from the worker thread of CSigSharesManager
void ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& recoveredSig, const CQuorumCPtr& quorum, CConnman& connman);
void Cleanup(); // called from the worker thread of CSigSharesManager

Expand Down
Loading

0 comments on commit 4c17603

Please sign in to comment.