Skip to content

Commit

Permalink
Fixups to mempool synch code to handle rare corner case (mostly for BTC)
Browse files Browse the repository at this point in the history
There was a race condition to the SynchMempoolTask. See issue #214. It
could lead to a situation where the prefetcher thread was joined by 2
threads at once (bad!).

This has now been addressed and also failures to read inputs from
mempool and/or DB are now handled much more robustly (due to block-only
txns + new block arriving as we are synching the mempool -- rare corner
case!).

Fixes issue #214.
  • Loading branch information
cculianu committed Nov 22, 2023
1 parent f9942dc commit 34a825a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
38 changes: 29 additions & 9 deletions src/Controller_SynchMempoolTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ struct SynchMempoolTask::Precache {
std::vector<bitcoin::CTransactionRef> workQueue; ///< guarded by mut, signaled by cond
std::atomic_bool stopFlag = false, doneSubmittingWorkFlag = false, threadIsRunning = false;
std::thread thread;
std::atomic_bool didErrorOut = false;

void startThread(size_t reserve, Mempool::TxHashSet tentativeMempoolTxHashes);
void waitUntilDone();
[[nodiscard]] bool waitUntilDone();
void stopThread();
void submitWork(const bitcoin::CTransactionRef &tx);
void threadFunc(size_t reserve, Mempool::TxHashSet tentativeMempoolTxHashes);
Expand Down Expand Up @@ -132,11 +133,11 @@ void SynchMempoolTask::Precache::stopThread()
thread.join();
}
std::unique_lock g(mut); // keep TSAN happy
doneSubmittingWorkFlag = stopFlag = threadIsRunning = false;
doneSubmittingWorkFlag = stopFlag = threadIsRunning = didErrorOut = false;
workQueue.clear();
}

void SynchMempoolTask::Precache::waitUntilDone()
bool SynchMempoolTask::Precache::waitUntilDone()
{
if (thread.joinable()) {
Tic t0;
Expand All @@ -149,6 +150,7 @@ void SynchMempoolTask::Precache::waitUntilDone()
if (const double el = t0.msec<double>(); el >= 500.)
DebugM("Waited ", QString::number(el, 'f', 3), " msec for precache thread to finish");
}
return !didErrorOut;
}

void SynchMempoolTask::Precache::submitWork(const bitcoin::CTransactionRef &tx)
Expand All @@ -163,12 +165,15 @@ void SynchMempoolTask::Precache::submitWork(const bitcoin::CTransactionRef &tx)

