Skip to content

Commit

Permalink
fix obtaining deposits after connection loss (#3943)
Browse files Browse the repository at this point in the history
* fix obtaining deposits after connection loss

When an error occurs during Eth1 deposits import, the already imported
blocks are kept while the connection to the EL is re-established.
However, the corresponding merkleizer is not persisted, leading to any
future deposits no longer being properly imported. This is quite common
when syncing a fresh Nimbus instance against an already-synced Geth EL.
Fixed by persisting the head merkleizer together with the blocks.
  • Loading branch information
etan-status authored Aug 9, 2022
1 parent 06a5c67 commit 4ef621f
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
3 changes: 1 addition & 2 deletions beacon_chain/beacon_chain_db_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import
eth/db/kvstore_sqlite3,
# Beacon chain internals
spec/datatypes/altair,
spec/[eth2_ssz_serialization, helpers],
./filepath
spec/[eth2_ssz_serialization, helpers]

logScope: topics = "lcdata"

Expand Down
40 changes: 22 additions & 18 deletions beacon_chain/eth1/eth1_monitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ type

blocksByHash: Table[BlockHash, Eth1Block]

headMerkleizer: DepositsMerkleizer
## Merkleizer state after applying all `blocks`

hasConsensusViolation: bool
## The local chain contradicts the observed consensus on the network

Expand Down Expand Up @@ -257,6 +260,9 @@ template depositChainBlocks*(m: Eth1Monitor): Deque[Eth1Block] =
template finalizedDepositsMerkleizer(m: Eth1Monitor): auto =
m.depositsChain.finalizedDepositsMerkleizer

template headMerkleizer(m: Eth1Monitor): auto =
m.depositsChain.headMerkleizer

proc fixupWeb3Urls*(web3Url: var string) =
var normalizedUrl = toLowerAscii(web3Url)
if not (normalizedUrl.startsWith("https://") or
Expand Down Expand Up @@ -645,7 +651,7 @@ when hasDepositRootChecks:
const
contractCallTimeout = 60.seconds

func fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
proc fetchDepositContractData(p: Web3DataProviderRef, blk: Eth1Block):
Future[DepositContractDataStatus] {.async.} =
let
depositRoot = p.ns.get_deposit_root.call(blockNumber = blk.number)
Expand All @@ -668,8 +674,8 @@ when hasDepositRootChecks:
result = DepositRootUnavailable

try:
let fetchedCount = bytes_to_uint64(array[8, byte](
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout)))
let fetchedCount = bytes_to_uint64(
awaitOrRaiseOnTimeout(rawCount, contractCallTimeout).toArray)
if blk.voteData.deposit_count == 0:
blk.voteData.deposit_count = fetchedCount
elif blk.voteData.deposit_count != fetchedCount:
Expand Down Expand Up @@ -1031,6 +1037,7 @@ proc safeCancel(fut: var Future[void]) =
func clear(chain: var Eth1Chain) =
chain.blocks.clear()
chain.blocksByHash.clear()
chain.headMerkleizer.reset()
chain.hasConsensusViolation = false

proc detectPrimaryProviderComingOnline(m: Eth1Monitor) {.async.} =
Expand Down Expand Up @@ -1114,10 +1121,9 @@ func earliestBlockOfInterest(m: Eth1Monitor): Eth1BlockNumber =
m.latestEth1BlockNumber - (2 * m.cfg.ETH1_FOLLOW_DISTANCE) - votedBlocksSafetyMargin

proc syncBlockRange(m: Eth1Monitor,
merkleizer: ref DepositsMerkleizer,
fromBlock, toBlock,
fullSyncFromBlock: Eth1BlockNumber) {.gcsafe, async.} =
doAssert m.depositsChain.blocks.len > 0 and m.dataProvider != nil
doAssert m.depositsChain.blocks.len > 0

var currentBlock = fromBlock
while currentBlock <= toBlock:
Expand Down Expand Up @@ -1169,12 +1175,6 @@ proc syncBlockRange(m: Eth1Monitor,
for i in 0 ..< blocksWithDeposits.len:
let blk = blocksWithDeposits[i]

for deposit in blk.deposits:
merkleizer[].addChunk hash_tree_root(deposit).data

blk.voteData.deposit_count = merkleizer[].getChunkCount
blk.voteData.deposit_root = merkleizer[].getDepositsRoot

if blk.number > fullSyncFromBlock:
let lastBlock = m.depositsChain.blocks.peekLast
for n in max(lastBlock.number + 1, fullSyncFromBlock) ..< blk.number:
Expand All @@ -1186,6 +1186,11 @@ proc syncBlockRange(m: Eth1Monitor,
lastBlock.makeSuccessorWithoutDeposits(blockWithoutDeposits))
eth1_synced_head.set blockWithoutDeposits.number.toGaugeValue

for deposit in blk.deposits:
m.headMerkleizer.addChunk hash_tree_root(deposit).data
blk.voteData.deposit_count = m.headMerkleizer.getChunkCount
blk.voteData.deposit_root = m.headMerkleizer.getDepositsRoot

m.depositsChain.addBlock blk
eth1_synced_head.set blk.number.toGaugeValue

Expand Down Expand Up @@ -1224,7 +1229,7 @@ proc syncBlockRange(m: Eth1Monitor,

let depositContractState = DepositContractSnapshot(
eth1Block: blocksWithDeposits[^1].voteData.block_hash,
depositContractState: merkleizer[].toDepositContractState)
depositContractState: m.headMerkleizer.toDepositContractState)

m.depositsChain.db.putEth2FinalizedTo depositContractState

Expand Down Expand Up @@ -1388,11 +1393,11 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
subscriptionErrorHandler)

let shouldProcessDeposits = not m.depositContractAddress.isZeroMemory
var scratchMerkleizer: ref DepositsMerkleizer
var eth1SyncedTo: Eth1BlockNumber
if shouldProcessDeposits and m.depositsChain.blocks.len == 0:
let startBlock = awaitWithRetries(
m.dataProvider.getBlockByHash(m.depositsChain.finalizedBlockHash.asBlockHash))
m.dataProvider.getBlockByHash(
m.depositsChain.finalizedBlockHash.asBlockHash))

