diff --git a/command/server/config/config.go b/command/server/config/config.go index 072184c0eb..93b372bbf3 100644 --- a/command/server/config/config.go +++ b/command/server/config/config.go @@ -32,9 +32,8 @@ type Config struct { JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"` CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"` - Relayer bool `json:"relayer" yaml:"relayer"` - NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"` - RelayerTrackerPollInterval time.Duration `json:"relayer_tracker_poll_interval" yaml:"relayer_tracker_poll_interval"` + Relayer bool `json:"relayer" yaml:"relayer"` + NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"` ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"` WebSocketReadLimit uint64 `json:"web_socket_read_limit" yaml:"web_socket_read_limit"` @@ -94,10 +93,6 @@ const ( // the connection sends a close message to the peer and returns ErrReadLimit to the application. DefaultWebSocketReadLimit uint64 = 8192 - // DefaultRelayerTrackerPollInterval specifies time interval after which relayer node's event tracker - // polls child chain to get the latest block - DefaultRelayerTrackerPollInterval time.Duration = time.Second - // DefaultMetricsInterval specifies the time interval after which Prometheus metrics will be generated. // A value of 0 means the metrics are disabled. DefaultMetricsInterval time.Duration = time.Second * 8 @@ -132,15 +127,14 @@ func DefaultConfig() *Config { Headers: &Headers{ AccessControlAllowOrigins: []string{"*"}, }, - LogFilePath: "", - JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit, - JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit, - Relayer: false, - NumBlockConfirmations: DefaultNumBlockConfirmations, - ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug, - WebSocketReadLimit: DefaultWebSocketReadLimit, - RelayerTrackerPollInterval: DefaultRelayerTrackerPollInterval, - MetricsInterval: DefaultMetricsInterval, + LogFilePath: "", + JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit, + JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit, + Relayer: false, + NumBlockConfirmations: DefaultNumBlockConfirmations, + ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug, + WebSocketReadLimit: DefaultWebSocketReadLimit, + MetricsInterval: DefaultMetricsInterval, } } diff --git a/command/server/init.go b/command/server/init.go index 996a664477..216c4e28c1 100644 --- a/command/server/init.go +++ b/command/server/init.go @@ -58,10 +58,6 @@ func (p *serverParams) initRawParams() error { p.relayer = p.rawConfig.Relayer - if p.relayer && p.rawConfig.RelayerTrackerPollInterval == 0 { - return helper.ErrBlockTrackerPollInterval - } - return p.initAddresses() } diff --git a/command/server/params.go b/command/server/params.go index 43f39f76f5..da48b929e9 100644 --- a/command/server/params.go +++ b/command/server/params.go @@ -43,8 +43,6 @@ const ( concurrentRequestsDebugFlag = "concurrent-requests-debug" webSocketReadLimitFlag = "websocket-read-limit" - relayerTrackerPollIntervalFlag = "relayer-poll-interval" - metricsIntervalFlag = "metrics-interval" ) @@ -187,9 +185,8 @@ func (p *serverParams) generateConfig() *server.Config { JSONLogFormat: p.rawConfig.JSONLogFormat, LogFilePath: p.logFileLocation, - Relayer: p.relayer, - NumBlockConfirmations: p.rawConfig.NumBlockConfirmations, - RelayerTrackerPollInterval: p.rawConfig.RelayerTrackerPollInterval, - MetricsInterval: p.rawConfig.MetricsInterval, + Relayer: p.relayer, + NumBlockConfirmations: p.rawConfig.NumBlockConfirmations, + MetricsInterval: p.rawConfig.MetricsInterval, } } diff --git a/command/server/server.go b/command/server/server.go index aa64d2b248..f9c4ab4c6c 100644 --- a/command/server/server.go +++ b/command/server/server.go @@ -235,13 +235,6 @@ func setFlags(cmd *cobra.Command) { "maximum size in bytes for a message read from the peer by websocket", ) - cmd.Flags().DurationVar( - ¶ms.rawConfig.RelayerTrackerPollInterval, - relayerTrackerPollIntervalFlag, - defaultConfig.RelayerTrackerPollInterval, - "interval (number of seconds) at which relayer's tracker polls for latest block at childchain", - ) - cmd.Flags().DurationVar( ¶ms.rawConfig.MetricsInterval, metricsIntervalFlag, diff --git a/consensus/consensus.go b/consensus/consensus.go index 878ad76655..160faf4523 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -64,6 +64,12 @@ type Config struct { // Path is the directory path for the consensus protocol to store information Path string + + // IsRelayer is true if node is relayer + IsRelayer bool + + // RPCEndpoint + RPCEndpoint string } type Params struct { diff --git a/consensus/polybft/consensus_runtime.go b/consensus/polybft/consensus_runtime.go index 7b9bb31879..288a810daf 100644 --- a/consensus/polybft/consensus_runtime.go +++ b/consensus/polybft/consensus_runtime.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/0xPolygon/polygon-edge/consensus" "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer" "github.com/0xPolygon/polygon-edge/consensus/polybft/validator" @@ -82,6 +83,7 @@ type runtimeConfig struct { txPool txPoolInterface bridgeTopic topic numBlockConfirmations uint64 + consensusConfig *consensus.Config } // consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management @@ -121,6 +123,9 @@ type consensusRuntime struct { eventProvider *EventProvider + // stateSyncRelayer is relayer for commitment events + stateSyncRelayer StateSyncRelayer + // logger instance logger hcf.Logger } @@ -160,6 +165,10 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti return nil, err } + if err := runtime.initStateSyncRelayer(log); err != nil { + return nil, err + } + // we need to call restart epoch on runtime to initialize epoch state runtime.epoch, err = runtime.restartEpoch(runtime.lastBuiltBlock, dbTx) if err != nil { @@ -175,6 +184,7 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti // close is used to tear down allocated resources func (c *consensusRuntime) close() { + c.stateSyncRelayer.Close() c.stateSyncManager.Close() } @@ -238,6 +248,33 @@ func (c *consensusRuntime) initCheckpointManager(logger hcf.Logger) error { return nil } +// initStateSyncRelayer initializes state sync relayer +// if not enabled, then a dummy state sync relayer will be used +func (c *consensusRuntime) initStateSyncRelayer(logger hcf.Logger) error { + if c.config.consensusConfig.IsRelayer { + txRelayer, err := getStateSyncTxRelayer(c.config.consensusConfig.RPCEndpoint, logger) + if err != nil { + return err + } + + c.stateSyncRelayer = NewStateSyncRelayer( + txRelayer, + contracts.StateReceiverContract, + c.state.StateSyncStore, + c, + c.config.blockchain, + wallet.NewEcdsaSigner(c.config.Key), + nil, + logger.Named("state_sync_relayer")) + } else { + c.stateSyncRelayer = &dummyStateSyncRelayer{} + } + + c.eventProvider.Subscribe(c.stateSyncRelayer) + + return c.stateSyncRelayer.Init() +} + // initStakeManager initializes stake manager func (c *consensusRuntime) initStakeManager(logger hcf.Logger, dbTx *bolt.Tx) error { rootRelayer, err := txrelayer.NewTxRelayer(txrelayer.WithIPAddress(c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint)) @@ -369,6 +406,11 @@ func (c *consensusRuntime) OnBlockInserted(fullBlock *types.FullBlock) { return } + // handle state sync relayer events that happened in block + if err := c.stateSyncRelayer.PostBlock(postBlock); err != nil { + c.logger.Error("post block callback failed in state sync relayer", "err", err) + } + if isEndOfEpoch { if epoch, err = c.restartEpoch(fullBlock.Block.Header, dbTx); err != nil { c.logger.Error("failed to restart epoch after block inserted", "error", err) diff --git a/consensus/polybft/consensus_runtime_test.go b/consensus/polybft/consensus_runtime_test.go index 721034387a..6909bf3839 100644 --- a/consensus/polybft/consensus_runtime_test.go +++ b/consensus/polybft/consensus_runtime_test.go @@ -232,6 +232,7 @@ func TestConsensusRuntime_OnBlockInserted_EndOfEpoch(t *testing.T) { checkpointManager: &dummyCheckpointManager{}, stakeManager: &dummyStakeManager{}, eventProvider: NewEventProvider(blockchainMock), + stateSyncRelayer: &dummyStateSyncRelayer{}, } runtime.OnBlockInserted(&types.FullBlock{Block: builtBlock}) @@ -474,13 +475,14 @@ func Test_NewConsensusRuntime(t *testing.T) { tmpDir := t.TempDir() config := &runtimeConfig{ - polybftBackend: polybftBackendMock, - State: newTestState(t), - PolyBFTConfig: polyBftConfig, - DataDir: tmpDir, - Key: createTestKey(t), - blockchain: blockchainMock, - bridgeTopic: &mockTopic{}, + polybftBackend: polybftBackendMock, + State: newTestState(t), + PolyBFTConfig: polyBftConfig, + DataDir: tmpDir, + Key: createTestKey(t), + blockchain: blockchainMock, + bridgeTopic: &mockTopic{}, + consensusConfig: &consensus.Config{}, } require.NoError(t, config.State.StakeStore.insertFullValidatorSet(validatorSetState{ diff --git a/consensus/polybft/contractsapi/bindings-gen/main.go b/consensus/polybft/contractsapi/bindings-gen/main.go index d302d15b6e..aa8785c3b9 100644 --- a/consensus/polybft/contractsapi/bindings-gen/main.go +++ b/consensus/polybft/contractsapi/bindings-gen/main.go @@ -48,6 +48,7 @@ func main() { []string{ "commit", "execute", + "batchExecute", }, []string{ "StateSyncResult", diff --git a/consensus/polybft/contractsapi/contractsapi.go b/consensus/polybft/contractsapi/contractsapi.go index 36ce2faeb5..b84045e6c2 100644 --- a/consensus/polybft/contractsapi/contractsapi.go +++ b/consensus/polybft/contractsapi/contractsapi.go @@ -77,6 +77,23 @@ func (e *ExecuteStateReceiverFn) DecodeAbi(buf []byte) error { return decodeMethod(StateReceiver.Abi.Methods["execute"], buf, e) } +type BatchExecuteStateReceiverFn struct { + Proofs [][]types.Hash `abi:"proofs"` + Objs []*StateSync `abi:"objs"` +} + +func (b *BatchExecuteStateReceiverFn) Sig() []byte { + return StateReceiver.Abi.Methods["batchExecute"].ID() +} + +func (b *BatchExecuteStateReceiverFn) EncodeAbi() ([]byte, error) { + return StateReceiver.Abi.Methods["batchExecute"].Encode(b) +} + +func (b *BatchExecuteStateReceiverFn) DecodeAbi(buf []byte) error { + return decodeMethod(StateReceiver.Abi.Methods["batchExecute"], buf, b) +} + type StateSyncResultEvent struct { Counter *big.Int `abi:"counter"` Status bool `abi:"status"` diff --git a/consensus/polybft/polybft.go b/consensus/polybft/polybft.go index 421625956b..9e77c545c1 100644 --- a/consensus/polybft/polybft.go +++ b/consensus/polybft/polybft.go @@ -552,6 +552,7 @@ func (p *Polybft) initRuntime() error { txPool: p.txPool, bridgeTopic: p.bridgeTopic, numBlockConfirmations: p.config.NumBlockConfirmations, + consensusConfig: p.config.Config, } runtime, err := newConsensusRuntime(p.logger, runtimeConfig) diff --git a/consensus/polybft/polybft_test.go b/consensus/polybft/polybft_test.go index 72714b14fc..8d8f44f10f 100644 --- a/consensus/polybft/polybft_test.go +++ b/consensus/polybft/polybft_test.go @@ -208,7 +208,10 @@ func TestPolybft_Close(t *testing.T) { polybft := Polybft{ closeCh: make(chan struct{}), syncer: syncer, - runtime: &consensusRuntime{stateSyncManager: &dummyStateSyncManager{}}, + runtime: &consensusRuntime{ + stateSyncManager: &dummyStateSyncManager{}, + stateSyncRelayer: &dummyStateSyncRelayer{}, + }, } assert.NoError(t, polybft.Close()) diff --git a/consensus/polybft/state_store_state_sync.go b/consensus/polybft/state_store_state_sync.go index 5c2b50fce4..71e37f80ab 100644 --- a/consensus/polybft/state_store_state_sync.go +++ b/consensus/polybft/state_store_state_sync.go @@ -19,6 +19,8 @@ var ( stateSyncProofsBucket = []byte("stateSyncProofs") // bucket to store message votes (signatures) messageVotesBucket = []byte("votes") + // bucket to store all state sync relayer events + stateSyncRelayerEventsBucket = []byte("relayerEvents") // errNotEnoughStateSyncs error message errNotEnoughStateSyncs = errors.New("there is either a gap or not enough sync events") @@ -39,6 +41,9 @@ commitments/ stateSyncProofs/ |--> stateSyncProof.StateSync.Id -> *StateSyncProof (json marshalled) + +relayerEvents/ +|--> StateSyncRelayerEventData.EventID -> *StateSyncRelayerEventData (json marshalled) */ type StateSyncStore struct { @@ -59,6 +64,10 @@ func (s *StateSyncStore) initialize(tx *bolt.Tx) error { return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncProofsBucket), err) } + if _, err := tx.CreateBucketIfNotExists(stateSyncRelayerEventsBucket); err != nil { + return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncRelayerEventsBucket), err) + } + return nil } @@ -365,3 +374,69 @@ func (s *StateSyncStore) getStateSyncProof(stateSyncID uint64) (*StateSyncProof, return ssp, err } + +// updateStateSyncRelayerEvents updates/remove desired events +func (s *StateSyncStore) updateStateSyncRelayerEvents( + events []*StateSyncRelayerEventData, removeIDs []uint64, dbTx *bolt.Tx) error { + updateFn := func(tx *bolt.Tx) error { + relayerEventsBucket := tx.Bucket(stateSyncRelayerEventsBucket) + + for _, evnt := range events { + raw, err := json.Marshal(evnt) + if err != nil { + return err + } + + key := common.EncodeUint64ToBytes(evnt.EventID) + + if err := relayerEventsBucket.Put(key, raw); err != nil { + return err + } + } + + for _, stateSyncEventID := range removeIDs { + stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID) + + if err := relayerEventsBucket.Delete(stateSyncEventIDKey); err != nil { + return fmt.Errorf("failed to remove state sync relayer event (ID=%d): %w", stateSyncEventID, err) + } + } + + return nil + } + + if dbTx == nil { + return s.db.Update(func(tx *bolt.Tx) error { + return updateFn(tx) + }) + } + + return updateFn(dbTx) +} + +// getAllAvailableEvents retrieves all StateSyncRelayerEventData that should be sent as a transactions +func (s *StateSyncStore) getAllAvailableEvents(limit int) (result []*StateSyncRelayerEventData, err error) { + if err = s.db.View(func(tx *bolt.Tx) error { + cursor := tx.Bucket(stateSyncRelayerEventsBucket).Cursor() + + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var event *StateSyncRelayerEventData + + if err := json.Unmarshal(v, &event); err != nil { + return err + } + + result = append(result, event) + + if limit > 0 && len(result) >= limit { + break + } + } + + return nil + }); err != nil { + return nil, err + } + + return result, nil +} diff --git a/consensus/polybft/state_store_state_sync_test.go b/consensus/polybft/state_store_state_sync_test.go index 421686c7b2..698c5c8356 100644 --- a/consensus/polybft/state_store_state_sync_test.go +++ b/consensus/polybft/state_store_state_sync_test.go @@ -297,3 +297,59 @@ func createTestStateSync(index int64) *contractsapi.StateSyncedEvent { Data: []byte{0, 1}, } } + +func TestState_StateSync_StateSyncRelayerDataAndEvents(t *testing.T) { + t.Parallel() + + state := newTestState(t) + + // update + require.NoError(t, state.StateSyncStore.updateStateSyncRelayerEvents([]*StateSyncRelayerEventData{ + {EventID: 2}, + {EventID: 4}, + {EventID: 7, SentStatus: true, BlockNumber: 100}, + }, []uint64{}, nil)) + + // get available events + events, err := state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 3) + require.Equal(t, uint64(2), events[0].EventID) + require.Equal(t, uint64(4), events[1].EventID) + require.Equal(t, uint64(7), events[2].EventID) + + // update again + require.NoError(t, state.StateSyncStore.updateStateSyncRelayerEvents( + []*StateSyncRelayerEventData{ + {EventID: 10}, + {EventID: 12}, + {EventID: 11}, + }, + []uint64{4, 7}, + nil, + )) + + // get available events + events, err = state.StateSyncStore.getAllAvailableEvents(1000) + + require.NoError(t, err) + require.Len(t, events, 4) + require.Equal(t, uint64(2), events[0].EventID) + require.Equal(t, uint64(10), events[1].EventID) + require.Equal(t, false, events[1].SentStatus) + require.Equal(t, uint64(11), events[2].EventID) + require.Equal(t, uint64(12), events[3].EventID) + + events[1].SentStatus = true + require.NoError(t, state.StateSyncStore.updateStateSyncRelayerEvents(events[1:2], []uint64{2}, nil)) + + // get available events with limit + events, err = state.StateSyncStore.getAllAvailableEvents(2) + + require.NoError(t, err) + require.Len(t, events, 2) + require.Equal(t, uint64(10), events[0].EventID) + require.Equal(t, true, events[0].SentStatus) + require.Equal(t, uint64(11), events[1].EventID) +} diff --git a/consensus/polybft/state_sync_relayer.go b/consensus/polybft/state_sync_relayer.go new file mode 100644 index 0000000000..8c38a5ed94 --- /dev/null +++ b/consensus/polybft/state_sync_relayer.go @@ -0,0 +1,347 @@ +package polybft + +import ( + "encoding/json" + "errors" + "fmt" + "net" + "strings" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/contracts" + "github.com/0xPolygon/polygon-edge/txrelayer" + "github.com/0xPolygon/polygon-edge/types" + "github.com/hashicorp/go-hclog" + "github.com/umbracle/ethgo" + bolt "go.etcd.io/bbolt" +) + +const ( + // defaultMaxBlocksToWaitForResend specifies how many blocks should be wait + // in order to try again to send transaction + defaultMaxBlocksToWaitForResend = uint64(30) + // defaultMaxAttemptsToSend specifies how many sending retries for one transaction + defaultMaxAttemptsToSend = uint64(15) + // defaultMaxEventsPerBatch specifies maximum events per one batchExecute tx + defaultMaxEventsPerBatch = uint64(10) +) + +var ( + errFailedToExecuteStateSync = errors.New("failed to execute state sync") + errUnknownStateSyncRelayerEvent = errors.New("unknown event") + + commitmentEventSignature = new(contractsapi.NewCommitmentEvent).Sig() + stateSyncResultEventSignature = new(contractsapi.StateSyncResultEvent).Sig() +) + +// StateSyncRelayer is an interface that defines functions for state sync relayer +type StateSyncRelayer interface { + EventSubscriber + PostBlock(req *PostBlockRequest) error + Init() error + Close() +} + +// stateSyncProofRetriever is an interface that exposes function for retrieving state sync proof +type stateSyncProofRetriever interface { + GetStateSyncProof(stateSyncID uint64) (types.Proof, error) +} + +var _ StateSyncRelayer = (*dummyStateSyncRelayer)(nil) + +// dummyStateSyncRelayer is a dummy implementation of a StateSyncRelayer +type dummyStateSyncRelayer struct{} + +func (d *dummyStateSyncRelayer) PostBlock(req *PostBlockRequest) error { return nil } + +func (d *dummyStateSyncRelayer) Init() error { return nil } +func (d *dummyStateSyncRelayer) Close() {} + +// EventSubscriber implementation +func (d *dummyStateSyncRelayer) GetLogFilters() map[types.Address][]types.Hash { + return make(map[types.Address][]types.Hash) +} +func (d *dummyStateSyncRelayer) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error { + return nil +} + +var _ StateSyncRelayer = (*stateSyncRelayerImpl)(nil) + +// StateSyncRelayerEventData keeps information about an event +type StateSyncRelayerEventData struct { + EventID uint64 `json:"eventID"` + CountTries uint64 `json:"countTries"` + BlockNumber uint64 `json:"blockNumber"` // block when state sync is sent + SentStatus bool `json:"sentStatus"` +} + +func (ed StateSyncRelayerEventData) String() string { + return fmt.Sprintf("%d", ed.EventID) +} + +type stateSyncRelayerConfig struct { + maxBlocksToWaitForResend uint64 + maxAttemptsToSend uint64 + maxEventsPerBatch uint64 +} + +type stateSyncRelayerImpl struct { + txRelayer txrelayer.TxRelayer + key ethgo.Key + proofRetriever stateSyncProofRetriever + state *StateSyncStore + logger hclog.Logger + blockchain blockchainBackend + + notifyCh chan struct{} + closeCh chan struct{} + + config *stateSyncRelayerConfig +} + +func NewStateSyncRelayer( + txRelayer txrelayer.TxRelayer, + stateReceiverAddr types.Address, + state *StateSyncStore, + store stateSyncProofRetriever, + blockchain blockchainBackend, + key ethgo.Key, + config *stateSyncRelayerConfig, + logger hclog.Logger, +) *stateSyncRelayerImpl { + if config == nil { + config = &stateSyncRelayerConfig{ + maxBlocksToWaitForResend: defaultMaxBlocksToWaitForResend, + maxAttemptsToSend: defaultMaxAttemptsToSend, + maxEventsPerBatch: defaultMaxEventsPerBatch, + } + } + + return &stateSyncRelayerImpl{ + txRelayer: txRelayer, + key: key, + proofRetriever: store, + state: state, + closeCh: make(chan struct{}), + notifyCh: make(chan struct{}, 1), + blockchain: blockchain, + config: config, + logger: logger, + } +} + +func (ssr *stateSyncRelayerImpl) Init() error { + // start consumer + go func() { + for { + select { + case <-ssr.closeCh: + return + case <-ssr.notifyCh: + ssr.processEvents() + } + } + }() + + return nil +} + +func (ssr *stateSyncRelayerImpl) Close() { + close(ssr.closeCh) +} + +func (ssr *stateSyncRelayerImpl) PostBlock(req *PostBlockRequest) error { + select { + case ssr.notifyCh <- struct{}{}: + default: + } + + return nil +} + +func (ssr *stateSyncRelayerImpl) processEvents() { + // we need twice as batch size because events from first batch are possible already sent maxAttemptsToSend times + events, err := ssr.state.getAllAvailableEvents(int(ssr.config.maxEventsPerBatch) * 2) + if err != nil { + ssr.logger.Error("retrieving events failed", "err", err) + + return + } + + removedEventIDs := []uint64{} + sendingEvents := []*StateSyncRelayerEventData{} + currentBlockNumber := ssr.blockchain.CurrentHeader().Number + + // check already processed events + for _, evnt := range events { + // quit if we are still waiting for some old event confirmation (there is no paralelization right now!) + if evnt.SentStatus && evnt.BlockNumber+ssr.config.maxBlocksToWaitForResend > currentBlockNumber { + return + } + + // remove event if it is processed too many times + if evnt.CountTries+1 > ssr.config.maxAttemptsToSend { + removedEventIDs = append(removedEventIDs, evnt.EventID) + } else { + evnt.CountTries++ + evnt.BlockNumber = currentBlockNumber + evnt.SentStatus = true + + sendingEvents = append(sendingEvents, evnt) + if len(sendingEvents) == int(ssr.config.maxEventsPerBatch) { + break + } + } + } + + // update state only if needed + if len(sendingEvents)+len(removedEventIDs) > 0 { + ssr.logger.Info("sending events", "events", sendingEvents, "removed", removedEventIDs) + + if err := ssr.state.updateStateSyncRelayerEvents(sendingEvents, removedEventIDs, nil); err != nil { + ssr.logger.Error("updating events failed", + "events", sendingEvents, "removed", removedEventIDs, "err", err) + + return + } + } + + // send tx only if needed + if len(sendingEvents) > 0 { + if err := ssr.sendTx(sendingEvents); err != nil { + ssr.logger.Error("failed to send tx", "block", currentBlockNumber, "events", sendingEvents, "err", err) + } else { + ssr.logger.Info("tx has been successfully sent", "block", currentBlockNumber, "events", sendingEvents) + } + } +} + +func (ssr *stateSyncRelayerImpl) sendTx(events []*StateSyncRelayerEventData) error { + proofs := make([][]types.Hash, len(events)) + objs := make([]*contractsapi.StateSync, len(events)) + + for i, event := range events { + proof, err := ssr.proofRetriever.GetStateSyncProof(event.EventID) + if err != nil { + return fmt.Errorf("failed to get proof for %d: %w", event.EventID, err) + } + + // since state sync event is a map in the jsonrpc response, + // to not have custom logic of converting the map to state sync event + // json encoding is used, since it manages to successfully unmarshal the + // event from the marshaled map + raw, err := json.Marshal(proof.Metadata["StateSync"]) + if err != nil { + return fmt.Errorf("failed to marshal event %d: %w", event.EventID, err) + } + + if err = json.Unmarshal(raw, &objs[i]); err != nil { + return fmt.Errorf("failed to unmarshal event %d: %w", event.EventID, err) + } + + proofs[i] = proof.Data + } + + input, err := (&contractsapi.BatchExecuteStateReceiverFn{ + Proofs: proofs, + Objs: objs, + }).EncodeAbi() + if err != nil { + return err + } + + // send batchExecute state sync + _, err = ssr.txRelayer.SendTransaction(ðgo.Transaction{ + From: ssr.key.Address(), + To: (*ethgo.Address)(&contracts.StateReceiverContract), + Gas: types.StateTransactionGasLimit, + Input: input, + }, ssr.key) + + return err +} + +// EventSubscriber implementation + +// GetLogFilters returns a map of log filters for getting desired events, +// where the key is the address of contract that emits desired events, +// and the value is a slice of signatures of events we want to get. +// This function is the implementation of EventSubscriber interface +func (ssr *stateSyncRelayerImpl) GetLogFilters() map[types.Address][]types.Hash { + var ( + stateSyncResultEvent contractsapi.StateSyncResultEvent + newCommitmentEvent contractsapi.NewCommitmentEvent + ) + + return map[types.Address][]types.Hash{ + contracts.StateReceiverContract: { + types.Hash(stateSyncResultEvent.Sig()), + types.Hash(newCommitmentEvent.Sig()), + }, + } +} + +// ProcessLog is the implementation of EventSubscriber interface, +// used to handle a log defined in GetLogFilters, provided by event provider +func (ssr *stateSyncRelayerImpl) ProcessLog(header *types.Header, log *ethgo.Log, dbTx *bolt.Tx) error { + var ( + commitEvent contractsapi.NewCommitmentEvent + stateSyncResultEvent contractsapi.StateSyncResultEvent + ) + + switch log.Topics[0] { + case commitmentEventSignature: + _, err := commitEvent.ParseLog(log) + if err != nil { + return err + } + + firstID, lastID := commitEvent.StartID.Uint64(), commitEvent.EndID.Uint64() + newEvents := make([]*StateSyncRelayerEventData, lastID-firstID+1) + + for eventID := firstID; eventID <= lastID; eventID++ { + newEvents[eventID-firstID] = &StateSyncRelayerEventData{EventID: eventID} + } + + ssr.logger.Info("new events has been arrived", "block", header.Number, "events", newEvents) + + return ssr.state.updateStateSyncRelayerEvents(newEvents, nil, dbTx) + + case stateSyncResultEventSignature: + _, err := stateSyncResultEvent.ParseLog(log) + if err != nil { + return err + } + + eventID := stateSyncResultEvent.Counter.Uint64() + + if stateSyncResultEvent.Status { + ssr.logger.Info("event has been processed", "block", header.Number, "event", eventID) + + return ssr.state.updateStateSyncRelayerEvents(nil, []uint64{eventID}, dbTx) + } + + ssr.logger.Info("event has been failed to process", "block", header.Number, + "event", eventID, "reason", string(stateSyncResultEvent.Message)) + + return nil + + default: + return errUnknownStateSyncRelayerEvent + } +} + +func getStateSyncTxRelayer(rpcEndpoint string, logger hclog.Logger) (txrelayer.TxRelayer, error) { + if rpcEndpoint == "" || strings.Contains(rpcEndpoint, "0.0.0.0") { + _, port, err := net.SplitHostPort(rpcEndpoint) + if err == nil { + rpcEndpoint = fmt.Sprintf("http://%s:%s", "127.0.0.1", port) + } else { + rpcEndpoint = txrelayer.DefaultRPCAddress + } + } + + return txrelayer.NewTxRelayer( + txrelayer.WithIPAddress(rpcEndpoint), txrelayer.WithNumRetries(-1), + txrelayer.WithWriter(logger.StandardWriter(&hclog.StandardLoggerOptions{}))) +} diff --git a/consensus/polybft/state_sync_relayer_test.go b/consensus/polybft/state_sync_relayer_test.go new file mode 100644 index 0000000000..d07d89e8e0 --- /dev/null +++ b/consensus/polybft/state_sync_relayer_test.go @@ -0,0 +1,203 @@ +package polybft + +import ( + "errors" + "math/big" + "testing" + "time" + + "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" + "github.com/0xPolygon/polygon-edge/types" + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/umbracle/ethgo" + "github.com/umbracle/ethgo/abi" +) + +func TestStateSyncRelayer_FullWorkflow(t *testing.T) { + t.Parallel() + + testKey := createTestKey(t) + stateSyncAddr := types.StringToAddress("0x56563") + + resultLogs := []*types.Log{ + createTestLogForStateSyncResultEvent(t, 1), createTestLogForStateSyncResultEvent(t, 2), + createTestLogForStateSyncResultEvent(t, 3), createTestLogForStateSyncResultEvent(t, 4), + createTestLogForStateSyncResultEvent(t, 5), + } + commitmentLogs := []*types.Log{ + createTestLogForNewCommitmentEvent(t, stateSyncAddr, 1, 1, types.StringToHash("0x2")), + createTestLogForNewCommitmentEvent(t, stateSyncAddr, 2, 3, types.StringToHash("0x2")), + createTestLogForNewCommitmentEvent(t, stateSyncAddr, 4, 5, types.StringToHash("0x2")), + } + + headers := []*types.Header{ + {Number: 2}, {Number: 3}, {Number: 4}, {Number: 5}, {Number: 5}, + } + + proofMock := &mockStateSyncProofRetriever{ + fn: func(stateSyncID uint64) (types.Proof, error) { + return types.Proof{ + Data: []types.Hash{types.StringToHash("0x1122334455")}, + Metadata: map[string]interface{}{ + "StateSync": map[string]interface{}{ + "ID": stateSyncID, + "Sender": types.StringToAddress("0xffee"), + "Receiver": types.StringToAddress("0xeeff"), + "Data": nil, + }, + }, + }, nil + }, + } + blockhainMock := &blockchainMock{} + dummyTxRelayer := newDummyStakeTxRelayer(t, nil) + state := newTestState(t) + + stateSyncRelayer := NewStateSyncRelayer( + dummyTxRelayer, + stateSyncAddr, + state.StateSyncStore, + proofMock, + blockhainMock, + testKey, + &stateSyncRelayerConfig{ + maxAttemptsToSend: 6, + maxBlocksToWaitForResend: 1, + maxEventsPerBatch: 1, + }, + hclog.Default(), + ) + + for _, h := range headers { + blockhainMock.On("CurrentHeader").Return(h).Once() + } + + // send first two events without errors + dummyTxRelayer.On("SendTransaction", mock.Anything, testKey).Return((*ethgo.Receipt)(nil), nil).Times(2) + // fail 3rd time + dummyTxRelayer.On("SendTransaction", mock.Anything, testKey).Return( + (*ethgo.Receipt)(nil), errors.New("e")).Once() + // send 3 events all at once at the end + dummyTxRelayer.On("SendTransaction", mock.Anything, testKey).Return((*ethgo.Receipt)(nil), nil).Once() + + require.NoError(t, stateSyncRelayer.Init()) + + // post 1st block + require.NoError(t, stateSyncRelayer.ProcessLog(headers[0], convertLog(commitmentLogs[0]), nil)) + require.NoError(t, stateSyncRelayer.ProcessLog(headers[0], convertLog(commitmentLogs[1]), nil)) + require.NoError(t, stateSyncRelayer.PostBlock(&PostBlockRequest{})) + + time.Sleep(time.Second * 2) // wait for some time + + events, err := state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 3) + require.Equal(t, uint64(1), events[0].EventID) + require.True(t, events[0].SentStatus) + require.False(t, events[1].SentStatus) + require.False(t, events[2].SentStatus) + + // post 2nd block + require.NoError(t, stateSyncRelayer.ProcessLog(headers[1], convertLog(resultLogs[0]), nil)) + require.NoError(t, stateSyncRelayer.ProcessLog(headers[1], convertLog(commitmentLogs[2]), nil)) + require.NoError(t, stateSyncRelayer.PostBlock(&PostBlockRequest{})) + + time.Sleep(time.Second * 2) // wait for some time + + events, err = state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 4) + require.True(t, events[0].SentStatus) + require.Equal(t, uint64(2), events[0].EventID) + require.False(t, events[1].SentStatus) + require.False(t, events[2].SentStatus) + + // post 3rd block + require.NoError(t, stateSyncRelayer.ProcessLog(headers[2], convertLog(resultLogs[1]), nil)) + require.NoError(t, stateSyncRelayer.PostBlock(&PostBlockRequest{})) + + time.Sleep(time.Second * 2) // wait for some time + + events, err = state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 3) + require.True(t, events[0].SentStatus) + require.Equal(t, uint64(3), events[0].EventID) + require.False(t, events[1].SentStatus) + + // post 4th block - will not provide result, so one more SendTransaction will be triggered + stateSyncRelayer.config.maxEventsPerBatch = 3 // send all 3 left events at once + + require.NoError(t, stateSyncRelayer.PostBlock(&PostBlockRequest{})) + + time.Sleep(time.Second * 2) // wait for some time + + events, err = state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 3) + require.True(t, events[0].SentStatus && events[1].SentStatus && events[2].SentStatus) + + // post 5th block + require.NoError(t, stateSyncRelayer.ProcessLog(headers[4], convertLog(resultLogs[2]), nil)) + require.NoError(t, stateSyncRelayer.ProcessLog(headers[4], convertLog(resultLogs[3]), nil)) + require.NoError(t, stateSyncRelayer.ProcessLog(headers[4], convertLog(resultLogs[4]), nil)) + require.NoError(t, stateSyncRelayer.PostBlock(&PostBlockRequest{})) + + time.Sleep(time.Second * 2) // wait for some time + + events, err = state.StateSyncStore.getAllAvailableEvents(0) + + require.NoError(t, err) + require.Len(t, events, 0) + + stateSyncRelayer.Close() + time.Sleep(time.Second) + + blockhainMock.AssertExpectations(t) + dummyTxRelayer.AssertExpectations(t) +} + +type mockStateSyncProofRetriever struct { + fn func(uint64) (types.Proof, error) +} + +func (m *mockStateSyncProofRetriever) GetStateSyncProof(stateSyncID uint64) (types.Proof, error) { + if m.fn != nil { + return m.fn(stateSyncID) + } + + return types.Proof{}, nil +} + +func createTestLogForNewCommitmentEvent( + t *testing.T, + stateSyncAddr types.Address, + startEventID uint64, + endEventID uint64, + rootHash types.Hash) *types.Log { + t.Helper() + + var evnt contractsapi.NewCommitmentEvent + + encodedData1, err := abi.MustNewType("uint256").Encode(new(big.Int).SetUint64(startEventID)) + require.NoError(t, err) + + encodedData2, err := abi.MustNewType("uint256").Encode(new(big.Int).SetUint64(endEventID)) + require.NoError(t, err) + + return &types.Log{ + Address: stateSyncAddr, + Topics: []types.Hash{ + types.Hash(evnt.Sig()), + types.BytesToHash(encodedData1), + types.BytesToHash(encodedData2), + }, + Data: rootHash[:], + } +} diff --git a/consensus/polybft/statesyncrelayer/state_sync_relayer.go b/consensus/polybft/statesyncrelayer/state_sync_relayer.go deleted file mode 100644 index 62c901b376..0000000000 --- a/consensus/polybft/statesyncrelayer/state_sync_relayer.go +++ /dev/null @@ -1,240 +0,0 @@ -package statesyncrelayer - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "net" - "path" - "strings" - "time" - - "github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi" - "github.com/0xPolygon/polygon-edge/contracts" - "github.com/0xPolygon/polygon-edge/tracker" - "github.com/0xPolygon/polygon-edge/txrelayer" - "github.com/0xPolygon/polygon-edge/types" - - hcf "github.com/hashicorp/go-hclog" - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/jsonrpc" -) - -type StateSyncRelayer struct { - dataDir string - rpcEndpoint string - stateReceiverAddr ethgo.Address - eventTrackerStartBlock uint64 - logger hcf.Logger - client *jsonrpc.Client - txRelayer txrelayer.TxRelayer - key ethgo.Key - closeCh chan struct{} - pollInterval time.Duration -} - -func sanitizeRPCEndpoint(rpcEndpoint string) string { - if rpcEndpoint == "" || strings.Contains(rpcEndpoint, "0.0.0.0") { - _, port, err := net.SplitHostPort(rpcEndpoint) - if err == nil { - rpcEndpoint = fmt.Sprintf("http://%s:%s", "127.0.0.1", port) - } else { - rpcEndpoint = txrelayer.DefaultRPCAddress - } - } - - return rpcEndpoint -} - -func NewRelayer( - dataDir string, - rpcEndpoint string, - stateReceiverAddr ethgo.Address, - stateReceiverTrackerStartBlock uint64, - logger hcf.Logger, - key ethgo.Key, - pollInterval time.Duration, -) *StateSyncRelayer { - endpoint := sanitizeRPCEndpoint(rpcEndpoint) - - // create the JSON RPC client - client, err := jsonrpc.NewClient(endpoint) - if err != nil { - logger.Error("Failed to create the JSON RPC client", "err", err) - - return nil - } - - txRelayer, err := txrelayer.NewTxRelayer(txrelayer.WithClient(client)) - if err != nil { - logger.Error("Failed to create the tx relayer", "err", err) - } - - return &StateSyncRelayer{ - dataDir: dataDir, - rpcEndpoint: endpoint, - stateReceiverAddr: stateReceiverAddr, - logger: logger, - client: client, - txRelayer: txRelayer, - key: key, - closeCh: make(chan struct{}), - eventTrackerStartBlock: stateReceiverTrackerStartBlock, - pollInterval: pollInterval, - } -} - -func (r *StateSyncRelayer) Start() error { - et := tracker.NewEventTracker( - path.Join(r.dataDir, "/relayer.db"), - r.rpcEndpoint, - r.stateReceiverAddr, - r, - 0, // sidechain (Polygon POS) is instant finality, so no need to wait - r.eventTrackerStartBlock, - r.logger, - r.pollInterval, - ) - - ctx, cancelFn := context.WithCancel(context.Background()) - - go func() { - <-r.closeCh - cancelFn() - }() - - return et.Start(ctx) -} - -// Stop function is used to tear down all the allocated resources -func (r *StateSyncRelayer) Stop() { - close(r.closeCh) -} - -func (r *StateSyncRelayer) AddLog(log *ethgo.Log) error { - r.logger.Debug("Received a log", "log", log) - - var commitEvent contractsapi.NewCommitmentEvent - - doesMatch, err := commitEvent.ParseLog(log) - if !doesMatch { - return nil - } - - if err != nil { - r.logger.Error("Failed to parse log", "err", err) - - return err - } - - startID := commitEvent.StartID.Uint64() - endID := commitEvent.EndID.Uint64() - - r.logger.Info("Execute commitment", "Block", log.BlockNumber, "StartID", startID, "EndID", endID) - - // we don't return errors if some client logic fails, - // only if event is not parsed - for i := startID; i <= endID; i++ { - // query the state sync proof - stateSyncProof, err := r.queryStateSyncProof(fmt.Sprintf("0x%x", i)) - if err != nil { - r.logger.Error("Failed to query state sync proof", "err", err) - - continue - } - - if err := r.executeStateSync(stateSyncProof); err != nil { - r.logger.Error("State sync execution failed", "err", err) - - continue - } - - r.logger.Info("State sync executed", "ID", i) - } - - return nil -} - -// queryStateSyncProof queries the state sync proof -func (r *StateSyncRelayer) queryStateSyncProof(stateSyncID string) (*types.Proof, error) { - // retrieve state sync proof - var stateSyncProof types.Proof - - err := r.client.Call("bridge_getStateSyncProof", &stateSyncProof, stateSyncID) - if err != nil { - return nil, err - } - - r.logger.Debug(fmt.Sprintf("state sync proof: %v", stateSyncProof)) - - return &stateSyncProof, nil -} - -// executeStateSync executes the state sync -func (r *StateSyncRelayer) executeStateSync(proof *types.Proof) error { - sseMap, ok := proof.Metadata["StateSync"].(map[string]interface{}) - if !ok { - return errors.New("could not get state sync event from proof") - } - - var sse *contractsapi.StateSync - - // since state sync event is a map in the jsonrpc response, - // to not have custom logic of converting the map to state sync event - // json encoding is used, since it manages to successfully unmarshal the - // event from the marshaled map - raw, err := json.Marshal(sseMap) - if err != nil { - return fmt.Errorf("failed to marshal state sync map into JSON. Error: %w", err) - } - - if err = json.Unmarshal(raw, &sse); err != nil { - return fmt.Errorf("failed to unmarshal state sync event from JSON. Error: %w", err) - } - - execute := &contractsapi.ExecuteStateReceiverFn{ - Proof: proof.Data, - Obj: sse, - } - - input, err := execute.EncodeAbi() - if err != nil { - return err - } - - // execute the state sync - txn := ðgo.Transaction{ - From: r.key.Address(), - To: (*ethgo.Address)(&contracts.StateReceiverContract), - Gas: types.StateTransactionGasLimit, - Input: input, - } - - receipt, err := r.txRelayer.SendTransaction(txn, r.key) - if err != nil { - return fmt.Errorf("failed to send execute state sync transaction for id %d: %w", sse.ID, err) - } - - if receipt.Status == uint64(types.ReceiptFailed) { - return fmt.Errorf("transaction execution reverted for state sync id: %d", sse.ID) - } - - var stateSyncResult contractsapi.StateSyncResultEvent - for _, log := range receipt.Logs { - matches, err := stateSyncResult.ParseLog(log) - if err != nil { - return fmt.Errorf("failed to find state sync event result log for state sync id: %d", sse.ID) - } - - if !matches { - continue - } - - if !stateSyncResult.Status { - return fmt.Errorf("failed to execute state sync id: %d", sse.ID) - } - } - - return nil -} diff --git a/consensus/polybft/statesyncrelayer/state_sync_relayer_test.go b/consensus/polybft/statesyncrelayer/state_sync_relayer_test.go deleted file mode 100644 index 65341dcc91..0000000000 --- a/consensus/polybft/statesyncrelayer/state_sync_relayer_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package statesyncrelayer - -import ( - "math/big" - "testing" - "time" - - "github.com/0xPolygon/polygon-edge/contracts" - "github.com/0xPolygon/polygon-edge/txrelayer" - "github.com/0xPolygon/polygon-edge/types" - "github.com/hashicorp/go-hclog" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/umbracle/ethgo" - "github.com/umbracle/ethgo/jsonrpc" - "github.com/umbracle/ethgo/wallet" -) - -var _ txrelayer.TxRelayer = (*txRelayerMock)(nil) - -type txRelayerMock struct { - mock.Mock -} - -func (t *txRelayerMock) Call(from ethgo.Address, to ethgo.Address, input []byte) (string, error) { - args := t.Called(from, to, input) - - return args.String(0), args.Error(1) -} - -func (t *txRelayerMock) SendTransaction(txn *ethgo.Transaction, key ethgo.Key) (*ethgo.Receipt, error) { - args := t.Called(txn, key) - - return args.Get(0).(*ethgo.Receipt), args.Error(1) //nolint:forcetypeassert -} - -func (t *txRelayerMock) SendTransactionLocal(txn *ethgo.Transaction) (*ethgo.Receipt, error) { - args := t.Called(txn) - - return nil, args.Error(1) -} - -func (t *txRelayerMock) Client() *jsonrpc.Client { - return nil -} - -func Test_executeStateSync(t *testing.T) { - t.Parallel() - - txRelayer := &txRelayerMock{} - key, _ := wallet.GenerateKey() - - r := &StateSyncRelayer{ - txRelayer: txRelayer, - key: key, - } - - txRelayer.On("SendTransaction", mock.Anything, mock.Anything). - Return(ðgo.Receipt{Status: uint64(types.ReceiptSuccess)}, nil).Once() - - proof := &types.Proof{ - Data: []types.Hash{}, - Metadata: map[string]interface{}{ - "StateSync": map[string]interface{}{ - "ID": big.NewInt(1), - "Sender": types.ZeroAddress, - "Receiver": types.ZeroAddress, - "Data": []byte{}, - }, - }, - } - - require.NoError(t, r.executeStateSync(proof)) - - txRelayer.AssertExpectations(t) -} - -// Test sanitizeRPCEndpoint -func Test_sanitizeRPCEndpoint(t *testing.T) { - t.Parallel() - - tests := []struct { - name string - endpoint string - want string - }{ - { - "url with port", - "http://localhost:10001", - "http://localhost:10001", - }, - { - "all interfaces with port without schema", - "0.0.0.0:10001", - "http://127.0.0.1:10001", - }, - { - "url without port", - "http://127.0.0.1", - "http://127.0.0.1", - }, - { - "empty endpoint", - "", - txrelayer.DefaultRPCAddress, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - if got := sanitizeRPCEndpoint(tt.endpoint); got != tt.want { - t.Errorf("sanitizeRPCEndpoint() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestStateSyncRelayer_Stop(t *testing.T) { - t.Parallel() - - key, err := wallet.GenerateKey() - require.NoError(t, err) - - r := NewRelayer("test-chain-1", txrelayer.DefaultRPCAddress, ethgo.Address(contracts.StateReceiverContract), - 0, hclog.NewNullLogger(), key, time.Second) - - require.NotPanics(t, func() { r.Stop() }) -} diff --git a/e2e-polybft/e2e/bridge_test.go b/e2e-polybft/e2e/bridge_test.go index 4a321b21ea..2ea8d0dc8d 100644 --- a/e2e-polybft/e2e/bridge_test.go +++ b/e2e-polybft/e2e/bridge_test.go @@ -1532,13 +1532,12 @@ func TestE2E_Bridge_Transfers_AccessLists(t *testing.T) { }) } -func TestE2E_Bridge_Transfers_WithBlockTrackerPollInterval(t *testing.T) { +func TestE2E_Bridge_Transfers_WithRootTrackerPollInterval(t *testing.T) { var ( numBlockConfirmations = uint64(2) epochSize = 30 sprintSize = uint64(5) rootPollInterval = 5 * time.Second - relayerPollInterval = 5 * time.Second numberOfAttempts = uint64(4) stateSyncedLogsCount = 1 ) @@ -1547,7 +1546,6 @@ func TestE2E_Bridge_Transfers_WithBlockTrackerPollInterval(t *testing.T) { framework.WithEpochSize(epochSize), framework.WithNumBlockConfirmations(numBlockConfirmations), framework.WithRootTrackerPollInterval(rootPollInterval), - framework.WithRelayerTrackerPollInterval(relayerPollInterval), ) defer cluster.Stop() diff --git a/e2e-polybft/framework/test-cluster.go b/e2e-polybft/framework/test-cluster.go index cce2586b18..345a3432ec 100644 --- a/e2e-polybft/framework/test-cluster.go +++ b/e2e-polybft/framework/test-cluster.go @@ -131,8 +131,7 @@ type TestClusterConfig struct { IsPropertyTest bool TestRewardToken string - RootTrackerPollInterval time.Duration - RelayerTrackerPollInterval time.Duration + RootTrackerPollInterval time.Duration ProxyContractsAdmin string @@ -393,12 +392,6 @@ func WithRootTrackerPollInterval(pollInterval time.Duration) ClusterOption { } } -func WithRelayerTrackerPollInterval(pollInterval time.Duration) ClusterOption { - return func(h *TestClusterConfig) { - h.RelayerTrackerPollInterval = pollInterval - } -} - func WithProxyContractsAdmin(address string) ClusterOption { return func(h *TestClusterConfig) { h.ProxyContractsAdmin = address @@ -506,9 +499,9 @@ func NewTestCluster(t *testing.T, validatorsCount int, opts ...ClusterOption) *T cluster.Config.BlockTime.String()) } - if cluster.Config.RelayerTrackerPollInterval != 0 { + if cluster.Config.RootTrackerPollInterval != 0 { args = append(args, "--block-tracker-poll-interval", - cluster.Config.RelayerTrackerPollInterval.String()) + cluster.Config.RootTrackerPollInterval.String()) } if cluster.Config.TestRewardToken != "" { @@ -695,7 +688,6 @@ func (c *TestCluster) InitTestServer(t *testing.T, config.Relayer = nodeType.IsSet(Relayer) config.NumBlockConfirmations = c.Config.NumBlockConfirmations config.BridgeJSONRPC = bridgeJSONRPC - config.RelayerTrackerPollInterval = c.Config.RelayerTrackerPollInterval }) // watch the server for stop signals. It is important to fix the specific diff --git a/e2e-polybft/framework/test-server.go b/e2e-polybft/framework/test-server.go index ff4950938e..471759151e 100644 --- a/e2e-polybft/framework/test-server.go +++ b/e2e-polybft/framework/test-server.go @@ -27,18 +27,17 @@ import ( ) type TestServerConfig struct { - Name string - JSONRPCPort int64 - GRPCPort int64 - P2PPort int64 - Validator bool - DataDir string - Chain string - LogLevel string - Relayer bool - NumBlockConfirmations uint64 - BridgeJSONRPC string - RelayerTrackerPollInterval time.Duration + Name string + JSONRPCPort int64 + GRPCPort int64 + P2PPort int64 + Validator bool + DataDir string + Chain string + LogLevel string + Relayer bool + NumBlockConfirmations uint64 + BridgeJSONRPC string } type TestServerConfigCallback func(*TestServerConfig) @@ -175,11 +174,6 @@ func (t *TestServer) Start() { if config.Relayer { args = append(args, "--relayer") - - if config.RelayerTrackerPollInterval != 0 { - // only relayer node should have this setup if - args = append(args, "--relayer-poll-interval", config.RelayerTrackerPollInterval.String()) - } } // Start the server diff --git a/jsonrpc/throttling_test.go b/jsonrpc/throttling_test.go index 056c0bbc69..13b73ac905 100644 --- a/jsonrpc/throttling_test.go +++ b/jsonrpc/throttling_test.go @@ -59,7 +59,7 @@ func TestThrottling(t *testing.T) { }() go func() { - time.Sleep(time.Millisecond * 600) + time.Sleep(time.Millisecond * 620) defer wg.Done() @@ -70,7 +70,7 @@ func TestThrottling(t *testing.T) { }() go func() { - time.Sleep(time.Millisecond * 610) + time.Sleep(time.Millisecond * 640) defer wg.Done() @@ -81,7 +81,7 @@ func TestThrottling(t *testing.T) { }() go func() { - time.Sleep(time.Millisecond * 950) + time.Sleep(time.Millisecond * 1000) defer wg.Done() diff --git a/server/config.go b/server/config.go index ff725973b0..63240f6ddf 100644 --- a/server/config.go +++ b/server/config.go @@ -44,9 +44,8 @@ type Config struct { Relayer bool - NumBlockConfirmations uint64 - RelayerTrackerPollInterval time.Duration - MetricsInterval time.Duration + NumBlockConfirmations uint64 + MetricsInterval time.Duration } // Telemetry holds the config details for metric services diff --git a/server/server.go b/server/server.go index 394ed93226..e88e3f27ca 100644 --- a/server/server.go +++ b/server/server.go @@ -23,8 +23,6 @@ import ( "github.com/0xPolygon/polygon-edge/blockchain" "github.com/0xPolygon/polygon-edge/chain" "github.com/0xPolygon/polygon-edge/consensus" - "github.com/0xPolygon/polygon-edge/consensus/polybft/statesyncrelayer" - "github.com/0xPolygon/polygon-edge/consensus/polybft/wallet" "github.com/0xPolygon/polygon-edge/contracts" "github.com/0xPolygon/polygon-edge/crypto" "github.com/0xPolygon/polygon-edge/helper/common" @@ -44,7 +42,6 @@ import ( "github.com/hashicorp/go-hclog" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/umbracle/ethgo" "google.golang.org/grpc" ) @@ -89,9 +86,6 @@ type Server struct { // restore restoreProgression *progress.ProgressionWrapper - // stateSyncRelayer is handling state syncs execution (Polybft exclusive) - stateSyncRelayer *statesyncrelayer.StateSyncRelayer - // gasHelper is providing functions regarding gas and fees gasHelper *gasprice.GasHelper } @@ -409,13 +403,6 @@ func NewServer(config *Config) (*Server, error) { return nil, err } - // start relayer - if config.Relayer { - if err := m.setupRelayer(); err != nil { - return nil, err - } - } - m.txpool.SetBaseFee(m.blockchain.Header()) m.txpool.Start() @@ -567,9 +554,11 @@ func (s *Server) setupConsensus() error { } config := &consensus.Config{ - Params: s.config.Chain.Params, - Config: engineConfig, - Path: filepath.Join(s.config.DataDir, "consensus"), + Params: s.config.Chain.Params, + Config: engineConfig, + Path: filepath.Join(s.config.DataDir, "consensus"), + IsRelayer: s.config.Relayer, + RPCEndpoint: s.config.JSONRPC.JSONRPCAddr.String(), } consensus, err := engine( @@ -624,41 +613,6 @@ func extractBlockTime(engineConfig map[string]interface{}) (common.Duration, err return blockTime, nil } -// setupRelayer sets up the relayer -func (s *Server) setupRelayer() error { - account, err := wallet.NewAccountFromSecret(s.secretsManager) - if err != nil { - return fmt.Errorf("failed to create account from secret: %w", err) - } - - polyBFTConfig, err := consensusPolyBFT.GetPolyBFTConfig(s.config.Chain) - if err != nil { - return fmt.Errorf("failed to extract polybft config: %w", err) - } - - trackerStartBlockConfig := map[types.Address]uint64{} - if polyBFTConfig.Bridge != nil { - trackerStartBlockConfig = polyBFTConfig.Bridge.EventTrackerStartBlocks - } - - relayer := statesyncrelayer.NewRelayer( - s.config.DataDir, - s.config.JSONRPC.JSONRPCAddr.String(), - ethgo.Address(contracts.StateReceiverContract), - trackerStartBlockConfig[contracts.StateReceiverContract], - s.logger.Named("relayer"), - wallet.NewEcdsaSigner(wallet.NewKey(account)), - s.config.RelayerTrackerPollInterval, - ) - - // start relayer - if err := relayer.Start(); err != nil { - return fmt.Errorf("failed to start relayer: %w", err) - } - - return nil -} - type jsonRPCHub struct { state state.State restoreProgression *progress.ProgressionWrapper @@ -986,11 +940,6 @@ func (s *Server) Close() { } } - // Stop state sync relayer - if s.stateSyncRelayer != nil { - s.stateSyncRelayer.Stop() - } - // Close the txpool's main loop s.txpool.Close()