Skip to content

Commit

Permalink
Flush chain repo and wait for store to catch up before flushing conse… (
Browse files Browse the repository at this point in the history
#223)

* Flush chain repo and wait for store to catch up before flushing consensus

* Fix test
  • Loading branch information
dangershony authored Sep 30, 2020
1 parent b2a9869 commit 734324a
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 106 deletions.
101 changes: 55 additions & 46 deletions src/Blockcore/Base/ChainRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public class ChainRepository : IChainRepository

private BlockLocator locator;

private object lockObj;

public Network Network { get; }

public ChainRepository(ILoggerFactory loggerFactory, IChainStore chainStore, Network network)
Expand All @@ -38,6 +40,7 @@ public ChainRepository(ILoggerFactory loggerFactory, IChainStore chainStore, Net

this.chainStore = chainStore;
this.Network = network;
this.lockObj = new object();

this.logger = loggerFactory.CreateLogger(this.GetType().FullName);
}
Expand All @@ -47,39 +50,42 @@ public Task<ChainedHeader> LoadAsync(ChainedHeader genesisHeader)
{
Task<ChainedHeader> task = Task.Run(() =>
{
ChainedHeader tip = null;

ChainData data = this.chainStore.GetChainData(0);

if (data == null)
lock (this.lockObj)
{
genesisHeader.SetChainStore(this.chainStore);
return genesisHeader;
}
ChainedHeader tip = null;

Guard.Assert(data.Hash == genesisHeader.HashBlock); // can't swap networks

int index = 0;
while (true)
{
data = this.chainStore.GetChainData((index));
ChainData data = this.chainStore.GetChainData(0);

if (data == null)
break;

tip = new ChainedHeader(data.Hash, data.Work, tip);
if (tip.Height == 0) tip.SetChainStore(this.chainStore);
index++;
}

if (tip == null)
{
genesisHeader.SetChainStore(this.chainStore);
tip = genesisHeader;
{
genesisHeader.SetChainStore(this.chainStore);
return genesisHeader;
}

Guard.Assert(data.Hash == genesisHeader.HashBlock); // can't swap networks

int index = 0;
while (true)
{
data = this.chainStore.GetChainData((index));

if (data == null)
break;

tip = new ChainedHeader(data.Hash, data.Work, tip);
if (tip.Height == 0) tip.SetChainStore(this.chainStore);
index++;
}

if (tip == null)
{
genesisHeader.SetChainStore(this.chainStore);
tip = genesisHeader;
}

this.locator = tip.GetLocator();
return tip;
}

this.locator = tip.GetLocator();
return tip;
});

return task;
Expand All @@ -92,26 +98,29 @@ public Task SaveAsync(ChainIndexer chainIndexer)

Task task = Task.Run(() =>
{
ChainedHeader fork = this.locator == null ? null : chainIndexer.FindFork(this.locator);
ChainedHeader tip = chainIndexer.Tip;
ChainedHeader toSave = tip;

var headers = new List<ChainedHeader>();
while (toSave != fork)
lock (this.lockObj)
{
headers.Add(toSave);
toSave = toSave.Previous;
ChainedHeader fork = this.locator == null ? null : chainIndexer.FindFork(this.locator);
ChainedHeader tip = chainIndexer.Tip;
ChainedHeader toSave = tip;

var headers = new List<ChainedHeader>();
while (toSave != fork)
{
headers.Add(toSave);
toSave = toSave.Previous;
}

var items = headers.OrderBy(b => b.Height).Select(h => new ChainDataItem
{
Height = h.Height,
Data = new ChainData { Hash = h.HashBlock, Work = h.ChainWorkBytes }
});

this.chainStore.PutChainData(items);

this.locator = tip.GetLocator();
}

var items = headers.OrderBy(b => b.Height).Select(h => new ChainDataItem
{
Height = h.Height,
Data = new ChainData { Hash = h.HashBlock, Work = h.ChainWorkBytes }
});

this.chainStore.PutChainData(items);

this.locator = tip.GetLocator();
});

return task;
Expand Down
122 changes: 75 additions & 47 deletions src/Features/Blockcore.Features.Consensus/CoinViews/CachedCoinView.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Blockcore.Base;
using Blockcore.Configuration.Settings;
using Blockcore.Consensus;
using Blockcore.Features.Consensus.CoinViews.Coindb;
Expand Down Expand Up @@ -130,7 +132,16 @@ public long GetScriptSize

private readonly Random random;

public CachedCoinView(Network network, ICheckpoints checkpoints, ICoindb coindb, IDateTimeProvider dateTimeProvider, ILoggerFactory loggerFactory, INodeStats nodeStats, ConsensusSettings consensusSettings, StakeChainStore stakeChainStore = null, IRewindDataIndexCache rewindDataIndexCache = null)
public CachedCoinView(
Network network,
ICheckpoints checkpoints,
ICoindb coindb,
IDateTimeProvider dateTimeProvider,
ILoggerFactory loggerFactory,
INodeStats nodeStats,
ConsensusSettings consensusSettings,
StakeChainStore stakeChainStore = null,
IRewindDataIndexCache rewindDataIndexCache = null)
{
Guard.NotNull(coindb, nameof(CachedCoinView.coindb));

Expand Down Expand Up @@ -303,37 +314,43 @@ private void TryEvictCacheLocked()
}
}

/// <summary>
/// Check if periodic flush is required.
/// The conditions to flash the cache are if <see cref="CacheFlushTimeIntervalSeconds"/> is elapsed
/// or if <see cref="MaxCacheSizeBytes"/> is reached.
/// </summary>
/// <returns>True if the coinview needs to flush</returns>
public bool ShouldFlush()
{
DateTime now = this.dateTimeProvider.GetUtcNow();
bool flushTimeLimit = (now - this.lastCacheFlushTime).TotalSeconds >= this.CacheFlushTimeIntervalSeconds;

// The size of the cache was reached and most likely TryEvictCacheLocked didn't work
// so the cache is polluted with flushable items, then we flush anyway.

long totalBytes = this.cacheSizeBytes + this.rewindDataSizeBytes;
bool flushSizeLimit = totalBytes > this.MaxCacheSizeBytes;

if (!flushTimeLimit && !flushSizeLimit)
{
return false;
}

this.logger.LogDebug("Flushing, reasons flushTimeLimit={0} flushSizeLimit={1}.", flushTimeLimit, flushSizeLimit);

return true;
}

/// <summary>
/// Finds all changed records in the cache and persists them to the underlying coinview.
/// </summary>
/// <param name="force"><c>true</c> to enforce flush, <c>false</c> to flush only if <see cref="lastCacheFlushTime"/> is older than <see cref="CacheFlushTimeIntervalSeconds"/>.</param>
/// <remarks>
/// WARNING: This method can only be run from <see cref="ConsensusLoop.Execute(System.Threading.CancellationToken)"/> thread context
/// or when consensus loop is stopped. Otherwise, there is a risk of race condition when the consensus loop accepts new block.
/// </remarks>
public void Flush(bool force = true)
{
if (!force)
{
// Check if periodic flush is reuired.
// Ideally this will flush less frequent and always be behind
// blockstore which is currently set to 17 sec.

DateTime now = this.dateTimeProvider.GetUtcNow();
bool flushTimeLimit = (now - this.lastCacheFlushTime).TotalSeconds >= this.CacheFlushTimeIntervalSeconds;

// The size of the cache was reached and most likely TryEvictCacheLocked didn't work
// so the cahces is pulledted with flushable items, then we flush anyway.

long totalBytes = this.cacheSizeBytes + this.rewindDataSizeBytes;
bool flushSizeLimit = totalBytes > this.MaxCacheSizeBytes;

if (!flushTimeLimit && !flushSizeLimit)
{
if (!this.ShouldFlush())
return;
}

this.logger.LogDebug("Flushing, reasons flushTimeLimit={0} flushSizeLimit={1}.", flushTimeLimit, flushSizeLimit);
}

// Before flushing the coinview persist the stake store
Expand Down Expand Up @@ -408,7 +425,7 @@ public void SaveChanges(IList<UnspentOutput> outputs, HashHeightPair oldBlockHas
if (!this.cachedUtxoItems.TryGetValue(output.OutPoint, out CacheItem cacheItem))
{
// Add outputs to cache, this will happen for two cases
// 1. if a chaced item was evicted
// 1. if a cached item was evicted
// 2. for new outputs that are added

if (output.CreatedFromBlock)
Expand All @@ -428,8 +445,8 @@ public void SaveChanges(IList<UnspentOutput> outputs, HashHeightPair oldBlockHas
}
else
{
// This can happen if the cashe item was evicted while
// the block was being processed, fetch the outut again from disk.
// This can happen if the cached item was evicted while
// the block was being processed, fetch the output again from disk.

this.logger.LogDebug("Outpoint '{0}' is not found in cache, creating it.", output.OutPoint);

Expand Down Expand Up @@ -521,12 +538,12 @@ public void SaveChanges(IList<UnspentOutput> outputs, HashHeightPair oldBlockHas

this.logger.LogDebug("Coin override alllowed for utxo '{0}'.", cacheItem.OutPoint);

// Deduct the crurrent script size form the
// Deduct the current script size form the
// total cache size, it will be added again later.
this.cacheSizeBytes -= cacheItem.GetScriptSize;

// Clear this in order to calculate the cache sie
// this will get set later when overriden
// Clear this in order to calculate the cache size
// this will get set later when overridden
cacheItem.Coins = null;
}

Expand Down Expand Up @@ -573,27 +590,11 @@ public void SaveChanges(IList<UnspentOutput> outputs, HashHeightPair oldBlockHas
// When cache is flushed the rewind data will allow to rewind the node up to the
// number of rewind blocks.
// TODO: move rewind data to use block store.
// Rewind data can go away all togetehr if the node uses the blocks in block store
// Rewind data can go away all together if the node uses the blocks in block store
// to get the rewind information, blockstore persists much more frequent then coin cache
// So using block store for rewinds is not entirely impossible.

uint rewindDataWindow = 10;

if (this.blockHash.Height >= this.lastCheckpointHeight)
{
if (this.network.Consensus.MaxReorgLength != 0)
{
rewindDataWindow = this.network.Consensus.MaxReorgLength + 1;
}
else
{
// TODO: make the rewind data window a configuration
// parameter of evern a network parameter.

// For POW assume BTC where a rewind data of 100 is more then enough.
rewindDataWindow = 100;
}
}
int rewindDataWindow = this.CalculateRewindWindow();

int rewindToRemove = this.blockHash.Height - (int)rewindDataWindow;

Expand All @@ -606,6 +607,33 @@ public void SaveChanges(IList<UnspentOutput> outputs, HashHeightPair oldBlockHas
}
}

/// <summary>
/// Calculate the window of how many rewind items to keep in memory.
/// </summary>
/// <returns></returns>
public int CalculateRewindWindow()
{
uint rewindDataWindow = 10;

if (this.blockHash.Height >= this.lastCheckpointHeight)
{
if (this.network.Consensus.MaxReorgLength != 0)
{
rewindDataWindow = this.network.Consensus.MaxReorgLength + 1;
}
else
{
// TODO: make the rewind data window a configuration
// parameter of every a network parameter.

// For POW assume BTC where a rewind data of 100 is more then enough.
rewindDataWindow = 100;
}
}

return (int)rewindDataWindow;
}

public HashHeightPair Rewind()
{
if (this.innerBlockHash == null)
Expand Down
Loading

0 comments on commit 734324a

Please sign in to comment.