Skip to content

Commit

Permalink
Add executor reserved ZK counters (#3348)
Browse files Browse the repository at this point in the history
* add executor reserved ZK counters

* update prover image to v5.0.0-RC6

* fix typos

* add check reserved counters when executing L2 block

* remove closeBatch

* update prover image v5.0.0-RC7

* fixes and logs improvements

* update prover image v5.0.0-RC8
  • Loading branch information
agnusmor committed Feb 23, 2024
1 parent f8690e6 commit f91406f
Show file tree
Hide file tree
Showing 18 changed files with 320 additions and 257 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
zkevm-prover:
container_name: zkevm-prover
restart: unless-stopped
image: hermeznetwork/zkevm-prover:v5.0.0-RC5
image: hermeznetwork/zkevm-prover:v5.0.0-RC8
depends_on:
zkevm-state-db:
condition: service_healthy
Expand Down
3 changes: 2 additions & 1 deletion event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ const (
EventID_SynchronizerHalt EventID = "SYNCHRONIZER HALT"
// EventID_SequenceSenderHalt is triggered when the SequenceSender halts
EventID_SequenceSenderHalt EventID = "SEQUENCESENDER HALT"

// EventID_NodeOOC is triggered when an OOC at node level is detected
EventID_NodeOOC EventID = "NODE OOC"
// EventID_ReservedZKCountersOverflow is triggered when reserved ZK counters exceeds remaining batch ZK counters
EventID_ReservedZKCountersOverflow EventID = "RESERVED ZKCOUNTERS OVERFLOW"
// Source_Node is the source of the event
Source_Node Source = "node"

Expand Down
6 changes: 3 additions & 3 deletions sequencer/addrqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ func (a *addrQueue) updateCurrentNonceBalance(nonce *uint64, balance *big.Int) (
}

// UpdateTxZKCounters updates the ZKCounters for the given tx (txHash)
func (a *addrQueue) UpdateTxZKCounters(txHash common.Hash, counters state.ZKCounters) {
func (a *addrQueue) UpdateTxZKCounters(txHash common.Hash, usedZKCounters state.ZKCounters, reservedZKCounters state.ZKCounters) {
txHashStr := txHash.String()

if (a.readyTx != nil) && (a.readyTx.HashStr == txHashStr) {
log.Debugf("updating readyTx %s with new ZKCounters from addrQueue %s", txHashStr, a.fromStr)
a.readyTx.updateZKCounters(counters)
a.readyTx.updateZKCounters(usedZKCounters, reservedZKCounters)
} else {
for _, txTracker := range a.notReadyTxs {
if txTracker.HashStr == txHashStr {
log.Debugf("updating notReadyTx %s with new ZKCounters from addrQueue %s", txHashStr, a.fromStr)
txTracker.updateZKCounters(counters)
txTracker.updateZKCounters(usedZKCounters, reservedZKCounters)
break
}
}
Expand Down
40 changes: 14 additions & 26 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason sta
if f.wipL2Block != nil {
f.wipBatch.imStateRoot = f.wipL2Block.imStateRoot
// Subtract the WIP L2 block used resources to batch
overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(f.wipL2Block.usedResources)
overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: f.wipL2Block.usedZKCounters, Bytes: f.wipL2Block.bytes})
if overflow {
return fmt.Errorf("failed to subtract L2 block [%d] used resources to new wip batch %d, overflow resource: %s",
f.wipL2Block.trackingNum, f.wipBatch.batchNumber, overflowResource)
Expand Down Expand Up @@ -419,19 +419,7 @@ func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initi
if err != nil {
log.Errorf("error marshaling payload, error: %v", err)
} else {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_ReprocessFullBatchOOC,
Description: string(payload),
Json: batchRequest,
}
err = f.eventLog.LogEvent(ctx, event)
if err != nil {
log.Errorf("error storing payload, error: %v", err)
}
f.LogEvent(ctx, event.Level_Critical, event.EventID_ReprocessFullBatchOOC, string(payload), batchRequest)
}

return nil, ErrProcessBatchOOC
Expand Down Expand Up @@ -464,7 +452,7 @@ func (f *finalizer) maxTxsPerBatchReached(batch *Batch) bool {

// isBatchResourcesMarginExhausted checks if one of resources of the batch has reached the exhausted margin and returns the name of the exhausted resource
func (f *finalizer) isBatchResourcesMarginExhausted(resources state.BatchResources) (bool, string) {
zkCounters := resources.UsedZKCounters
zkCounters := resources.ZKCounters
result := false
resourceName := ""
if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) {
Expand Down Expand Up @@ -512,16 +500,16 @@ func (f *finalizer) getConstraintThresholdUint32(input uint32) uint32 {
// getUsedBatchResources calculates and returns the used resources of a batch from remaining resources
func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResources state.BatchResources) state.BatchResources {
return state.BatchResources{
UsedZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed - remainingResources.UsedZKCounters.GasUsed,
KeccakHashes: constraints.MaxKeccakHashes - remainingResources.UsedZKCounters.KeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes - remainingResources.UsedZKCounters.PoseidonHashes,
PoseidonPaddings: constraints.MaxPoseidonPaddings - remainingResources.UsedZKCounters.PoseidonPaddings,
MemAligns: constraints.MaxMemAligns - remainingResources.UsedZKCounters.MemAligns,
Arithmetics: constraints.MaxArithmetics - remainingResources.UsedZKCounters.Arithmetics,
Binaries: constraints.MaxBinaries - remainingResources.UsedZKCounters.Binaries,
Steps: constraints.MaxSteps - remainingResources.UsedZKCounters.Steps,
Sha256Hashes_V2: constraints.MaxSHA256Hashes - remainingResources.UsedZKCounters.Sha256Hashes_V2,
ZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed - remainingResources.ZKCounters.GasUsed,
KeccakHashes: constraints.MaxKeccakHashes - remainingResources.ZKCounters.KeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes - remainingResources.ZKCounters.PoseidonHashes,
PoseidonPaddings: constraints.MaxPoseidonPaddings - remainingResources.ZKCounters.PoseidonPaddings,
MemAligns: constraints.MaxMemAligns - remainingResources.ZKCounters.MemAligns,
Arithmetics: constraints.MaxArithmetics - remainingResources.ZKCounters.Arithmetics,
Binaries: constraints.MaxBinaries - remainingResources.ZKCounters.Binaries,
Steps: constraints.MaxSteps - remainingResources.ZKCounters.Steps,
Sha256Hashes_V2: constraints.MaxSHA256Hashes - remainingResources.ZKCounters.Sha256Hashes_V2,
},
Bytes: constraints.MaxBatchBytesSize - remainingResources.Bytes,
}
Expand All @@ -530,7 +518,7 @@ func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResou
// getMaxRemainingResources returns the max resources that can be used in a batch
func getMaxRemainingResources(constraints state.BatchConstraintsCfg) state.BatchResources {
return state.BatchResources{
UsedZKCounters: state.ZKCounters{
ZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed,
KeccakHashes: constraints.MaxKeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes,
Expand Down
4 changes: 2 additions & 2 deletions sequencer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var (
ErrExecutorError = errors.New("executor error")
// ErrNoFittingTransaction happens when there is not a tx (from the txSortedList) that fits in the remaining batch resources
ErrNoFittingTransaction = errors.New("no fit transaction")
// ErrBatchResourceUnderFlow happens when there is batch resoure underflow after sustract the resources from a tx
ErrBatchResourceUnderFlow = errors.New("batch resource underflow")
// ErrBatchResourceOverFlow happens when there is a tx that overlows remaining batch resources
ErrBatchResourceOverFlow = errors.New("batch resource overflow")
// ErrTransactionsListEmpty happens when txSortedList is empty
ErrTransactionsListEmpty = errors.New("transactions list empty")
)
118 changes: 59 additions & 59 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
firstTxProcess = false
log.Infof("reprocessing tx %s because of effective gas price calculation", tx.HashStr)
continue
} else if err == ErrBatchResourceUnderFlow {
log.Infof("skipping tx %s due to a batch resource underflow", tx.HashStr)
} else if err == ErrBatchResourceOverFlow {
log.Infof("skipping tx %s due to a batch resource overflow", tx.HashStr)
break
} else {
log.Errorf("failed to process tx %s, error: %v", err)
Expand Down Expand Up @@ -373,11 +373,11 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
// Save values for later logging
tx.EGPLog.L1GasPrice = tx.L1GasPrice
tx.EGPLog.L2GasPrice = txL2GasPrice
tx.EGPLog.GasUsedFirst = tx.BatchResources.UsedZKCounters.GasUsed
tx.EGPLog.GasUsedFirst = tx.UsedZKCounters.GasUsed
tx.EGPLog.GasPrice.Set(txGasPrice)

// Calculate EffectiveGasPrice
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.BatchResources.UsedZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.UsedZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, err
Expand Down Expand Up @@ -469,8 +469,9 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
// Update imStateRoot
f.wipBatch.imStateRoot = batchResponse.NewStateRoot

log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters))
log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s, reserved counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(),
f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters))

return nil, nil
}
Expand Down Expand Up @@ -527,27 +528,24 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
}
}

// Check remaining resources

overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{UsedZKCounters: result.UsedZkCounters, Bytes: uint64(len(tx.RawTx))})
if overflow {
log.Infof("current tx %s exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing", tx.HashStr, overflowResource)
if !f.batchConstraints.IsWithinConstraints(result.UsedZkCounters) {
log.Warnf("current tx %s exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool", tx.HashStr)

event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Error,
EventID: event.EventID_NodeOOC,
Description: fmt.Sprintf("tx: %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP),
}
// Check if reserved resources of the tx fits in the remaining batch resources
subOverflow := false
fits, overflowResource := f.wipBatch.imRemainingResources.Fits(state.BatchResources{ZKCounters: result.ReservedZkCounters, Bytes: uint64(len(tx.RawTx))})
if fits {
// Sustract the used resources from the batch
subOverflow, overflowResource = f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: result.UsedZkCounters, Bytes: uint64(len(tx.RawTx))})
if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
log.Infof("current tx %s used resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. Batch counters: %s, tx used counters: %s",
tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.UsedZkCounters))
}
} else {
log.Infof("current tx %s reserved resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. Batch counters: %s, tx reserved counters: %s",
tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.ReservedZkCounters))
if !f.batchConstraints.IsWithinConstraints(result.ReservedZkCounters) {
log.Warnf("current tx %s reserved resources exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool", tx.HashStr)

eventErr := f.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing finalizer halt event, error: %v", eventErr)
}
f.LogEvent(ctx, event.Level_Error, event.EventID_NodeOOC,
fmt.Sprintf("tx: %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP), nil)

// Delete the transaction from the txSorted list
f.workerIntf.DeleteTx(tx.Hash, tx.From)
Expand All @@ -557,16 +555,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
if err != nil {
log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", tx.Hash.String(), err)
}

return nil, ErrBatchResourceUnderFlow
} else {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, ErrBatchResourceUnderFlow
}
}

