Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

beacon: Cull beacon db memory (passivation) #2069

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 144 additions & 31 deletions src/gridcoin/beacon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ bool BeaconRegistry::TryRenewal(Beacon_ptr& current_beacon_ptr, int& height, con
// Place a smart shared pointer to the renewed beacon in the active beacons map. Note that the
// subscript form of the insert with the same key replaces the current beacon entry with the
//renewal.
m_beacons[payload.m_cpid] = std::make_shared<Beacon>(renewal_iter->second);
m_beacons[payload.m_cpid] = renewal_iter->second;

return true;
}
Expand Down Expand Up @@ -446,7 +446,7 @@ void BeaconRegistry::Add(const ContractContext& ctx)
historical.GetAddress().ToString(),
historical.m_hash.GetHex());
}
m_beacons[payload.m_cpid] = std::make_shared<Beacon>(m_beacon_db[ctx.m_tx.GetHash()]);
m_beacons[payload.m_cpid] = m_beacon_db.find(ctx.m_tx.GetHash())->second;
return;
}

Expand Down Expand Up @@ -480,7 +480,7 @@ void BeaconRegistry::Add(const ContractContext& ctx)
}

// Insert a pointer to the entry in the m_pending map.
m_pending[pending.GetId()] = std::make_shared<Beacon>(m_beacon_db[ctx.m_tx.GetHash()]);
m_pending[pending.GetId()] = m_beacon_db.find(ctx.m_tx.GetHash())->second;
}

void BeaconRegistry::Delete(const ContractContext& ctx)
Expand Down Expand Up @@ -652,11 +652,11 @@ void BeaconRegistry::Revert(const ContractContext& ctx)
if (resurrect_iter != m_beacon_db.end())
{
// This is an element to the desired entry in the historical table.
Beacon& resurrected_beacon = resurrect_iter->second;
Beacon_ptr& resurrected_beacon = resurrect_iter->second;

// Resurrect the prior beacon.
// ------------- cpid -------------- smart shared pointer to element in historical map.
m_beacons[cpid] = std::make_shared<Beacon>(resurrected_beacon);
m_beacons[cpid] = resurrected_beacon;
}

// Erase the renewal record in the db that was reverted. No reason to keep it.
Expand Down Expand Up @@ -688,12 +688,12 @@ void BeaconRegistry::Revert(const ContractContext& ctx)

