Skip to content

Commit

Permalink
fix: prune checkpoint states at syncing time (#7241)
Browse files Browse the repository at this point in the history
* fix: prune checkpoint states at syncing time

* fix: lint

* fix: check-types in test
  • Loading branch information
twoeths authored and nflaig committed Nov 28, 2024
1 parent 36be6b3 commit ffa4bb8
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 37 deletions.
1 change: 0 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ export class BeaconChain implements IBeaconChain {
metrics,
logger,
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: this.bufferPool,
datastore: fileDataStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {loadCachedBeaconState} from "@lodestar/state-transition";
import {INTERVALS_PER_SLOT} from "@lodestar/params";
import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
Expand All @@ -17,16 +16,13 @@ import {CheckpointHex, CacheItemType, CheckpointStateCache, BlockStateCache} fro
export type PersistentCheckpointStateCacheOpts = {
/** Keep max n states in memory, persist the rest to disk */
maxCPStateEpochsInMemory?: number;
/** for testing only */
processLateBlock?: boolean;
};

type PersistentCheckpointStateCacheModules = {
metrics?: Metrics | null;
logger: Logger;
clock?: IClock | null;
signal?: AbortSignal;
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool | null;
Expand Down Expand Up @@ -102,24 +98,12 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private preComputedCheckpoint: string | null = null;
private preComputedCheckpointHits: number | null = null;
private readonly maxEpochsInMemory: number;
// only for testing, default false for production
private readonly processLateBlock: boolean;
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool | null;

constructor(
{
metrics,
logger,
clock,
signal,
shufflingCache,
datastore,
blockStateCache,
bufferPool,
}: PersistentCheckpointStateCacheModules,
{metrics, logger, clock, signal, datastore, blockStateCache, bufferPool}: PersistentCheckpointStateCacheModules,
opts: PersistentCheckpointStateCacheOpts
) {
this.cache = new MapTracker(metrics?.cpStateCache);
Expand Down Expand Up @@ -153,10 +137,8 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
throw new Error("maxEpochsInMemory must be >= 0");
}
this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY;
this.processLateBlock = opts.processLateBlock ?? false;
// Specify different datastore for testing
this.datastore = datastore;
this.shufflingCache = shufflingCache;
this.blockStateCache = blockStateCache;
this.bufferPool = bufferPool;
}
Expand Down Expand Up @@ -487,12 +469,9 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// 2/3 of slot is the most free time of every slot, take that chance to persist checkpoint states
// normally it should only persist checkpoint states at 2/3 of slot 0 of epoch
await sleep(secToTwoThirdsSlot * 1000, this.signal);
} else if (!this.processLateBlock) {
// normally the block persist happens at 2/3 of slot 0 of epoch, if it's already late then just skip to allow other tasks to run
// there are plenty of chances in the same epoch to persist checkpoint states, also if block is late it could be reorged
this.logger.verbose("Skip persist checkpoint states", {blockSlot, root: blockRootHex});
return 0;
}
// at syncing time, it's critical to persist checkpoint states as soon as possible to avoid OOM during unfinality time
// if node is synced this is not a hot time because block comes late, we'll likely miss attestation already, or the block is orphaned

const persistEpochs = sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory);
for (const lowestEpoch of persistEpochs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -165,10 +164,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -242,10 +240,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 2, processLateBlock: true}
{maxCPStateEpochsInMemory: 2}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -548,10 +545,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 1, processLateBlock: true}
{maxCPStateEpochsInMemory: 1}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -820,10 +816,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);
cache.add(cp0a, states["cp0a"]);
cache.add(cp0b, states["cp0b"]);
Expand Down Expand Up @@ -911,10 +906,9 @@ describe("PersistentCheckpointStateCache", () => {
{
datastore,
logger: testLogger(),
shufflingCache: new ShufflingCache(),
blockStateCache: new FIFOBlockStateCache({}, {}),
},
{maxCPStateEpochsInMemory: 0, processLateBlock: true}
{maxCPStateEpochsInMemory: 0}
);

const root1a = Buffer.alloc(32, 100);
Expand Down

0 comments on commit ffa4bb8

Please sign in to comment.