// If reserved tx resources don't fit in the remaining batch resources (or we got an overflow when trying to subtract the used resources)
// we update the ZKCounters of the tx and returns ErrBatchResourceOverFlow error
if !fits || subOverflow {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters, result.ReservedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, ErrBatchResourceOverFlow
}

// Save Enabled, GasPriceOC, BalanceOC and final effective gas price for later logging
tx.EGPLog.Enabled = egpEnabled
tx.EGPLog.GasPriceOC = result.BlockResponses[0].TransactionResponses[0].HasGaspriceOpcode
Expand All @@ -579,7 +579,9 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
tx.EGPLog.GasPrice, tx.EGPLog.L1GasPrice, tx.EGPLog.L2GasPrice, tx.EGPLog.Reprocess, tx.EGPLog.GasPriceOC, tx.EGPLog.BalanceOC, egpEnabled, len(tx.RawTx), tx.HashStr, tx.EGPLog.Error)

f.wipL2Block.addTx(tx)

f.wipBatch.countOfTxs++

f.updateWorkerAfterSuccessfulProcessing(ctx, tx.Hash, tx.From, false, result)

return nil, nil
Expand Down Expand Up @@ -726,19 +728,8 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s
// checkIfProverRestarted checks if the proverID changed
func (f *finalizer) checkIfProverRestarted(proverID string) {
if f.proverID != "" && f.proverID != proverID {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_FinalizerRestart,
Description: fmt.Sprintf("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor", f.proverID, proverID),
}

err := f.eventLog.LogEvent(context.Background(), event)
if err != nil {
log.Errorf("error storing payload, error: %v", err)
}
f.LogEvent(context.Background(), event.Level_Critical, event.EventID_FinalizerRestart,
fmt.Sprintf("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor", f.proverID, proverID), nil)

log.Fatal("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor")
}
Expand All @@ -755,19 +746,7 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)

event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_FinalizerHalt,
Description: fmt.Sprintf("finalizer halted due to error, error: %s", err),
}

eventErr := f.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing finalizer halt event, error: %v", eventErr)
}
f.LogEvent(ctx, event.Level_Critical, event.EventID_FinalizerHalt, fmt.Sprintf("finalizer halted due to error, error: %s", err), nil)

if isFatal {
log.Fatalf("fatal error on finalizer, error: %v", err)
Expand All @@ -778,3 +757,24 @@ func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
}
}
}

// LogEvent adds an event for runtime debugging
func (f *finalizer) LogEvent(ctx context.Context, level event.Level, eventId event.EventID, description string, json interface{}) {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: level,
EventID: eventId,
Description: description,
}

if json != nil {
event.Json = json
}

eventErr := f.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing log event, error: %v", eventErr)
}
}
Loading

0 comments on commit f91406f

Please sign in to comment.