if (deleted_beacon_record != m_beacon_db.end())
{
auto record_to_restore = m_beacon_db.find(deleted_beacon_record->second.m_prev_beacon_hash);
auto record_to_restore = m_beacon_db.find(deleted_beacon_record->second->m_prev_beacon_hash);

if (record_to_restore != m_beacon_db.end())
{
// Get a smart shared pointer to the beacon to restore
Beacon_ptr beacon_to_restore_ptr = std::make_shared<Beacon>(record_to_restore->second);
Beacon_ptr beacon_to_restore_ptr = record_to_restore->second;

// Check the beacon's status. If it was ACTIVE or RENEWAL, put it back in the m_beacons map
// under the cpid.
Expand Down Expand Up @@ -866,8 +866,7 @@ void BeaconRegistry::ActivatePending(

// This is the subscript form of insert. Important here because an activated beacon should
// overwrite any existing entry in the m_beacons map.
m_beacons[activated_beacon.m_cpid] =
std::make_shared<Beacon>(m_beacon_db[activated_beacon.m_hash]);
m_beacons[activated_beacon.m_cpid] = m_beacon_db.find(activated_beacon.m_hash)->second;

// Remove the pending beacon entry from the pending map. (Note this entry still exists in the historical
// table and the db.
Expand Down Expand Up @@ -940,8 +939,8 @@ void BeaconRegistry::Deactivate(const uint256 superblock_hash)
}

// Resurrect the pending record prior to the activation. This points to the pending record still in the db.
m_pending[static_cast<PendingBeacon>(pending_beacon_entry->second).GetId()] =
std::make_shared<Beacon>(pending_beacon_entry->second);
m_pending[static_cast<PendingBeacon>(*pending_beacon_entry->second).GetId()] =
pending_beacon_entry->second;

// Erase the entry from the active beacons map. This also increments the iterator.
iter = m_beacons.erase(iter);
Expand All @@ -964,22 +963,22 @@ void BeaconRegistry::Deactivate(const uint256 superblock_hash)
while (iter != m_beacon_db.end())
{
// The cpid in the historical beacon record to be matched.
Cpid cpid = iter->second.m_cpid;
Cpid cpid = iter->second->m_cpid;

uint256 match_hash = Hash(superblock_hash.begin(),
superblock_hash.end(),
iter->second.m_prev_beacon_hash.begin(),
iter->second.m_prev_beacon_hash.end());
iter->second->m_prev_beacon_hash.begin(),
iter->second->m_prev_beacon_hash.end());

// If the calculated match_hash matches the key (hash) of the historical beacon record, then
// restore the previous record pointed to by the historical beacon record to the pending map.
if (match_hash == iter->first)
{
uint256 resurrect_pending_hash = iter->second.m_prev_beacon_hash;
uint256 resurrect_pending_hash = iter->second->m_prev_beacon_hash;

if (!resurrect_pending_hash.IsNull())
{
Beacon_ptr resurrected_pending = std::make_shared<Beacon>(m_beacon_db.find(resurrect_pending_hash)->second);
Beacon_ptr resurrected_pending = m_beacon_db.find(resurrect_pending_hash)->second;

// Check that the status of the beacon to resurrect is PENDING. If it is not log an error but continue
// anyway.
Expand Down Expand Up @@ -1137,6 +1136,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
}

uint64_t recnum_high_watermark = 0;
uint64_t number_passivated = 0;

// Replay the storage map. The iterator is ordered by record number, which ensures that the correct
// elements end up in m_beacons and m_pending. Storage entries that are "mark deletions" are also inserted
Expand All @@ -1162,8 +1162,15 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&

// Insert the entry into the historical map. This includes ctx's where the beacon is marked deleted.
// --------------- hash ---------- does NOT include the Cpid.
m_historical[iter.second.m_hash] = beacon;
Beacon& historical_beacon = m_historical[iter.second.m_hash];
m_historical[iter.second.m_hash] = std::make_shared<Beacon>(beacon);
Beacon_ptr& historical_beacon_ptr = m_historical[iter.second.m_hash];

BeaconRegistry::HistoricalBeaconMap::iterator prev_historical_iter = m_historical.end();

if (!historical_beacon_ptr->m_prev_beacon_hash.IsNull())
{
prev_historical_iter = m_historical.find(historical_beacon_ptr->m_prev_beacon_hash);
}

if (beacon.m_status == BeaconStatusForStorage::PENDING)
{
Expand All @@ -1180,7 +1187,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
);

// Insert the pending beacon in the pending map.
m_pending[beacon.GetId()] = std::make_shared<Beacon>(historical_beacon);
m_pending[beacon.GetId()] = historical_beacon_ptr;
}

if (beacon.m_status == BeaconStatusForStorage::ACTIVE || beacon.m_status == BeaconStatusForStorage::RENEWAL)
Expand All @@ -1198,7 +1205,7 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
);

// Insert or replace the existing map entry for the cpid with the latest active or renewed for that CPID.
m_beacons[beacon.m_cpid] = std::make_shared<Beacon>(historical_beacon);
m_beacons[beacon.m_cpid] = historical_beacon_ptr;

// Delete any entry in the pending map with THE SAME public key.
auto pending_to_delete = m_pending.find(beacon.GetId());
Expand Down Expand Up @@ -1256,11 +1263,28 @@ int BeaconRegistry::BeaconDB::Initialize(PendingBeaconMap& m_pending, BeaconMap&
m_beacons.erase(beacon.m_cpid);
m_pending.erase(beacon.m_public_key.GetID());
}

if (prev_historical_iter != m_historical.end())
{
// Note that passivation is not expected to be successful for every call. See the comments
// in the passivate() function.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool> passivation_result
= passivate(prev_historical_iter);

number_passivated += passivation_result.second;
}
}

