Skip to content

Commit

Permalink
op-node/rollup/derive: Implement Holocene Batch Stage
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianst committed Oct 15, 2024
1 parent 6ae28f5 commit 9853bf0
Show file tree
Hide file tree
Showing 6 changed files with 536 additions and 134 deletions.
174 changes: 122 additions & 52 deletions op-node/rollup/derive/batch_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ import (
// It is internally responsible for making sure that batches with L1 inclusions block outside it's
// working range are not considered or pruned.

type ChannelFlusher interface {
FlushChannel()
}

type NextBatchProvider interface {
ChannelFlusher
Origin() eth.L1BlockRef
NextBatch(ctx context.Context) (Batch, error)
}
Expand All @@ -37,9 +42,10 @@ type SafeBlockFetcher interface {
PayloadByNumber(context.Context, uint64) (*eth.ExecutionPayloadEnvelope, error)
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
// The baseBatchStage is a shared implementation of basic channel stage functionality. It is
// currently shared between the legacy BatchQueue, which buffers future batches, and the
// post-Holocene BatchStage, which requires strictly ordered batches.
type baseBatchStage struct {
log log.Logger
config *rollup.Config
prev NextBatchProvider
Expand All @@ -53,99 +59,152 @@ type BatchQueue struct {
// length of l1Blocks never exceeds SequencerWindowSize
l1Blocks []eth.L1BlockRef

// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock

// nextSpan is cached SingularBatches derived from SpanBatch
nextSpan []*SingularBatch

l2 SafeBlockFetcher
}

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{
func newBaseBatchStage(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) baseBatchStage {
return baseBatchStage{
log: log,
config: cfg,
prev: prev,
l2: l2,
}
}

func (bq *BatchQueue) Origin() eth.L1BlockRef {
return bq.prev.Origin()
func (bs *baseBatchStage) base() *baseBatchStage {
return bs
}

func (bs *baseBatchStage) Log() log.Logger {
if len(bs.l1Blocks) == 0 {
return bs.log.New("origin", bs.origin.ID())
} else {
return bs.log.New("origin", bs.origin.ID(), "epoch", bs.l1Blocks[0])
}
}

type SingularBatchProvider interface {
ResettableStage
NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error)
}

// BatchQueue contains a set of batches for every L1 block.
// L1 blocks are contiguous and this does not support reorgs.
type BatchQueue struct {
baseBatchStage

// batches in order of when we've first seen them
batches []*BatchWithL1InclusionBlock
}

var _ SingularBatchProvider = (*BatchQueue)(nil)

// NewBatchQueue creates a BatchQueue, which should be Reset(origin) before use.
func NewBatchQueue(log log.Logger, cfg *rollup.Config, prev NextBatchProvider, l2 SafeBlockFetcher) *BatchQueue {
return &BatchQueue{baseBatchStage: newBaseBatchStage(log, cfg, prev, l2)}
}

func (bs *baseBatchStage) Origin() eth.L1BlockRef {
return bs.prev.Origin()
}

// popNextBatch pops the next batch from the current queued up span-batch nextSpan.
// The queue must be non-empty, or the function will panic.
func (bq *BatchQueue) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
if len(bq.nextSpan) == 0 {
func (bs *baseBatchStage) popNextBatch(parent eth.L2BlockRef) *SingularBatch {
if len(bs.nextSpan) == 0 {
panic("popping non-existent span-batch, invalid state")
}
nextBatch := bq.nextSpan[0]
bq.nextSpan = bq.nextSpan[1:]
nextBatch := bs.nextSpan[0]
bs.nextSpan = bs.nextSpan[1:]
// Must set ParentHash before return. we can use parent because the parentCheck is verified in CheckBatch().
nextBatch.ParentHash = parent.Hash
bq.log.Debug("pop next batch from the cached span batch")
bs.log.Debug("pop next batch from the cached span batch")
return nextBatch
}

// NextBatch return next valid batch upon the given safe head.
// It also returns the boolean that indicates if the batch is the last block in the batch.
func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
if len(bq.nextSpan) > 0 {
func (bs *baseBatchStage) nextFromSpanBatch(parent eth.L2BlockRef) (*SingularBatch, bool) {
if len(bs.nextSpan) > 0 {
// There are cached singular batches derived from the span batch.
// Check if the next cached batch matches the given parent block.
if bq.nextSpan[0].Timestamp == parent.Time+bq.config.BlockTime {
if bs.nextSpan[0].Timestamp == parent.Time+bs.config.BlockTime {
// Pop first one and return.
nextBatch := bq.popNextBatch(parent)
nextBatch := bs.popNextBatch(parent)
// len(bq.nextSpan) == 0 means it's the last batch of the span.
return nextBatch, len(bq.nextSpan) == 0, nil
return nextBatch, len(bs.nextSpan) == 0
} else {
// Given parent block does not match the next batch. It means the previously returned batch is invalid.
// Drop cached batches and find another batch.
bq.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bq.nextSpan[0].GetTimestamp())
bq.nextSpan = bq.nextSpan[:0]
bs.log.Warn("parent block does not match the next batch. dropped cached batches", "parent", parent.ID(), "nextBatchTime", bs.nextSpan[0].GetTimestamp())
bs.nextSpan = bs.nextSpan[:0]
}
}
return nil, false
}

func (bs *baseBatchStage) updateOrigins(parent eth.L2BlockRef) {
// Note: We use the origin that we will have to determine if it's behind. This is important
// because it's the future origin that gets saved into the l1Blocks array.
// We always update the origin of this stage if it is not the same so after the update code
// runs, this is consistent.
originBehind := bq.prev.Origin().Number < parent.L1Origin.Number
originBehind := bs.originBehind(parent)

// Advance origin if needed
// Note: The entire pipeline has the same origin
// We just don't accept batches prior to the L1 origin of the L2 safe head
if bq.origin != bq.prev.Origin() {
bq.origin = bq.prev.Origin()
if bs.origin != bs.prev.Origin() {
bs.origin = bs.prev.Origin()
if !originBehind {
bq.l1Blocks = append(bq.l1Blocks, bq.origin)
bs.l1Blocks = append(bs.l1Blocks, bs.origin)
} else {
// This is to handle the special case of startup. At startup we call Reset & include
// the L1 origin. That is the only time where immediately after `Reset` is called
// originBehind is false.
bq.l1Blocks = bq.l1Blocks[:0]
bs.l1Blocks = bs.l1Blocks[:0]
}
bq.log.Info("Advancing bq origin", "origin", bq.origin, "originBehind", originBehind)
bs.log.Info("Advancing bq origin", "origin", bs.origin, "originBehind", originBehind)
}

// If the epoch is advanced, update bq.l1Blocks
// Advancing epoch must be done after the pipeline successfully apply the entire span batch to the chain.
// Because the span batch can be reverted during processing the batch, then we must preserve existing l1Blocks
// to verify the epochs of the next candidate batch.
if len(bq.l1Blocks) > 0 && parent.L1Origin.Number > bq.l1Blocks[0].Number {
for i, l1Block := range bq.l1Blocks {
// Before Holocene, advancing the epoch must be done after the pipeline successfully applied the entire span batch to the chain.
// This is because the entire span batch can be reverted after finding an invalid batch.
// So we must preserve the existing l1Blocks to verify the epochs of the next candidate batch.
if len(bs.l1Blocks) > 0 && parent.L1Origin.Number > bs.l1Blocks[0].Number {
for i, l1Block := range bs.l1Blocks {
if parent.L1Origin.Number == l1Block.Number {
bq.l1Blocks = bq.l1Blocks[i:]
bq.log.Debug("Advancing internal L1 blocks", "next_epoch", bq.l1Blocks[0].ID(), "next_epoch_time", bq.l1Blocks[0].Time)
if bs.config.IsHolocene(bs.origin.Time) && i > 1 {
// TODO(12444): We'll see if this invariant really holds. It should if the
// origin only ever increases in single increments, which I currently assume is
// the case for Holocene. This being true is not a strict requirement from
// Holocene though. This check should be removed after successful testing.
panic("updateOrigins: unexpected origin jump")
}
bs.l1Blocks = bs.l1Blocks[i:]
bs.log.Debug("Advancing internal L1 blocks", "next_epoch", bs.l1Blocks[0].ID(), "next_epoch_time", bs.l1Blocks[0].Time)
break
}
}
// If we can't find the origin of parent block, we have to advance bq.origin.
}
}

func (bs *baseBatchStage) originBehind(parent eth.L2BlockRef) bool {
return bs.prev.Origin().Number < parent.L1Origin.Number
}

func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*SingularBatch, bool, error) {
// Early return if there are singular batches from a span batch queued up
if batch, last := bq.nextFromSpanBatch(parent); batch != nil {
return batch, last, nil
}

bq.updateOrigins(parent)

originBehind := bq.originBehind(parent)
// Load more data into the batch queue
outOfData := false
if batch, err := bq.prev.NextBatch(ctx); err == io.EOF {
Expand Down Expand Up @@ -206,17 +265,21 @@ func (bq *BatchQueue) NextBatch(ctx context.Context, parent eth.L2BlockRef) (*Si
return nextBatch, len(bq.nextSpan) == 0, nil
}

func (bq *BatchQueue) Reset(ctx context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
func (bs *baseBatchStage) reset(base eth.L1BlockRef) {
// Copy over the Origin from the next stage
// It is set in the engine queue (two stages away) such that the L2 Safe Head origin is the progress
bq.origin = base
bq.batches = []*BatchWithL1InclusionBlock{}
bs.origin = base
// Include the new origin as an origin to build on
// Note: This is only for the initialization case. During normal resets we will later
// throw out this block.
bq.l1Blocks = bq.l1Blocks[:0]
bq.l1Blocks = append(bq.l1Blocks, base)
bq.nextSpan = bq.nextSpan[:0]
bs.l1Blocks = bs.l1Blocks[:0]
bs.l1Blocks = append(bs.l1Blocks, base)
bs.nextSpan = bs.nextSpan[:0]
}

func (bq *BatchQueue) Reset(_ context.Context, base eth.L1BlockRef, _ eth.SystemConfig) error {
bq.baseBatchStage.reset(base)
bq.batches = bq.batches[:0]
return io.EOF
}

Expand Down Expand Up @@ -257,7 +320,6 @@ func (bq *BatchQueue) deriveNextBatch(ctx context.Context, outOfData bool, paren
// Find the first-seen batch that matches all validity conditions.
// We may not have sufficient information to proceed filtering, and then we stop.
// There may be none: in that case we force-create an empty batch
nextTimestamp := parent.Time + bq.config.BlockTime
var nextBatch *BatchWithL1InclusionBlock

// Go over all batches, in order of inclusion, and find the first batch we can accept.
Expand Down Expand Up @@ -296,33 +358,39 @@ batchLoop:
nextBatch.Batch.LogContext(bq.log).Info("Found next batch")
return nextBatch.Batch, nil
}
return bq.deriveNextEmptyBatch(ctx, outOfData, parent)
}

// deriveNextEmptyBatch may derive an empty batch if the sequencing window is expired
func (bs *baseBatchStage) deriveNextEmptyBatch(ctx context.Context, outOfData bool, parent eth.L2BlockRef) (*SingularBatch, error) {
epoch := bs.l1Blocks[0]
// If the current epoch is too old compared to the L1 block we are at,
// i.e. if the sequence window expired, we create empty batches for the current epoch
expiryEpoch := epoch.Number + bq.config.SeqWindowSize
forceEmptyBatches := (expiryEpoch == bq.origin.Number && outOfData) || expiryEpoch < bq.origin.Number
expiryEpoch := epoch.Number + bs.config.SeqWindowSize
forceEmptyBatches := (expiryEpoch == bs.origin.Number && outOfData) || expiryEpoch < bs.origin.Number
firstOfEpoch := epoch.Number == parent.L1Origin.Number+1
nextTimestamp := parent.Time + bs.config.BlockTime

bq.log.Trace("Potentially generating an empty batch",
bs.log.Trace("Potentially generating an empty batch",
"expiryEpoch", expiryEpoch, "forceEmptyBatches", forceEmptyBatches, "nextTimestamp", nextTimestamp,
"epoch_time", epoch.Time, "len_l1_blocks", len(bq.l1Blocks), "firstOfEpoch", firstOfEpoch)
"epoch_time", epoch.Time, "len_l1_blocks", len(bs.l1Blocks), "firstOfEpoch", firstOfEpoch)

if !forceEmptyBatches {
// sequence window did not expire yet, still room to receive batches for the current epoch,
// no need to force-create empty batch(es) towards the next epoch yet.
return nil, io.EOF
}
if len(bq.l1Blocks) < 2 {
if len(bs.l1Blocks) < 2 {
// need next L1 block to proceed towards
return nil, io.EOF
}

nextEpoch := bq.l1Blocks[1]
nextEpoch := bs.l1Blocks[1]
// Fill with empty L2 blocks of the same epoch until we meet the time of the next L1 origin,
// to preserve that L2 time >= L1 time. If this is the first block of the epoch, always generate a
// batch to ensure that we at least have one batch per epoch.
if nextTimestamp < nextEpoch.Time || firstOfEpoch {
bq.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
bs.log.Info("Generating next batch", "epoch", epoch, "timestamp", nextTimestamp)
return &SingularBatch{
ParentHash: parent.Hash,
EpochNum: rollup.Epoch(epoch.Number),
Expand All @@ -334,7 +402,9 @@ batchLoop:

// At this point we have auto generated every batch for the current epoch
// that we can, so we can advance to the next epoch.
bq.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
bq.l1Blocks = bq.l1Blocks[1:]
// TODO(12444): Instead of manually advancing the epoch here, it may be better to generate a
// batch for the next epoch, so that updateOrigins then properly advances the origin.
bs.log.Trace("Advancing internal L1 blocks", "next_timestamp", nextTimestamp, "next_epoch_time", nextEpoch.Time)
bs.l1Blocks = bs.l1Blocks[1:]
return nil, io.EOF
}
Loading

0 comments on commit 9853bf0

Please sign in to comment.