Skip to content

Commit

Permalink
Remove sync with virtual state (synchronizer). Add L1 block confirmat… (
Browse files Browse the repository at this point in the history
#3659)

* Remove sync with virtual state (synchronizer). Add L1 block confirmations to consider sequence final

* fix get monitored tx receipt

* update doc
  • Loading branch information
agnusmor authored May 30, 2024
1 parent 5c17169 commit 3b0005d
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 38 deletions.
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@ func Test_Defaults(t *testing.T) {
path: "SequenceSender.GasOffset",
expectedValue: uint64(80000),
},
{
path: "SequenceSender.SequenceL1BlockConfirmations",
expectedValue: uint64(4),
},
{
path: "Etherman.URL",
expectedValue: "http://localhost:8545",
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
L1BlockTimestampMargin = "30s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 4
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}
GasOffset = 80000
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ WaitPeriodSendSequence = "5s"
LastBatchVirtualizationTimeMaxWaitPeriod = "5s"
L1BlockTimestampMargin = "30s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 4
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}

Expand Down
2 changes: 1 addition & 1 deletion docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions docs/config-file/node-config-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -2603,6 +2603,7 @@ UpgradeEtrogBatchNumber=0
| - [PrivateKey](#SequenceSender_PrivateKey ) | No | object | No | - | PrivateKey defines all the key store files that are going<br />to be read in order to provide the private keys to sign the L1 txs |
| - [ForkUpgradeBatchNumber](#SequenceSender_ForkUpgradeBatchNumber ) | No | integer | No | - | Batch number where there is a forkid change (fork upgrade) |
| - [GasOffset](#SequenceSender_GasOffset ) | No | integer | No | - | GasOffset is the amount of gas to be added to the gas estimation in order<br />to provide an amount that is higher than the estimated one. This is used<br />to avoid the TX getting reverted in case something has changed in the network<br />state after the estimation which can cause the TX to require more gas to be<br />executed.<br /><br />ex:<br />gas estimation: 1000<br />gas offset: 100<br />final gas: 1100 |
| - [SequenceL1BlockConfirmations](#SequenceSender_SequenceL1BlockConfirmations ) | No | integer | No | - | SequenceL1BlockConfirmations is number of blocks to consider a sequence sent to L1 as final |

### <a name="SequenceSender_WaitPeriodSendSequence"></a>11.1. `SequenceSender.WaitPeriodSendSequence`

Expand Down Expand Up @@ -2797,6 +2798,20 @@ final gas: 1100
GasOffset=80000
```

### <a name="SequenceSender_SequenceL1BlockConfirmations"></a>11.10. `SequenceSender.SequenceL1BlockConfirmations`

**Type:** : `integer`

**Default:** `4`

**Description:** SequenceL1BlockConfirmations is number of blocks to consider a sequence sent to L1 as final

**Example setting the default value** (4):
```
[SequenceSender]
SequenceL1BlockConfirmations=4
```

## <a name="Aggregator"></a>12. `[Aggregator]`

**Type:** : `object`
Expand Down
5 changes: 5 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,11 @@
"type": "integer",
"description": "GasOffset is the amount of gas to be added to the gas estimation in order\nto provide an amount that is higher than the estimated one. This is used\nto avoid the TX getting reverted in case something has changed in the network\nstate after the estimation which can cause the TX to require more gas to be\nexecuted.\n\nex:\ngas estimation: 1000\ngas offset: 100\nfinal gas: 1100",
"default": 80000
},
"SequenceL1BlockConfirmations": {
"type": "integer",
"description": "SequenceL1BlockConfirmations is number of blocks to consider a sequence sent to L1 as final",
"default": 4
}
},
"additionalProperties": false,
Expand Down
2 changes: 2 additions & 0 deletions sequencesender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,6 @@ type Config struct {
// gas offset: 100
// final gas: 1100
GasOffset uint64 `mapstructure:"GasOffset"`
// SequenceL1BlockConfirmations is number of blocks to consider a sequence sent to L1 as final
SequenceL1BlockConfirmations uint64 `mapstructure:"SequenceL1BlockConfirmations"`
}
109 changes: 78 additions & 31 deletions sequencesender/sequencesender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/event"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v4"
)
Expand All @@ -21,6 +22,7 @@ const (
monitoredIDFormat = "sequence-from-%v-to-%v"
retriesSanityCheck = 8
waitRetrySanityCheck = 15 * time.Second
waitRetryGetL1Block = 2 * time.Second
)

var (
Expand All @@ -36,11 +38,13 @@ var (

// SequenceSender represents a sequence sender
type SequenceSender struct {
cfg Config
state stateInterface
ethTxManager ethTxManager
etherman etherman
eventLog *event.EventLog
cfg Config
state stateInterface
ethTxManager ethTxManager
etherman etherman
eventLog *event.EventLog
lastSequenceInitialBatch uint64
lastSequenceEndBatch uint64
}

// New inits sequence sender
Expand All @@ -63,7 +67,7 @@ func (s *SequenceSender) Start(ctx context.Context) {

// marginTimeElapsed checks if the time between currentTime and l2BlockTimestamp is greater than timeMargin.
// If it's greater returns true, otherwise it returns false and the waitTime needed to achieve this timeMargin
func (s *SequenceSender) marginTimeElapsed(ctx context.Context, l2BlockTimestamp uint64, currentTime uint64, timeMargin int64) (bool, int64) {
func (s *SequenceSender) marginTimeElapsed(l2BlockTimestamp uint64, currentTime uint64, timeMargin int64) (bool, int64) {
// Check the time difference between L2 block and currentTime
var timeDiff int64
if l2BlockTimestamp >= currentTime {
Expand Down Expand Up @@ -91,7 +95,55 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
retry := false
// process monitored sequences before starting a next cycle
s.ethTxManager.ProcessPendingMonitoredTxs(ctx, ethTxManagerOwner, func(result ethtxmanager.MonitoredTxResult, dbTx pgx.Tx) {
if result.Status == ethtxmanager.MonitoredTxStatusFailed {
if result.Status == ethtxmanager.MonitoredTxStatusConfirmed {
if len(result.Txs) > 0 {
var txL1BlockNumber uint64
var txHash common.Hash
receiptFound := false
for _, tx := range result.Txs {
if tx.Receipt != nil {
txL1BlockNumber = tx.Receipt.BlockNumber.Uint64()
txHash = tx.Tx.Hash()
receiptFound = true
break
}
}

if !receiptFound {
s.halt(ctx, fmt.Errorf("monitored tx %s for sequence [%d-%d] is confirmed but doesn't have a receipt", result.ID, s.lastSequenceInitialBatch, s.lastSequenceEndBatch))
}

// wait L1 confirmation blocks
log.Infof("waiting %d L1 block confirmations for sequence [%d-%d], L1 block: %d, tx: %s",
s.cfg.SequenceL1BlockConfirmations, s.lastSequenceInitialBatch, s.lastSequenceEndBatch, txL1BlockNumber, txHash)
for {
lastL1BlockHeader, err := s.etherman.GetLatestBlockHeader(ctx)
if err != nil {
log.Errorf("failed to get last L1 block number, err: %v", err)
} else {
lastL1BlockNumber := lastL1BlockHeader.Number.Uint64()

if lastL1BlockNumber >= txL1BlockNumber+s.cfg.SequenceL1BlockConfirmations {
log.Infof("continuing, last L1 block: %d", lastL1BlockNumber)
break
}
}
time.Sleep(waitRetryGetL1Block)
}

lastSCBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Warnf("failed to get from the SC last sequenced batch number, err: %v", err)
return
}

if lastSCBatchNum != s.lastSequenceEndBatch {
s.halt(ctx, fmt.Errorf("last sequenced batch from SC %d doesn't match last sequenced batch sent %d", lastSCBatchNum, s.lastSequenceEndBatch))
}
} else {
s.halt(ctx, fmt.Errorf("monitored tx %s for sequence [%d-%d] doesn't have transactions to be checked", result.ID, s.lastSequenceInitialBatch, s.lastSequenceEndBatch))
}
} else { // Monitored tx is failed
retry = true
mTxResultLogger := ethtxmanager.CreateMonitoredTxResultLogger(ethTxManagerOwner, result)
mTxResultLogger.Error("failed to send sequence, TODO: review this fatal and define what to do in this case")
Expand All @@ -102,13 +154,12 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

// Check if synchronizer is up to date
synced, err := s.isSynced(ctx, retriesSanityCheck, waitRetrySanityCheck)
sanityCheckOk, err := s.sanityCheck(ctx, retriesSanityCheck, waitRetrySanityCheck)
if err != nil {
s.halt(ctx, err)
}
if !synced {
log.Info("wait virtual state to be synced...")
if !sanityCheckOk {
log.Info("sanity check failed, retrying...")
time.Sleep(5 * time.Second) // nolint:gomnd
return
}
Expand All @@ -126,7 +177,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
lastVirtualBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
log.Errorf("failed to get last virtual batch num, err: %v", err)
return
Expand All @@ -153,7 +204,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
return
}

elapsed, waitTime := s.marginTimeElapsed(ctx, lastL2BlockTimestamp, lastL1BlockHeader.Time, timeMargin)
elapsed, waitTime := s.marginTimeElapsed(lastL2BlockTimestamp, lastL1BlockHeader.Time, timeMargin)

if !elapsed {
log.Infof("waiting at least %d seconds to send sequences, time difference between last L1 block %d (ts: %d) and last L2 block %d (ts: %d) in the sequence is lower than %d seconds",
Expand All @@ -170,7 +221,7 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
for {
currentTime := uint64(time.Now().Unix())

elapsed, waitTime := s.marginTimeElapsed(ctx, lastL2BlockTimestamp, currentTime, timeMargin)
elapsed, waitTime := s.marginTimeElapsed(lastL2BlockTimestamp, currentTime, timeMargin)

// Wait if the time difference is less than timeMargin (L1BlockTimestampMargin)
if !elapsed {
Expand Down Expand Up @@ -200,13 +251,16 @@ func (s *SequenceSender) tryToSendSequence(ctx context.Context) {
mTxLogger.Errorf("error to add sequences tx to eth tx manager: ", err)
return
}

s.lastSequenceInitialBatch = sequences[0].BatchNumber
s.lastSequenceEndBatch = lastSequence.BatchNumber
}

// getSequencesToSend generates an array of sequences to be send to L1.
// If the array is empty, it doesn't necessarily mean that there are no sequences to be sent,
// it could be that it's not worth it to do so yet.
func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequence, error) {
lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
lastVirtualBatchNum, err := s.etherman.GetLatestBatchNumber()
if err != nil {
return nil, fmt.Errorf("failed to get last virtual batch num, err: %v", err)
}
Expand Down Expand Up @@ -297,7 +351,7 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequen
}
if err != nil {
log.Infof("Handling estimage gas send sequence error: %v", err)
sequences, err = s.handleEstimateGasSendSequenceErr(ctx, sequences, currentBatchNumToSequence, err)
sequences, err = s.handleEstimateGasSendSequenceErr(sequences, currentBatchNumToSequence, err)
if sequences != nil {
if len(sequences) > 0 {
// Handling the error gracefully, re-processing the sequence as a sanity check
Expand Down Expand Up @@ -347,12 +401,7 @@ func (s *SequenceSender) getSequencesToSend(ctx context.Context) ([]types.Sequen
// nil, error: impossible to handle gracefully
// sequence, nil: handled gracefully. Potentially manipulating the sequences
// nil, nil: a situation that requires waiting
func (s *SequenceSender) handleEstimateGasSendSequenceErr(
ctx context.Context,
sequences []types.Sequence,
currentBatchNumToSequence uint64,
err error,
) ([]types.Sequence, error) {
func (s *SequenceSender) handleEstimateGasSendSequenceErr(sequences []types.Sequence, currentBatchNumToSequence uint64, err error) ([]types.Sequence, error) {
// Insufficient allowance
if errors.Is(err, ethman.ErrInsufficientAllowance) {
return nil, err
Expand Down Expand Up @@ -415,7 +464,7 @@ func isDataForEthTxTooBig(err error) bool {
errors.Is(err, ethman.ErrContentLengthTooLarge)
}

func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry time.Duration) (bool, error) {
func (s *SequenceSender) sanityCheck(ctx context.Context, retries int, waitRetry time.Duration) (bool, error) {
lastVirtualBatchNum, err := s.state.GetLastVirtualBatchNum(ctx, nil)
if err != nil && err != state.ErrNotFound {
log.Warnf("failed to get last virtual batch number, err: %v", err)
Expand All @@ -434,10 +483,8 @@ func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry ti
return false, nil
}

if lastVirtualBatchNum < lastSCBatchNum {
log.Infof("waiting for the state to be synced, last virtual batch: %d, last SC sequenced batch: %d", lastVirtualBatchNum, lastSCBatchNum)
return false, nil
} else if lastVirtualBatchNum > lastSCBatchNum { // Sanity check: virtual batch number cannot be greater than last batch sequenced in the SC
// Sanity check: virtual batch number cannot be greater than last batch sequenced in the SC
if lastVirtualBatchNum > lastSCBatchNum {
// we will retry some times to check that really the last sequenced batch in the SC is lower that the las virtual batch
log.Warnf("last virtual batch %d is greater than last SC sequenced batch %d, retrying...", lastVirtualBatchNum, lastSCBatchNum)
for i := 0; i < retries; i++ {
Expand All @@ -457,13 +504,13 @@ func (s *SequenceSender) isSynced(ctx context.Context, retries int, waitRetry ti
log.Infof("last virtual batch %d is equal to last SC sequenced batch %d, continuing...", lastVirtualBatchNum, lastSCBatchNum)
}

// At this point lastVirtualBatchNum = lastEthBatchNum. Check trusted batches
if lastTrustedBatchClosed.BatchNumber >= lastVirtualBatchNum {
return true, nil
} else { // Sanity check: virtual batch number cannot be greater than last trusted batch closed
// Sanity check: virtual batch number cannot be greater than last trusted batch closed
if lastTrustedBatchClosed.BatchNumber < lastVirtualBatchNum {
log.Errorf("last virtual batch %d is greater than last trusted batch closed %d", lastVirtualBatchNum, lastTrustedBatchClosed.BatchNumber)
return false, ErrSyncVirtualGreaterTrusted
}

return true, nil
}

// halt halts the SequenceSender
Expand Down
12 changes: 6 additions & 6 deletions sequencesender/sequencesender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ func TestIsSynced(t *testing.T) {

testCases := []IsSyncedTestCase{
{
name: "is synced",
name: "sanity check ok",
lastVirtualBatchNum: 10,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10},
expectedResult: true,
err: nil,
},
{
name: "not synced",
name: "sanity check ok",
lastVirtualBatchNum: 9,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10},
expectedResult: false,
expectedResult: true,
err: nil,
},
{
Expand All @@ -67,15 +67,15 @@ func TestIsSynced(t *testing.T) {
err: ErrSyncVirtualGreaterSequenced,
},
{
name: "is synced, sc sequenced retries",
name: "sanity check ok: sc sequenced retries",
lastVirtualBatchNum: 11,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10, 10, 11},
expectedResult: true,
err: nil,
},
{
name: "is synced, sc sequenced retries (last)",
name: "sanity check ok: sc sequenced retries (last)",
lastVirtualBatchNum: 11,
lastTrustedBatchClosed: 12,
lastSCBatchNum: []uint64{10, 10, 10, 11},
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestIsSynced(t *testing.T) {
}
}

synced, err := ssender.isSynced(context.Background(), retries, waitRetry)
synced, err := ssender.sanityCheck(context.Background(), retries, waitRetry)

assert.EqualValues(t, tc.expectedResult, synced)
assert.EqualValues(t, tc.err, err)
Expand Down
1 change: 1 addition & 0 deletions test/config/debug.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ WaitPeriodSendSequence = "15s"
LastBatchVirtualizationTimeMaxWaitPeriod = "10s"
L1BlockTimestampMargin = "5s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 2
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "./test/sequencer.keystore", Password = "testonly"}

Expand Down
1 change: 1 addition & 0 deletions test/config/test.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ WaitPeriodSendSequence = "15s"
LastBatchVirtualizationTimeMaxWaitPeriod = "10s"
L1BlockTimestampMargin = "5s"
MaxTxSizeForL1 = 131072
SequenceL1BlockConfirmations = 4
L2Coinbase = "0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"
PrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}
[SequenceSender.StreamClient]
Expand Down

0 comments on commit 3b0005d

Please sign in to comment.