Skip to content

Commit

Permalink
fix as workaround to close batch on tx oog (#3271)
Browse files Browse the repository at this point in the history
Co-authored-by: agnusmor <agnusmor@gmail.com>
  • Loading branch information
ToniRamirezM and agnusmor authored Feb 14, 2024
1 parent 43eb91f commit ad26b03
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,16 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
continue
}

closeWIPBatch := false
metrics.WorkerProcessingTime(time.Since(start))
if tx != nil {
showNotFoundTxLog = true

firstTxProcess := true

for {
_, err := f.processTransaction(ctx, tx, firstTxProcess)
var err error
_, err, closeWIPBatch = f.processTransaction(ctx, tx, firstTxProcess)
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
firstTxProcess = false
Expand Down Expand Up @@ -326,7 +328,11 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
}

// Check if we must finalize the batch due to a closing reason (resources exhausted, max txs, timestamp resolution, forced batches deadline)
if finalize, closeReason := f.checkIfFinalizeBatch(); finalize {
finalize, closeReason := f.checkIfFinalizeBatch()
if closeWIPBatch || finalize {
if closeWIPBatch {
closeReason = "tx OOG"
}
f.finalizeWIPBatch(ctx, closeReason)
}

Expand All @@ -338,7 +344,7 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
}

// processTransaction processes a single transaction.
func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error) {
func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, firstTxProcess bool) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) {
start := time.Now()
defer func() {
metrics.ProcessingTime(time.Since(start))
Expand Down Expand Up @@ -381,7 +387,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.BatchResources.ZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to calculate effectiveGasPrice for tx %s, error: %v", tx.HashStr, err)
tx.EGPLog.Error = fmt.Sprintf("CalculateEffectiveGasPrice#1: %s", err)
Expand Down Expand Up @@ -409,7 +415,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
egpPercentage, err := f.effectiveGasPrice.CalculateEffectiveGasPricePercentage(txGasPrice, tx.EffectiveGasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to to calculate efftive gas price percentage (#1), error: %v", err)
tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPricePercentage#1: %s", tx.EGPLog.Error, err)
Expand All @@ -429,7 +435,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first

effectivePercentageAsDecodedHex, err := hex.DecodeHex(fmt.Sprintf("%x", tx.EGPPercentage))
if err != nil {
return nil, err
return nil, err, false
}

batchRequest.Transactions = append(batchRequest.Transactions, effectivePercentageAsDecodedHex...)
Expand All @@ -438,7 +444,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first

if err != nil && (errors.Is(err, runtime.ErrExecutorDBError) || errors.Is(err, runtime.ErrInvalidTxChangeL2BlockMinTimestamp)) {
log.Errorf("failed to process tx %s, error: %v", tx.HashStr, err)
return nil, err
return nil, err, false
} else if err == nil && !batchResponse.IsRomLevelError && len(batchResponse.BlockResponses) == 0 {
err = fmt.Errorf("executor returned no errors and no responses for tx %s", tx.HashStr)
f.Halt(ctx, err, false)
Expand Down Expand Up @@ -477,14 +483,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
} else {
metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1)
}
return nil, err
return nil, err, false
}

closeBatch := false
oldStateRoot := f.wipBatch.imStateRoot
if len(batchResponse.BlockResponses) > 0 {
errWg, err = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot)
errWg, err, closeBatch = f.handleProcessTransactionResponse(ctx, tx, batchResponse, oldStateRoot)
if err != nil {
return errWg, err
return errWg, err, false
}
}

Expand All @@ -494,17 +501,17 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters))

return nil, nil
return nil, nil, closeBatch
}

// handleProcessTransactionResponse handles the response of transaction processing.
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error) {
func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *TxTracker, result *state.ProcessBatchResponse, oldStateRoot common.Hash) (errWg *sync.WaitGroup, err error, closeWIPBatch bool) {
// Handle Transaction Error
errorCode := executor.RomErrorCode(result.BlockResponses[0].TransactionResponses[0].RomError)
if !state.IsStateRootChanged(errorCode) {
// If intrinsic error or OOC error, we skip adding the transaction to the batch
errWg = f.handleProcessTransactionError(ctx, result, tx)
return errWg, result.BlockResponses[0].TransactionResponses[0].RomError
return errWg, result.BlockResponses[0].TransactionResponses[0].RomError, false
}

egpEnabled := f.effectiveGasPrice.IsEnabled()
Expand All @@ -519,7 +526,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
if err != nil {
if egpEnabled {
log.Errorf("failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error())
return nil, err
return nil, err, false
} else {
log.Warnf("effectiveGasPrice is disabled, but failed to calculate effective gas price with new gasUsed for tx %s, error: %v", tx.HashStr, err.Error())
tx.EGPLog.Error = fmt.Sprintf("%s; CalculateEffectiveGasPrice#2: %s", tx.EGPLog.Error, err)
Expand All @@ -544,7 +551,7 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
}

if errCompare != nil && egpEnabled {
return nil, errCompare
return nil, errCompare, false
}
}
}
Expand Down Expand Up @@ -580,12 +587,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", tx.Hash.String(), err)
}

return nil, ErrBatchResourceUnderFlow
return nil, ErrBatchResourceUnderFlow, false
} else {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, ErrBatchResourceUnderFlow
return nil, ErrBatchResourceUnderFlow, false
}
}

Expand All @@ -606,7 +613,12 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx

f.updateWorkerAfterSuccessfulProcessing(ctx, tx.Hash, tx.From, false, result)

return nil, nil
if errors.Is(result.BlockResponses[0].TransactionResponses[0].RomError, runtime.ErrOutOfGas) {
log.Infof("tx %s is OOG", tx.HashStr)
return nil, nil, true
}

return nil, nil, false
}

// compareTxEffectiveGasPrice compares newEffectiveGasPrice with tx.EffectiveGasPrice.
Expand Down

0 comments on commit ad26b03

Please sign in to comment.