From 0b63a9cc1a7eaf46d5c5249ee7483d4f76ec3c62 Mon Sep 17 00:00:00 2001 From: agnusmor Date: Wed, 14 Feb 2024 10:13:11 +0100 Subject: [PATCH] fix as workaround to close batch on tx oog --- sequencer/finalizer.go | 48 ++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index e11e912243..0679e56543 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -283,6 +283,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { continue } + closeWIPBatch := false metrics.WorkerProcessingTime(time.Since(start)) if tx != nil { showNotFoundTxLog = true @@ -290,7 +291,8 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { firstTxProcess := true for { - _, err := f.processTransaction(ctx, tx, firstTxProcess) + var err error + _, err, closeWIPBatch = f.processTransaction(ctx, tx, firstTxProcess) if err != nil { if err == ErrEffectiveGasPriceReprocess { firstTxProcess = false @@ -325,7 +327,11 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { } // Check if we must finalize the batch due to a closing reason (resources exhausted, max txs, timestamp resolution, forced batches deadline) - if finalize, closeReason := f.checkIfFinalizeBatch(); finalize { + finalize, closeReason := f.checkIfFinalizeBatch() + if closeWIPBatch || finalize { + if closeWIPBatch { + closeReason = "tx OOG" + } f.finalizeWIPBatch(ctx, closeReason) } @@ -337,7 +343,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) { } // processTransaction processes a single transaction. -func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error) { +func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) { start := time.Now() defer func() { metrics.ProcessingTime(time.Since(start)) @@ -380,7 +386,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.BatchResources.ZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice) if err != nil { if f.effectiveGasPrice.IsEnabled() { - return nil, err + return nil, err, false } else { log.Warnf("effectiveGasPrice is disabled, but failed to calculate effectiveGasPrice for tx %s, error: %v", tx.HashStr, err) tx.EGPLog.Error = fmt.Sprintf("CalculateEffectiveGasPrice#1: %s", err) @@ -408,7 +414,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first egpPercentage, err := f.effectiveGasPrice.CalculateEffectiveGasPricePercentage(txGasPrice, tx.EffectiveGasPrice) if err != nil { if f.effectiveGasPrice.IsEnabled() { - return nil, err + return nil, err, false } else { log.Warnf("effectiveGasPrice is disabled, but failed to to calculate efftive gas price percentage (#1), error: %v", err) tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPricePercentage#1: %s", tx.EGPLog.Error, err) @@ -428,7 +434,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first effectivePercentageAsDecodedHex, err := hex.DecodeHex(fmt.Sprintf("%x", tx.EGPPercentage)) if err != nil { - return nil, err + return nil, err, false } batchRequest.Transactions = append(batchRequest.Transactions, effectivePercentageAsDecodedHex...) @@ -437,7 +443,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first if err != nil && (errors.Is(err, runtime.ErrExecutorDBError) || errors.Is(err, runtime.ErrInvalidTxChangeL2BlockMinTimestamp)) { log.Errorf("failed to process tx %s, error: %v", tx.HashStr, err) - return nil, err + return nil, err, false } else if err == nil && !batchResponse.IsRomLevelError && len(batchResponse.BlockResponses) == 0 { err = fmt.Errorf("executor returned no errors and no responses for tx %s", tx.HashStr) f.Halt(ctx, err, false) @@ -454,14 +460,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first } else { metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1) } - return nil, err + return nil, err, false } + closeBatch := false oldStateRoot := f.wipBatch.imStateRoot if len(batchResponse.BlockResponses) > 0 { - errWg, err = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot) + errWg, err, closeBatch = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot) if err != nil { - return errWg, err + return errWg, err, false } } @@ -471,17 +478,17 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s", tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters)) - return nil, nil + return nil, nil, closeBatch } // handleProcessTransactionResponse handles the response of transaction processing. -func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error) { +func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) { // Handle Transaction Error errorCode := executor.RomErrorCode(result.BlockResponses[0].TransactionResponses[0].RomError) if !state.IsStateRootChanged(errorCode) { // If intrinsic error or OOC error, we skip adding the transaction to the batch errWg = f.handleProcessTransactionError(ctx, result, tx) - return errWg, result.BlockResponses[0].TransactionResponses[0].RomError + return errWg, result.BlockResponses[0].TransactionResponses[0].RomError, false } egpEnabled := f.effectiveGasPrice.IsEnabled() @@ -496,7 +503,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx if err != nil { if egpEnabled { log.Errorf("failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error()) - return nil, err + return nil, err, false } else { log.Warnf("effectiveGasPrice is disabled, but failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error()) tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPrice#2: %s", tx.EGPLog.Error, err) @@ -521,7 +528,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx } if errCompare != nil && egpEnabled { - return nil, errCompare + return nil, errCompare, false } } } @@ -557,12 +564,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", tx.Hash.String(), err) } - return nil, ErrBatchResourceUnderFlow + return nil, ErrBatchResourceUnderFlow, false } else { start := time.Now() f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters) metrics.WorkerProcessingTime(time.Since(start)) - return nil, ErrBatchResourceUnderFlow + return nil, ErrBatchResourceUnderFlow, false } } @@ -583,7 +590,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx f.updateWorkerAfterSuccessfulProcessing(ctx, tx.Hash, tx.From, false, result) - return nil, nil + if errors.Is(result.BlockResponses[0].TransactionResponses[0].RomError, runtime.ErrOutOfGas) { + log.Infof("tx %s is OOG", tx.HashStr) + return nil, nil, true + } + + return nil, nil, false } // compareTxEffectiveGasPrice compares newEffectiveGasPrice with tx.EffectiveGasPrice.