Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close batch on OOG #3271

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,16 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
continue
}

closeWIPBatch := false
metrics.WorkerProcessingTime(time.Since(start))
if tx != nil {
showNotFoundTxLog = true

firstTxProcess := true

for {
_, err := f.processTransaction(ctx, tx, firstTxProcess)
var err error
_, err, closeWIPBatch = f.processTransaction(ctx, tx, firstTxProcess)
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
firstTxProcess = false
Expand Down Expand Up @@ -325,7 +327,11 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
}

// Check if we must finalize the batch due to a closing reason (resources exhausted, max txs, timestamp resolution, forced batches deadline)
if finalize, closeReason := f.checkIfFinalizeBatch(); finalize {
finalize, closeReason := f.checkIfFinalizeBatch()
if closeWIPBatch || finalize {
if closeWIPBatch {
closeReason = "tx OOG"
}
f.finalizeWIPBatch(ctx, closeReason)
}

Expand All @@ -337,7 +343,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
}

// processTransaction processes a single transaction.
func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error) {
func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) {
start := time.Now()
defer func() {
metrics.ProcessingTime(time.Since(start))
Expand Down Expand Up @@ -380,7 +386,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.BatchResources.ZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to calculate effectiveGasPrice for tx %s, error: %v", tx.HashStr, err)
tx.EGPLog.Error = fmt.Sprintf("CalculateEffectiveGasPrice#1: %s", err)
Expand Down Expand Up @@ -408,7 +414,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
egpPercentage, err := f.effectiveGasPrice.CalculateEffectiveGasPricePercentage(txGasPrice, tx.EffectiveGasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to to calculate efftive gas price percentage (#1), error: %v", err)
tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPricePercentage#1: %s", tx.EGPLog.Error, err)
Expand All @@ -428,7 +434,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first

effectivePercentageAsDecodedHex, err := hex.DecodeHex(fmt.Sprintf("%x", tx.EGPPercentage))
if err != nil {
return nil, err
return nil, err, false
}

batchRequest.Transactions = append(batchRequest.Transactions, effectivePercentageAsDecodedHex...)
Expand All @@ -437,7 +443,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first

if err != nil && (errors.Is(err, runtime.ErrExecutorDBError) || errors.Is(err, runtime.ErrInvalidTxChangeL2BlockMinTimestamp)) {
log.Errorf("failed to process tx %s, error: %v", tx.HashStr, err)
return nil, err
return nil, err, false
} else if err == nil && !batchResponse.IsRomLevelError && len(batchResponse.BlockResponses) == 0 {
err = fmt.Errorf("executor returned no errors and no responses for tx %s", tx.HashStr)
f.Halt(ctx, err, false)
Expand All @@ -454,14 +460,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
} else {
metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1)
}
return nil, err
return nil, err, false
}

closeBatch := false
oldStateRoot := f.wipBatch.imStateRoot
if len(batchResponse.BlockResponses) > 0 {
errWg, err = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot)
errWg, err, closeBatch = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot)
if err != nil {
return errWg, err
return errWg, err, false
}
}

Expand All @@ -471,17 +478,17 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters))

return nil, nil
return nil, nil, closeBatch
}

// handleProcessTransactionResponse handles the response of transaction processing.
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error) {
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) {
// Handle Transaction Error
errorCode := executor.RomErrorCode(result.BlockResponses[0].TransactionResponses[0].RomError)
if !state.IsStateRootChanged(errorCode) {
// If intrinsic error or OOC error, we skip adding the transaction to the batch
errWg = f.handleProcessTransactionError(ctx, result, tx)
return errWg, result.BlockResponses[0].TransactionResponses[0].RomError
return errWg, result.BlockResponses[0].TransactionResponses[0].RomError, false
}

egpEnabled := f.effectiveGasPrice.IsEnabled()
Expand All @@ -496,7 +503,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
if err != nil {
if egpEnabled {
log.Errorf("failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error())
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error())
tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPrice#2: %s", tx.EGPLog.Error, err)
Expand All @@ -521,7 +528,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
}

if errCompare != nil && egpEnabled {
return nil, errCompare
return nil, errCompare, false
}
}
}
Expand Down Expand Up @@ -557,12 +564,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", tx.Hash.String(), err)
}

return nil, ErrBatchResourceUnderFlow
return nil, ErrBatchResourceUnderFlow, false
} else {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, ErrBatchResourceUnderFlow
return nil, ErrBatchResourceUnderFlow, false
}
}

Expand All @@ -583,7 +590,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx

f.updateWorkerAfterSuccessfulProcessing(ctx, tx.Hash, tx.From, false, result)

return nil, nil
if errors.Is(result.BlockResponses[0].TransactionResponses[0].RomError, runtime.ErrOutOfGas) {
log.Infof("tx %s is OOG", tx.HashStr)
return nil, nil, true
}

return nil, nil, false
}

// compareTxEffectiveGasPrice compares newEffectiveGasPrice with tx.EffectiveGasPrice.
Expand Down
Loading