From c92a7212ee77b08c40d62925216e5081278a4e3f Mon Sep 17 00:00:00 2001 From: Vyzaldy Sanchez Date: Thu, 15 Aug 2024 10:22:27 -0400 Subject: [PATCH] Add Db syncing for registry syncer (#13756) * Adds migration for syncer state data * Implements syncer ORM * Implements syncing with local registry and properly updating it * Fixes db sync channel * Connects DB to syncer constructor * Adds changeset * Fixes changeset * Uses custom marshal JSON call on DB store call * Fixes errors from merge conflicts * Fixes existing tests * Prevents tests from hanging * Fixes lint issue * Fixes setup on `New()` call * Implements marshal/unnmarshal mechanics on syncer state * Fixes migration name * Fixes lint * Keeps the latest 10 records on `registry_syncer_states` * Adds ORM tests * Prevents possible flake * Fixes errors from conflict * Fixes syncer tests * Fixes syncer ORM tests * Fixes linter * Fixes tests * Fixes tests * Review changes * Fixes tests * Fixes lint * Improves implementation * Removes unused `to32Byte` func * fixes errors on `Close()` * Adds more custom types to avoid data races * Update tests * fixes lint * Removes unnecessary comments * Sends deep copy of local registry to launchers * Fixes linter * Uses interface for syncer ORM * Mocks the ORM for tests * Improves `syncer.Close()` mechanism * Fixes merge conflicts * Fixes test * Fixes linter * Fixes race condition with `syncer.reader` * Fixes race condition in test * Fixes race condition in test * Fixes test --------- Co-authored-by: Bolek <1416262+bolekk@users.noreply.github.com> --- .changeset/many-knives-play.md | 5 + .mockery.yaml | 5 +- core/capabilities/ccip/delegate.go | 2 + .../ccip/launcher/integration_test.go | 3 + core/services/chainlink/application.go | 1 + .../services/registrysyncer/local_registry.go | 16 ++ core/services/registrysyncer/mocks/orm.go | 142 ++++++++++++ core/services/registrysyncer/orm.go | 167 ++++++++++++++ core/services/registrysyncer/orm_test.go | 145 ++++++++++++ core/services/registrysyncer/syncer.go | 170 +++++++++++++-- core/services/registrysyncer/syncer_test.go | 206 ++++++++++++++++-- .../migrations/0249_registry_syncer_state.sql | 11 + 12 files changed, 837 insertions(+), 36 deletions(-) create mode 100644 .changeset/many-knives-play.md create mode 100644 core/services/registrysyncer/mocks/orm.go create mode 100644 core/services/registrysyncer/orm.go create mode 100644 core/services/registrysyncer/orm_test.go create mode 100644 core/store/migrate/migrations/0249_registry_syncer_state.sql diff --git a/.changeset/many-knives-play.md b/.changeset/many-knives-play.md new file mode 100644 index 00000000000..8c1f5da1a48 --- /dev/null +++ b/.changeset/many-knives-play.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#updated Adds DB syncing for registry syncer diff --git a/.mockery.yaml b/.mockery.yaml index 37cbff58b1a..87c27cd46a9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -570,4 +570,7 @@ packages: interfaces: Reader: config: - mockname: "Mock{{ .InterfaceName }}" \ No newline at end of file + mockname: "Mock{{ .InterfaceName }}" + github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer: + interfaces: + ORM: \ No newline at end of file diff --git a/core/capabilities/ccip/delegate.go b/core/capabilities/ccip/delegate.go index c9974d62e99..187ae0c5811 100644 --- a/core/capabilities/ccip/delegate.go +++ b/core/capabilities/ccip/delegate.go @@ -6,6 +6,7 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/common" configsevm "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/configs/evm" "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/launcher" @@ -119,6 +120,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) (services }, relayer, cfg.ExternalRegistry().Address(), + registrysyncer.NewORM(d.ds, d.lggr), ) if err != nil { return nil, fmt.Errorf("could not configure syncer: %w", err) diff --git a/core/capabilities/ccip/launcher/integration_test.go b/core/capabilities/ccip/launcher/integration_test.go index db3daf4d9b9..7973316b31d 100644 --- a/core/capabilities/ccip/launcher/integration_test.go +++ b/core/capabilities/ccip/launcher/integration_test.go @@ -6,6 +6,7 @@ import ( it "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/ccip_integration_tests/integrationhelpers" cctypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/ccip/types" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/onsi/gomega" @@ -30,12 +31,14 @@ func TestIntegration_Launcher(t *testing.T) { p2pIDs := it.P2pIDsFromInts(arr) uni.AddCapability(p2pIDs) + db := pgtest.NewSqlxDB(t) regSyncer, err := registrysyncer.New(lggr, func() (p2ptypes.PeerID, error) { return p2pIDs[0], nil }, uni, uni.CapReg.Address().String(), + registrysyncer.NewORM(db, lggr), ) require.NoError(t, err) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 98067821f99..a3f46fc8a9f 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -241,6 +241,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { }, relayer, registryAddress, + registrysyncer.NewORM(opts.DS, globalLogger), ) if err != nil { return nil, fmt.Errorf("could not configure syncer: %w", err) diff --git a/core/services/registrysyncer/local_registry.go b/core/services/registrysyncer/local_registry.go index 8a0e471ccda..d4bf4a49f53 100644 --- a/core/services/registrysyncer/local_registry.go +++ b/core/services/registrysyncer/local_registry.go @@ -36,6 +36,22 @@ type LocalRegistry struct { IDsToCapabilities map[string]Capability } +func NewLocalRegistry( + lggr logger.Logger, + getPeerID func() (p2ptypes.PeerID, error), + IDsToDONs map[DonID]DON, + IDsToNodes map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo, + IDsToCapabilities map[string]Capability, +) LocalRegistry { + return LocalRegistry{ + lggr: lggr.Named("LocalRegistry"), + getPeerID: getPeerID, + IDsToDONs: IDsToDONs, + IDsToNodes: IDsToNodes, + IDsToCapabilities: IDsToCapabilities, + } +} + func (l *LocalRegistry) LocalNode(ctx context.Context) (capabilities.Node, error) { // Load the current nodes PeerWrapper, this gets us the current node's // PeerID, allowing us to contextualize registry information in terms of DON ownership diff --git a/core/services/registrysyncer/mocks/orm.go b/core/services/registrysyncer/mocks/orm.go new file mode 100644 index 00000000000..d7777ecb6e4 --- /dev/null +++ b/core/services/registrysyncer/mocks/orm.go @@ -0,0 +1,142 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + registrysyncer "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" + mock "github.com/stretchr/testify/mock" +) + +// ORM is an autogenerated mock type for the ORM type +type ORM struct { + mock.Mock +} + +type ORM_Expecter struct { + mock *mock.Mock +} + +func (_m *ORM) EXPECT() *ORM_Expecter { + return &ORM_Expecter{mock: &_m.Mock} +} + +// AddLocalRegistry provides a mock function with given fields: ctx, localRegistry +func (_m *ORM) AddLocalRegistry(ctx context.Context, localRegistry registrysyncer.LocalRegistry) error { + ret := _m.Called(ctx, localRegistry) + + if len(ret) == 0 { + panic("no return value specified for AddLocalRegistry") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, registrysyncer.LocalRegistry) error); ok { + r0 = rf(ctx, localRegistry) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// ORM_AddLocalRegistry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddLocalRegistry' +type ORM_AddLocalRegistry_Call struct { + *mock.Call +} + +// AddLocalRegistry is a helper method to define mock.On call +// - ctx context.Context +// - localRegistry registrysyncer.LocalRegistry +func (_e *ORM_Expecter) AddLocalRegistry(ctx interface{}, localRegistry interface{}) *ORM_AddLocalRegistry_Call { + return &ORM_AddLocalRegistry_Call{Call: _e.mock.On("AddLocalRegistry", ctx, localRegistry)} +} + +func (_c *ORM_AddLocalRegistry_Call) Run(run func(ctx context.Context, localRegistry registrysyncer.LocalRegistry)) *ORM_AddLocalRegistry_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(registrysyncer.LocalRegistry)) + }) + return _c +} + +func (_c *ORM_AddLocalRegistry_Call) Return(_a0 error) *ORM_AddLocalRegistry_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *ORM_AddLocalRegistry_Call) RunAndReturn(run func(context.Context, registrysyncer.LocalRegistry) error) *ORM_AddLocalRegistry_Call { + _c.Call.Return(run) + return _c +} + +// LatestLocalRegistry provides a mock function with given fields: ctx +func (_m *ORM) LatestLocalRegistry(ctx context.Context) (*registrysyncer.LocalRegistry, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestLocalRegistry") + } + + var r0 *registrysyncer.LocalRegistry + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*registrysyncer.LocalRegistry, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *registrysyncer.LocalRegistry); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*registrysyncer.LocalRegistry) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_LatestLocalRegistry_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'LatestLocalRegistry' +type ORM_LatestLocalRegistry_Call struct { + *mock.Call +} + +// LatestLocalRegistry is a helper method to define mock.On call +// - ctx context.Context +func (_e *ORM_Expecter) LatestLocalRegistry(ctx interface{}) *ORM_LatestLocalRegistry_Call { + return &ORM_LatestLocalRegistry_Call{Call: _e.mock.On("LatestLocalRegistry", ctx)} +} + +func (_c *ORM_LatestLocalRegistry_Call) Run(run func(ctx context.Context)) *ORM_LatestLocalRegistry_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *ORM_LatestLocalRegistry_Call) Return(_a0 *registrysyncer.LocalRegistry, _a1 error) *ORM_LatestLocalRegistry_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_LatestLocalRegistry_Call) RunAndReturn(run func(context.Context) (*registrysyncer.LocalRegistry, error)) *ORM_LatestLocalRegistry_Call { + _c.Call.Return(run) + return _c +} + +// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewORM(t interface { + mock.TestingT + Cleanup(func()) +}) *ORM { + mock := &ORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/registrysyncer/orm.go b/core/services/registrysyncer/orm.go new file mode 100644 index 00000000000..cb08eaafeaf --- /dev/null +++ b/core/services/registrysyncer/orm.go @@ -0,0 +1,167 @@ +package registrysyncer + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "math/big" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + + kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +type capabilitiesRegistryNodeInfo struct { + NodeOperatorId uint32 `json:"nodeOperatorId"` + ConfigCount uint32 `json:"configCount"` + WorkflowDONId uint32 `json:"workflowDONId"` + Signer p2ptypes.PeerID `json:"signer"` + P2pId p2ptypes.PeerID `json:"p2pId"` + HashedCapabilityIds []p2ptypes.PeerID `json:"hashedCapabilityIds"` + CapabilitiesDONIds []string `json:"capabilitiesDONIds"` +} + +func (l *LocalRegistry) MarshalJSON() ([]byte, error) { + idsToNodes := make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo) + for k, v := range l.IDsToNodes { + hashedCapabilityIds := make([]p2ptypes.PeerID, len(v.HashedCapabilityIds)) + for i, id := range v.HashedCapabilityIds { + hashedCapabilityIds[i] = p2ptypes.PeerID(id[:]) + } + capabilitiesDONIds := make([]string, len(v.CapabilitiesDONIds)) + for i, id := range v.CapabilitiesDONIds { + capabilitiesDONIds[i] = id.String() + } + idsToNodes[k] = capabilitiesRegistryNodeInfo{ + NodeOperatorId: v.NodeOperatorId, + ConfigCount: v.ConfigCount, + WorkflowDONId: v.WorkflowDONId, + Signer: p2ptypes.PeerID(v.Signer[:]), + P2pId: p2ptypes.PeerID(v.P2pId[:]), + HashedCapabilityIds: hashedCapabilityIds, + CapabilitiesDONIds: capabilitiesDONIds, + } + } + + b, err := json.Marshal(&struct { + IDsToDONs map[DonID]DON + IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo + IDsToCapabilities map[string]Capability + }{ + IDsToDONs: l.IDsToDONs, + IDsToNodes: idsToNodes, + IDsToCapabilities: l.IDsToCapabilities, + }) + if err != nil { + return []byte{}, err + } + return b, nil +} + +func (l *LocalRegistry) UnmarshalJSON(data []byte) error { + temp := struct { + IDsToDONs map[DonID]DON + IDsToNodes map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo + IDsToCapabilities map[string]Capability + }{ + IDsToDONs: make(map[DonID]DON), + IDsToNodes: make(map[p2ptypes.PeerID]capabilitiesRegistryNodeInfo), + IDsToCapabilities: make(map[string]Capability), + } + + if err := json.Unmarshal(data, &temp); err != nil { + return fmt.Errorf("failed to unmarshal state: %w", err) + } + + l.IDsToDONs = temp.IDsToDONs + + l.IDsToNodes = make(map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo) + for peerID, v := range temp.IDsToNodes { + hashedCapabilityIds := make([][32]byte, len(v.HashedCapabilityIds)) + for i, id := range v.HashedCapabilityIds { + copy(hashedCapabilityIds[i][:], id[:]) + } + + capabilitiesDONIds := make([]*big.Int, len(v.CapabilitiesDONIds)) + for i, id := range v.CapabilitiesDONIds { + bigInt := new(big.Int) + bigInt.SetString(id, 10) + capabilitiesDONIds[i] = bigInt + } + l.IDsToNodes[peerID] = kcr.CapabilitiesRegistryNodeInfo{ + NodeOperatorId: v.NodeOperatorId, + ConfigCount: v.ConfigCount, + WorkflowDONId: v.WorkflowDONId, + Signer: v.Signer, + P2pId: v.P2pId, + HashedCapabilityIds: hashedCapabilityIds, + CapabilitiesDONIds: capabilitiesDONIds, + } + } + + l.IDsToCapabilities = temp.IDsToCapabilities + + return nil +} + +type ORM interface { + AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error + LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error) +} + +type orm struct { + ds sqlutil.DataSource + lggr logger.Logger +} + +var _ ORM = (*orm)(nil) + +func NewORM(ds sqlutil.DataSource, lggr logger.Logger) orm { + namedLogger := lggr.Named("RegistrySyncerORM") + return orm{ + ds: ds, + lggr: namedLogger, + } +} + +func (orm orm) AddLocalRegistry(ctx context.Context, localRegistry LocalRegistry) error { + return sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error { + localRegistryJSON, err := localRegistry.MarshalJSON() + if err != nil { + return err + } + hash := sha256.Sum256(localRegistryJSON) + _, err = tx.ExecContext( + ctx, + `INSERT INTO registry_syncer_states (data, data_hash) VALUES ($1, $2) ON CONFLICT (data_hash) DO NOTHING`, + localRegistryJSON, fmt.Sprintf("%x", hash[:]), + ) + if err != nil { + return err + } + _, err = tx.ExecContext(ctx, `DELETE FROM registry_syncer_states +WHERE data_hash NOT IN ( + SELECT data_hash FROM registry_syncer_states + ORDER BY id DESC + LIMIT 10 +);`) + return err + }) +} + +func (orm orm) LatestLocalRegistry(ctx context.Context) (*LocalRegistry, error) { + var localRegistry LocalRegistry + var localRegistryJSON string + err := orm.ds.GetContext(ctx, &localRegistryJSON, `SELECT data FROM registry_syncer_states ORDER BY id DESC LIMIT 1`) + if err != nil { + return nil, err + } + err = localRegistry.UnmarshalJSON([]byte(localRegistryJSON)) + if err != nil { + return nil, err + } + return &localRegistry, nil +} diff --git a/core/services/registrysyncer/orm_test.go b/core/services/registrysyncer/orm_test.go new file mode 100644 index 00000000000..03772ea22bf --- /dev/null +++ b/core/services/registrysyncer/orm_test.go @@ -0,0 +1,145 @@ +package registrysyncer_test + +import ( + "encoding/hex" + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/durationpb" + + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + "github.com/smartcontractkit/chainlink-common/pkg/values" + + kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/capabilities_registry" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" +) + +func TestRegistrySyncerORM_InsertAndRetrieval(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := registrysyncer.NewORM(db, lggr) + + var states []registrysyncer.LocalRegistry + for i := 0; i < 11; i++ { + state := generateState(t) + err := orm.AddLocalRegistry(ctx, state) + require.NoError(t, err) + states = append(states, state) + } + + var count int + err := db.Get(&count, `SELECT count(*) FROM registry_syncer_states`) + require.NoError(t, err) + assert.Equal(t, 10, count) + + state, err := orm.LatestLocalRegistry(ctx) + require.NoError(t, err) + assert.Equal(t, states[10], *state) +} + +func generateState(t *testing.T) registrysyncer.LocalRegistry { + dID := uint32(1) + var pid ragetypes.PeerID + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) + require.NoError(t, err) + nodes := [][32]byte{ + pid, + randomWord(), + randomWord(), + randomWord(), + } + capabilityID := randomWord() + capabilityID2 := randomWord() + capabilityIDStr := hex.EncodeToString(capabilityID[:]) + capabilityID2Str := hex.EncodeToString(capabilityID2[:]) + + config := &capabilitiespb.CapabilityConfig{ + DefaultConfig: values.Proto(values.EmptyMap()).GetMapValue(), + RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(20 * time.Second), + RegistrationExpiry: durationpb.New(60 * time.Second), + // F + 1 + MinResponsesToAggregate: uint32(1) + 1, + MessageExpiry: durationpb.New(120 * time.Second), + }, + }, + } + configb, err := proto.Marshal(config) + require.NoError(t, err) + + return registrysyncer.LocalRegistry{ + IDsToDONs: map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { + DON: capabilities.DON{ + ID: dID, + ConfigVersion: uint32(0), + F: uint8(1), + IsPublic: true, + AcceptsWorkflows: true, + Members: toPeerIDs(nodes), + }, + CapabilityConfigurations: map[string]registrysyncer.CapabilityConfiguration{ + capabilityIDStr: { + Config: configb, + }, + capabilityID2Str: { + Config: configb, + }, + }, + }, + }, + IDsToCapabilities: map[string]registrysyncer.Capability{ + capabilityIDStr: { + ID: capabilityIDStr, + CapabilityType: capabilities.CapabilityTypeAction, + }, + capabilityID2Str: { + ID: capabilityID2Str, + CapabilityType: capabilities.CapabilityTypeConsensus, + }, + }, + IDsToNodes: map[types.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + nodes[0]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[0], + HashedCapabilityIds: [][32]byte{capabilityID, capabilityID2}, + CapabilitiesDONIds: []*big.Int{}, + }, + nodes[1]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[1], + HashedCapabilityIds: [][32]byte{capabilityID, capabilityID2}, + CapabilitiesDONIds: []*big.Int{}, + }, + nodes[2]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[2], + HashedCapabilityIds: [][32]byte{capabilityID, capabilityID2}, + CapabilitiesDONIds: []*big.Int{}, + }, + nodes[3]: { + NodeOperatorId: 1, + Signer: randomWord(), + P2pId: nodes[3], + HashedCapabilityIds: [][32]byte{capabilityID, capabilityID2}, + CapabilitiesDONIds: []*big.Int{}, + }, + }, + } +} diff --git a/core/services/registrysyncer/syncer.go b/core/services/registrysyncer/syncer.go index a83e0102afa..ab50c448b5c 100644 --- a/core/services/registrysyncer/syncer.go +++ b/core/services/registrysyncer/syncer.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/big" "sync" "time" @@ -27,16 +28,34 @@ type Syncer interface { AddLauncher(h ...Launcher) } +type ContractReaderFactory interface { + NewContractReader(context.Context, []byte) (types.ContractReader, error) +} + +type RegistrySyncer interface { + Sync(ctx context.Context, isInitialSync bool) error + AddLauncher(launchers ...Launcher) + Start(ctx context.Context) error + Close() error + Ready() error + HealthReport() map[string]error + Name() string +} + type registrySyncer struct { services.StateMachine stopCh services.StopChan launchers []Launcher reader types.ContractReader - initReader func(ctx context.Context, lggr logger.Logger, relayer contractReaderFactory, registryAddress string) (types.ContractReader, error) - relayer contractReaderFactory + initReader func(ctx context.Context, lggr logger.Logger, relayer ContractReaderFactory, registryAddress string) (types.ContractReader, error) + relayer ContractReaderFactory registryAddress string getPeerID func() (p2ptypes.PeerID, error) + orm ORM + + updateChan chan *LocalRegistry + wg sync.WaitGroup lggr logger.Logger mu sync.RWMutex @@ -52,28 +71,26 @@ var ( func New( lggr logger.Logger, getPeerID func() (p2ptypes.PeerID, error), - relayer contractReaderFactory, + relayer ContractReaderFactory, registryAddress string, -) (*registrySyncer, error) { - stopCh := make(services.StopChan) + orm ORM, +) (RegistrySyncer, error) { return ®istrySyncer{ - stopCh: stopCh, + stopCh: make(services.StopChan), + updateChan: make(chan *LocalRegistry), lggr: lggr.Named("RegistrySyncer"), relayer: relayer, registryAddress: registryAddress, initReader: newReader, + orm: orm, getPeerID: getPeerID, }, nil } -type contractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} - // NOTE: this can't be called while initializing the syncer and needs to be called in the sync loop. // This is because Bind() makes an onchain call to verify that the contract address exists, and if // called during initialization, this results in a "no live nodes" error. -func newReader(ctx context.Context, lggr logger.Logger, relayer contractReaderFactory, remoteRegistryAddress string) (types.ContractReader, error) { +func newReader(ctx context.Context, lggr logger.Logger, relayer ContractReaderFactory, remoteRegistryAddress string) (types.ContractReader, error) { contractReaderConfig := evmrelaytypes.ChainReaderConfig{ Contracts: map[string]evmrelaytypes.ChainContractReader{ "CapabilitiesRegistry": { @@ -120,6 +137,11 @@ func (s *registrySyncer) Start(ctx context.Context) error { defer s.wg.Done() s.syncLoop() }() + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.updateStateLoop() + }() return nil }) } @@ -135,7 +157,7 @@ func (s *registrySyncer) syncLoop() { // sync immediately once spinning up syncLoop, as by default a ticker will // fire for the first time at T+N, where N is the interval. s.lggr.Debug("starting initial sync with remote registry") - err := s.sync(ctx) + err := s.Sync(ctx, true) if err != nil { s.lggr.Errorw("failed to sync with remote registry", "error", err) } @@ -146,7 +168,7 @@ func (s *registrySyncer) syncLoop() { return case <-ticker.C: s.lggr.Debug("starting regular sync with the remote registry") - err := s.sync(ctx) + err := s.Sync(ctx, false) if err != nil { s.lggr.Errorw("failed to sync with remote registry", "error", err) } @@ -154,8 +176,28 @@ func (s *registrySyncer) syncLoop() { } } +func (s *registrySyncer) updateStateLoop() { + ctx, cancel := s.stopCh.NewCtx() + defer cancel() + + for { + select { + case <-s.stopCh: + return + case localRegistry, ok := <-s.updateChan: + if !ok { + // channel has been closed, terminating. + return + } + if err := s.orm.AddLocalRegistry(ctx, *localRegistry); err != nil { + s.lggr.Errorw("failed to save state to local registry", "error", err) + } + } + } +} + func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, error) { - caps := []kcr.CapabilitiesRegistryCapabilityInfo{} + var caps []kcr.CapabilitiesRegistryCapabilityInfo err := s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getCapabilities", primitives.Unconfirmed, nil, &caps) if err != nil { return nil, err @@ -173,7 +215,7 @@ func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, err hashedIDsToCapabilityIDs[c.HashedId] = cid } - dons := []kcr.CapabilitiesRegistryDONInfo{} + var dons []kcr.CapabilitiesRegistryDONInfo err = s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getDONs", primitives.Unconfirmed, nil, &dons) if err != nil { return nil, err @@ -199,7 +241,7 @@ func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, err } } - nodes := []kcr.CapabilitiesRegistryNodeInfo{} + var nodes []kcr.CapabilitiesRegistryNodeInfo err = s.reader.GetLatestValue(ctx, "CapabilitiesRegistry", "getNodes", primitives.Unconfirmed, nil, &nodes) if err != nil { return nil, err @@ -219,7 +261,7 @@ func (s *registrySyncer) localRegistry(ctx context.Context) (*LocalRegistry, err }, nil } -func (s *registrySyncer) sync(ctx context.Context) error { +func (s *registrySyncer) Sync(ctx context.Context, isInitialSync bool) error { s.mu.RLock() defer s.mu.RUnlock() @@ -237,13 +279,44 @@ func (s *registrySyncer) sync(ctx context.Context) error { s.reader = reader } - lr, err := s.localRegistry(ctx) - if err != nil { - return fmt.Errorf("failed to sync with remote registry: %w", err) + var lr *LocalRegistry + var err error + + if isInitialSync { + s.lggr.Debug("syncing with local registry") + lr, err = s.orm.LatestLocalRegistry(ctx) + if err != nil { + s.lggr.Warnw("failed to sync with local registry, using remote registry instead", "error", err) + } else { + lr.lggr = s.lggr + lr.getPeerID = s.getPeerID + } + } + + if lr == nil { + s.lggr.Debug("syncing with remote registry") + localRegistry, err := s.localRegistry(ctx) + if err != nil { + return fmt.Errorf("failed to sync with remote registry: %w", err) + } + lr = localRegistry + // Attempt to send local registry to the update channel without blocking + // This is to prevent the tests from hanging if they are not calling `Start()` on the syncer + select { + case <-s.stopCh: + s.lggr.Debug("sync cancelled, stopping") + case s.updateChan <- lr: + // Successfully sent state + s.lggr.Debug("remote registry update triggered successfully") + default: + // No one is ready to receive the state, handle accordingly + s.lggr.Debug("no listeners on update channel, remote registry update skipped") + } } for _, h := range s.launchers { - if err := h.Launch(ctx, lr); err != nil { + lrCopy := deepCopyLocalRegistry(lr) + if err := h.Launch(ctx, &lrCopy); err != nil { s.lggr.Errorf("error calling launcher: %s", err) } } @@ -251,6 +324,58 @@ func (s *registrySyncer) sync(ctx context.Context) error { return nil } +func deepCopyLocalRegistry(lr *LocalRegistry) LocalRegistry { + var lrCopy LocalRegistry + lrCopy.lggr = lr.lggr + lrCopy.getPeerID = lr.getPeerID + lrCopy.IDsToDONs = make(map[DonID]DON, len(lr.IDsToDONs)) + for id, don := range lr.IDsToDONs { + d := capabilities.DON{ + ID: don.ID, + ConfigVersion: don.ConfigVersion, + Members: make([]p2ptypes.PeerID, len(don.Members)), + F: don.F, + IsPublic: don.IsPublic, + AcceptsWorkflows: don.AcceptsWorkflows, + } + copy(d.Members, don.Members) + capCfgs := make(map[string]CapabilityConfiguration, len(don.CapabilityConfigurations)) + for capID, capCfg := range don.CapabilityConfigurations { + capCfgs[capID] = CapabilityConfiguration{ + Config: capCfg.Config[:], + } + } + lrCopy.IDsToDONs[id] = DON{ + DON: d, + CapabilityConfigurations: capCfgs, + } + } + + lrCopy.IDsToCapabilities = make(map[string]Capability, len(lr.IDsToCapabilities)) + for id, capability := range lr.IDsToCapabilities { + cp := capability + lrCopy.IDsToCapabilities[id] = cp + } + + lrCopy.IDsToNodes = make(map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo, len(lr.IDsToNodes)) + for id, node := range lr.IDsToNodes { + nodeInfo := kcr.CapabilitiesRegistryNodeInfo{ + NodeOperatorId: node.NodeOperatorId, + ConfigCount: node.ConfigCount, + WorkflowDONId: node.WorkflowDONId, + Signer: node.Signer, + P2pId: node.P2pId, + HashedCapabilityIds: make([][32]byte, len(node.HashedCapabilityIds)), + CapabilitiesDONIds: make([]*big.Int, len(node.CapabilitiesDONIds)), + } + copy(nodeInfo.HashedCapabilityIds, node.HashedCapabilityIds) + copy(nodeInfo.CapabilitiesDONIds, node.CapabilitiesDONIds) + lrCopy.IDsToNodes[id] = nodeInfo + } + + return lrCopy +} + func toCapabilityType(capabilityType uint8) capabilities.CapabilityType { switch capabilityType { case 0: @@ -291,6 +416,9 @@ func (s *registrySyncer) AddLauncher(launchers ...Launcher) { func (s *registrySyncer) Close() error { return s.StopOnce("RegistrySyncer", func() error { close(s.stopCh) + s.mu.Lock() + defer s.mu.Unlock() + close(s.updateChan) s.wg.Wait() return nil }) diff --git a/core/services/registrysyncer/syncer_test.go b/core/services/registrysyncer/syncer_test.go index cd8776d882c..2c08a1cdde6 100644 --- a/core/services/registrysyncer/syncer_test.go +++ b/core/services/registrysyncer/syncer_test.go @@ -1,4 +1,4 @@ -package registrysyncer +package registrysyncer_test import ( "context" @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "math/big" + "sync" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" @@ -25,6 +27,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" + evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -33,6 +36,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer" + syncerMocks "github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer/mocks" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) @@ -126,16 +131,51 @@ func randomWord() [32]byte { } type launcher struct { - localRegistry *LocalRegistry + localRegistry *registrysyncer.LocalRegistry + mu sync.RWMutex } -func (l *launcher) Launch(ctx context.Context, localRegistry *LocalRegistry) error { +func (l *launcher) Launch(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error { + l.mu.Lock() + defer l.mu.Unlock() l.localRegistry = localRegistry return nil } +type orm struct { + ormMock *syncerMocks.ORM + latestLocalRegistryCh chan struct{} + addLocalRegistryCh chan struct{} +} + +func newORM(t *testing.T) *orm { + t.Helper() + + return &orm{ + ormMock: syncerMocks.NewORM(t), + latestLocalRegistryCh: make(chan struct{}, 1), + addLocalRegistryCh: make(chan struct{}, 1), + } +} + +func (o *orm) Cleanup() { + close(o.latestLocalRegistryCh) + close(o.addLocalRegistryCh) +} + +func (o *orm) AddLocalRegistry(ctx context.Context, localRegistry registrysyncer.LocalRegistry) error { + o.addLocalRegistryCh <- struct{}{} + err := o.ormMock.AddLocalRegistry(ctx, localRegistry) + return err +} + +func (o *orm) LatestLocalRegistry(ctx context.Context) (*registrysyncer.LocalRegistry, error) { + o.latestLocalRegistryCh <- struct{}{} + return o.ormMock.LatestLocalRegistry(ctx) +} + func toPeerIDs(ids [][32]byte) []p2ptypes.PeerID { - pids := []p2ptypes.PeerID{} + var pids []p2ptypes.PeerID for _, id := range ids { pids = append(pids, id) } @@ -236,20 +276,22 @@ func TestReader_Integration(t *testing.T) { require.NoError(t, err) + db := pgtest.NewSqlxDB(t) factory := newContractReaderFactory(t, sim) - syncer, err := New(logger.TestLogger(t), func() (p2ptypes.PeerID, error) { return p2ptypes.PeerID{}, nil }, factory, regAddress.Hex()) + syncerORM := registrysyncer.NewORM(db, logger.TestLogger(t)) + syncer, err := registrysyncer.New(logger.TestLogger(t), func() (p2ptypes.PeerID, error) { return p2ptypes.PeerID{}, nil }, factory, regAddress.Hex(), syncerORM) require.NoError(t, err) l := &launcher{} syncer.AddLauncher(l) - err = syncer.sync(ctx) + err = syncer.Sync(ctx, false) // not looking to load from the DB in this specific test. s := l.localRegistry require.NoError(t, err) assert.Len(t, s.IDsToCapabilities, 1) gotCap := s.IDsToCapabilities[cid] - assert.Equal(t, Capability{ + assert.Equal(t, registrysyncer.Capability{ CapabilityType: capabilities.CapabilityTypeTarget, ID: "write-chain@1.0.1", }, gotCap) @@ -308,6 +350,127 @@ func TestReader_Integration(t *testing.T) { }, s.IDsToNodes) } +func TestSyncer_DBIntegration(t *testing.T) { + ctx := testutils.Context(t) + reg, regAddress, owner, sim := startNewChainWithRegistry(t) + + _, err := reg.AddCapabilities(owner, []kcr.CapabilitiesRegistryCapability{writeChainCapability}) + require.NoError(t, err, "AddCapability failed for %s", writeChainCapability.LabelledName) + sim.Commit() + + cid, err := reg.GetHashedCapabilityId(&bind.CallOpts{}, writeChainCapability.LabelledName, writeChainCapability.Version) + require.NoError(t, err) + + _, err = reg.AddNodeOperators(owner, []kcr.CapabilitiesRegistryNodeOperator{ + { + Admin: owner.From, + Name: "TEST_NOP", + }, + }) + require.NoError(t, err) + + nodeSet := [][32]byte{ + randomWord(), + randomWord(), + randomWord(), + } + + signersSet := [][32]byte{ + randomWord(), + randomWord(), + randomWord(), + } + + nodes := []kcr.CapabilitiesRegistryNodeParams{ + { + // The first NodeOperatorId has id 1 since the id is auto-incrementing. + NodeOperatorId: uint32(1), + Signer: signersSet[0], + P2pId: nodeSet[0], + HashedCapabilityIds: [][32]byte{cid}, + }, + { + // The first NodeOperatorId has id 1 since the id is auto-incrementing. + NodeOperatorId: uint32(1), + Signer: signersSet[1], + P2pId: nodeSet[1], + HashedCapabilityIds: [][32]byte{cid}, + }, + { + // The first NodeOperatorId has id 1 since the id is auto-incrementing. + NodeOperatorId: uint32(1), + Signer: signersSet[2], + P2pId: nodeSet[2], + HashedCapabilityIds: [][32]byte{cid}, + }, + } + _, err = reg.AddNodes(owner, nodes) + require.NoError(t, err) + + config := &capabilitiespb.CapabilityConfig{ + DefaultConfig: values.Proto(values.EmptyMap()).GetMapValue(), + RemoteConfig: &capabilitiespb.CapabilityConfig_RemoteTriggerConfig{ + RemoteTriggerConfig: &capabilitiespb.RemoteTriggerConfig{ + RegistrationRefresh: durationpb.New(20 * time.Second), + RegistrationExpiry: durationpb.New(60 * time.Second), + // F + 1 + MinResponsesToAggregate: uint32(1) + 1, + }, + }, + } + configb, err := proto.Marshal(config) + require.NoError(t, err) + + cfgs := []kcr.CapabilitiesRegistryCapabilityConfiguration{ + { + CapabilityId: cid, + Config: configb, + }, + } + _, err = reg.AddDON( + owner, + nodeSet, + cfgs, + true, + true, + 1, + ) + sim.Commit() + + require.NoError(t, err) + + factory := newContractReaderFactory(t, sim) + syncerORM := newORM(t) + syncerORM.ormMock.On("LatestLocalRegistry", mock.Anything).Return(nil, fmt.Errorf("no state found")) + syncerORM.ormMock.On("AddLocalRegistry", mock.Anything, mock.Anything).Return(nil) + syncer, err := newTestSyncer(logger.TestLogger(t), func() (p2ptypes.PeerID, error) { return p2ptypes.PeerID{}, nil }, factory, regAddress.Hex(), syncerORM) + require.NoError(t, err) + require.NoError(t, syncer.Start(ctx)) + t.Cleanup(func() { + syncerORM.Cleanup() + require.NoError(t, syncer.Close()) + }) + + l := &launcher{} + syncer.AddLauncher(l) + + var latestLocalRegistryCalled, addLocalRegistryCalled bool + timeout := time.After(500 * time.Millisecond) + + for !latestLocalRegistryCalled || !addLocalRegistryCalled { + select { + case val := <-syncerORM.latestLocalRegistryCh: + assert.Equal(t, struct{}{}, val) + latestLocalRegistryCalled = true + case val := <-syncerORM.addLocalRegistryCh: + assert.Equal(t, struct{}{}, val) + addLocalRegistryCalled = true + case <-timeout: + t.Fatal("test timed out; channels did not received data") + } + } +} + func TestSyncer_LocalNode(t *testing.T) { ctx := tests.Context(t) lggr := logger.TestLogger(t) @@ -327,11 +490,11 @@ func TestSyncer_LocalNode(t *testing.T) { // The below state describes a Workflow DON (AcceptsWorkflows = true), // which exposes the streams-trigger and write_chain capabilities. // We expect receivers to be wired up and both capabilities to be added to the registry. - localRegistry := LocalRegistry{ - lggr: lggr, - getPeerID: func() (p2ptypes.PeerID, error) { return pid, nil }, - IDsToDONs: map[DonID]DON{ - DonID(dID): { + localRegistry := registrysyncer.NewLocalRegistry( + lggr, + func() (p2ptypes.PeerID, error) { return pid, nil }, + map[registrysyncer.DonID]registrysyncer.DON{ + registrysyncer.DonID(dID): { DON: capabilities.DON{ ID: dID, ConfigVersion: uint32(2), @@ -342,7 +505,7 @@ func TestSyncer_LocalNode(t *testing.T) { }, }, }, - IDsToNodes: map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ + map[p2ptypes.PeerID]kcr.CapabilitiesRegistryNodeInfo{ workflowDonNodes[0]: { NodeOperatorId: 1, Signer: randomWord(), @@ -364,7 +527,8 @@ func TestSyncer_LocalNode(t *testing.T) { P2pId: workflowDonNodes[3], }, }, - } + map[string]registrysyncer.Capability{}, + ) node, err := localRegistry.LocalNode(ctx) require.NoError(t, err) @@ -384,3 +548,17 @@ func TestSyncer_LocalNode(t *testing.T) { } assert.Equal(t, expectedNode, node) } + +func newTestSyncer( + lggr logger.Logger, + getPeerID func() (p2ptypes.PeerID, error), + relayer registrysyncer.ContractReaderFactory, + registryAddress string, + orm *orm, +) (registrysyncer.RegistrySyncer, error) { + rs, err := registrysyncer.New(lggr, getPeerID, relayer, registryAddress, orm) + if err != nil { + return nil, err + } + return rs, nil +} diff --git a/core/store/migrate/migrations/0249_registry_syncer_state.sql b/core/store/migrate/migrations/0249_registry_syncer_state.sql new file mode 100644 index 00000000000..e34a3790a38 --- /dev/null +++ b/core/store/migrate/migrations/0249_registry_syncer_state.sql @@ -0,0 +1,11 @@ +-- +goose Up +CREATE TABLE registry_syncer_states ( + id SERIAL PRIMARY KEY, + data JSONB NOT NULL, + data_hash TEXT NOT NULL UNIQUE, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +-- +goose Down +-- +goose StatementBegin +DROP TABLE registry_syncer_states; +-- +goose StatementEnd