Skip to content

Commit

Permalink
fix: sync l1msg fix from develop branch (#826)
Browse files Browse the repository at this point in the history
* fix: ensure L1 messages are stored in db consistently (#679)

* fix: ensure L1 messages are stored in db consistently

* check db iterator errors

* fix(sync-service): only add queue index when message index is not zero (#682)

* fix(sync-service): increase queue index only when L1 message index is not zero

---------

Co-authored-by: Péter Garamvölgyi <peter@scroll.io>

fix

---------

Co-authored-by: Péter Garamvölgyi <peter@scroll.io>
Co-authored-by: colin <102356659+colinlyguo@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 13, 2024
1 parent 1ca3b74 commit bed81b0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 1 deletion.
6 changes: 6 additions & 0 deletions core/block_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
// TODO: consider verifying that skipped messages overflow
for index := queueIndex; index < txQueueIndex; index++ {
if exists := it.Next(); !exists {
if err := it.Error(); err != nil {
log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", queueIndex)
}
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
Expand All @@ -240,6 +243,9 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
queueIndex = txQueueIndex + 1

if exists := it.Next(); !exists {
if err := it.Error(); err != nil {
log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex)
}
// the message in this block is not available in our local db.
// we'll reprocess this block at a later time.
return consensus.ErrMissingL1MessageData
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ func (it *L1MessageIterator) Release() {
it.inner.Release()
}

// Error returns any accumulated error.
// Exhausting all the key/value pairs is not considered to be an error.
func (it *L1MessageIterator) Error() error {
return it.inner.Error()
}

// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
msgs := make([]types.L1MessageTx, 0, maxCount)
Expand Down Expand Up @@ -236,6 +242,10 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.
}
}

if err := it.Error(); err != nil {
log.Crit("Failed to read L1 messages", "err", err)
}

return msgs
}

Expand Down
4 changes: 4 additions & 0 deletions rollup/sync_service/bridge_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (c *BridgeClient) fetchMessagesInRange(ctx context.Context, from, to uint64
})
}

if err := it.Error(); err != nil {
return nil, err
}

return msgs, nil
}

Expand Down
16 changes: 15 additions & 1 deletion rollup/sync_service/sync_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (s *SyncService) fetchMessages() {

log.Trace("Sync service fetchMessages", "latestProcessedBlock", s.latestProcessedBlock, "latestConfirmed", latestConfirmed)

// keep track of next queue index we're expecting to see
queueIndex := rawdb.ReadHighestSyncedQueueIndex(s.db)

batchWriter := s.db.NewBatch()
numBlocksPendingDbWrite := uint64(0)
numMessagesPendingDbWrite := 0
Expand Down Expand Up @@ -216,7 +219,18 @@ func (s *SyncService) fetchMessages() {
numMsgsCollected += len(msgs)
}

numBlocksPendingDbWrite += to - from
for _, msg := range msgs {
if msg.QueueIndex > 0 {
queueIndex++
}
// check if received queue index matches expected queue index
if msg.QueueIndex != queueIndex {
log.Error("Unexpected queue index in SyncService", "expected", queueIndex, "got", msg.QueueIndex, "msg", msg)
return // do not flush inconsistent data to disk
}
}

numBlocksPendingDbWrite += to - from + 1
numMessagesPendingDbWrite += len(msgs)

// flush new messages to database periodically
Expand Down

0 comments on commit bed81b0

Please sign in to comment.