Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync recent miner/ccc changes #1062

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 33 additions & 18 deletions miner/scroll_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type work struct {
cccLogger *ccc.Logger
vmConfig vm.Config

reorging bool
reorgReason error

// accumulated state
Expand Down Expand Up @@ -287,7 +288,7 @@ func (w *worker) mainLoop() {
var retryableCommitError *retryableCommitError
if errors.As(err, &retryableCommitError) {
log.Warn("failed to commit to a block, retrying", "err", err)
if _, err = w.tryCommitNewWork(time.Now(), w.current.header.ParentHash, w.current.reorgReason); err != nil {
if _, err = w.tryCommitNewWork(time.Now(), w.current.header.ParentHash, w.current.reorging, w.current.reorgReason); err != nil {
continue
}
} else if err != nil {
Expand All @@ -305,21 +306,20 @@ func (w *worker) mainLoop() {
return
}
}

_, err = w.tryCommitNewWork(time.Now(), w.chain.CurrentHeader().Hash(), nil)
_, err = w.tryCommitNewWork(time.Now(), w.chain.CurrentHeader().Hash(), false, nil)
case trigger := <-w.reorgCh:
idleTimer.UpdateSince(idleStart)
err = w.handleReorg(&trigger)
case chainHead := <-w.chainHeadCh:
idleTimer.UpdateSince(idleStart)
if w.isCanonical(chainHead.Block.Header()) {
_, err = w.tryCommitNewWork(time.Now(), chainHead.Block.Hash(), nil)
_, err = w.tryCommitNewWork(time.Now(), chainHead.Block.Hash(), false, nil)
}
case <-w.current.deadlineCh():
idleTimer.UpdateSince(idleStart)
w.current.deadlineReached = true
if len(w.current.txs) > 0 {
_, err = w.commit(false)
_, err = w.commit()
}
case ev := <-w.txsCh:
idleTimer.UpdateSince(idleStart)
Expand All @@ -330,8 +330,8 @@ func (w *worker) mainLoop() {
// be automatically eliminated.
if w.current != nil {
shouldCommit, _ := w.processTxnSlice(ev.Txs)
if shouldCommit || w.current.deadlineReached {
_, err = w.commit(false)
if shouldCommit || (w.current.deadlineReached && len(w.current.txs) > 0) {
_, err = w.commit()
}
}
w.newTxs.Add(int32(len(ev.Txs)))
Expand Down Expand Up @@ -370,7 +370,7 @@ func (w *worker) collectPendingL1Messages(startIndex uint64) []types.L1MessageTx
}

// newWork
func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason error) error {
func (w *worker) newWork(now time.Time, parentHash common.Hash, reorging bool, reorgReason error) error {
parent := w.chain.GetBlockByHash(parentHash)
header := &types.Header{
ParentHash: parent.Hash(),
Expand All @@ -380,6 +380,13 @@ func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason erro
Time: uint64(now.Unix()),
}

if reorgReason != nil {
// if we are replacing a failing block, reuse the timestamp to make sure
// the information we get from AsyncChecker is reliable. Changing timestamp
// might alter execution flow of reorged transactions.
header.Time = w.chain.GetHeaderByNumber(header.Number.Uint64()).Time
}

parentState, err := w.chain.StateAt(parent.Root())
if err != nil {
return fmt.Errorf("failed to fetch parent state: %w", err)
Expand Down Expand Up @@ -431,14 +438,19 @@ func (w *worker) newWork(now time.Time, parentHash common.Hash, reorgReason erro
coalescedLogs: []*types.Log{},
gasPool: new(core.GasPool).AddGas(header.GasLimit),
nextL1MsgIndex: nextL1MsgIndex,
reorging: reorging,
reorgReason: reorgReason,
}

// initiliaze pending block with an empty block to make sure we always have
// a pending block to serve RPC requests
w.updateSnapshot()
return nil
}

// tryCommitNewWork
func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason error) (common.Hash, error) {
err := w.newWork(now, parent, reorgReason)
func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorging bool, reorgReason error) (common.Hash, error) {
err := w.newWork(now, parent, reorging, reorgReason)
if err != nil {
return common.Hash{}, fmt.Errorf("failed creating new work: %w", err)
}
Expand All @@ -449,8 +461,7 @@ func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason
}

// check if we are reorging
reorging := w.chain.GetBlockByNumber(w.current.header.Number.Uint64()) != nil
if !shouldCommit && reorging {
if !shouldCommit && w.current.reorging {
shouldCommit, err = w.processReorgedTxns(w.current.reorgReason)
}
if err != nil {
Expand All @@ -468,7 +479,7 @@ func (w *worker) tryCommitNewWork(now time.Time, parent common.Hash, reorgReason
// if reorging, force committing even if we are not "running"
// this can happen when sequencer is instructed to shutdown while handling a reorg
// we should make sure reorg is not interrupted
if blockHash, err := w.commit(reorging); err != nil {
if blockHash, err := w.commit(); err != nil {
return common.Hash{}, fmt.Errorf("failed committing new work: %w", err)
} else {
return blockHash, nil
Expand Down Expand Up @@ -595,6 +606,10 @@ func (w *worker) processTxnSlice(txns types.Transactions) (bool, error) {
// processReorgedTxns
func (w *worker) processReorgedTxns(reason error) (bool, error) {
reorgedBlock := w.chain.GetBlockByNumber(w.current.header.Number.Uint64())
if reorgedBlock == nil {
return false, nil
}

commitGasCounter.Dec(int64(reorgedBlock.GasUsed()))
reorgedTxns := reorgedBlock.Transactions()
var errorWithTxnIdx *ccc.ErrorWithTxnIdx
Expand Down Expand Up @@ -729,14 +744,14 @@ func (e retryableCommitError) Unwrap() error {

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(reorging bool) (common.Hash, error) {
func (w *worker) commit() (common.Hash, error) {
sealDelay := time.Duration(0)
defer func(t0 time.Time) {
l2CommitTimer.Update(time.Since(t0) - sealDelay)
}(time.Now())

w.updateSnapshot()
if !w.isRunning() && !reorging {
if !w.isRunning() && !w.current.reorging {
return common.Hash{}, nil
}

Expand Down Expand Up @@ -813,7 +828,7 @@ func (w *worker) commit(reorging bool) (common.Hash, error) {

currentHeight := w.current.header.Number.Uint64()
maxReorgDepth := uint64(w.config.CCCMaxWorkers + 1)
if !reorging && currentHeight > maxReorgDepth {
if !w.current.reorging && currentHeight > maxReorgDepth {
ancestorHeight := currentHeight - maxReorgDepth
ancestorHash := w.chain.GetHeaderByNumber(ancestorHeight).Hash()
if rawdb.ReadBlockRowConsumption(w.chain.Database(), ancestorHash) == nil {
Expand Down Expand Up @@ -980,7 +995,7 @@ func (w *worker) handleReorg(trigger *reorgTrigger) error {
return nil
}

newBlockHash, err := w.tryCommitNewWork(time.Now(), parentHash, reorgReason)
newBlockHash, err := w.tryCommitNewWork(time.Now(), parentHash, true, reorgReason)
if err != nil {
return err
}
Expand All @@ -989,7 +1004,7 @@ func (w *worker) handleReorg(trigger *reorgTrigger) error {
if newBlockHash == (common.Hash{}) {
// force committing the new canonical head to trigger a reorg in blockchain
// otherwise we might ignore CCC errors from the new side chain since it is not canonical yet
newBlockHash, err = w.commit(true)
newBlockHash, err = w.commit()
if err != nil {
return err
}
Expand Down
35 changes: 4 additions & 31 deletions rollup/ccc/async_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,6 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}

header := block.Header()
header.GasUsed = 0
gasPool := new(core.GasPool).AddGas(header.GasLimit)
ccc.Reset()

accRc := new(types.RowConsumption)
Expand All @@ -184,7 +182,7 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}

var curRc *types.RowConsumption
curRc, err = c.checkTxAndApply(parent, header, statedb, gasPool, tx, ccc)
curRc, err = c.checkTx(parent, header, statedb, tx, ccc)
if err != nil {
err = &ErrorWithTxnIdx{
TxIdx: uint(txIdx),
Expand All @@ -208,39 +206,14 @@ func (c *AsyncChecker) checkerTask(block *types.Block, ccc *Checker, forkCtx con
}
}

func (c *AsyncChecker) checkTxAndApply(parent *types.Block, header *types.Header, state *state.StateDB, gasPool *core.GasPool, tx *types.Transaction, ccc *Checker) (*types.RowConsumption, error) {
// don't commit the state during tracing for circuit capacity checker, otherwise we cannot revert.
// and even if we don't commit the state, the `refund` value will still be correct, as explained in `CommitTransaction`
commitStateAfterApply := false
snap := state.Snapshot()

// 1. we have to check circuit capacity before `core.ApplyTransaction`,
// because if the tx can be successfully executed but circuit capacity overflows, it will be inconvenient to revert.
// 2. even if we don't commit to the state during the tracing (which means `clearJournalAndRefund` is not called during the tracing),
// the `refund` value will still be correct, because:
// 2.1 when starting handling the first tx, `state.refund` is 0 by default,
// 2.2 after tracing, the state is either committed in `core.ApplyTransaction`, or reverted, so the `state.refund` can be cleared,
// 2.3 when starting handling the following txs, `state.refund` comes as 0
func (c *AsyncChecker) checkTx(parent *types.Block, header *types.Header, state *state.StateDB, tx *types.Transaction, ccc *Checker) (*types.RowConsumption, error) {
trace, err := tracing.NewTracerWrapper().CreateTraceEnvAndGetBlockTrace(c.bc.Config(), c.bc, c.bc.Engine(), c.bc.Database(),
state, parent.Header(), types.NewBlockWithHeader(header).WithBody([]*types.Transaction{tx}, nil), commitStateAfterApply)
// `w.current.traceEnv.State` & `w.current.state` share a same pointer to the state, so only need to revert `w.current.state`
// revert to snapshot for calling `core.ApplyMessage` again, (both `traceEnv.GetBlockTrace` & `core.ApplyTransaction` will call `core.ApplyMessage`)
state.RevertToSnapshot(snap)
state, parent.Header(), types.NewBlockWithHeader(header).WithBody([]*types.Transaction{tx}, nil), true)
if err != nil {
return nil, err
}

rc, err := ccc.ApplyTransaction(trace)
if err != nil {
return rc, err
}

_, err = core.ApplyTransaction(c.bc.Config(), c.bc, nil /* coinbase will default to chainConfig.Scroll.FeeVaultAddress */, gasPool,
state, header, tx, &header.GasUsed, *c.bc.GetVMConfig())
if err != nil {
return nil, err
}
return rc, nil
return ccc.ApplyTransaction(trace)
}

// ScheduleError forces a block to error on a given transaction index
Expand Down
Loading