diff --git a/core/block_validator.go b/core/block_validator.go index ad5221f43496..ad367f9185d0 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -178,6 +178,9 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error { // TODO: consider verifying that skipped messages overflow for index := queueIndex; index < txQueueIndex; index++ { if exists := it.Next(); !exists { + if err := it.Error(); err != nil { + log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", queueIndex) + } // the message in this block is not available in our local db. // we'll reprocess this block at a later time. return consensus.ErrMissingL1MessageData @@ -192,6 +195,9 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error { queueIndex = txQueueIndex + 1 if exists := it.Next(); !exists { + if err := it.Error(); err != nil { + log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex) + } // the message in this block is not available in our local db. // we'll reprocess this block at a later time. return consensus.ErrMissingL1MessageData diff --git a/core/rawdb/accessors_l1_message.go b/core/rawdb/accessors_l1_message.go index 34d0f4acb4d2..4ae27c7b8ff0 100644 --- a/core/rawdb/accessors_l1_message.go +++ b/core/rawdb/accessors_l1_message.go @@ -202,6 +202,12 @@ func (it *L1MessageIterator) Release() { it.inner.Release() } +// Error returns any accumulated error. +// Exhausting all the key/value pairs is not considered to be an error. +func (it *L1MessageIterator) Error() error { + return it.inner.Error() +} + // ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx { msgs := make([]types.L1MessageTx, 0, maxCount) @@ -236,6 +242,10 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types. } } + if err := it.Error(); err != nil { + log.Crit("Failed to read L1 messages", "err", err) + } + return msgs } diff --git a/params/version.go b/params/version.go index 6d868731e0ae..e730602d1a54 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ import ( const ( VersionMajor = 5 // Major version component of the current release VersionMinor = 1 // Minor version component of the current release - VersionPatch = 24 // Patch version component of the current release + VersionPatch = 25 // Patch version component of the current release VersionMeta = "mainnet" // Version metadata to append to the version string ) diff --git a/rollup/sync_service/bridge_client.go b/rollup/sync_service/bridge_client.go index ea46fdcb8b4a..51ae3b02ce3e 100644 --- a/rollup/sync_service/bridge_client.go +++ b/rollup/sync_service/bridge_client.go @@ -86,6 +86,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64 }) } + if err := it.Error(); err != nil { + return nil, err + } + return msgs, nil } diff --git a/rollup/sync_service/sync_service.go b/rollup/sync_service/sync_service.go index 2720aec76b1b..e8efd9a54e53 100644 --- a/rollup/sync_service/sync_service.go +++ b/rollup/sync_service/sync_service.go @@ -149,6 +149,9 @@ func (s *SyncService) fetchMessages() { log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed) + // keep track of next queue index we're expecting to see + queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db) + batchWriter := s.db.NewBatch() numBlocksPendingDbWrite := uint64(0) numMessagesPendingDbWrite := 0 @@ -216,7 +219,16 @@ func (s *SyncService) fetchMessages() { numMsgsCollected += len(msgs) } - numBlocksPendingDbWrite += to - from + for _, msg := range msgs { + queueIndex++ + // check if received queue index matches expected queue index + if msg.QueueIndex != queueIndex { + log.Error("Unexpected queue index in SyncService", "expected", queueIndex, "got", msg.QueueIndex, "msg", msg) + return // do not flush inconsistent data to disk + } + } + + numBlocksPendingDbWrite += to - from + 1 numMessagesPendingDbWrite += len(msgs) // flush new messages to database periodically