diff --git a/CMakeLists.txt b/CMakeLists.txt index c6100be0d66..ec4fc254433 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2013,6 +2013,7 @@ else () src/test/app/PayChan_test.cpp src/test/app/PayStrand_test.cpp src/test/app/PseudoTx_test.cpp + src/test/app/RCLCensorshipDetector_test.cpp src/test/app/RCLValidations_test.cpp src/test/app/Regression_test.cpp src/test/app/SHAMapStore_test.cpp diff --git a/src/ripple/app/consensus/RCLCensorshipDetector.h b/src/ripple/app/consensus/RCLCensorshipDetector.h new file mode 100644 index 00000000000..ead1bb91243 --- /dev/null +++ b/src/ripple/app/consensus/RCLCensorshipDetector.h @@ -0,0 +1,143 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2018 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_CONSENSUS_RCLCENSORSHIPDETECTOR_H_INCLUDED +#define RIPPLE_APP_CONSENSUS_RCLCENSORSHIPDETECTOR_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +template +class RCLCensorshipDetector +{ +private: + std::map tracker_; + + /** Removes all elements satisfying specific criteria from the tracker + + @param pred A predicate which returns true for tracking entries that + should be removed. The predicate must be callable as: + bool pred(TxID const&, Sequence) + It must return true for entries that should be removed. + + @note This could be replaced with std::erase_if when it becomes part + of the standard. For example: + + prune ([](TxID const& id, Sequence seq) + { + return id.isZero() || seq == 314159; + }); + + would become: + + std::erase_if(tracker_.begin(), tracker_.end(), + [](auto const& e) + { + return e.first.isZero() || e.second == 314159; + } + */ + template + void prune(Predicate&& pred) + { + auto t = tracker_.begin(); + + while (t != tracker_.end()) + { + if (pred(t->first, t->second)) + t = tracker_.erase(t); + else + t = std::next(t); + } + } + +public: + RCLCensorshipDetector() = default; + + /** Add transactions being proposed for the current consensus round. + + @param seq The sequence number of the ledger being built. + @param proposed The set of transactions that we are initially proposing + for this round. + */ + void propose( + Sequence seq, + std::vector proposed) + { + std::sort (proposed.begin(), proposed.end()); + + // We want to remove any entries that we proposed in a previous round + // that did not make it in yet if we are no longer proposing them. + prune ([&proposed](TxID const& id, Sequence seq) + { + return !std::binary_search(proposed.begin(), proposed.end(), id); + }); + + // Track the entries that we are proposing in this round. + for (auto const& p : proposed) + tracker_.emplace(p, seq); // FIXME C++17: use try_emplace + } + + /** Determine which transactions made it and perform censorship detection. + + This function is called when the server is proposing and a consensus + round it participated in completed. + + @param accepted The set of transactions that the network agreed + should be included in the ledger being built. + @param pred A predicate invoked for every transaction we've proposed + but which hasn't yet made it. The predicate must be + callable as: + bool pred(TxID const&, Sequence) + It must return true for entries that should be removed. + */ + template + void check( + std::vector accepted, + Predicate&& pred) + { + std::sort (accepted.begin(), accepted.end()); + + // We want to remove all tracking entries for transactions that were + // accepted as well as those which match the predicate. + prune ([&pred, &accepted](TxID const& id, Sequence seq) + { + if (std::binary_search(accepted.begin(), accepted.end(), id)) + return true; + + return pred(id, seq); + }); + } + + /** Removes all elements from the tracker + + Typically, this function might be called after we reconnect to the + network following an outage, or after we start tracking the network. + */ + void reset() + { + tracker_.clear(); + } +}; + +} + +#endif diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index ec43eb9b6c3..a5630790fdd 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -40,6 +40,7 @@ #include #include #include +#include namespace ripple { @@ -87,44 +88,42 @@ RCLConsensus::Adaptor::Adaptor( } boost::optional -RCLConsensus::Adaptor::acquireLedger(LedgerHash const& ledger) +RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) { // we need to switch the ledger we're working from - auto buildLCL = ledgerMaster_.getLedgerByHash(ledger); - if (!buildLCL) + auto built = ledgerMaster_.getLedgerByHash(hash); + if (!built) { - if (acquiringLedger_ != ledger) + if (acquiringLedger_ != hash) { // need to start acquiring the correct consensus LCL - JLOG(j_.warn()) << "Need consensus ledger " << ledger; + JLOG(j_.warn()) << "Need consensus ledger " << hash; // Tell the ledger acquire system that we need the consensus ledger - acquiringLedger_ = ledger; - - auto app = &app_; - auto hash = acquiringLedger_; - app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [app, hash](Job&) { - app->getInboundLedgers().acquire( - hash, 0, InboundLedger::Reason::CONSENSUS); + acquiringLedger_ = hash; + + app_.getJobQueue().addJob(jtADVANCE, "getConsensusLedger", + [id = hash, &app = app_](Job&) + { + app.getInboundLedgers().acquire(id, 0, + InboundLedger::Reason::CONSENSUS); }); } return boost::none; } - assert(!buildLCL->open() && buildLCL->isImmutable()); - assert(buildLCL->info().hash == ledger); + assert(!built->open() && built->isImmutable()); + assert(built->info().hash == hash); // Notify inbound transactions of the new ledger sequence number - inboundTransactions_.newRound(buildLCL->info().seq); + inboundTransactions_.newRound(built->info().seq); // Use the ledger timing rules of the acquired ledger - parms_.useRoundedCloseTime = buildLCL->rules().enabled(fix1528); + parms_.useRoundedCloseTime = built->rules().enabled(fix1528); - return RCLCxLedger(buildLCL); + return RCLCxLedger(built); } - void RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) { @@ -328,7 +327,22 @@ RCLConsensus::Adaptor::onClose( // Now we need an immutable snapshot initialSet = initialSet->snapShot(false); - auto setHash = initialSet->getHash().as_uint256(); + + if (!wrongLCL) + { + std::vector proposed; + + initialSet->visitLeaves( + [&proposed](std::shared_ptr const& item) + { + proposed.push_back(item->key()); + }); + + censorshipDetector_.propose(prevLedger->info().seq + 1, std::move(proposed)); + } + + // Needed because of the move below. + auto const setHash = initialSet->getHash().as_uint256(); return Result{ std::move(initialSet), @@ -431,33 +445,95 @@ RCLConsensus::Adaptor::doAccept( << prevLedger.seq(); //-------------------------------------------------------------------------- - // Put transactions into a deterministic, but unpredictable, order - CanonicalTXSet retriableTxs{result.txns.id()}; + std::set failed; - auto sharedLCL = buildLCL( - prevLedger, - result.txns, - consensusCloseTime, - closeTimeCorrect, - closeResolution, - result.roundTime.read(), - retriableTxs); + // We want to put transactions in an unpredictable but deterministic order: + // we use the hash of the set. + // + // FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet? + CanonicalTXSet retriableTxs{ result.txns.map_->getHash().as_uint256() }; + + JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key(); + + for (auto const& item : *result.txns.map_) + { + try + { + retriableTxs.insert(std::make_shared(SerialIter{item.slice()})); + JLOG(j_.debug()) << " Tx: " << item.key(); + } + catch (std::exception const&) + { + failed.insert(item.key()); + JLOG(j_.warn()) << " Tx: " << item.key() << " throws!"; + } + } + + auto built = buildLCL(prevLedger, retriableTxs, consensusCloseTime, + closeTimeCorrect, closeResolution, result.roundTime.read(), failed); - auto const newLCLHash = sharedLCL.id(); - JLOG(j_.debug()) << "Report: NewL = " << newLCLHash << ":" - << sharedLCL.seq(); + auto const newLCLHash = built.id(); + JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash; // Tell directly connected peers that we have a new LCL - notify(protocol::neACCEPTED_LEDGER, sharedLCL, haveCorrectLCL); + notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL); + + // As long as we're in sync with the network, attempt to detect attempts + // at censorship of transaction by tracking which ones don't make it in + // after a period of time. + if (haveCorrectLCL && result.state == ConsensusState::Yes) + { + std::vector accepted; + + result.txns.map_->visitLeaves ( + [&accepted](std::shared_ptr const& item) + { + accepted.push_back(item->key()); + }); + + // Track all the transactions which failed or were marked as retriable + for (auto const& r : retriableTxs) + failed.insert (r.first.getTXID()); + + censorshipDetector_.check(std::move(accepted), + [curr = built.seq(), j = app_.journal("CensorshipDetector"), &failed] + (uint256 const& id, LedgerIndex seq) + { + if (failed.count(id)) + return true; + + auto const wait = curr - seq; + + if (wait && (wait % censorshipWarnInternal == 0)) + { + std::ostringstream ss; + ss << "Potential Censorship: Eligible tx " << id + << ", which we are tracking since ledger " << seq + << " has not been included as of ledger " << curr + << "."; + + if (wait / censorshipWarnInternal == censorshipMaxWarnings) + { + JLOG(j.error()) << ss.str() << " Additional warnings suppressed."; + } + else + { + JLOG(j.warn()) << ss.str(); + } + } + + return false; + }); + } if (validating_) validating_ = ledgerMaster_.isCompatible( - *sharedLCL.ledger_, j_.warn(), "Not validating"); + *built.ledger_, j_.warn(), "Not validating"); if (validating_ && !consensusFail && - app_.getValidations().canValidateSeq(sharedLCL.seq())) + app_.getValidations().canValidateSeq(built.seq())) { - validate(sharedLCL, result.txns, proposing); + validate(built, result.txns, proposing); JLOG(j_.info()) << "CNF Val " << newLCLHash; } else @@ -465,7 +541,7 @@ RCLConsensus::Adaptor::doAccept( // See if we can accept a ledger as fully-validated ledgerMaster_.consensusBuilt( - sharedLCL.ledger_, result.txns.id(), std::move(consensusJson)); + built.ledger_, result.txns.id(), std::move(consensusJson)); //------------------------------------------------------------------------- { @@ -526,7 +602,7 @@ RCLConsensus::Adaptor::doAccept( app_.openLedger().accept( app_, *rules, - sharedLCL.ledger_, + built.ledger_, localTxs_.getTxSet(), anyDisputes, retriableTxs, @@ -544,12 +620,12 @@ RCLConsensus::Adaptor::doAccept( //------------------------------------------------------------------------- { - ledgerMaster_.switchLCL(sharedLCL.ledger_); + ledgerMaster_.switchLCL(built.ledger_); // Do these need to exist? - assert(ledgerMaster_.getClosedLedger()->info().hash == sharedLCL.id()); + assert(ledgerMaster_.getClosedLedger()->info().hash == built.id()); assert( - app_.openLedger().current()->info().parentHash == sharedLCL.id()); + app_.openLedger().current()->info().parentHash == built.id()); } //------------------------------------------------------------------------- @@ -637,43 +713,36 @@ RCLConsensus::Adaptor::notify( RCLCxLedger RCLConsensus::Adaptor::buildLCL( RCLCxLedger const& previousLedger, - RCLTxSet const& txns, + CanonicalTXSet& retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, - CanonicalTXSet& retriableTxs) + std::set& failedTxs) { - std::shared_ptr buildLCL = [&]() { - auto const replayData = ledgerMaster_.releaseReplay(); - if (replayData) + std::shared_ptr built = [&]() + { + if (auto const replayData = ledgerMaster_.releaseReplay()) { assert(replayData->parent()->info().hash == previousLedger.id()); return buildLedger(*replayData, tapNONE, app_, j_); } - return buildLedger( - previousLedger.ledger_, - closeTime, - closeTimeCorrect, - closeResolution, - *txns.map_, - app_, - retriableTxs, - j_); + return buildLedger(previousLedger.ledger_, closeTime, closeTimeCorrect, + closeResolution, app_, retriableTxs, failedTxs, j_); }(); // Update fee computations based on accepted txs using namespace std::chrono_literals; - app_.getTxQ().processClosedLedger(app_, *buildLCL, roundTime > 5s); + app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s); // And stash the ledger in the ledger master - if (ledgerMaster_.storeLedger(buildLCL)) + if (ledgerMaster_.storeLedger(built)) JLOG(j_.debug()) << "Consensus built ledger we already had"; - else if (app_.getInboundLedgers().find(buildLCL->info().hash)) + else if (app_.getInboundLedgers().find(built->info().hash)) JLOG(j_.debug()) << "Consensus built ledger we were acquiring"; else JLOG(j_.debug()) << "Consensus built new ledger"; - return RCLCxLedger{std::move(buildLCL)}; + return RCLCxLedger{std::move(built)}; } void @@ -735,6 +804,12 @@ RCLConsensus::Adaptor::onModeChange( { JLOG(j_.info()) << "Consensus mode change before=" << to_string(before) << ", after=" << to_string(after); + + // If we were proposing but aren't any longer, we need to reset the + // censorship tracking to avoid bogus warnings. + if ((before == ConsensusMode::proposing || before == ConsensusMode::observing) && before != after) + censorshipDetector_.reset(); + mode_ = after; } diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h index 6d7e32290f0..b40f62becc9 100644 --- a/src/ripple/app/consensus/RCLConsensus.h +++ b/src/ripple/app/consensus/RCLConsensus.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,7 @@ #include #include #include - +#include namespace ripple { class InboundTransactions; @@ -47,6 +48,12 @@ class ValidatorKeys; */ class RCLConsensus { + /** Warn for transactions that haven't been included every so many ledgers. */ + constexpr static unsigned int censorshipWarnInternal = 15; + + /** Stop warning after several warnings. */ + constexpr static unsigned int censorshipMaxWarnings = 5; + // Implements the Adaptor template interface required by Consensus. class Adaptor { @@ -76,6 +83,8 @@ class RCLConsensus std::chrono::milliseconds{0}}; std::atomic mode_{ConsensusMode::observing}; + RCLCensorshipDetector censorshipDetector_; + public: using Ledger_t = RCLCxLedger; using NodeID_t = NodeID; @@ -137,7 +146,7 @@ class RCLConsensus //--------------------------------------------------------------------- // The following members implement the generic Consensus requirements // and are marked private to indicate ONLY Consensus will call - // them (via friendship). Since they are callled only from Consenus + // them (via friendship). Since they are called only from Consenus // methods and since RCLConsensus::consensus_ should only be accessed // under lock, these will only be called under lock. // @@ -151,11 +160,11 @@ class RCLConsensus If not available, asynchronously acquires from the network. - @param ledger The ID/hash of the ledger acquire + @param hash The ID/hash of the ledger acquire @return Optional ledger, will be seated if we locally had the ledger */ boost::optional - acquireLedger(LedgerHash const& ledger); + acquireLedger(LedgerHash const& hash); /** Share the given proposal with all peers @@ -331,27 +340,29 @@ class RCLConsensus can be retried in the next round. @param previousLedger Prior ledger building upon - @param txns The set of transactions to apply to the ledger + @param retriableTxs On entry, the set of transactions to apply to + the ledger; on return, the set of transactions + to retry in the next round. @param closeTime The time the ledger closed @param closeTimeCorrect Whether consensus agreed on close time @param closeResolution Resolution used to determine consensus close time @param roundTime Duration of this consensus rorund - @param retriableTxs Populate with transactions to retry in next - round + @param failedTxs Populate with transactions that we could not + successfully apply. @return The newly built ledger */ RCLCxLedger buildLCL( RCLCxLedger const& previousLedger, - RCLTxSet const& txns, + CanonicalTXSet& retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, - CanonicalTXSet& retriableTxs); + std::set& failedTxs); - /** Validate the given ledger and share with peers as necessary + /** Validate the given ledger and share with peers as necessary @param ledger The ledger to validate @param txns The consensus transaction set @@ -363,7 +374,6 @@ class RCLConsensus */ void validate(RCLCxLedger const& ledger, RCLTxSet const& txns, bool proposing); - }; public: diff --git a/src/ripple/app/ledger/BuildLedger.h b/src/ripple/app/ledger/BuildLedger.h index c496483c3a1..1eb6b38dfac 100644 --- a/src/ripple/app/ledger/BuildLedger.h +++ b/src/ripple/app/ledger/BuildLedger.h @@ -44,9 +44,10 @@ class SHAMap; @param closeTime The time the ledger closed @param closeTimeCorrect Whether consensus agreed on close time @param closeResolution Resolution used to determine consensus close time - @param txs The consensus transactions to attempt to apply @param app Handle to application instance - @param retriableTxs Populate with transactions to retry in next round + @param txs On entry, transactions to apply; on exit, transactions that must + be retried in next round. + @param failedTxs Populated with transactions that failed in this round @param j Journal to use for logging @return The newly built ledger */ @@ -56,9 +57,9 @@ buildLedger( NetClock::time_point closeTime, const bool closeTimeCorrect, NetClock::duration closeResolution, - SHAMap const& txs, Application& app, - CanonicalTXSet& retriableTxs, + CanonicalTXSet& txns, + std::set& failedTxs, beast::Journal j); /** Build a new ledger by replaying transactions diff --git a/src/ripple/app/ledger/impl/BuildLedger.cpp b/src/ripple/app/ledger/impl/BuildLedger.cpp index e430818b4dd..7db67d493d7 100644 --- a/src/ripple/app/ledger/impl/BuildLedger.cpp +++ b/src/ripple/app/ledger/impl/BuildLedger.cpp @@ -45,109 +45,97 @@ buildLedgerImpl( beast::Journal j, ApplyTxs&& applyTxs) { - auto buildLCL = std::make_shared(*parent, closeTime); + auto built = std::make_shared(*parent, closeTime); - if (buildLCL->rules().enabled(featureSHAMapV2) && - !buildLCL->stateMap().is_v2()) - { - buildLCL->make_v2(); - } + if (built->rules().enabled(featureSHAMapV2) && !built->stateMap().is_v2()) + built->make_v2(); // Set up to write SHAMap changes to our database, // perform updates, extract changes { - OpenView accum(&*buildLCL); + OpenView accum(&*built); assert(!accum.open()); - applyTxs(accum, buildLCL); - accum.apply(*buildLCL); + applyTxs(accum, built); + accum.apply(*built); } - buildLCL->updateSkipList(); - + built->updateSkipList(); { // Write the final version of all modified SHAMap // nodes to the node store to preserve the new LCL - int const asf = buildLCL->stateMap().flushDirty( - hotACCOUNT_NODE, buildLCL->info().seq); - int const tmf = buildLCL->txMap().flushDirty( - hotTRANSACTION_NODE, buildLCL->info().seq); + int const asf = built->stateMap().flushDirty( + hotACCOUNT_NODE, built->info().seq); + int const tmf = built->txMap().flushDirty( + hotTRANSACTION_NODE, built->info().seq); JLOG(j.debug()) << "Flushed " << asf << " accounts and " << tmf << " transaction nodes"; } - buildLCL->unshare(); + built->unshare(); // Accept ledger - buildLCL->setAccepted( + built->setAccepted( closeTime, closeResolution, closeTimeCorrect, app.config()); - return buildLCL; + return built; } /** Apply a set of consensus transactions to a ledger. @param app Handle to application - @param txns Consensus transactions to apply - @param view Ledger to apply to - @param buildLCL Ledger to check if transaction already exists + @param txns the set of transactions to apply, + @param failed set of transactions that failed to apply + @param view ledger to apply to @param j Journal for logging - @return Any retriable transactions + @return number of transactions applied; transactions to retry left in txns */ -CanonicalTXSet +std::size_t applyTransactions( Application& app, - SHAMap const& txns, + std::shared_ptr const& built, + CanonicalTXSet& txns, + std::set& failed, OpenView& view, - std::shared_ptr const& buildLCL, beast::Journal j) { - CanonicalTXSet retriableTxs(txns.getHash().as_uint256()); - - for (auto const& item : txns) - { - if (buildLCL->txExists(item.key())) - continue; - - // The transaction wasn't filtered - // Add it to the set to be tried in canonical order - JLOG(j.debug()) << "Processing candidate transaction: " << item.key(); - try - { - retriableTxs.insert( - std::make_shared(SerialIter{item.slice()})); - } - catch (std::exception const&) - { - JLOG(j.warn()) << "Txn " << item.key() << " throws"; - } - } - bool certainRetry = true; + std::size_t count = 0; + // Attempt to apply all of the retriable transactions for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) { - JLOG(j.debug()) << "Pass: " << pass << " Txns: " << retriableTxs.size() - << (certainRetry ? " retriable" : " final"); + JLOG(j.debug()) + << (certainRetry ? "Pass: " : "Final pass: ") << pass + << " begins (" << txns.size() << " transactions)"; int changes = 0; - auto it = retriableTxs.begin(); + auto it = txns.begin(); - while (it != retriableTxs.end()) + while (it != txns.end()) { + auto const txid = it->first.getTXID(); + try { + if (pass == 0 && built->txExists(txid)) + { + it = txns.erase(it); + continue; + } + switch (applyTransaction( app, view, *it->second, certainRetry, tapNONE, j)) { case ApplyResult::Success: - it = retriableTxs.erase(it); + it = txns.erase(it); ++changes; break; case ApplyResult::Fail: - it = retriableTxs.erase(it); + failed.insert(txid); + it = txns.erase(it); break; case ApplyResult::Retry: @@ -156,17 +144,19 @@ applyTransactions( } catch (std::exception const&) { - JLOG(j.warn()) << "Transaction throws"; - it = retriableTxs.erase(it); + JLOG(j.warn()) << "Transaction " << txid << " throws"; + failed.insert(txid); + it = txns.erase(it); } } - JLOG(j.debug()) << "Pass: " << pass << " finished " << changes - << " changes"; + JLOG(j.debug()) + << (certainRetry ? "Pass: " : "Final pass: ") << pass + << " completed (" << changes << " changes)"; // A non-retry pass made no changes if (!changes && !certainRetry) - return retriableTxs; + break; // Stop retriable passes if (!changes || (pass >= LEDGER_RETRY_PASSES)) @@ -175,8 +165,8 @@ applyTransactions( // If there are any transactions left, we must have // tried them in at least one final pass - assert(retriableTxs.empty() || !certainRetry); - return retriableTxs; + assert(txns.empty() || !certainRetry); + return count; } // Build a ledger from consensus transactions @@ -186,24 +176,35 @@ buildLedger( NetClock::time_point closeTime, const bool closeTimeCorrect, NetClock::duration closeResolution, - SHAMap const& txs, Application& app, - CanonicalTXSet& retriableTxs, + CanonicalTXSet& txns, + std::set& failedTxns, beast::Journal j) { - JLOG(j.debug()) << "Report: TxSt = " << txs.getHash().as_uint256() + JLOG(j.debug()) << "Report: Transaction Set = " << txns.key() << ", close " << closeTime.time_since_epoch().count() << (closeTimeCorrect ? "" : " (incorrect)"); - return buildLedgerImpl( - parent, - closeTime, - closeTimeCorrect, - closeResolution, - app, - j, - [&](OpenView& accum, std::shared_ptr const& buildLCL) { - retriableTxs = applyTransactions(app, txs, accum, buildLCL, j); + return buildLedgerImpl(parent, closeTime, closeTimeCorrect, + closeResolution, app, j, + [&](OpenView& accum, std::shared_ptr const& built) + { + JLOG(j.debug()) + << "Attempting to apply " << txns.size() + << " transactions"; + + auto const applied = applyTransactions(app, built, txns, + failedTxns, accum, j); + + if (txns.size() || txns.size()) + JLOG(j.debug()) + << "Applied " << applied << " transactions; " + << failedTxns.size() << " failed and " + << txns.size() << " will be retried."; + else + JLOG(j.debug()) + << "Applied " << applied + << " transactions."; }); } @@ -226,7 +227,8 @@ buildLedger( replayLedger->info().closeTimeResolution, app, j, - [&](OpenView& accum, std::shared_ptr const& buildLCL) { + [&](OpenView& accum, std::shared_ptr const& built) + { for (auto& tx : replayData.orderedTxns()) applyTransaction(app, accum, *tx.second, false, applyFlags, j); }); diff --git a/src/ripple/app/misc/CanonicalTXSet.cpp b/src/ripple/app/misc/CanonicalTXSet.cpp index 984fb312357..94b2e7a3fde 100644 --- a/src/ripple/app/misc/CanonicalTXSet.cpp +++ b/src/ripple/app/misc/CanonicalTXSet.cpp @@ -81,13 +81,13 @@ uint256 CanonicalTXSet::accountKey (AccountID const& account) ret.begin (), account.begin (), account.size ()); - ret ^= mSetHash; + ret ^= salt_; return ret; } void CanonicalTXSet::insert (std::shared_ptr const& txn) { - mMap.insert ( + map_.insert ( std::make_pair ( Key ( accountKey (txn->getAccountID(sfAccount)), @@ -106,29 +106,16 @@ CanonicalTXSet::prune(AccountID const& account, Key keyHigh(effectiveAccount, seq+1, beast::zero); auto range = boost::make_iterator_range( - mMap.lower_bound(keyLow), - mMap.lower_bound(keyHigh)); - auto txRange = boost::adaptors::transform( - range, - [](auto const& p) - { - return p.second; - }); + map_.lower_bound(keyLow), + map_.lower_bound(keyHigh)); + auto txRange = boost::adaptors::transform(range, + [](auto const& p) { return p.second; }); std::vector> result( txRange.begin(), txRange.end()); - mMap.erase(range.begin(), range.end()); - + map_.erase(range.begin(), range.end()); return result; } -CanonicalTXSet::iterator CanonicalTXSet::erase (iterator const& it) -{ - iterator tmp = it; - ++tmp; - mMap.erase (it); - return tmp; -} - } // ripple diff --git a/src/ripple/app/misc/CanonicalTXSet.h b/src/ripple/app/misc/CanonicalTXSet.h index 24cff2fe3c1..1df590ef8df 100644 --- a/src/ripple/app/misc/CanonicalTXSet.h +++ b/src/ripple/app/misc/CanonicalTXSet.h @@ -75,12 +75,11 @@ class CanonicalTXSet uint256 accountKey (AccountID const& account); public: - using iterator = std::map >::iterator; using const_iterator = std::map >::const_iterator; public: explicit CanonicalTXSet (LedgerHash const& saltHash) - : mSetHash (saltHash) + : salt_ (saltHash) { } @@ -90,45 +89,46 @@ class CanonicalTXSet prune(AccountID const& account, std::uint32_t const seq); // VFALCO TODO remove this function - void reset (LedgerHash const& saltHash) + void reset (LedgerHash const& salt) { - mSetHash = saltHash; - - mMap.clear (); + salt_ = salt; + map_.clear (); } - iterator erase (iterator const& it); - - iterator begin () - { - return mMap.begin (); - } - iterator end () + const_iterator erase (const_iterator const& it) { - return mMap.end (); + return map_.erase(it); } - const_iterator begin () const + + const_iterator begin () const { - return mMap.begin (); + return map_.begin(); } - const_iterator end () const + + const_iterator end() const { - return mMap.end (); + return map_.end(); } + size_t size () const { - return mMap.size (); + return map_.size (); } bool empty () const { - return mMap.empty (); + return map_.empty (); + } + + uint256 const& key() const + { + return salt_; } private: - // Used to salt the accounts so people can't mine for low account numbers - uint256 mSetHash; + std::map > map_; - std::map > mMap; + // Used to salt the accounts so people can't mine for low account numbers + uint256 salt_; }; } // ripple diff --git a/src/ripple/app/tx/impl/apply.cpp b/src/ripple/app/tx/impl/apply.cpp index 747ad2c0fd1..98daa3f244a 100644 --- a/src/ripple/app/tx/impl/apply.cpp +++ b/src/ripple/app/tx/impl/apply.cpp @@ -123,16 +123,12 @@ applyTransaction (Application& app, OpenView& view, if (retryAssured) flags = flags | tapRETRY; - JLOG (j.debug()) << "TXN " - << txn.getTransactionID () - //<< (engine.view().open() ? " open" : " closed") - // because of the optional in engine + JLOG (j.debug()) << "TXN " << txn.getTransactionID () << (retryAssured ? "/retry" : "/final"); try { - auto const result = apply(app, - view, txn, flags, j); + auto const result = apply(app, view, txn, flags, j); if (result.second) { JLOG (j.debug()) diff --git a/src/test/app/RCLCensorshipDetector_test.cpp b/src/test/app/RCLCensorshipDetector_test.cpp new file mode 100644 index 00000000000..6c4fd6d4262 --- /dev/null +++ b/src/test/app/RCLCensorshipDetector_test.cpp @@ -0,0 +1,96 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2018 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +namespace ripple { +namespace test { + +class RCLCensorshipDetector_test : public beast::unit_test::suite +{ + void test( + RCLCensorshipDetector& cdet, int round, + std::vector proposed, std::vector accepted, + std::vector remain, std::vector remove) + { + // Begin tracking what we're proposing this round + cdet.propose(round, std::move(proposed)); + + // Finalize the round, by processing what we accepted; then + // remove anything that needs to be removed and ensure that + // what remains is correct. + cdet.check(std::move(accepted), + [&remove, &remain](auto id, auto seq) + { + // If the item is supposed to be removed from the censorship + // detector internal tracker manually, do it now: + if (std::find(remove.begin(), remove.end(), id) != remove.end()) + return true; + + // If the item is supposed to still remain in the censorship + // detector internal tracker; remove it from the vector. + auto it = std::find(remain.begin(), remain.end(), id); + if (it != remain.end()) + remain.erase(it); + return false; + }); + + // On entry, this set contained all the elements that should be tracked + // by the detector after we process this round. We removed all the items + // that actually were in the tracker, so this should now be empty: + BEAST_EXPECT(remain.empty()); + } + +public: + void + run() override + { + testcase ("Censorship Detector"); + + RCLCensorshipDetector cdet; + int round = 0; + + test(cdet, ++round, { }, { }, { }, { }); + test(cdet, ++round, { 10, 11, 12, 13 }, { 11, 2 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 13, 14, 15 }, { 14 }, { 10, 13, 15 }, { }); + test(cdet, ++round, { 10, 13, 15, 16 }, { 15, 16 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 13 }, { 17, 18 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 19 }, { }, { 10, 19 }, { }); + test(cdet, ++round, { 10, 19, 20 }, { 20 }, { 10 }, { 19 }); + test(cdet, ++round, { 21 }, { 21 }, { }, { }); + test(cdet, ++round, { }, { 22 }, { }, { }); + test(cdet, ++round, { 23, 24, 25, 26 }, { 25, 27 }, { 23, 26 }, { 24 }); + test(cdet, ++round, { 23, 26, 28 }, { 26, 28 }, { 23 }, { }); + + for (int i = 0; i != 10; ++i) + test(cdet, ++round, { 23 }, { }, { 23 }, { }); + + test(cdet, ++round, { 23, 29 }, { 29 }, { 23 }, { }); + test(cdet, ++round, { 30, 31 }, { 31 }, { 30 }, { }); + test(cdet, ++round, { 30 }, { 30 }, { }, { }); + test(cdet, ++round, { }, { }, { }, { }); + } +}; + +BEAST_DEFINE_TESTSUITE(RCLCensorshipDetector, app, ripple); +} // namespace test +} // namespace ripple diff --git a/src/test/unity/app_test_unity2.cpp b/src/test/unity/app_test_unity2.cpp index e9ea03e1d42..1f4b9678fc6 100644 --- a/src/test/unity/app_test_unity2.cpp +++ b/src/test/unity/app_test_unity2.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include