Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EVM-831 Implement state sync relayer without blocktracker #1899

Merged
merged 27 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions command/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ type Config struct {
JSONLogFormat bool `json:"json_log_format" yaml:"json_log_format"`
CorsAllowedOrigins []string `json:"cors_allowed_origins" yaml:"cors_allowed_origins"`

Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`
RelayerTrackerPollInterval time.Duration `json:"relayer_tracker_poll_interval" yaml:"relayer_tracker_poll_interval"`
Relayer bool `json:"relayer" yaml:"relayer"`
NumBlockConfirmations uint64 `json:"num_block_confirmations" yaml:"num_block_confirmations"`

ConcurrentRequestsDebug uint64 `json:"concurrent_requests_debug" yaml:"concurrent_requests_debug"`
WebSocketReadLimit uint64 `json:"web_socket_read_limit" yaml:"web_socket_read_limit"`
Expand Down Expand Up @@ -94,10 +93,6 @@ const (
// the connection sends a close message to the peer and returns ErrReadLimit to the application.
DefaultWebSocketReadLimit uint64 = 8192

// DefaultRelayerTrackerPollInterval specifies time interval after which relayer node's event tracker
// polls child chain to get the latest block
DefaultRelayerTrackerPollInterval time.Duration = time.Second

// DefaultMetricsInterval specifies the time interval after which Prometheus metrics will be generated.
// A value of 0 means the metrics are disabled.
DefaultMetricsInterval time.Duration = time.Second * 8
Expand Down Expand Up @@ -132,15 +127,14 @@ func DefaultConfig() *Config {
Headers: &Headers{
AccessControlAllowOrigins: []string{"*"},
},
LogFilePath: "",
JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit,
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
RelayerTrackerPollInterval: DefaultRelayerTrackerPollInterval,
MetricsInterval: DefaultMetricsInterval,
LogFilePath: "",
JSONRPCBatchRequestLimit: DefaultJSONRPCBatchRequestLimit,
JSONRPCBlockRangeLimit: DefaultJSONRPCBlockRangeLimit,
Relayer: false,
NumBlockConfirmations: DefaultNumBlockConfirmations,
ConcurrentRequestsDebug: DefaultConcurrentRequestsDebug,
WebSocketReadLimit: DefaultWebSocketReadLimit,
MetricsInterval: DefaultMetricsInterval,
}
}

Expand Down
4 changes: 0 additions & 4 deletions command/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (p *serverParams) initRawParams() error {

p.relayer = p.rawConfig.Relayer

if p.relayer && p.rawConfig.RelayerTrackerPollInterval == 0 {
return helper.ErrBlockTrackerPollInterval
}

return p.initAddresses()
}

Expand Down
9 changes: 3 additions & 6 deletions command/server/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ const (
concurrentRequestsDebugFlag = "concurrent-requests-debug"
webSocketReadLimitFlag = "websocket-read-limit"

relayerTrackerPollIntervalFlag = "relayer-poll-interval"

metricsIntervalFlag = "metrics-interval"
)

Expand Down Expand Up @@ -187,9 +185,8 @@ func (p *serverParams) generateConfig() *server.Config {
JSONLogFormat: p.rawConfig.JSONLogFormat,
LogFilePath: p.logFileLocation,

Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
RelayerTrackerPollInterval: p.rawConfig.RelayerTrackerPollInterval,
MetricsInterval: p.rawConfig.MetricsInterval,
Relayer: p.relayer,
NumBlockConfirmations: p.rawConfig.NumBlockConfirmations,
MetricsInterval: p.rawConfig.MetricsInterval,
}
}
7 changes: 0 additions & 7 deletions command/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,6 @@ func setFlags(cmd *cobra.Command) {
"maximum size in bytes for a message read from the peer by websocket",
)

cmd.Flags().DurationVar(
&params.rawConfig.RelayerTrackerPollInterval,
relayerTrackerPollIntervalFlag,
defaultConfig.RelayerTrackerPollInterval,
"interval (number of seconds) at which relayer's tracker polls for latest block at childchain",
)

cmd.Flags().DurationVar(
&params.rawConfig.MetricsInterval,
metricsIntervalFlag,
Expand Down
6 changes: 6 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ type Config struct {

// Path is the directory path for the consensus protocol to store information
Path string

// IsRelayer is true if node is relayer
IsRelayer bool

// RPCEndpoint
RPCEndpoint string
Stefan-Ethernal marked this conversation as resolved.
Show resolved Hide resolved
}

type Params struct {
Expand Down
42 changes: 42 additions & 0 deletions consensus/polybft/consensus_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/0xPolygon/polygon-edge/consensus"
"github.com/0xPolygon/polygon-edge/consensus/polybft/contractsapi"
bls "github.com/0xPolygon/polygon-edge/consensus/polybft/signer"
"github.com/0xPolygon/polygon-edge/consensus/polybft/validator"
Expand Down Expand Up @@ -82,6 +83,7 @@ type runtimeConfig struct {
txPool txPoolInterface
bridgeTopic topic
numBlockConfirmations uint64
consensusConfig *consensus.Config
}

// consensusRuntime is a struct that provides consensus runtime features like epoch, state and event management
Expand Down Expand Up @@ -121,6 +123,9 @@ type consensusRuntime struct {

eventProvider *EventProvider

// stateSyncRelayer is relayer for commitment events
stateSyncRelayer StateSyncRelayer

// logger instance
logger hcf.Logger
}
Expand Down Expand Up @@ -160,6 +165,10 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti
return nil, err
}

if err := runtime.initStateSyncRelayer(log); err != nil {
return nil, err
}

// we need to call restart epoch on runtime to initialize epoch state
runtime.epoch, err = runtime.restartEpoch(runtime.lastBuiltBlock, dbTx)
if err != nil {
Expand All @@ -175,6 +184,7 @@ func newConsensusRuntime(log hcf.Logger, config *runtimeConfig) (*consensusRunti

// close is used to tear down allocated resources
func (c *consensusRuntime) close() {
c.stateSyncRelayer.Close()
c.stateSyncManager.Close()
}

Expand Down Expand Up @@ -238,6 +248,33 @@ func (c *consensusRuntime) initCheckpointManager(logger hcf.Logger) error {
return nil
}

// initStateSyncRelayer initializes state sync relayer
// if not enabled, then a dummy state sync relayer will be used
func (c *consensusRuntime) initStateSyncRelayer(logger hcf.Logger) error {
if c.config.consensusConfig.IsRelayer {
txRelayer, err := getStateSyncTxRelayer(c.config.consensusConfig.RPCEndpoint, logger)
if err != nil {
return err
}

c.stateSyncRelayer = NewStateSyncRelayer(
txRelayer,
contracts.StateReceiverContract,
c.state.StateSyncStore,
c,
c.config.blockchain,
wallet.NewEcdsaSigner(c.config.Key),
nil,
logger.Named("state_sync_relayer"))
} else {
c.stateSyncRelayer = &dummyStateSyncRelayer{}
}

c.eventProvider.Subscribe(c.stateSyncRelayer)

return c.stateSyncRelayer.Init()
}

// initStakeManager initializes stake manager
func (c *consensusRuntime) initStakeManager(logger hcf.Logger, dbTx *bolt.Tx) error {
rootRelayer, err := txrelayer.NewTxRelayer(txrelayer.WithIPAddress(c.config.PolyBFTConfig.Bridge.JSONRPCEndpoint))
Expand Down Expand Up @@ -369,6 +406,11 @@ func (c *consensusRuntime) OnBlockInserted(fullBlock *types.FullBlock) {
return
}

// handle state sync relayer events that happened in block
if err := c.stateSyncRelayer.PostBlock(postBlock); err != nil {
c.logger.Error("post block callback failed in state sync relayer", "err", err)
}

if isEndOfEpoch {
if epoch, err = c.restartEpoch(fullBlock.Block.Header, dbTx); err != nil {
c.logger.Error("failed to restart epoch after block inserted", "error", err)
Expand Down
16 changes: 9 additions & 7 deletions consensus/polybft/consensus_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestConsensusRuntime_OnBlockInserted_EndOfEpoch(t *testing.T) {
checkpointManager: &dummyCheckpointManager{},
stakeManager: &dummyStakeManager{},
eventProvider: NewEventProvider(blockchainMock),
stateSyncRelayer: &dummyStateSyncRelayer{},
}
runtime.OnBlockInserted(&types.FullBlock{Block: builtBlock})

Expand Down Expand Up @@ -474,13 +475,14 @@ func Test_NewConsensusRuntime(t *testing.T) {

tmpDir := t.TempDir()
config := &runtimeConfig{
polybftBackend: polybftBackendMock,
State: newTestState(t),
PolyBFTConfig: polyBftConfig,
DataDir: tmpDir,
Key: createTestKey(t),
blockchain: blockchainMock,
bridgeTopic: &mockTopic{},
polybftBackend: polybftBackendMock,
State: newTestState(t),
PolyBFTConfig: polyBftConfig,
DataDir: tmpDir,
Key: createTestKey(t),
blockchain: blockchainMock,
bridgeTopic: &mockTopic{},
consensusConfig: &consensus.Config{},
}

require.NoError(t, config.State.StakeStore.insertFullValidatorSet(validatorSetState{
Expand Down
1 change: 1 addition & 0 deletions consensus/polybft/contractsapi/bindings-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func main() {
[]string{
"commit",
"execute",
"batchExecute",
},
[]string{
"StateSyncResult",
Expand Down
17 changes: 17 additions & 0 deletions consensus/polybft/contractsapi/contractsapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/polybft/polybft.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ func (p *Polybft) initRuntime() error {
txPool: p.txPool,
bridgeTopic: p.bridgeTopic,
numBlockConfirmations: p.config.NumBlockConfirmations,
consensusConfig: p.config.Config,
}

runtime, err := newConsensusRuntime(p.logger, runtimeConfig)
Expand Down
5 changes: 4 additions & 1 deletion consensus/polybft/polybft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ func TestPolybft_Close(t *testing.T) {
polybft := Polybft{
closeCh: make(chan struct{}),
syncer: syncer,
runtime: &consensusRuntime{stateSyncManager: &dummyStateSyncManager{}},
runtime: &consensusRuntime{
stateSyncManager: &dummyStateSyncManager{},
stateSyncRelayer: &dummyStateSyncRelayer{},
},
}

assert.NoError(t, polybft.Close())
Expand Down
78 changes: 78 additions & 0 deletions consensus/polybft/state_store_state_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
stateSyncProofsBucket = []byte("stateSyncProofs")
// bucket to store message votes (signatures)
messageVotesBucket = []byte("votes")
// bucket to store all state sync relayer events
stateSyncRelayerEventsBucket = []byte("relayerEvents")

// errNotEnoughStateSyncs error message
errNotEnoughStateSyncs = errors.New("there is either a gap or not enough sync events")
Expand All @@ -39,6 +41,12 @@ commitments/

stateSyncProofs/
|--> stateSyncProof.StateSync.Id -> *StateSyncProof (json marshalled)

relayerData/
|--> data -> *StateSyncRelayerStateData (json marshalled)
igorcrevar marked this conversation as resolved.
Show resolved Hide resolved

relayerEvents/
|--> StateSyncRelayerEventData.EventID -> *StateSyncRelayerEventData (json marshalled)
*/

type StateSyncStore struct {
Expand All @@ -59,6 +67,10 @@ func (s *StateSyncStore) initialize(tx *bolt.Tx) error {
return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncProofsBucket), err)
}

if _, err := tx.CreateBucketIfNotExists(stateSyncRelayerEventsBucket); err != nil {
return fmt.Errorf("failed to create bucket=%s: %w", string(stateSyncRelayerEventsBucket), err)
}

return nil
}

Expand Down Expand Up @@ -365,3 +377,69 @@ func (s *StateSyncStore) getStateSyncProof(stateSyncID uint64) (*StateSyncProof,

return ssp, err
}

// updateStateSyncRelayerEvents updates/remove desired events
func (s *StateSyncStore) updateStateSyncRelayerEvents(
events []*StateSyncRelayerEventData, removeIDs []uint64, dbTx *bolt.Tx) error {
updateFn := func(tx *bolt.Tx) error {
relayerEventsBucket := tx.Bucket(stateSyncRelayerEventsBucket)

for _, evnt := range events {
raw, err := json.Marshal(evnt)
if err != nil {
return err
}

key := common.EncodeUint64ToBytes(evnt.EventID)

if err := relayerEventsBucket.Put(key, raw); err != nil {
return err
}
}

for _, stateSyncEventID := range removeIDs {
stateSyncEventIDKey := common.EncodeUint64ToBytes(stateSyncEventID)

if err := relayerEventsBucket.Delete(stateSyncEventIDKey); err != nil {
return fmt.Errorf("failed to remove state sync relayer event (ID=%d): %w", stateSyncEventID, err)
}
}

return nil
}

if dbTx == nil {
return s.db.Update(func(tx *bolt.Tx) error {
return updateFn(tx)
})
}

return updateFn(dbTx)
}

// getAllAvailableEvents retrieves all StateSyncRelayerEventData that should be sent as a transactions
func (s *StateSyncStore) getAllAvailableEvents(limit int) (result []*StateSyncRelayerEventData, err error) {
if err = s.db.View(func(tx *bolt.Tx) error {
cursor := tx.Bucket(stateSyncRelayerEventsBucket).Cursor()

for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var event *StateSyncRelayerEventData

if err := json.Unmarshal(v, &event); err != nil {
return err
}

result = append(result, event)

if limit > 0 && len(result) >= limit {
break
}
}

return nil
}); err != nil {
return nil, err
}

return result, nil
}
Loading
Loading