m.depositsChain.addBlock Eth1Block(
number: Eth1BlockNumber startBlock.number,
Expand All @@ -1408,7 +1413,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
eth1_finalized_deposits.set(
m.depositsChain.finalizedDepositsMerkleizer.getChunkCount.toGaugeValue)

scratchMerkleizer = newClone(copy m.finalizedDepositsMerkleizer)
m.depositsChain.headMerkleizer = copy m.finalizedDepositsMerkleizer

debug "Starting Eth1 syncing", `from` = shortLog(m.depositsChain.blocks[0])

Expand Down Expand Up @@ -1476,7 +1481,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
m.terminalBlockHash = some terminalBlockCandidate.hash
m.terminalBlockNumber = some terminalBlockCandidate.number

if shouldProcessDeposits and scratchMerkleizer != nil:
if shouldProcessDeposits:
if m.latestEth1BlockNumber <= m.cfg.ETH1_FOLLOW_DISTANCE:
continue

Expand All @@ -1485,8 +1490,7 @@ proc startEth1Syncing(m: Eth1Monitor, delayBeforeStart: Duration) {.async.} =
continue

let earliestBlockOfInterest = m.earliestBlockOfInterest()
await m.syncBlockRange(scratchMerkleizer,
eth1SyncedTo + 1,
await m.syncBlockRange(eth1SyncedTo + 1,
targetBlock,
earliestBlockOfInterest)
eth1SyncedTo = targetBlock
Expand Down

0 comments on commit 4ef621f

Please sign in to comment.