Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add executor reserved ZK counters #3348

Merged
merged 9 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:
zkevm-prover:
container_name: zkevm-prover
restart: unless-stopped
image: hermeznetwork/zkevm-prover:v5.0.0-RC5
image: hermeznetwork/zkevm-prover:v5.0.0-RC6
depends_on:
zkevm-state-db:
condition: service_healthy
Expand Down
6 changes: 3 additions & 3 deletions sequencer/addrqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ func (a *addrQueue) updateCurrentNonceBalance(nonce *uint64, balance *big.Int) (
}

// UpdateTxZKCounters updates the ZKCounters for the given tx (txHash)
func (a *addrQueue) UpdateTxZKCounters(txHash common.Hash, counters state.ZKCounters) {
func (a *addrQueue) UpdateTxZKCounters(txHash common.Hash, usedZKCounters state.ZKCounters, reservedZKCounters state.ZKCounters) {
txHashStr := txHash.String()

if (a.readyTx != nil) && (a.readyTx.HashStr == txHashStr) {
log.Debugf("updating readyTx %s with new ZKCounters from addrQueue %s", txHashStr, a.fromStr)
a.readyTx.updateZKCounters(counters)
a.readyTx.updateZKCounters(usedZKCounters, reservedZKCounters)
} else {
for _, txTracker := range a.notReadyTxs {
if txTracker.HashStr == txHashStr {
log.Debugf("updating notReadyTx %s with new ZKCounters from addrQueue %s", txHashStr, a.fromStr)
txTracker.updateZKCounters(counters)
txTracker.updateZKCounters(usedZKCounters, reservedZKCounters)
break
}
}
Expand Down
40 changes: 14 additions & 26 deletions sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (f *finalizer) closeAndOpenNewWIPBatch(ctx context.Context, closeReason sta
if f.wipL2Block != nil {
f.wipBatch.imStateRoot = f.wipL2Block.imStateRoot
// Subtract the WIP L2 block used resources to batch
overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(f.wipL2Block.usedResources)
overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: f.wipL2Block.usedZKCounters, Bytes: f.wipL2Block.bytes})
if overflow {
return fmt.Errorf("failed to subtract L2 block [%d] used resources to new wip batch %d, overflow resource: %s",
f.wipL2Block.trackingNum, f.wipBatch.batchNumber, overflowResource)
Expand Down Expand Up @@ -424,19 +424,7 @@ func (f *finalizer) batchSanityCheck(ctx context.Context, batchNum uint64, initi
if err != nil {
log.Errorf("error marshaling payload, error: %v", err)
} else {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_ReprocessFullBatchOOC,
Description: string(payload),
Json: batchRequest,
}
err = f.eventLog.LogEvent(ctx, event)
if err != nil {
log.Errorf("error storing payload, error: %v", err)
}
f.AddEvent(ctx, event.Level_Critical, event.EventID_ReprocessFullBatchOOC, string(payload), batchRequest)
}

return nil, ErrProcessBatchOOC
Expand Down Expand Up @@ -469,7 +457,7 @@ func (f *finalizer) maxTxsPerBatchReached(batch *Batch) bool {

// isBatchResourcesMarginExhausted checks if one of resources of the batch has reached the exhausted margin and returns the name of the exhausted resource
func (f *finalizer) isBatchResourcesMarginExhausted(resources state.BatchResources) (bool, string) {
zkCounters := resources.UsedZKCounters
zkCounters := resources.ZKCounters
result := false
resourceName := ""
if resources.Bytes <= f.getConstraintThresholdUint64(f.batchConstraints.MaxBatchBytesSize) {
Expand Down Expand Up @@ -517,16 +505,16 @@ func (f *finalizer) getConstraintThresholdUint32(input uint32) uint32 {
// getUsedBatchResources calculates and returns the used resources of a batch from remaining resources
func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResources state.BatchResources) state.BatchResources {
return state.BatchResources{
UsedZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed - remainingResources.UsedZKCounters.GasUsed,
KeccakHashes: constraints.MaxKeccakHashes - remainingResources.UsedZKCounters.KeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes - remainingResources.UsedZKCounters.PoseidonHashes,
PoseidonPaddings: constraints.MaxPoseidonPaddings - remainingResources.UsedZKCounters.PoseidonPaddings,
MemAligns: constraints.MaxMemAligns - remainingResources.UsedZKCounters.MemAligns,
Arithmetics: constraints.MaxArithmetics - remainingResources.UsedZKCounters.Arithmetics,
Binaries: constraints.MaxBinaries - remainingResources.UsedZKCounters.Binaries,
Steps: constraints.MaxSteps - remainingResources.UsedZKCounters.Steps,
Sha256Hashes_V2: constraints.MaxSHA256Hashes - remainingResources.UsedZKCounters.Sha256Hashes_V2,
ZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed - remainingResources.ZKCounters.GasUsed,
KeccakHashes: constraints.MaxKeccakHashes - remainingResources.ZKCounters.KeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes - remainingResources.ZKCounters.PoseidonHashes,
PoseidonPaddings: constraints.MaxPoseidonPaddings - remainingResources.ZKCounters.PoseidonPaddings,
MemAligns: constraints.MaxMemAligns - remainingResources.ZKCounters.MemAligns,
Arithmetics: constraints.MaxArithmetics - remainingResources.ZKCounters.Arithmetics,
Binaries: constraints.MaxBinaries - remainingResources.ZKCounters.Binaries,
Steps: constraints.MaxSteps - remainingResources.ZKCounters.Steps,
Sha256Hashes_V2: constraints.MaxSHA256Hashes - remainingResources.ZKCounters.Sha256Hashes_V2,
},
Bytes: constraints.MaxBatchBytesSize - remainingResources.Bytes,
}
Expand All @@ -535,7 +523,7 @@ func getUsedBatchResources(constraints state.BatchConstraintsCfg, remainingResou
// getMaxRemainingResources returns the max resources that can be used in a batch
func getMaxRemainingResources(constraints state.BatchConstraintsCfg) state.BatchResources {
return state.BatchResources{
UsedZKCounters: state.ZKCounters{
ZKCounters: state.ZKCounters{
GasUsed: constraints.MaxCumulativeGasUsed,
KeccakHashes: constraints.MaxKeccakHashes,
PoseidonHashes: constraints.MaxPoseidonHashes,
Expand Down
4 changes: 2 additions & 2 deletions sequencer/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var (
ErrExecutorError = errors.New("executor error")
// ErrNoFittingTransaction happens when there is not a tx (from the txSortedList) that fits in the remaining batch resources
ErrNoFittingTransaction = errors.New("no fit transaction")
// ErrBatchResourceUnderFlow happens when there is batch resoure underflow after sustract the resources from a tx
ErrBatchResourceUnderFlow = errors.New("batch resource underflow")
// ErrBatchResourceOverFlow happens when there is a tx that overlows remaining batch resources
ErrBatchResourceOverFlow = errors.New("batch resource overflow")
// ErrTransactionsListEmpty happens when txSortedList is empty
ErrTransactionsListEmpty = errors.New("transactions list empty")
)
111 changes: 54 additions & 57 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,8 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
firstTxProcess = false
log.Infof("reprocessing tx %s because of effective gas price calculation", tx.HashStr)
continue
} else if err == ErrBatchResourceUnderFlow {
log.Infof("skipping tx %s due to a batch resource underflow", tx.HashStr)
} else if err == ErrBatchResourceOverFlow {
log.Infof("skipping tx %s due to a batch resource overflow", tx.HashStr)
break
} else {
log.Errorf("failed to process tx %s, error: %v", err)
Expand Down Expand Up @@ -374,11 +374,11 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
// Save values for later logging
tx.EGPLog.L1GasPrice = tx.L1GasPrice
tx.EGPLog.L2GasPrice = txL2GasPrice
tx.EGPLog.GasUsedFirst = tx.BatchResources.UsedZKCounters.GasUsed
tx.EGPLog.GasUsedFirst = tx.UsedZKCounters.GasUsed
tx.EGPLog.GasPrice.Set(txGasPrice)

// Calculate EffectiveGasPrice
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.BatchResources.UsedZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
egp, err := f.effectiveGasPrice.CalculateEffectiveGasPrice(tx.RawTx, txGasPrice, tx.UsedZKCounters.GasUsed, tx.L1GasPrice, txL2GasPrice)
if err != nil {
if f.effectiveGasPrice.IsEnabled() {
return nil, false, err
Expand Down Expand Up @@ -471,8 +471,9 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker, first
// Update imStateRoot
f.wipBatch.imStateRoot = batchResponse.NewStateRoot

log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(), f.logZKCounters(batchResponse.UsedZkCounters))
log.Infof("processed tx %s, batchNumber: %d, l2Block: [%d], newStateRoot: %s, oldStateRoot: %s, used counters: %s, reserved counters: %s",
tx.HashStr, batchRequest.BatchNumber, f.wipL2Block.trackingNum, batchResponse.NewStateRoot.String(), batchRequest.OldStateRoot.String(),
f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters))

return nil, closeBatch, nil
}
Expand Down Expand Up @@ -529,27 +530,24 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
}
}

// Check remaining resources

overflow, overflowResource := f.wipBatch.imRemainingResources.Sub(state.BatchResources{UsedZKCounters: result.UsedZkCounters, Bytes: uint64(len(tx.RawTx))})
if overflow {
log.Infof("current tx %s exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing", tx.HashStr, overflowResource)
if !f.batchConstraints.IsWithinConstraints(result.UsedZkCounters) {
// Check if reserved resources of the tx fits in the remaining batch resources
subOverflow := false
fits, overflowResource := f.wipBatch.imRemainingResources.Fits(state.BatchResources{ZKCounters: result.ReservedZkCounters, Bytes: uint64(len(tx.RawTx))})
if fits {
// Sustract the used resources from the batch
subOverflow, overflowResource = f.wipBatch.imRemainingResources.Sub(state.BatchResources{ZKCounters: result.ReservedZkCounters, Bytes: uint64(len(tx.RawTx))})
if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
log.Infof("current tx %s used resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. Batch counters: %s, tx used counters: %s",
tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.UsedZkCounters))
}
} else {
log.Infof("current tx %s reserved resources exceeds the remaining batch resources, overflow resource: %s, updating metadata for tx in worker and continuing. Batch counters: %s, tx reserved counters: %s",
tx.HashStr, overflowResource, f.logZKCounters(f.wipBatch.imRemainingResources.ZKCounters), f.logZKCounters(result.ReservedZkCounters))
if !f.batchConstraints.IsWithinConstraints(result.ReservedZkCounters) {
log.Warnf("current tx %s exceeds the max limit for batch resources (node OOC), setting tx as invalid in the pool", tx.HashStr)

event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Error,
EventID: event.EventID_NodeOOC,
Description: fmt.Sprintf("tx: %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP),
}

eventErr := f.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing finalizer halt event, error: %v", eventErr)
}
f.AddEvent(ctx, event.Level_Error, event.EventID_NodeOOC,
fmt.Sprintf("tx: %s exceeds node max limit batch resources (node OOC), from: %s, IP: %s", tx.HashStr, tx.FromStr, tx.IP), nil)

// Delete the transaction from the txSorted list
f.workerIntf.DeleteTx(tx.Hash, tx.From)
Expand All @@ -559,16 +557,18 @@ func (f *finalizer) handleProcessTransactionResponse(ctx context.Context, tx *Tx
if err != nil {
log.Errorf("failed to update status to invalid in the pool for tx %s, error: %v", tx.Hash.String(), err)
}

return nil, false, ErrBatchResourceUnderFlow
} else {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, false, ErrBatchResourceUnderFlow
}
}

// If reserved tx resources doesn't fits in the remaining batch resources (or we got an overflow when trying to sustract the used resources)
agnusmor marked this conversation as resolved.
Show resolved Hide resolved
// we update the ZKCounters of the tx and returns ErrBatchResourceOverFlow error
if !fits || subOverflow {
start := time.Now()
f.workerIntf.UpdateTxZKCounters(result.BlockResponses[0].TransactionResponses[0].TxHash, tx.From, result.UsedZkCounters, result.ReservedZkCounters)
metrics.WorkerProcessingTime(time.Since(start))
return nil, false, ErrBatchResourceOverFlow
}

// Save Enabled, GasPriceOC, BalanceOC and final effective gas price for later logging
tx.EGPLog.Enabled = egpEnabled
tx.EGPLog.GasPriceOC = result.BlockResponses[0].TransactionResponses[0].HasGaspriceOpcode
Expand Down Expand Up @@ -728,19 +728,8 @@ func (f *finalizer) handleProcessTransactionError(ctx context.Context, result *s
// checkIfProverRestarted checks if the proverID changed
func (f *finalizer) checkIfProverRestarted(proverID string) {
if f.proverID != "" && f.proverID != proverID {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_FinalizerRestart,
Description: fmt.Sprintf("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor", f.proverID, proverID),
}

err := f.eventLog.LogEvent(context.Background(), event)
if err != nil {
log.Errorf("error storing payload, error: %v", err)
}
f.AddEvent(context.Background(), event.Level_Critical, event.EventID_FinalizerRestart,
fmt.Sprintf("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor", f.proverID, proverID), nil)

log.Fatal("proverID changed from %s to %s, restarting sequencer to discard current WIP batch and work with new executor")
}
Expand All @@ -757,26 +746,34 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)

f.AddEvent(ctx, event.Level_Critical, event.EventID_FinalizerHalt, fmt.Sprintf("finalizer halted due to error, error: %s", err), nil)

if isFatal {
log.Fatalf("fatal error on finalizer, error: %v", err)
} else {
for {
log.Errorf("halting finalizer, error: %v", err)
time.Sleep(5 * time.Second) //nolint:gomnd
}
}
}

func (f *finalizer) AddEvent(ctx context.Context, level event.Level, eventId event.EventID, description string, json interface{}) {
event := &event.Event{
ReceivedAt: time.Now(),
Source: event.Source_Node,
Component: event.Component_Sequencer,
Level: event.Level_Critical,
EventID: event.EventID_FinalizerHalt,
Description: fmt.Sprintf("finalizer halted due to error, error: %s", err),
Level: level,
EventID: eventId,
Description: description,
}

if json != nil {
event.Json = json
}

eventErr := f.eventLog.LogEvent(ctx, event)
if eventErr != nil {
log.Errorf("error storing finalizer halt event, error: %v", eventErr)
}

if isFatal {
log.Fatalf("fatal error on finalizer, error: %v", err)
} else {
for {
log.Errorf("halting finalizer, error: %v", err)
time.Sleep(5 * time.Second) //nolint:gomnd
}
}
}
Loading
Loading