void SynchMempoolTask::Precache::threadFunc(const size_t reserve, const Mempool::TxHashSet tentativeMempoolTxHashes)
{
static auto constexpr funcName = "SynchMempoolTask::Precache::threadFunc";
if (QThread *t = QThread::currentThread(); t && t != parent.thread() && t != qApp->thread()) {
t->setObjectName("SyncMempoolPreCache");
} else {
Fatal() << __func__ << ": Expected this function to run in its own thread!";
Fatal() << funcName << ": Expected this function to run in its own thread!";
didErrorOut = true;
return;
}
didErrorOut = false;
DebugM("Thread started");
size_t tot = 0u, ctr = 0u;
Tic t0;
Expand Down Expand Up @@ -207,15 +212,17 @@ void SynchMempoolTask::Precache::threadFunc(const size_t reserve, const Mempool:
// Potential race-condition with bitcoind confirming blocks before we realized it,
// and then a mempool txn appearing refering to a txn that was block-only.
// Signal error and on retry things should settle ok.
Warning() << __func__ << ": Unable to find prevout " << txo.toString()
Warning() << funcName << ": Unable to find prevout " << txo.toString()
<< " in DB for tx " << tx->GetId().ToString()
<< " (possibly a block arrived while synching mempool, will retry)";
didErrorOut = true;
emit parent.errored();
return;
}
++ctr;
} catch (const std::exception & e) {
Error() << __func__ << ": Got low-level DB error retrieving " << txo.toString() << ": " << e.what();
Error() << funcName << ": Got low-level DB error retrieving " << txo.toString() << ": " << e.what();
didErrorOut = true;
emit parent.errored();
return;
}
Expand All @@ -227,8 +234,9 @@ void SynchMempoolTask::Precache::threadFunc(const size_t reserve, const Mempool:

void SynchMempoolTask::stop()
{
precache->stopThread();
CtlTask::stop(); // call superclass
// Note: below should only be called here *after* our thread has stopped to avoid race conditions (see issue #214)
precache->stopThread();
}

void SynchMempoolTask::updateLastProgress(std::optional<double> val)
Expand Down Expand Up @@ -270,6 +278,14 @@ void SynchMempoolTask::process()
state = State::ProcessingResults;
try {
processResults();
} catch (const Mempool::ConsistencyError & e) {
Error() << "Mempool consistency error: " << e.what();
Error() << "Clearing mempool and restarting SynchMempoolTask";
{
auto [mempool, lock] = storage->mutableMempool(); // grab mempool struct exclusively
mempool.clear();
}
redoFromStart();
} catch (const std::exception & e) {
Error() << "Caught exception when processing mempool tx's: " << e.what();
emit errored();
Expand Down Expand Up @@ -543,11 +559,15 @@ void SynchMempoolTask::processResults()
// precache of the confirmed spends done in another thread, wait for it to complete now
if (!txsDownloaded.empty()) {
if (!precache->thread.joinable()) {
Error() << __PRETTY_FUNCTION__ << " precache->thread should be running -- FIXME!";
Error() << __PRETTY_FUNCTION__ << ": precache->thread should be running -- FIXME!";
emit errored();
return;
}
if (!precache->waitUntilDone()) {
Error() << __func__ << ": precache->thread errored out, aborting SynchMempoolTask";
emit errored();
return;
}
precache->waitUntilDone();
}
updateLastProgress(0.75);
auto & cache = precache->cache;
Expand Down
14 changes: 8 additions & 6 deletions src/Mempool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <optional>
#include <utility>

Mempool::ConsistencyError::~ConsistencyError() {} // for vtable

void Mempool::clear() {
txs.clear();
hashXTxs.clear();
Expand Down Expand Up @@ -151,14 +153,14 @@ auto Mempool::addNewTxs(ScriptHashesAffectedSet & scriptHashesAffected,
if (prevN >= prevTxRef->txos.size()
|| !(pprevInfo = &prevTxRef->txos[prevN])->isValid())
// defensive programming paranoia
throw InternalError(QString("FAILED TO FIND A VALID PREVIOUS TXOUTN %1:%2 IN MEMPOOL for TxHash: %3 (input %4)")
.arg(QString(prevTxId.toHex())).arg(prevN).arg(QString(hash.toHex())).arg(inNum));
throw ConsistencyError(QString("FAILED TO FIND A VALID PREVIOUS TXOUTN %1:%2 IN MEMPOOL for TxHash: %3 (input %4)")
.arg(QString(prevTxId.toHex())).arg(prevN).arg(QString(hash.toHex())).arg(inNum));
sh = pprevInfo->hashX;
const auto & refPrevInfo = tx->hashXs[sh].unconfirmedSpends[prevTXO] = *pprevInfo;
auto prevHashXIt = prevTxRef->hashXs.find(sh);
if (prevHashXIt == prevTxRef->hashXs.end())
throw InternalError(QString("PREV OUT %1 IS MISSING ITS HASHX ENTRY FOR HASHX %2 (txid: %3)")
.arg(prevTXO.toString(), QString(sh.toHex()), QString(tx->hash.toHex())));
throw ConsistencyError(QString("PREV OUT %1 IS MISSING ITS HASHX ENTRY FOR HASHX %2 (txid: %3)")
.arg(prevTXO.toString(), QString(sh.toHex()), QString(tx->hash.toHex())));
prevHashXIt->second.utxo.erase(prevN); // remove this spend from utxo set for prevTx in mempool
if (TRACE) {
Debug() << hash.toHex() << " unconfirmed spend: " << prevTXO.toString() << " " << refPrevInfo.amount.ToString().c_str()
Expand Down Expand Up @@ -207,8 +209,8 @@ auto Mempool::addNewTxs(ScriptHashesAffectedSet & scriptHashesAffected,
// (or there maybe was a race condition and a new block came in while we were doing this).
// We will throw if missing, and the synch process aborts and hopefully we recover with a reorg
// or a new block or somesuch.
throw InternalError(QString("FAILED TO FIND PREVIOUS TX %1 IN EITHER MEMPOOL OR DB for TxHash: %2 (input %3)")
.arg(prevTXO.toString()).arg(QString(hash.toHex())).arg(inNum));
throw ConsistencyError(QString("FAILED TO FIND PREVIOUS TX %1 IN EITHER MEMPOOL OR DB for TxHash: %2 (input %3)")
.arg(prevTXO.toString()).arg(QString(hash.toHex())).arg(inNum));
}
pprevInfo = &*optTXOInfo;
sh = pprevInfo->hashX;
Expand Down
9 changes: 8 additions & 1 deletion src/Mempool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once

#include "BlockProcTypes.h"
#include "Common.h"
#include "DSProof.h"
#include "TXO.h"

Expand All @@ -37,7 +38,6 @@
#include <utility>
#include <vector>


/// Models the mempool
struct Mempool
{
Expand Down Expand Up @@ -150,6 +150,13 @@ struct Mempool
double elapsedMsec = 0.;
};

/// Thrown by addNewTxs if we cannot retrieve a coin from the mempool or DB. Appropriate recovery is to clear
/// the entire mempool and start over since the mempool now is likely in an inconsistent state.
struct ConsistencyError : InternalError {
using InternalError::InternalError;
~ConsistencyError() override;
};

/// Add a batch of tx's that are new (downloaded from bitcoind) and were not previously in this mempool structure.
///
/// Note that all the txs in txsNew *must* be new (must not already exist in this mempool instance).
Expand Down

0 comments on commit 34a825a

Please sign in to comment.