LogPrint(LogFlags::BEACON, "INFO: %s: number of historical records passivated: %" PRId64 ".",
__func__,
number_passivated);

// Set the in-memory record number stored variable to the highest recnum encountered during the replay above.
m_recnum_stored = recnum_high_watermark;

// Set the needs passivation flag to true, because the one-by-one passivation done above may not catch everything.
m_needs_passivation = true;

return height;
}

Expand All @@ -1271,6 +1295,26 @@ void BeaconRegistry::ResetInMemoryOnly()
m_beacon_db.clear_in_memory_only();
}

uint64_t BeaconRegistry::PassivateDB()
{
return m_beacon_db.passivate_db();
}

// This is static and called by the scheduler.
void BeaconRegistry::RunBeaconDBPassivation()
{
TRY_LOCK(cs_main, locked_main);

if (!locked_main)
{
return;
}

BeaconRegistry& beacons = GetBeaconRegistry();

beacons.PassivateDB();
}

// Required to make the linker happy.
constexpr uint32_t BeaconRegistry::BeaconDB::CURRENT_VERSION;

Expand All @@ -1280,6 +1324,7 @@ void BeaconRegistry::BeaconDB::clear_in_memory_only()
m_database_init = false;
m_height_stored = 0;
m_recnum_stored = 0;
m_needs_passivation = false;
}

bool BeaconRegistry::BeaconDB::clear_leveldb()
Expand All @@ -1305,11 +1350,41 @@ bool BeaconRegistry::BeaconDB::clear_leveldb()
m_height_stored = 0;
m_recnum_stored = 0;
m_database_init = false;
m_needs_passivation = false;
m_needs_IsContract_correction = false;

return status;
}

uint64_t BeaconRegistry::BeaconDB::passivate_db()
{
uint64_t number_passivated = 0;

// Don't bother to go through the historical beacon map unless the needs passivation flag is set. This makes
// this function extremely light for most calls from the periodic schedule.
if (m_needs_passivation)
{
for (auto iter = m_historical.begin(); iter != m_historical.end(); /*no-op*/)
{
// The passivate function increments the iterator.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool> result = passivate(iter);

iter = result.first;
number_passivated += result.second;

}
}

LogPrint(BCLog::LogFlags::BEACON, "INFO %s: Passivated %" PRId64 " elements from beacon db.",
__func__,
number_passivated);

// Set needs passivation flag to false after passivating the db.
m_needs_passivation = false;

return number_passivated;
}

bool BeaconRegistry::BeaconDB::clear()
{
clear_in_memory_only();
Expand Down Expand Up @@ -1403,17 +1478,21 @@ bool BeaconRegistry::BeaconDB::insert(const uint256 &hash, const int& height, co
beacon.m_status.Raw() // status
);

m_historical.insert(std::make_pair(hash, beacon));
m_historical.insert(std::make_pair(hash, std::make_shared<Beacon>(beacon)));

status = Store(hash, static_cast<StorageBeacon>(beacon));

if (height) status &= StoreDBHeight(height);

// Set needs passivation flag to true to allow the scheduled passivation to remove unnecessary records from
// memory.
m_needs_passivation = true;

return status;
}
}

bool BeaconRegistry::BeaconDB::erase(uint256 hash)
bool BeaconRegistry::BeaconDB::erase(const uint256& hash)
{
auto iter = m_historical.find(hash);

Expand All @@ -1425,6 +1504,42 @@ bool BeaconRegistry::BeaconDB::erase(uint256 hash)
return Delete(hash);
}

