diff --git a/command/bridge/exit/exit.go b/command/bridge/exit/exit.go index dc74eef0d9..047d415a40 100644 --- a/command/bridge/exit/exit.go +++ b/command/bridge/exit/exit.go @@ -161,7 +161,7 @@ func run(cmd *cobra.Command, _ []string) { } outputter.SetCommandResult(&exitResult{ - ID: strconv.FormatUint(exitEvent.ID, 10), + ID: strconv.FormatUint(exitEvent.ID.Uint64(), 10), Sender: exitEvent.Sender.String(), Receiver: exitEvent.Receiver.String(), }) @@ -186,7 +186,7 @@ func createExitTxn(sender ethgo.Address, proof types.Proof) (*ethgo.Transaction, var exitEventAPI contractsapi.L2StateSyncedEvent - exitEventEncoded, err := exitEventAPI.Encode(exitEvent) + exitEventEncoded, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent) if err != nil { return nil, nil, fmt.Errorf("failed to encode exit event: %w", err) } diff --git a/consensus/polybft/checkpoint_manager.go b/consensus/polybft/checkpoint_manager.go index 3544e6d4ee..bf69f93d05 100644 --- a/consensus/polybft/checkpoint_manager.go +++ b/consensus/polybft/checkpoint_manager.go @@ -68,6 +68,8 @@ type checkpointManager struct { logger hclog.Logger // state boltDb instance state *State + // eventGetter gets exit events (missed or current) from blocks + eventGetter *eventsGetter[*ExitEvent] } // newCheckpointManager creates a new instance of checkpointManager @@ -75,6 +77,14 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, checkpointManagerSC types.Address, txRelayer txrelayer.TxRelayer, blockchain blockchainBackend, backend polybftBackend, logger hclog.Logger, state *State) *checkpointManager { + retry := &eventsGetter[*ExitEvent]{ + blockchain: blockchain, + isValidLogFn: func(l *types.Log) bool { + return l.Address == contracts.L2StateSenderContract + }, + parseEventFn: parseExitEvent, + } + return &checkpointManager{ key: key, blockchain: blockchain, @@ -84,6 +94,7 @@ func newCheckpointManager(key ethgo.Key, checkpointOffset uint64, checkpointManagerAddr: checkpointManagerSC, logger: logger, state: state, + eventGetter: retry, } } @@ -273,30 +284,28 @@ func (c *checkpointManager) isCheckpointBlock(blockNumber uint64, isEpochEndingB // PostBlock is called on every insert of finalized block (either from consensus or syncer) // It will read any exit event that happened in block and insert it to state boltDb func (c *checkpointManager) PostBlock(req *PostBlockRequest) error { - var ( - epoch = req.Epoch - block = req.FullBlock.Block.Number() - ) + block := req.FullBlock.Block.Number() - if req.IsEpochEndingBlock { - // exit events that happened in epoch ending blocks, - // should be added to the tree of the next epoch - epoch++ - block++ + lastBlock, err := c.state.CheckpointStore.getLastSaved() + if err != nil { + return fmt.Errorf("could not get last processed block for exit events. Error: %w", err) } - // commit exit events only when we finalize a block - events, err := getExitEventsFromReceipts(epoch, block, req.FullBlock.Receipts) + exitEvents, err := c.eventGetter.getFromBlocks(lastBlock, req.FullBlock) if err != nil { return err } - if len(events) > 0 { - c.logger.Debug("Gotten exit events from logs on block", - "eventsNum", len(events), "block", req.FullBlock.Block.Number()) + sort.Slice(exitEvents, func(i, j int) bool { + // keep events in sequential order + return exitEvents[i].ID.Cmp(exitEvents[j].ID) < 0 + }) + + if err := c.state.CheckpointStore.insertExitEvents(exitEvents); err != nil { + return err } - if err := c.state.CheckpointStore.insertExitEvents(events); err != nil { + if err := c.state.CheckpointStore.updateLastSaved(block); err != nil { return err } @@ -399,7 +408,7 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error var exitEventAPI contractsapi.L2StateSyncedEvent - e, err := exitEventAPI.Encode(exitEvent) + e, err := exitEventAPI.Encode(exitEvent.L2StateSyncedEvent) if err != nil { return types.Proof{}, err } @@ -436,42 +445,6 @@ func (c *checkpointManager) GenerateExitProof(exitID uint64) (types.Proof, error }, nil } -// getExitEventsFromReceipts parses logs from receipts to find exit events -func getExitEventsFromReceipts(epoch, block uint64, receipts []*types.Receipt) ([]*ExitEvent, error) { - events := make([]*ExitEvent, 0) - - for i := 0; i < len(receipts); i++ { - if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess { - continue - } - - for _, log := range receipts[i].Logs { - if log.Address != contracts.L2StateSenderContract { - continue - } - - event, err := decodeExitEvent(convertLog(log), epoch, block) - if err != nil { - return nil, err - } - - if event == nil { - // valid case, not an exit event - continue - } - - events = append(events, event) - } - } - - // enforce sequential order - sort.Slice(events, func(i, j int) bool { - return events[i].ID < events[j].ID - }) - - return events, nil -} - // createExitTree creates an exit event merkle tree from provided exit events func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { numOfEvents := len(exitEvents) @@ -479,7 +452,7 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { var exitEventAPI contractsapi.L2StateSyncedEvent for i := 0; i < numOfEvents; i++ { - b, err := exitEventAPI.Encode(exitEvents[i]) + b, err := exitEventAPI.Encode(exitEvents[i].L2StateSyncedEvent) if err != nil { return nil, err } @@ -489,3 +462,29 @@ func createExitTree(exitEvents []*ExitEvent) (*merkle.MerkleTree, error) { return merkle.NewMerkleTree(data) } + +// parseExitEvent parses exit event from provided log +func parseExitEvent(h *types.Header, l *ethgo.Log) (*ExitEvent, bool, error) { + extra, err := GetIbftExtra(h.ExtraData) + if err != nil { + return nil, false, + fmt.Errorf("could not get header extra on exit event parsing. Error: %w", err) + } + + epoch := extra.Checkpoint.EpochNumber + block := h.Number + + if extra.Validators != nil { + // exit events that happened in epoch ending blocks, + // should be added to the tree of the next epoch + epoch++ + block++ + } + + event, err := decodeExitEvent(l, epoch, block) + if err != nil { + return nil, false, err + } + + return event, true, nil +} diff --git a/consensus/polybft/checkpoint_manager_test.go b/consensus/polybft/checkpoint_manager_test.go index 7648eba1b0..7b323b7211 100644 --- a/consensus/polybft/checkpoint_manager_test.go +++ b/consensus/polybft/checkpoint_manager_test.go @@ -296,22 +296,37 @@ func TestCheckpointManager_PostBlock(t *testing.T) { state := newTestState(t) - receipts := make([]*types.Receipt, numOfReceipts) - for i := 0; i < numOfReceipts; i++ { - receipts[i] = &types.Receipt{Logs: []*types.Log{ - createTestLogForExitEvent(t, uint64(i)), - }} - receipts[i].SetStatus(types.ReceiptSuccess) + createReceipts := func(startID, endID uint64) []*types.Receipt { + receipts := make([]*types.Receipt, endID-startID) + for i := startID; i < endID; i++ { + receipts[i-startID] = &types.Receipt{Logs: []*types.Log{ + createTestLogForExitEvent(t, i), + }} + receipts[i-startID].SetStatus(types.ReceiptSuccess) + } + + return receipts } - req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}, Receipts: receipts}, + extra := &Extra{ + Checkpoint: &CheckpointData{ + EpochNumber: epoch, + }, + } + + req := &PostBlockRequest{FullBlock: &types.FullBlock{Block: &types.Block{Header: &types.Header{Number: block}}}, Epoch: epoch} + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) + + blockchain := new(blockchainMock) checkpointManager := newCheckpointManager(wallet.NewEcdsaSigner(createTestKey(t)), 5, types.ZeroAddress, - nil, nil, nil, hclog.NewNullLogger(), state) + nil, blockchain, nil, hclog.NewNullLogger(), state) t.Run("PostBlock - not epoch ending block", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block-1)) // we got everything till the current block req.IsEpochEndingBlock = false + req.FullBlock.Receipts = createReceipts(0, 5) require.NoError(t, checkpointManager.PostBlock(req)) exitEvents, err := state.CheckpointStore.getExitEvents(epoch, func(exitEvent *ExitEvent) bool { @@ -319,23 +334,79 @@ func TestCheckpointManager_PostBlock(t *testing.T) { }) require.NoError(t, err) - require.Len(t, exitEvents, numOfReceipts) + require.Len(t, exitEvents, 5) require.Equal(t, uint64(epoch), exitEvents[0].EpochNumber) }) t.Run("PostBlock - epoch ending block (exit events are saved to the next epoch)", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we got everything till the current block req.IsEpochEndingBlock = true + req.FullBlock.Receipts = createReceipts(5, 10) + extra.Validators = &validator.ValidatorSetDelta{} + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) + req.FullBlock.Block.Header.Number = block + 1 + require.NoError(t, checkpointManager.PostBlock(req)) exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { - return exitEvent.BlockNumber == block+1 + return exitEvent.BlockNumber == block+2 // they should be saved in the next epoch and its first block }) require.NoError(t, err) - require.Len(t, exitEvents, numOfReceipts) - require.Equal(t, uint64(block+1), exitEvents[0].BlockNumber) + require.Len(t, exitEvents, 5) + require.Equal(t, uint64(block+2), exitEvents[0].BlockNumber) require.Equal(t, uint64(epoch+1), exitEvents[0].EpochNumber) }) + + t.Run("PostBlock - there are missing events", func(t *testing.T) { + require.NoError(t, state.CheckpointStore.updateLastSaved(block)) // we are missing one block + + missedReceipts := createReceipts(10, 13) + newReceipts := createReceipts(13, 15) + + extra := &Extra{ + Checkpoint: &CheckpointData{ + EpochNumber: epoch + 1, + }, + } + + blockchain.On("GetHeaderByNumber", uint64(block+1)).Return(&types.Header{ + Number: block + 1, + ExtraData: extra.MarshalRLPTo(nil), + Hash: types.BytesToHash([]byte{0, 1, 2, 3}), + }, true) + blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{0, 1, 2, 3})).Return([]*types.Receipt{}, nil) + blockchain.On("GetHeaderByNumber", uint64(block+2)).Return(&types.Header{ + Number: block + 2, + ExtraData: extra.MarshalRLPTo(nil), + Hash: types.BytesToHash([]byte{4, 5, 6, 7}), + }, true) + blockchain.On("GetReceiptsByHash", types.BytesToHash([]byte{4, 5, 6, 7})).Return(missedReceipts, nil) + + req.IsEpochEndingBlock = false + req.FullBlock.Block.Header.Number = block + 3 // new block + req.FullBlock.Block.Header.ExtraData = extra.MarshalRLPTo(nil) // same epoch + req.FullBlock.Receipts = newReceipts + require.NoError(t, checkpointManager.PostBlock(req)) + + exitEvents, err := state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { + return exitEvent.BlockNumber == block+2 + }) + + require.NoError(t, err) + // receipts from missed block + events from previous test case that were saved in the next epoch + // since they were in epoch ending block + require.Len(t, exitEvents, len(missedReceipts)+5) + require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber) + + exitEvents, err = state.CheckpointStore.getExitEvents(epoch+1, func(exitEvent *ExitEvent) bool { + return exitEvent.BlockNumber == block+3 + }) + + require.NoError(t, err) + require.Len(t, exitEvents, len(newReceipts)) + require.Equal(t, extra.Checkpoint.EpochNumber, exitEvents[0].EpochNumber) + }) } func TestCheckpointManager_BuildEventRoot(t *testing.T) { diff --git a/consensus/polybft/consensus_runtime_test.go b/consensus/polybft/consensus_runtime_test.go index 34a7cfa3e7..8b15f7f0f1 100644 --- a/consensus/polybft/consensus_runtime_test.go +++ b/consensus/polybft/consensus_runtime_test.go @@ -1074,7 +1074,7 @@ func encodeExitEvents(t *testing.T, exitEvents []*ExitEvent) [][]byte { var exitEventAPI contractsapi.L2StateSyncedEvent for i, e := range exitEvents { - encodedEvent, err := exitEventAPI.Encode(e) + encodedEvent, err := exitEventAPI.Encode(e.L2StateSyncedEvent) require.NoError(t, err) encodedEvents[i] = encodedEvent diff --git a/consensus/polybft/contractsapi/helper.go b/consensus/polybft/contractsapi/helper.go index 9eb38707aa..6332104a6f 100644 --- a/consensus/polybft/contractsapi/helper.go +++ b/consensus/polybft/contractsapi/helper.go @@ -2,6 +2,7 @@ package contractsapi import ( "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/ethgo" "github.com/umbracle/ethgo/abi" ) @@ -13,6 +14,16 @@ type StateTransactionInput interface { DecodeAbi(b []byte) error } +// EventAbi is an interface representing an event generated in contractsapi +type EventAbi interface { + // Sig returns the event ABI signature or ID (which is unique for all event types) + Sig() ethgo.Hash + // Encode does abi encoding of given event + Encode(inputs interface{}) ([]byte, error) + // ParseLog parses the provided receipt log to given event type + ParseLog(log *ethgo.Log) (bool, error) +} + var ( // stateSyncABIType is a specific case where we need to encode state sync event as a tuple of tuple stateSyncABIType = abi.MustNewType( diff --git a/consensus/polybft/extra.go b/consensus/polybft/extra.go index 42ba463738..3db1917712 100644 --- a/consensus/polybft/extra.go +++ b/consensus/polybft/extra.go @@ -499,9 +499,5 @@ func GetIbftExtra(extraRaw []byte) (*Extra, error) { return nil, err } - if extra.Validators == nil { - extra.Validators = &validator.ValidatorSetDelta{} - } - return extra, nil } diff --git a/consensus/polybft/fsm.go b/consensus/polybft/fsm.go index 90fe2eb5b8..8e858bd306 100644 --- a/consensus/polybft/fsm.go +++ b/consensus/polybft/fsm.go @@ -43,8 +43,9 @@ var ( "allowed in an epoch ending block") errProposalDontMatch = errors.New("failed to insert proposal, because the validated proposal " + "is either nil or it does not match the received one") - errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch") - errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block") + errValidatorSetDeltaMismatch = errors.New("validator set delta mismatch") + errValidatorsUpdateInNonEpochEnding = errors.New("trying to update validator set in a non epoch ending block") + errValidatorDeltaNilInEpochEndingBlock = errors.New("validator set delta is nil in epoch ending block") ) type fsm struct { @@ -337,11 +338,15 @@ func (f *fsm) Validate(proposal []byte) error { // validate validators delta if f.isEndOfEpoch { + if extra.Validators == nil { + return errValidatorDeltaNilInEpochEndingBlock + } + if !extra.Validators.Equals(f.newValidatorsDelta) { return errValidatorSetDeltaMismatch } - } else if !extra.Validators.IsEmpty() { - // delta should be empty in non epoch ending blocks + } else if extra.Validators != nil { + // delta should be nil in non epoch ending blocks return errValidatorsUpdateInNonEpochEnding } diff --git a/consensus/polybft/fsm_test.go b/consensus/polybft/fsm_test.go index 0d71d15413..f439c841e3 100644 --- a/consensus/polybft/fsm_test.go +++ b/consensus/polybft/fsm_test.go @@ -365,7 +365,7 @@ func TestFSM_BuildProposal_EpochEndingBlock_ValidatorsDeltaExists(t *testing.T) blockChainMock.AssertExpectations(t) } -func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaEmpty(t *testing.T) { +func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaNil(t *testing.T) { t.Parallel() const ( @@ -396,7 +396,7 @@ func TestFSM_BuildProposal_NonEpochEndingBlock_ValidatorsDeltaEmpty(t *testing.T blockExtra, err := GetIbftExtra(stateBlock.Block.Header.ExtraData) assert.NoError(t, err) - assert.True(t, blockExtra.Validators.IsEmpty()) + assert.Nil(t, blockExtra.Validators) blockBuilderMock.AssertExpectations(t) } @@ -799,7 +799,7 @@ func TestFSM_Validate_EpochEndingBlock_MismatchInDeltas(t *testing.T) { parentCheckpointHash, err := extra.Checkpoint.Hash(0, parentBlockNumber, parent.Hash) require.NoError(t, err) - extra.Validators = nil // this will cause test to fail + extra.Validators = &validator.ValidatorSetDelta{} // this will cause test to fail extra.Parent = createSignature(t, validators.GetPrivateIdentities(), parentCheckpointHash, bls.DomainCheckpointManager) stateBlock := createDummyStateBlock(parent.Number+1, types.Hash{100, 15}, extra.MarshalRLPTo(nil)) diff --git a/consensus/polybft/runtime_helpers.go b/consensus/polybft/runtime_helpers.go index f2ffa2a49a..1fc6671f01 100644 --- a/consensus/polybft/runtime_helpers.go +++ b/consensus/polybft/runtime_helpers.go @@ -28,6 +28,11 @@ func getBlockData(blockNumber uint64, blockchainBackend blockchainBackend) (*typ // isEpochEndingBlock checks if given block is an epoch ending block func isEpochEndingBlock(blockNumber uint64, extra *Extra, blockchain blockchainBackend) (bool, error) { + if extra.Validators == nil { + // non epoch ending blocks have validator set delta as nil + return false, nil + } + if !extra.Validators.IsEmpty() { // if validator set delta is not empty, the validator set was changed in this block // meaning the epoch changed as well diff --git a/consensus/polybft/sc_integration_test.go b/consensus/polybft/sc_integration_test.go index c850705553..6762ed8095 100644 --- a/consensus/polybft/sc_integration_test.go +++ b/consensus/polybft/sc_integration_test.go @@ -161,16 +161,20 @@ func TestIntegratoin_PerformExit(t *testing.T) { exits := []*ExitEvent{ { - ID: 1, - Sender: ethgo.Address(contracts.ChildERC20PredicateContract), - Receiver: ethgo.Address(rootERC20PredicateAddr), - Data: exitData1, + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: big.NewInt(1), + Sender: contracts.ChildERC20PredicateContract, + Receiver: rootERC20PredicateAddr, + Data: exitData1, + }, }, { - ID: 2, - Sender: ethgo.Address(contracts.ChildERC20PredicateContract), - Receiver: ethgo.Address(rootERC20PredicateAddr), - Data: exitData2, + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: big.NewInt(2), + Sender: contracts.ChildERC20PredicateContract, + Receiver: rootERC20PredicateAddr, + Data: exitData2, + }, }, } exitTree, err := createExitTree(exits) @@ -226,7 +230,7 @@ func TestIntegratoin_PerformExit(t *testing.T) { require.Equal(t, 0, int(res[31])) var exitEventAPI contractsapi.L2StateSyncedEvent - proofExitEvent, err := exitEventAPI.Encode(exits[0]) + proofExitEvent, err := exitEventAPI.Encode(exits[0].L2StateSyncedEvent) require.NoError(t, err) proof, err := exitTree.GenerateProof(proofExitEvent) diff --git a/consensus/polybft/stake_manager.go b/consensus/polybft/stake_manager.go index e140ca6eee..0f332cf7f2 100644 --- a/consensus/polybft/stake_manager.go +++ b/consensus/polybft/stake_manager.go @@ -8,7 +8,6 @@ import ( "sort" "strings" - "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/consensus/polybft/bitmap" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" @@ -54,10 +53,9 @@ type stakeManager struct { state *State rootChainRelayer txrelayer.TxRelayer key ethgo.Key - validatorSetContract types.Address supernetManagerContract types.Address maxValidatorSetSize int - blockchain blockchainBackend + eventsGetter *eventsGetter[*contractsapi.TransferEvent] } // newStakeManager returns a new instance of stake manager @@ -70,15 +68,27 @@ func newStakeManager( blockchain blockchainBackend, maxValidatorSetSize int, ) *stakeManager { + eventsGetter := &eventsGetter[*contractsapi.TransferEvent]{ + blockchain: blockchain, + isValidLogFn: func(l *types.Log) bool { + return l.Address == validatorSetAddr + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { + var transferEvent contractsapi.TransferEvent + doesMatch, err := transferEvent.ParseLog(l) + + return &transferEvent, doesMatch, err + }, + } + return &stakeManager{ logger: logger, state: state, rootChainRelayer: rootchainRelayer, key: key, - validatorSetContract: validatorSetAddr, supernetManagerContract: supernetManagerAddr, maxValidatorSetSize: maxValidatorSetSize, - blockchain: blockchain, + eventsGetter: eventsGetter, } } @@ -108,25 +118,7 @@ func (s *stakeManager) PostBlock(req *PostBlockRequest) error { s.logger.Debug("Stake manager on post block", "block", req.FullBlock.Block.Number(), "last saved", fullValidatorSet.BlockNumber, "last updated", fullValidatorSet.UpdatedAtBlockNumber) - // update with missing blocks - for i := fullValidatorSet.BlockNumber + 1; i < req.FullBlock.Block.Number(); i++ { - blockHeader, found := s.blockchain.GetHeaderByNumber(i) - if !found { - return blockchain.ErrNoBlock - } - - receipts, err := s.blockchain.GetReceiptsByHash(blockHeader.Hash) - if err != nil { - return err - } - - if err := s.updateWithReceipts(&fullValidatorSet, receipts, i); err != nil { - return err - } - } - - // finally update with received block - err = s.updateWithReceipts(&fullValidatorSet, req.FullBlock.Receipts, req.FullBlock.Block.Number()) + err = s.updateWithReceipts(&fullValidatorSet, req.FullBlock) if err != nil { return err } @@ -138,20 +130,20 @@ func (s *stakeManager) PostBlock(req *PostBlockRequest) error { } func (s *stakeManager) updateWithReceipts( - fullValidatorSet *validatorSetState, receipts []*types.Receipt, block uint64) error { - events, err := s.getTransferEventsFromReceipts(receipts) + fullValidatorSet *validatorSetState, fullBlock *types.FullBlock) error { + var transferEvents []*contractsapi.TransferEvent + + // get transfer currentBlockEvents from current block + transferEvents, err := s.eventsGetter.getFromBlocks(fullValidatorSet.BlockNumber, fullBlock) if err != nil { - return err + return fmt.Errorf("could not get transfer events from current block. Error: %w", err) } - s.logger.Debug("Full validator set before", - "block", block-1, "evnts", len(events), "data", fullValidatorSet.Validators) - - if len(events) == 0 { + if len(transferEvents) == 0 { return nil } - for _, event := range events { + for _, event := range transferEvents { if event.IsStake() { s.logger.Debug("Stake transfer event", "to", event.To, "value", event.Value) @@ -170,18 +162,22 @@ func (s *stakeManager) updateWithReceipts( for addr, data := range fullValidatorSet.Validators { if data.BlsKey == nil { - data.BlsKey, err = s.getBlsKey(data.Address) + blsKey, err := s.getBlsKey(data.Address) if err != nil { - s.logger.Warn("Could not get info for new validator", "block", block, "address", addr) + s.logger.Warn("Could not get info for new validator", + "block", fullBlock.Block.Number(), "address", addr) } + + data.BlsKey = blsKey } data.IsActive = data.VotingPower.Cmp(bigZero) > 0 } - fullValidatorSet.UpdatedAtBlockNumber = block // mark on which block validator set has been updated + // mark on which block validator set has been updated + fullValidatorSet.UpdatedAtBlockNumber = fullValidatorSet.BlockNumber - s.logger.Debug("Full validator set after", "block", block, "data", fullValidatorSet.Validators) + s.logger.Debug("Full validator set after", "block", fullBlock.Block, "data", fullValidatorSet.Validators) return nil } @@ -261,38 +257,6 @@ func (s *stakeManager) UpdateValidatorSet( return delta, nil } -// getTransferEventsFromReceipts parses logs from receipts to find transfer events -func (s *stakeManager) getTransferEventsFromReceipts(receipts []*types.Receipt) ([]*contractsapi.TransferEvent, error) { - events := make([]*contractsapi.TransferEvent, 0) - - for i := 0; i < len(receipts); i++ { - if receipts[i].Status == nil || *receipts[i].Status != types.ReceiptSuccess { - continue - } - - for _, log := range receipts[i].Logs { - if log.Address != s.validatorSetContract { - continue - } - - var transferEvent contractsapi.TransferEvent - - doesMatch, err := transferEvent.ParseLog(convertLog(log)) - if err != nil { - return nil, err - } - - if !doesMatch { - continue - } - - events = append(events, &transferEvent) - } - } - - return events, nil -} - // getBlsKey returns bls key for validator from the supernet contract func (s *stakeManager) getBlsKey(address types.Address) (*bls.PublicKey, error) { getValidatorFn := &contractsapi.GetValidatorCustomSupernetManagerFn{ diff --git a/consensus/polybft/stake_manager_fuzz_test.go b/consensus/polybft/stake_manager_fuzz_test.go index 9f49f0f2e3..532c2f6bab 100644 --- a/consensus/polybft/stake_manager_fuzz_test.go +++ b/consensus/polybft/stake_manager_fuzz_test.go @@ -147,12 +147,14 @@ func FuzzTestStakeManagerPostBlock(f *testing.F) { t.Skip() } + validatorSetAddr := types.StringToAddress("0x0001") + stakeManager := newStakeManager( hclog.NewNullLogger(), state, nil, wallet.NewEcdsaSigner(validators.GetValidator("A").Key()), - types.StringToAddress("0x0001"), + validatorSetAddr, types.StringToAddress("0x0002"), nil, 5, @@ -167,7 +169,7 @@ func FuzzTestStakeManagerPostBlock(f *testing.F) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, validators.GetValidator(initialSetAliases[data.ValidatorID]).Address(), types.ZeroAddress, data.StakeValue, diff --git a/consensus/polybft/stake_manager_test.go b/consensus/polybft/stake_manager_test.go index 5546216450..30940bd67e 100644 --- a/consensus/polybft/stake_manager_test.go +++ b/consensus/polybft/stake_manager_test.go @@ -63,6 +63,7 @@ func TestStakeManager_PostBlock(t *testing.T) { newStake = uint64(100) firstValidator = uint64(0) secondValidator = uint64(1) + validatorSetAddr = types.StringToAddress("0x0001") ) state := newTestState(t) @@ -75,7 +76,7 @@ func TestStakeManager_PostBlock(t *testing.T) { state, nil, wallet.NewEcdsaSigner(validators.GetValidator("A").Key()), - types.StringToAddress("0x0001"), types.StringToAddress("0x0002"), + validatorSetAddr, types.StringToAddress("0x0002"), nil, 5, ) @@ -90,7 +91,7 @@ func TestStakeManager_PostBlock(t *testing.T) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, validators.GetValidator(initialSetAliases[firstValidator]).Address(), types.ZeroAddress, 1, // initial validator stake was 1 @@ -146,7 +147,7 @@ func TestStakeManager_PostBlock(t *testing.T) { Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(initialSetAliases[secondValidator]).Address(), 250, @@ -213,7 +214,7 @@ func TestStakeManager_PostBlock(t *testing.T) { receipts[i] = &types.Receipt{Logs: []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(allAliases[i]).Address(), newStake, @@ -272,7 +273,7 @@ func TestStakeManager_PostBlock(t *testing.T) { receipt.Logs = []*types.Log{ createTestLogForTransferEvent( t, - stakeManager.validatorSetContract, + validatorSetAddr, types.ZeroAddress, validators.GetValidator(initialSetAliases[secondValidator]).Address(), 250, @@ -291,16 +292,16 @@ func TestStakeManager_PostBlock(t *testing.T) { fullValidatorSet, err := state.StakeStore.getFullValidatorSet() require.NoError(t, err) - var firstValidaotor *validator.ValidatorMetadata - firstValidaotor = nil + var updatedValidator *validator.ValidatorMetadata + updatedValidator = nil for _, validator := range fullValidatorSet.Validators { if validator.Address.String() == validators.GetValidator(initialSetAliases[secondValidator]).Address().String() { - firstValidaotor = validator + updatedValidator = validator } } - require.NotNil(t, firstValidaotor) - require.Equal(t, big.NewInt(501), firstValidaotor.VotingPower) // 250 + 250 + initial 1 - require.True(t, firstValidaotor.IsActive) + require.NotNil(t, updatedValidator) + require.Equal(t, big.NewInt(501), updatedValidator.VotingPower) // 250 + 250 + initial 1 + require.True(t, updatedValidator.IsActive) bcMock.AssertExpectations(t) }) diff --git a/consensus/polybft/state.go b/consensus/polybft/state.go index 3e36e1ee01..2627b91ba7 100644 --- a/consensus/polybft/state.go +++ b/consensus/polybft/state.go @@ -5,26 +5,8 @@ import ( "github.com/hashicorp/go-hclog" bolt "go.etcd.io/bbolt" - - "github.com/umbracle/ethgo" ) -// ExitEvent is an event emitted by Exit contract -type ExitEvent struct { - // ID is the decoded 'index' field from the event - ID uint64 `abi:"id"` - // Sender is the decoded 'sender' field from the event - Sender ethgo.Address `abi:"sender"` - // Receiver is the decoded 'receiver' field from the event - Receiver ethgo.Address `abi:"receiver"` - // Data is the decoded 'data' field from the event - Data []byte `abi:"data"` - // EpochNumber is the epoch number in which exit event was added - EpochNumber uint64 `abi:"-"` - // BlockNumber is the block in which exit event was added - BlockNumber uint64 `abi:"-"` -} - // MessageSignature encapsulates sender identifier and its signature type MessageSignature struct { // Signer of the vote diff --git a/consensus/polybft/state_event_getter.go b/consensus/polybft/state_event_getter.go new file mode 100644 index 0000000000..46912d3d28 --- /dev/null +++ b/consensus/polybft/state_event_getter.go @@ -0,0 +1,87 @@ +package polybft + +import ( + "github.com/0xPolygon/polygon-edge/blockchain" + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/types" + "github.com/umbracle/ethgo" +) + +// eventsGetter is a struct for getting missed and current events +// of specified type from specified blocks +type eventsGetter[T contractsapi.EventAbi] struct { + // blockchain is an abstraction of blockchain that provides necessary functions + // for querying blockchain data (blocks, receipts, etc.) + blockchain blockchainBackend + // parseEventFn is a plugin function used to parse the event from transaction log + parseEventFn func(*types.Header, *ethgo.Log) (T, bool, error) + // isValidLogFn is a plugin function that validates the log + // for example: if it was sent from the desired address + isValidLogFn func(*types.Log) bool +} + +// getFromBlocks gets events of specified type from specified blocks +// and saves them using the provided saveEventsFn +func (e *eventsGetter[T]) getFromBlocks(lastProcessedBlock uint64, + currentBlock *types.FullBlock) ([]T, error) { + var allEvents []T + + for i := lastProcessedBlock + 1; i < currentBlock.Block.Number(); i++ { + blockHeader, found := e.blockchain.GetHeaderByNumber(i) + if !found { + return nil, blockchain.ErrNoBlock + } + + receipts, err := e.blockchain.GetReceiptsByHash(blockHeader.Hash) + if err != nil { + return nil, err + } + + eventsFromBlock, err := e.getEventsFromReceipts(blockHeader, receipts) + if err != nil { + return nil, err + } + + allEvents = append(allEvents, eventsFromBlock...) + } + + currentEvents, err := e.getEventsFromReceipts(currentBlock.Block.Header, currentBlock.Receipts) + if err != nil { + return nil, err + } + + allEvents = append(allEvents, currentEvents...) + + return allEvents, nil +} + +// getEventsFromReceipts returns events of specified type from block transaction receipts +func (e *eventsGetter[T]) getEventsFromReceipts(blockHeader *types.Header, + receipts []*types.Receipt) ([]T, error) { + var events []T + + for _, receipt := range receipts { + if receipt.Status == nil || *receipt.Status != types.ReceiptSuccess { + continue + } + + for _, log := range receipt.Logs { + if e.isValidLogFn != nil && !e.isValidLogFn(log) { + continue + } + + event, doesMatch, err := e.parseEventFn(blockHeader, convertLog(log)) + if err != nil { + return nil, err + } + + if !doesMatch { + continue + } + + events = append(events, event) + } + } + + return events, nil +} diff --git a/consensus/polybft/state_event_getter_test.go b/consensus/polybft/state_event_getter_test.go new file mode 100644 index 0000000000..92eaf77b25 --- /dev/null +++ b/consensus/polybft/state_event_getter_test.go @@ -0,0 +1,48 @@ +package polybft + +import ( + "testing" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/contracts" + "github.com/0xPolygon/polygon-edge/types" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" +) + +func TestEventDBInsertRetry_GetEvents(t *testing.T) { + receipt := &types.Receipt{ + Logs: []*types.Log{ + createTestLogForTransferEvent(t, contracts.ValidatorSetContract, types.ZeroAddress, types.ZeroAddress, 10), + }, + } + receipt.SetStatus(types.ReceiptSuccess) + + backend := new(blockchainMock) + backend.On("GetHeaderByNumber", mock.Anything).Return(&types.Header{ + Hash: types.BytesToHash([]byte{0, 1, 2, 3}), + }, true) + backend.On("GetReceiptsByHash", mock.Anything).Return([]*types.Receipt{receipt}, nil) + + retryManager := &eventsGetter[*contractsapi.TransferEvent]{ + blockchain: backend, + isValidLogFn: func(l *types.Log) bool { + return l.Address == contracts.ValidatorSetContract + }, + parseEventFn: func(h *types.Header, l *ethgo.Log) (*contractsapi.TransferEvent, bool, error) { + var e contractsapi.TransferEvent + doesMatch, err := e.ParseLog(l) + + return &e, doesMatch, err + }, + } + + events, err := retryManager.getFromBlocks(0, &types.FullBlock{ + Block: &types.Block{Header: &types.Header{Number: 2}}, + Receipts: []*types.Receipt{}, + }) + + require.NoError(t, err) + require.Len(t, events, 1) +} diff --git a/consensus/polybft/state_store_checkpoint.go b/consensus/polybft/state_store_checkpoint.go index 4f56a36af8..c5f0df8986 100644 --- a/consensus/polybft/state_store_checkpoint.go +++ b/consensus/polybft/state_store_checkpoint.go @@ -3,6 +3,7 @@ package polybft import ( "bytes" "encoding/json" + "errors" "fmt" "sort" @@ -15,8 +16,12 @@ import ( var ( // bucket to store exit contract events - exitEventsBucket = []byte("exitEvent") - exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") + exitEventsBucket = []byte("exitEvent") + exitEventToEpochLookupBucket = []byte("exitIdToEpochLookup") + exitEventLastProcessedBlockBucket = []byte("lastProcessedBlock") + + lastProcessedBlockKey = []byte("lastProcessedBlock") + errNoLastSavedEntry = errors.New("there is no last saved block in last saved bucket") ) type exitEventNotFoundError struct { @@ -28,12 +33,22 @@ func (e *exitEventNotFoundError) Error() string { return fmt.Sprintf("could not find any exit event that has an id: %v and epoch: %v", e.exitID, e.epoch) } +// ExitEvent is an event emitted by Exit contract +type ExitEvent struct { + *contractsapi.L2StateSyncedEvent + // EpochNumber is the epoch number in which exit event was added + EpochNumber uint64 `abi:"-"` + // BlockNumber is the block in which exit event was added + BlockNumber uint64 `abi:"-"` +} + /* Bolt DB schema: exit events/ |--> (id+epoch+blockNumber) -> *ExitEvent (json marshalled) |--> (exitEventID) -> epochNumber +|--> (lastProcessedBlockKey) -> block number */ type CheckpointStore struct { db *bolt.DB @@ -49,7 +64,11 @@ func (s *CheckpointStore) initialize(tx *bolt.Tx) error { return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventToEpochLookupBucket), err) } - return nil + if _, err := tx.CreateBucketIfNotExists(exitEventLastProcessedBlockBucket); err != nil { + return fmt.Errorf("failed to create bucket=%s: %w", string(exitEventLastProcessedBlockBucket), err) + } + + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, common.EncodeUint64ToBytes(0)) } // insertExitEvents inserts a slice of exit events to exit event bucket in bolt db @@ -80,7 +99,7 @@ func insertExitEventToBucket(exitEventBucket, lookupBucket *bolt.Bucket, exitEve } epochBytes := common.EncodeUint64ToBytes(exitEvent.EpochNumber) - exitIDBytes := common.EncodeUint64ToBytes(exitEvent.ID) + exitIDBytes := common.EncodeUint64ToBytes(exitEvent.ID.Uint64()) err = exitEventBucket.Put(bytes.Join([][]byte{epochBytes, exitIDBytes, common.EncodeUint64ToBytes(exitEvent.BlockNumber)}, nil), raw) @@ -162,12 +181,38 @@ func (s *CheckpointStore) getExitEvents(epoch uint64, filter func(exitEvent *Exi // enforce sequential order sort.Slice(events, func(i, j int) bool { - return events[i].ID < events[j].ID + return events[i].ID.Cmp(events[j].ID) < 0 }) return events, err } +// updateLastSaved saves the last block processed for exit events +func (s *CheckpointStore) updateLastSaved(blockNumber uint64) error { + return s.db.Update(func(tx *bolt.Tx) error { + return tx.Bucket(exitEventLastProcessedBlockBucket).Put(lastProcessedBlockKey, + common.EncodeUint64ToBytes(blockNumber)) + }) +} + +// updateLastSaved saves the last block processed for exit events +func (s *CheckpointStore) getLastSaved() (uint64, error) { + var lastSavedBlock uint64 + + err := s.db.View(func(tx *bolt.Tx) error { + v := tx.Bucket(exitEventLastProcessedBlockBucket).Get(lastProcessedBlockKey) + if v == nil { + return errNoLastSavedEntry + } + + lastSavedBlock = common.EncodeBytesToUint64(v) + + return nil + }) + + return lastSavedBlock, err +} + // decodeExitEvent tries to decode exit event from the provided log func decodeExitEvent(log *ethgo.Log, epoch, block uint64) (*ExitEvent, error) { var l2StateSyncedEvent contractsapi.L2StateSyncedEvent @@ -182,12 +227,9 @@ func decodeExitEvent(log *ethgo.Log, epoch, block uint64) (*ExitEvent, error) { } return &ExitEvent{ - ID: l2StateSyncedEvent.ID.Uint64(), - Sender: ethgo.Address(l2StateSyncedEvent.Sender), - Receiver: ethgo.Address(l2StateSyncedEvent.Receiver), - Data: l2StateSyncedEvent.Data, - EpochNumber: epoch, - BlockNumber: block, + L2StateSyncedEvent: &l2StateSyncedEvent, + EpochNumber: epoch, + BlockNumber: block, }, nil } diff --git a/consensus/polybft/state_store_checkpoint_test.go b/consensus/polybft/state_store_checkpoint_test.go index 7ee66d88e0..6123986a6a 100644 --- a/consensus/polybft/state_store_checkpoint_test.go +++ b/consensus/polybft/state_store_checkpoint_test.go @@ -1,10 +1,12 @@ package polybft import ( + "math/big" "testing" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" "github.com/0xPolygon/polygon-edge/helper/common" + "github.com/0xPolygon/polygon-edge/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/umbracle/ethgo" @@ -96,13 +98,13 @@ func TestState_NoEpochForExitEventInLookup(t *testing.T) { exitEventFromDB, err := state.CheckpointStore.getExitEvent(exitToTest) require.NoError(t, err) - require.Equal(t, exitToTest, exitEventFromDB.ID) + require.Equal(t, exitToTest, exitEventFromDB.ID.Uint64()) require.Equal(t, epochToMatch, exitEventFromDB.EpochNumber) require.Equal(t, blockNumberToMatch, exitEventFromDB.BlockNumber) // simulate invalid case (for some reason lookup table doesn't have epoch for given exit) err = state.db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(exitEventToEpochLookupBucket).Delete(common.EncodeUint64ToBytes(exitEventFromDB.ID)) + return tx.Bucket(exitEventToEpochLookupBucket).Delete(common.EncodeUint64ToBytes(exitEventFromDB.ID.Uint64())) }) require.NoError(t, err) @@ -141,7 +143,7 @@ func TestState_decodeExitEvent(t *testing.T) { event, err := decodeExitEvent(log, epoch, blockNumber) require.NoError(t, err) - require.Equal(t, uint64(exitID), event.ID) + require.Equal(t, uint64(exitID), event.ID.Uint64()) require.Equal(t, uint64(epoch), event.EpochNumber) require.Equal(t, uint64(blockNumber), event.BlockNumber) @@ -181,10 +183,12 @@ func insertTestExitEvents(t *testing.T, state *State, for k := 1; k <= numOfEventsPerBlock; k++ { exitEvents[index] = &ExitEvent{ - ID: index, - Sender: ethgo.ZeroAddress, - Receiver: ethgo.ZeroAddress, - Data: generateRandomBytes(t), + L2StateSyncedEvent: &contractsapi.L2StateSyncedEvent{ + ID: new(big.Int).SetUint64(index), + Sender: types.ZeroAddress, + Receiver: types.ZeroAddress, + Data: generateRandomBytes(t), + }, EpochNumber: i, BlockNumber: block, } diff --git a/consensus/polybft/state_sync_manager.go b/consensus/polybft/state_sync_manager.go index 75a036ec44..ad6cbc1cd4 100644 --- a/consensus/polybft/state_sync_manager.go +++ b/consensus/polybft/state_sync_manager.go @@ -231,12 +231,12 @@ func (s *stateSyncManager) verifyVoteSignature(valSet validator.ValidatorSet, si } // AddLog saves the received log from event tracker if it matches a state sync event ABI -func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { +func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) error { event := &contractsapi.StateSyncedEvent{} doesMatch, err := event.ParseLog(eventLog) if !doesMatch { - return + return nil } s.logger.Info( @@ -249,18 +249,22 @@ func (s *stateSyncManager) AddLog(eventLog *ethgo.Log) { if err != nil { s.logger.Error("could not decode state sync event", "err", err) - return + return err } if err := s.state.StateSyncStore.insertStateSyncEvent(event); err != nil { s.logger.Error("could not save state sync event to boltDb", "err", err) - return + return err } if err := s.buildCommitment(); err != nil { + // we don't return an error here. If state sync event is inserted in db, + // we will just try to build a commitment on next block or next event arrival s.logger.Error("could not build a commitment on arrival of new state sync", "err", err, "stateSyncID", event.ID) } + + return nil } // Commitment returns a commitment to be submitted if there is a pending commitment with quorum diff --git a/consensus/polybft/state_sync_manager_test.go b/consensus/polybft/state_sync_manager_test.go index c9defef33e..b04588bd76 100644 --- a/consensus/polybft/state_sync_manager_test.go +++ b/consensus/polybft/state_sync_manager_test.go @@ -335,7 +335,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { s := newTestStateSyncManager(t, vals.GetValidator("0"), &mockRuntime{isActiveValidator: true}) // empty log which is not an state sync - s.AddLog(ðgo.Log{}) + require.NoError(t, s.AddLog(ðgo.Log{})) stateSyncs, err := s.state.StateSyncStore.list() require.NoError(t, err) @@ -346,7 +346,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { stateSyncEventID := stateSyncedEvent.Sig() // log with the state sync topic but incorrect content - s.AddLog(ðgo.Log{Topics: []ethgo.Hash{stateSyncEventID}}) + require.Error(t, s.AddLog(ðgo.Log{Topics: []ethgo.Hash{stateSyncEventID}})) stateSyncs, err = s.state.StateSyncStore.list() require.NoError(t, err) @@ -366,7 +366,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { Data: data, } - s.AddLog(goodLog) + require.NoError(t, s.AddLog(goodLog)) stateSyncs, err = s.state.StateSyncStore.getStateSyncEventsForCommitment(0, 0) require.NoError(t, err) @@ -378,7 +378,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { // add one more log to have a minimum commitment goodLog2 := goodLog.Copy() goodLog2.Topics[1] = ethgo.BytesToHash([]byte{0x1}) // state sync index 1 - s.AddLog(goodLog2) + require.NoError(t, s.AddLog(goodLog2)) require.Len(t, s.pendingCommitments, 2) require.Equal(t, uint64(0), s.pendingCommitments[1].StartID.Uint64()) @@ -387,11 +387,11 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { // add two more logs to have larger commitments goodLog3 := goodLog.Copy() goodLog3.Topics[1] = ethgo.BytesToHash([]byte{0x2}) // state sync index 2 - s.AddLog(goodLog3) + require.NoError(t, s.AddLog(goodLog3)) goodLog4 := goodLog.Copy() goodLog4.Topics[1] = ethgo.BytesToHash([]byte{0x3}) // state sync index 3 - s.AddLog(goodLog4) + require.NoError(t, s.AddLog(goodLog4)) require.Len(t, s.pendingCommitments, 4) require.Equal(t, uint64(0), s.pendingCommitments[3].StartID.Uint64()) @@ -419,7 +419,7 @@ func TestStateSyncerManager_AddLog_BuildCommitments(t *testing.T) { Data: data, } - s.AddLog(goodLog) + require.NoError(t, s.AddLog(goodLog)) // node should have inserted given state sync event, but it shouldn't build any commitment stateSyncs, err := s.state.StateSyncStore.getStateSyncEventsForCommitment(0, 0) diff --git a/consensus/polybft/statesyncrelayer/state_sync_relayer.go b/consensus/polybft/statesyncrelayer/state_sync_relayer.go index 17a108581a..33232387ea 100644 --- a/consensus/polybft/statesyncrelayer/state_sync_relayer.go +++ b/consensus/polybft/statesyncrelayer/state_sync_relayer.go @@ -107,20 +107,20 @@ func (r *StateSyncRelayer) Stop() { close(r.closeCh) } -func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { +func (r *StateSyncRelayer) AddLog(log *ethgo.Log) error { r.logger.Debug("Received a log", "log", log) var commitEvent contractsapi.NewCommitmentEvent doesMatch, err := commitEvent.ParseLog(log) if !doesMatch { - return + return nil } if err != nil { r.logger.Error("Failed to parse log", "err", err) - return + return err } startID := commitEvent.StartID.Uint64() @@ -128,6 +128,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("Execute commitment", "Block", log.BlockNumber, "StartID", startID, "EndID", endID) + // we don't return errors if some client logic fails, + // only if event is not parsed for i := startID; i <= endID; i++ { // query the state sync proof stateSyncProof, err := r.queryStateSyncProof(fmt.Sprintf("0x%x", i)) @@ -145,6 +147,8 @@ func (r *StateSyncRelayer) AddLog(log *ethgo.Log) { r.logger.Info("State sync executed", "ID", i) } + + return nil } // queryStateSyncProof queries the state sync proof diff --git a/tracker/event_tracker.go b/tracker/event_tracker.go index 6ecca5c926..753c42aaae 100644 --- a/tracker/event_tracker.go +++ b/tracker/event_tracker.go @@ -15,7 +15,7 @@ import ( const minBlockMaxBacklog = 96 type eventSubscription interface { - AddLog(log *ethgo.Log) + AddLog(log *ethgo.Log) error } type EventTracker struct { diff --git a/tracker/event_tracker_store.go b/tracker/event_tracker_store.go index 151e59bc8c..12e4a3cfb0 100644 --- a/tracker/event_tracker_store.go +++ b/tracker/event_tracker_store.go @@ -124,6 +124,8 @@ func (b *EventTrackerStore) Set(k, v string) error { if strings.HasPrefix(k, dbLastBlockPrefix) { if err := b.onNewBlock(k[len(dbLastBlockPrefix):], v); err != nil { b.logger.Warn("new block error", "err", err) + + return err } } @@ -160,17 +162,19 @@ func (b *EventTrackerStore) onNewBlock(filterHash, blockData string) error { return nil // nothing to process } - nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 + // notify subscriber with logs + for _, log := range logs { + if err := b.subscriber.AddLog(log); err != nil { + return err + } + } + // save next to process only if every AddLog finished successfully + nextToProcessIdx := common.EncodeBytesToUint64(lastProcessedKey) + 1 if err := entry.saveNextToProcessIndx(nextToProcessIdx); err != nil { return err } - // notify subscriber with logs - for _, log := range logs { - b.subscriber.AddLog(log) - } - b.logger.Debug("Event logs have been notified to a subscriber", "len", len(logs), "next", nextToProcessIdx) return nil diff --git a/tracker/event_tracker_test.go b/tracker/event_tracker_test.go index 669569e5a1..bfc1c00e0a 100644 --- a/tracker/event_tracker_test.go +++ b/tracker/event_tracker_test.go @@ -20,7 +20,7 @@ type mockEventSubscriber struct { logs []*ethgo.Log } -func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { +func (m *mockEventSubscriber) AddLog(log *ethgo.Log) error { m.lock.Lock() defer m.lock.Unlock() @@ -29,6 +29,8 @@ func (m *mockEventSubscriber) AddLog(log *ethgo.Log) { } m.logs = append(m.logs, log) + + return nil } func (m *mockEventSubscriber) len() int {