Skip to content

Commit

Permalink
feat: implement ShufflingCache
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Nov 9, 2023
1 parent 37cf9dd commit e1fafb4
Show file tree
Hide file tree
Showing 20 changed files with 712 additions and 199 deletions.
7 changes: 7 additions & 0 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export async function importBlock(
const blockRootHex = toHexString(blockRoot);
const currentEpoch = computeEpochAtSlot(this.forkChoice.getTime());
const blockEpoch = computeEpochAtSlot(block.message.slot);
const parentEpoch = computeEpochAtSlot(parentBlockSlot);
const prevFinalizedEpoch = this.forkChoice.getFinalizedCheckpoint().epoch;
const blockDelaySec = (fullyVerifiedBlock.seenTimestampSec - postState.genesisTime) % this.config.SECONDS_PER_SLOT;

Expand Down Expand Up @@ -347,6 +348,12 @@ export async function importBlock(
this.logger.verbose("After importBlock caching postState without SSZ cache", {slot: postState.slot});
}

if (parentEpoch < blockEpoch) {
// current epoch and previous epoch are likely cached in previous states
this.shufflingCache.processState(postState, postState.epochCtx.nextShuffling.epoch);
this.logger.verbose("Processed shuffling for next epoch", {parentEpoch, blockEpoch, slot: block.message.slot});
}

if (block.message.slot % SLOTS_PER_EPOCH === 0) {
// Cache state to preserve epoch transition work
const checkpointState = postState;
Expand Down
53 changes: 52 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
isCachedBeaconState,
Index2PubkeyCache,
PubkeyIndexMap,
EpochShuffling,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {
Expand Down Expand Up @@ -39,7 +40,6 @@ import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Clock, ClockEvent, IClock} from "../util/clock.js";
import {ensureDir, writeIfNotExist} from "../util/file.js";
import {isOptimisticBlock} from "../util/forkChoice.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData, BlockHash, StateGetOpts} from "./interface.js";
Expand Down Expand Up @@ -75,6 +75,9 @@ import {BlockAttributes, produceBlockBody} from "./produceBlock/produceBlockBody
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";
import {StateContextCache} from "./stateCache/stateContextCache.js";
import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js";

/**
* Arbitrary constants, blobs and payloads should be consumed immediately in the same slot
Expand Down Expand Up @@ -129,6 +132,7 @@ export class BeaconChain implements IBeaconChain {

readonly beaconProposerCache: BeaconProposerCache;
readonly checkpointBalancesCache: CheckpointBalancesCache;
readonly shufflingCache: ShufflingCache;
/** Map keyed by executionPayload.blockHash of the block for those blobs */
readonly producedBlobSidecarsCache = new Map<BlockHash, deneb.BlobSidecars>();
readonly producedBlindedBlobSidecarsCache = new Map<BlockHash, deneb.BlindedBlobSidecars>();
Expand Down Expand Up @@ -209,6 +213,7 @@ export class BeaconChain implements IBeaconChain {

this.beaconProposerCache = new BeaconProposerCache(opts);
this.checkpointBalancesCache = new CheckpointBalancesCache();
this.shufflingCache = new ShufflingCache(metrics, this.opts);

// Restore state caches
// anchorState may already by a CachedBeaconState. If so, don't create the cache again, since deserializing all
Expand All @@ -223,6 +228,9 @@ export class BeaconChain implements IBeaconChain {
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
});
this.shufflingCache.processState(cachedState, cachedState.epochCtx.previousShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.currentShuffling.epoch);
this.shufflingCache.processState(cachedState, cachedState.epochCtx.nextShuffling.epoch);

// Persist single global instance of state caches
this.pubkey2index = cachedState.epochCtx.pubkey2index;
Expand Down Expand Up @@ -640,6 +648,49 @@ export class BeaconChain implements IBeaconChain {
}
}

/**
* Regenerate state for attestation verification, this does not happen with default chain option of maxSkipSlots = 32 .
* However, need to handle just in case. Lodestar doesn't support multiple regen state requests for attestation verification
* at the same time, bounded inside "ShufflingCache.insertPromise()" function.
* Leave this function in chain instead of attestatation verification code to make sure we're aware of its performance impact.
*/
async regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling> {
// this is to prevent multiple calls to get shuffling for the same epoch and dependent root
// any subsequent calls of the same epoch and dependent root will wait for this promise to resolve
this.shufflingCache.insertPromise(attEpoch, shufflingDependentRoot);
const blockEpoch = computeEpochAtSlot(attHeadBlock.slot);

let state: CachedBeaconStateAllForks;
if (blockEpoch < attEpoch - 1) {
// thanks to one epoch look ahead, we don't need to dial up to attEpoch
const targetSlot = computeStartSlotAtEpoch(attEpoch - 1);
this.metrics?.gossipAttestation.useHeadBlockStateDialedToTargetEpoch.inc({caller: regenCaller});
state = await this.regen.getBlockSlotState(
attHeadBlock.blockRoot,
targetSlot,
{dontTransferCache: true},
regenCaller
);
} else if (blockEpoch > attEpoch) {
// should not happen, handled inside attestation verification code
throw Error(`Block epoch ${blockEpoch} is after attestation epoch ${attEpoch}`);
} else {
// should use either current or next shuffling of head state
// it's not likely to hit this since these shufflings are cached already
// so handle just in case
this.metrics?.gossipAttestation.useHeadBlockState.inc({caller: regenCaller});
state = await this.regen.getState(attHeadBlock.stateRoot, regenCaller);
}

// resolve the promise to unblock other calls of the same epoch and dependent root
return this.shufflingCache.processState(state, attEpoch);
}

/**
* `ForkChoice.onBlock` must never throw for a block that is valid with respect to the network
* `justifiedBalancesGetter()` must never throw and it should always return a state.
Expand Down
8 changes: 1 addition & 7 deletions packages/beacon-node/src/chain/errors/attestationError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {toHexString} from "@chainsafe/ssz";
import {CommitteeIndex, Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {Epoch, Slot, ValidatorIndex, RootHex} from "@lodestar/types";
import {GossipActionError} from "./gossipValidation.js";

export enum AttestationErrorCode {
Expand Down Expand Up @@ -65,11 +65,6 @@ export enum AttestationErrorCode {
* A signature on the attestation is invalid.
*/
INVALID_SIGNATURE = "ATTESTATION_ERROR_INVALID_SIGNATURE",
/**
* There is no committee for the slot and committee index of this attestation
* and the attestation should not have been produced.
*/
NO_COMMITTEE_FOR_SLOT_AND_INDEX = "ATTESTATION_ERROR_NO_COMMITTEE_FOR_SLOT_AND_INDEX",
/**
* The unaggregated attestation doesn't have only one aggregation bit set.
*/
Expand Down Expand Up @@ -150,7 +145,6 @@ export type AttestationErrorType =
| {code: AttestationErrorCode.HEAD_NOT_TARGET_DESCENDANT}
| {code: AttestationErrorCode.UNKNOWN_TARGET_ROOT; root: Uint8Array}
| {code: AttestationErrorCode.INVALID_SIGNATURE}
| {code: AttestationErrorCode.NO_COMMITTEE_FOR_SLOT_AND_INDEX; slot: Slot; index: CommitteeIndex}
| {code: AttestationErrorCode.NOT_EXACTLY_ONE_AGGREGATION_BIT_SET}
| {code: AttestationErrorCode.PRIOR_ATTESTATION_KNOWN; validatorIndex: ValidatorIndex; epoch: Epoch}
| {code: AttestationErrorCode.FUTURE_EPOCH; attestationEpoch: Epoch; currentEpoch: Epoch}
Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {allForks, UintNum64, Root, phase0, Slot, RootHex, Epoch, ValidatorIndex,
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
EpochShuffling,
Index2PubkeyCache,
PubkeyIndexMap,
} from "@lodestar/state-transition";
Expand Down Expand Up @@ -36,6 +37,7 @@ import {CheckpointBalancesCache} from "./balancesCache.js";
import {IChainOptions} from "./options.js";
import {AssembledBlockType, BlockAttributes, BlockType} from "./produceBlock/produceBlockBody.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {ShufflingCache} from "./shufflingCache.js";

export {BlockType, type AssembledBlockType};
export {type ProposerPreparationData};
Expand Down Expand Up @@ -96,6 +98,7 @@ export interface IBeaconChain {
readonly producedBlobSidecarsCache: Map<BlockHash, deneb.BlobSidecars>;
readonly producedBlockRoot: Map<RootHex, allForks.ExecutionPayload | null>;
readonly producedBlindedBlobSidecarsCache: Map<BlockHash, deneb.BlindedBlobSidecars>;
readonly shufflingCache: ShufflingCache;
readonly producedBlindedBlockRoot: Set<RootHex>;
readonly opts: IChainOptions;

Expand Down Expand Up @@ -160,6 +163,12 @@ export interface IBeaconChain {
persistInvalidSszBytes(type: string, sszBytes: Uint8Array, suffix?: string): void;
/** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */
persistInvalidSszView(view: TreeView<CompositeTypeAny>, suffix?: string): void;
regenStateForAttestationVerification(
attEpoch: Epoch,
shufflingDependentRoot: RootHex,
attHeadBlock: ProtoBlock,
regenCaller: RegenCaller
): Promise<EpochShuffling>;
updateBuilderStatus(clockSlot: Slot): void;

regenCanAcceptWork(): boolean;
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import {defaultOptions as defaultValidatorOptions} from "@lodestar/validator";
import {ArchiverOpts} from "./archiver/index.js";
import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";
import {ShufflingCacheOpts} from "./shufflingCache.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
SeenCacheOpts &
ForkChoiceOpts &
ArchiverOpts &
ShufflingCacheOpts &
LightClientServerOpts & {
blsVerifyAllMainThread?: boolean;
blsVerifyAllMultiThread?: boolean;
Expand Down
180 changes: 180 additions & 0 deletions packages/beacon-node/src/chain/shufflingCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import {CachedBeaconStateAllForks, EpochShuffling, getShufflingDecisionBlock} from "@lodestar/state-transition";
import {Epoch, RootHex} from "@lodestar/types";
import {MapDef, pruneSetToMax} from "@lodestar/utils";
import {Metrics} from "../metrics/metrics.js";

/**
* Same value to CheckpointBalancesCache, with the assumption that we don't have to use it for old epochs. In the worse case:
* - when loading state bytes from disk, we need to compute shuffling for all epochs (~1s as of Sep 2023)
* - don't have shuffling to verify attestations, need to do 1 epoch transition to add shuffling to this cache. This never happens
* with default chain option of maxSkipSlots = 32
**/
const MAX_EPOCHS = 4;

/**
* With default chain option of maxSkipSlots = 32, there should be no shuffling promise. If that happens a lot, it could blow up Lodestar,
* with MAX_EPOCHS = 4, only allow 2 promise at a time. Note that regen already bounds number of concurrent requests at 1 already.
*/
const MAX_PROMISES = 2;

enum CacheItemType {
shuffling,
promise,
}

type ShufflingCacheItem = {
type: CacheItemType.shuffling;
shuffling: EpochShuffling;
};

type PromiseCacheItem = {
type: CacheItemType.promise;
promise: Promise<EpochShuffling>;
resolveFn: (shuffling: EpochShuffling) => void;
};

type CacheItem = ShufflingCacheItem | PromiseCacheItem;

export type ShufflingCacheOpts = {
maxShufflingCacheEpochs?: number;
};

/**
* A shuffling cache to help:
* - get committee quickly for attestation verification
* - if a shuffling is not available (which does not happen with default chain option of maxSkipSlots = 32), track a promise to make sure we don't compute the same shuffling twice
* - skip computing shuffling when loading state bytes from disk
*/
export class ShufflingCache {
/** LRU cache implemented as an array, pruned every time we add an item */
private readonly itemsByDecisionRootByEpoch: MapDef<Epoch, Map<RootHex, CacheItem>> = new MapDef(
() => new Map<RootHex, CacheItem>()
);

private readonly maxEpochs: number;

constructor(
private readonly metrics: Metrics | null = null,
opts: ShufflingCacheOpts = {}
) {
if (metrics) {
metrics.shufflingCache.size.addCollect(() =>
metrics.shufflingCache.size.set(
Array.from(this.itemsByDecisionRootByEpoch.values()).reduce((total, innerMap) => total + innerMap.size, 0)
)
);
}

this.maxEpochs = opts.maxShufflingCacheEpochs ?? MAX_EPOCHS;
}

/**
* Extract shuffling from state and add to cache
*/
processState(state: CachedBeaconStateAllForks, shufflingEpoch: Epoch): EpochShuffling {
const decisionBlockHex = getShufflingDecisionBlock(state, shufflingEpoch);
let shuffling: EpochShuffling;
switch (shufflingEpoch) {
case state.epochCtx.nextShuffling.epoch:
shuffling = state.epochCtx.nextShuffling;
break;
case state.epochCtx.currentShuffling.epoch:
shuffling = state.epochCtx.currentShuffling;
break;
case state.epochCtx.previousShuffling.epoch:
shuffling = state.epochCtx.previousShuffling;
break;
default:
throw new Error(`Shuffling not found from state ${state.slot} for epoch ${shufflingEpoch}`);
}

let cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionBlockHex);
if (cacheItem !== undefined) {
// update existing promise
if (isPromiseCacheItem(cacheItem)) {
// unblock consumers of this promise
cacheItem.resolveFn(shuffling);
// then update item type to shuffling
cacheItem = {
type: CacheItemType.shuffling,
shuffling,
};
this.add(shufflingEpoch, decisionBlockHex, cacheItem);
// we updated type to CacheItemType.shuffling so the above fields are not used anyway
this.metrics?.shufflingCache.processStateUpdatePromise.inc();
} else {
// ShufflingCacheItem, do nothing
this.metrics?.shufflingCache.processStateNoOp.inc();
}
} else {
// not found, new shuffling
this.add(shufflingEpoch, decisionBlockHex, {type: CacheItemType.shuffling, shuffling});
this.metrics?.shufflingCache.processStateInsertNew.inc();
}

return shuffling;
}

/**
* Insert a promise to make sure we don't regen state for the same shuffling.
* Bound by MAX_SHUFFLING_PROMISE to make sure our node does not blow up.
*/
insertPromise(shufflingEpoch: Epoch, decisionRootHex: RootHex): void {
const promiseCount = Array.from(this.itemsByDecisionRootByEpoch.values())
.map((innerMap) => Array.from(innerMap.values()))
.flat()
.filter((item) => isPromiseCacheItem(item)).length;
if (promiseCount >= MAX_PROMISES) {
throw new Error(
`Too many shuffling promises: ${promiseCount}, shufflingEpoch: ${shufflingEpoch}, decisionRootHex: ${decisionRootHex}`
);
}
let resolveFn: ((shuffling: EpochShuffling) => void) | null = null;
const promise = new Promise<EpochShuffling>((resolve) => {
resolveFn = resolve;
});
if (resolveFn === null) {
throw new Error("Promise Constructor was not executed immediately");
}

const cacheItem: PromiseCacheItem = {
type: CacheItemType.promise,
promise,
resolveFn,
};
this.add(shufflingEpoch, decisionRootHex, cacheItem);
this.metrics?.shufflingCache.insertPromiseCount.inc();
}

/**
* Most of the time, this should return a shuffling immediately.
* If there's a promise, it means we are computing the same shuffling, so we wait for the promise to resolve.
* Return null if we don't have a shuffling for this epoch and dependentRootHex.
*/
async get(shufflingEpoch: Epoch, decisionRootHex: RootHex): Promise<EpochShuffling | null> {
const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionRootHex);
if (cacheItem === undefined) {
return null;
}

if (isShufflingCacheItem(cacheItem)) {
return cacheItem.shuffling;
} else {
// promise
return cacheItem.promise;
}
}

private add(shufflingEpoch: Epoch, decisionBlock: RootHex, cacheItem: CacheItem): void {
this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionBlock, cacheItem);
pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs);
}
}

function isShufflingCacheItem(item: CacheItem): item is ShufflingCacheItem {
return item.type === CacheItemType.shuffling;
}

function isPromiseCacheItem(item: CacheItem): item is PromiseCacheItem {
return item.type === CacheItemType.promise;
}
Loading

0 comments on commit e1fafb4

Please sign in to comment.