// Note that this function uses the shared pointer use_count() to determine whether an element in
// m_historical is referenced by either the m_beacons or m_pending map and if not, erases it, leaving the backing
// state in leveldb untouched. Note that the use of use_count() in multithreaded environments must be carefully
// considered because it is only approximate. In this case it is exact. Access to the entire BeaconRegistry class
// and everything in it is protected by the cs_main lock and is therefore single threaded. This method of passivating
// is MUCH faster than searching through m_beacons and m_pending for each element, because they are not keyed by hash.
//
// Note that this function acts very similarly to the map erase function with an iterator argument, but with a standard
// pair returned. The first part of the pair a boolean as to whether the element was passivated, and the
// second is is an iterator to the next element. This is designed to be traversed in a for loop just like map erase.
std::pair<BeaconRegistry::HistoricalBeaconMap::iterator, bool>
BeaconRegistry::BeaconDB::passivate(BeaconRegistry::HistoricalBeaconMap::iterator& iter)
{
// m-historical itself holds one reference, additional references can be held by m_beacons and m_pending.
// If there is only one reference then remove the shared_pointer from m_historical, which will implicitly destroy
// the shared_pointer object.
if (iter->second.use_count() == 1)
{
iter = m_historical.erase(iter);
return std::make_pair(iter, true);
}
else
{
LogPrint(BCLog::LogFlags::BEACON, "INFO: %s: Passivate called for historical beacon record with hash %s that "
"has existing reference count %li. This is expected under certain situations, "
"such as a forced (re)advertisement, where the new pending beacon is allowed "
"to expire, while the original is still active.",
__func__,
iter->second->m_hash.GetHex(),
iter->second.use_count());
jamescowens marked this conversation as resolved.
Show resolved Hide resolved

++iter;
return std::make_pair(iter, false);
}
}

BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::begin()
{
return m_historical.begin();
Expand All @@ -1435,7 +1550,7 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::end()
return m_historical.end();
}

BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(uint256& hash)
BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(const uint256& hash)
{
// See if beacon from that ctx_hash is already in the historical map. If so, get iterator.
auto iter = m_historical.find(hash);
Expand All @@ -1448,7 +1563,10 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::find(uin
// If the load from leveldb is successful, insert into the historical map and return the iterator.
if (Load(hash, beacon))
{
iter = m_historical.insert(std::make_pair(hash, static_cast<Beacon>(beacon))).first;
iter = m_historical.insert(std::make_pair(hash, std::make_shared<Beacon>(static_cast<Beacon>(beacon)))).first;

// Set the needs passivation flag to true
m_needs_passivation = true;
}
}

Expand All @@ -1463,11 +1581,6 @@ BeaconRegistry::HistoricalBeaconMap::iterator BeaconRegistry::BeaconDB::advance(
return ++iter;
}

Beacon& BeaconRegistry::BeaconDB::operator[](const uint256& hash)
{
return m_historical[hash];
}

bool BeaconRegistry::BeaconDB::Store(const uint256& hash, const StorageBeacon& beacon)
{
CTxDB txdb("rw");
Expand All @@ -1479,7 +1592,7 @@ bool BeaconRegistry::BeaconDB::Store(const uint256& hash, const StorageBeacon& b
return txdb.WriteGenericSerializable(key, std::make_pair(m_recnum_stored, beacon));
}

bool BeaconRegistry::BeaconDB::Load(uint256& hash, StorageBeacon& beacon)
bool BeaconRegistry::BeaconDB::Load(const uint256& hash, StorageBeacon& beacon)
{
CTxDB txdb("r");

Expand All @@ -1494,7 +1607,7 @@ bool BeaconRegistry::BeaconDB::Load(uint256& hash, StorageBeacon& beacon)
return status;
}

bool BeaconRegistry::BeaconDB::Delete(uint256& hash)
bool BeaconRegistry::BeaconDB::Delete(const uint256& hash)
{
CTxDB txdb("rw");

Expand Down
Loading