diff --git a/.changeset/orange-squids-kick.md b/.changeset/orange-squids-kick.md new file mode 100644 index 00000000000..a934e70063d --- /dev/null +++ b/.changeset/orange-squids-kick.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal Remote Trigger setup diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index 94ca58e6156..fac01fdead6 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -5,6 +5,7 @@ import ( sync "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -87,17 +88,21 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) { key := registrationKey{msg.CallerDonId, req.Metadata.WorkflowID} nowMs := time.Now().UnixMilli() p.mu.Lock() + defer p.mu.Unlock() p.messageCache.Insert(key, sender, nowMs, msg.Payload) + _, exists := p.registrations[key] + if exists { + p.lggr.Debugw("trigger registration already exists", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID) + return + } // NOTE: require 2F+1 by default, introduce different strategies later (KS-76) minRequired := uint32(2*callerDon.F + 1) ready, payloads := p.messageCache.Ready(key, minRequired, nowMs-int64(p.config.RegistrationExpiryMs), false) - p.mu.Unlock() if !ready { p.lggr.Debugw("not ready to aggregate yet", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "minRequired", minRequired) return } - agg := NewDefaultModeAggregator(uint32(callerDon.F + 1)) - aggregated, err := agg.Aggregate("", payloads) + aggregated, err := AggregateModeRaw(payloads, uint32(callerDon.F+1)) if err != nil { p.lggr.Errorw("failed to aggregate trigger registrations", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) return @@ -107,7 +112,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) { p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err) return } - p.mu.Lock() callbackCh := make(chan commoncap.CapabilityResponse) ctx, cancel := p.stopCh.NewCtx() err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled) @@ -123,7 +127,6 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) { } else { p.lggr.Errorw("failed to register trigger", "capabilityId", p.capInfo.ID, "workflowId", req.Metadata.WorkflowID, "err", err) } - p.mu.Unlock() } else { p.lggr.Errorw("received trigger request with unknown method", "method", msg.Method, "sender", sender) } @@ -171,7 +174,13 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability p.lggr.Infow("triggerEventLoop channel closed", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId) return } - p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId) + triggerEvent := capabilities.TriggerEvent{} + err := response.Value.UnwrapTo(&triggerEvent) + if err != nil { + p.lggr.Errorw("can't unwrap trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "err", err) + break + } + p.lggr.Debugw("received trigger event", "capabilityId", p.capInfo.ID, "workflowId", key.workflowId, "triggerEventID", triggerEvent.ID) marshaled, err := pb.MarshalCapabilityResponse(response) if err != nil { p.lggr.Debugw("can't marshal trigger event", "err", err) @@ -186,7 +195,8 @@ func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.Capability Metadata: &types.MessageBody_TriggerEventMetadata{ TriggerEventMetadata: &types.TriggerEventMetadata{ // NOTE: optionally introduce batching across workflows as an optimization - WorkflowIds: []string{key.workflowId}, + WorkflowIds: []string{key.workflowId}, + TriggerEventId: triggerEvent.ID, }, }, } diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index 2c893d2b86e..4b46935eab6 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -102,6 +102,7 @@ func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<- callback: callback, rawRequest: rawRequest, } + s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID) return nil } @@ -114,8 +115,8 @@ func (s *triggerSubscriber) registrationLoop() { case <-s.stopCh: return case <-ticker.C: - s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members)) s.mu.RLock() + s.lggr.Infow("register trigger for remote capability", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "nMembers", len(s.capDonInfo.Members), "nWorkflows", len(s.registeredWorkflows)) for _, registration := range s.registeredWorkflows { // NOTE: send to all by default, introduce different strategies later (KS-76) for _, peerID := range s.capDonInfo.Members { @@ -180,18 +181,14 @@ func (s *triggerSubscriber) Receive(msg *types.MessageBody) { continue } if ready { + s.lggr.Debugw("trigger event ready to aggregate", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId) aggregatedResponse, err := s.aggregator.Aggregate(meta.TriggerEventId, payloads) if err != nil { - s.lggr.Errorw("failed to aggregate responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err) + s.lggr.Errorw("failed to aggregate responses", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err) continue } - unmarshaled, err := pb.UnmarshalCapabilityResponse(aggregatedResponse) - if err != nil { - s.lggr.Errorw("failed to unmarshal responses", "capabilityId", s.capInfo.ID, "workflowId", workflowId, "err", err) - continue - } - s.lggr.Info("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId) - registration.callback <- unmarshaled + s.lggr.Infow("remote trigger event aggregated", "triggerEventID", meta.TriggerEventId, "capabilityId", s.capInfo.ID, "workflowId", workflowId) + registration.callback <- aggregatedResponse } } } else { diff --git a/core/capabilities/remote/types/types.go b/core/capabilities/remote/types/types.go index 327c2b8d4c5..d8307d09f80 100644 --- a/core/capabilities/remote/types/types.go +++ b/core/capabilities/remote/types/types.go @@ -1,6 +1,7 @@ package types import ( + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -23,7 +24,7 @@ type Receiver interface { } type Aggregator interface { - Aggregate(eventID string, responses [][]byte) ([]byte, error) + Aggregate(eventID string, responses [][]byte) (commoncap.CapabilityResponse, error) } // NOTE: this type will become part of the Registry (KS-108) diff --git a/core/capabilities/remote/utils.go b/core/capabilities/remote/utils.go index 92c5e5447a5..dba24b843cc 100644 --- a/core/capabilities/remote/utils.go +++ b/core/capabilities/remote/utils.go @@ -10,6 +10,8 @@ import ( "google.golang.org/protobuf/proto" + commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" remotetypes "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -60,16 +62,29 @@ func NewDefaultModeAggregator(minIdenticalResponses uint32) *defaultModeAggregat } } -func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) ([]byte, error) { +func (a *defaultModeAggregator) Aggregate(_ string, responses [][]byte) (commoncap.CapabilityResponse, error) { + found, err := AggregateModeRaw(responses, a.minIdenticalResponses) + if err != nil { + return commoncap.CapabilityResponse{}, fmt.Errorf("failed to aggregate responses, err: %w", err) + } + + unmarshaled, err := pb.UnmarshalCapabilityResponse(found) + if err != nil { + return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal aggregated responses, err: %w", err) + } + return unmarshaled, nil +} + +func AggregateModeRaw(elemList [][]byte, minIdenticalResponses uint32) ([]byte, error) { hashToCount := make(map[string]uint32) var found []byte - for _, resp := range responses { + for _, elem := range elemList { hasher := sha256.New() - hasher.Write(resp) + hasher.Write(elem) sha := hex.EncodeToString(hasher.Sum(nil)) hashToCount[sha]++ - if hashToCount[sha] >= a.minIdenticalResponses { - found = resp + if hashToCount[sha] >= minIdenticalResponses { + found = elem break } } diff --git a/core/capabilities/remote/utils_test.go b/core/capabilities/remote/utils_test.go index 120cf5604ca..b5f97af99ed 100644 --- a/core/capabilities/remote/utils_test.go +++ b/core/capabilities/remote/utils_test.go @@ -1,7 +1,6 @@ package remote_test import ( - "bytes" "crypto/ed25519" "crypto/rand" "testing" @@ -90,29 +89,32 @@ func TestToPeerID(t *testing.T) { } func TestDefaultModeAggregator_Aggregate(t *testing.T) { - capResponse1 := marshalCapabilityResponse(t, triggerEvent1, nil) - capResponse2 := marshalCapabilityResponse(t, triggerEvent2, nil) + val, err := values.Wrap(triggerEvent1) + require.NoError(t, err) + capResponse1 := commoncap.CapabilityResponse{ + Value: val, + Err: nil, + } + marshaled1, err := pb.MarshalCapabilityResponse(capResponse1) + require.NoError(t, err) + + val2, err := values.Wrap(triggerEvent2) + require.NoError(t, err) + capResponse2 := commoncap.CapabilityResponse{ + Value: val2, + Err: nil, + } + marshaled2, err := pb.MarshalCapabilityResponse(capResponse2) + require.NoError(t, err) agg := remote.NewDefaultModeAggregator(2) - _, err := agg.Aggregate("", [][]byte{capResponse1}) + _, err = agg.Aggregate("", [][]byte{marshaled1}) require.Error(t, err) - _, err = agg.Aggregate("", [][]byte{capResponse1, capResponse2}) + _, err = agg.Aggregate("", [][]byte{marshaled1, marshaled2}) require.Error(t, err) - res, err := agg.Aggregate("", [][]byte{capResponse1, capResponse2, capResponse1}) - require.NoError(t, err) - require.True(t, bytes.Equal(res, capResponse1)) -} - -func marshalCapabilityResponse(t *testing.T, capValue any, capError error) []byte { - val, err := values.Wrap(capValue) - require.NoError(t, err) - capResponse := commoncap.CapabilityResponse{ - Value: val, - Err: capError, - } - marshaled, err := pb.MarshalCapabilityResponse(capResponse) + res, err := agg.Aggregate("", [][]byte{marshaled1, marshaled2, marshaled1}) require.NoError(t, err) - return marshaled + require.Equal(t, res, capResponse1) } diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index 748910c462b..2de917b5f9f 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -3,8 +3,12 @@ package capabilities import ( "context" "slices" + "sync" + "time" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/mercury" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -32,12 +36,12 @@ var defaultStreamConfig = p2ptypes.StreamConfig{ OutgoingMessageBufferSize: 1000000, MaxMessageLenBytes: 100000, MessageRateLimiter: ragep2p.TokenBucketParams{ - Rate: 10.0, + Rate: 100.0, Capacity: 1000, }, BytesRateLimiter: ragep2p.TokenBucketParams{ - Rate: 10.0, - Capacity: 1000, + Rate: 100000.0, + Capacity: 1000000, }, } @@ -54,14 +58,16 @@ func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry types.Capabili func (s *registrySyncer) Start(ctx context.Context) error { // NOTE: temporary hard-coded DONs workflowDONPeers := []string{ - "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC", - "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8", - "12D3KooWJbZLiMuGeKw78s3LM5TNgBTJHcF39DraxLu14bucG9RN", - "12D3KooWGqfSPhHKmQycfhRjgUDE2vg9YWZN27Eue8idb2ZUk6EH", + "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", + "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", + "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", + "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", } - capabilityDONPeers := []string{ - "12D3KooWHCcyTPmYFB1ydNvNcXw5WyAomRzGSFu1B7hpB4yi8Smf", - "12D3KooWPv6eqJvYz7TcQWk4Y4XjZ1uQ7mUKahdDXj65ht95zH6a", + triggerDONPeers := []string{ + "12D3KooWJrthXtnPHw7xyHFAxo6NxifYTvc8igKYaA6wRRRqtsMb", + "12D3KooWFQekP9sGex4XhqEJav5EScjTpDVtDqJFg1JvrePBCEGJ", + "12D3KooWFLEq4hYtdyKWwe47dXGEbSiHMZhmr5xLSJNhpfiEz8NF", + "12D3KooWN2hztiXNNS1jMQTTvvPRYcarK1C7T3Mdqk4x4gwyo5WS", } allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) addPeersToDONInfo := func(peers []string, donInfo *remotetypes.DON) error { @@ -76,12 +82,12 @@ func (s *registrySyncer) Start(ctx context.Context) error { } return nil } - workflowDonInfo := remotetypes.DON{ID: "workflowDon1"} + workflowDonInfo := remotetypes.DON{ID: "workflowDon1", F: 1} if err := addPeersToDONInfo(workflowDONPeers, &workflowDonInfo); err != nil { return err } - capabilityDonInfo := remotetypes.DON{ID: "capabilityDon1"} - if err := addPeersToDONInfo(capabilityDONPeers, &capabilityDonInfo); err != nil { + triggerCapabilityDonInfo := remotetypes.DON{ID: "capabilityDon1", F: 1} + if err := addPeersToDONInfo(triggerDONPeers, &triggerCapabilityDonInfo); err != nil { return err } err := s.peerWrapper.GetPeer().UpdateConnections(allPeers) @@ -89,7 +95,7 @@ func (s *registrySyncer) Start(ctx context.Context) error { return err } // NOTE: temporary hard-coded capabilities - capId := "sample_remote_trigger" + capId := "mercury-trigger" triggerInfo := commoncap.CapabilityInfo{ ID: capId, CapabilityType: commoncap.CapabilityTypeTrigger, @@ -98,36 +104,42 @@ func (s *registrySyncer) Start(ctx context.Context) error { } myId := s.peerWrapper.GetPeer().ID().String() config := remotetypes.RemoteTriggerConfig{ - RegistrationRefreshMs: 20000, + RegistrationRefreshMs: 20000, + MinResponsesToAggregate: uint32(triggerCapabilityDonInfo.F) + 1, } if slices.Contains(workflowDONPeers, myId) { s.lggr.Info("member of a workflow DON - starting remote subscribers") - triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, capabilityDonInfo, workflowDonInfo, s.dispatcher, nil, s.lggr) + aggregator := triggers.NewMercuryRemoteAggregator(s.lggr) + triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, triggerCapabilityDonInfo, workflowDonInfo, s.dispatcher, aggregator, s.lggr) err = s.registry.Add(ctx, triggerCap) if err != nil { s.lggr.Errorw("failed to add remote target capability to registry", "error", err) return err } - err = s.dispatcher.SetReceiver(capId, capabilityDonInfo.ID, triggerCap) + err = s.dispatcher.SetReceiver(capId, triggerCapabilityDonInfo.ID, triggerCap) if err != nil { - s.lggr.Errorw("failed to set receiver", "capabilityId", capId, "donId", capabilityDonInfo.ID, "error", err) + s.lggr.Errorw("workflow DON failed to set receiver", "capabilityId", capId, "donId", triggerCapabilityDonInfo.ID, "error", err) return err } s.subServices = append(s.subServices, triggerCap) } - if slices.Contains(capabilityDONPeers, myId) { + if slices.Contains(triggerDONPeers, myId) { s.lggr.Info("member of a capability DON - starting remote publishers") workflowDONs := map[string]remotetypes.DON{ workflowDonInfo.ID: workflowDonInfo, } - underlying := &noOpTrigger{info: triggerInfo, lggr: s.lggr} - triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, capabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) - err = s.dispatcher.SetReceiver(capId, capabilityDonInfo.ID, triggerCap) + underlying := triggers.NewMercuryTriggerService(1000, s.lggr) + triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, triggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) + err = s.dispatcher.SetReceiver(capId, triggerCapabilityDonInfo.ID, triggerCap) if err != nil { - s.lggr.Errorw("failed to set receiver", "capabilityId", capId, "donId", capabilityDonInfo.ID, "error", err) + s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", triggerCapabilityDonInfo.ID, "error", err) return err } + s.subServices = append(s.subServices, underlying) s.subServices = append(s.subServices, triggerCap) + // NOTE: temporary mock Mercury data producer + mockMercuryDataProducer := NewMockMercuryDataProducer(underlying, s.lggr) + s.subServices = append(s.subServices, mockMercuryDataProducer) } // NOTE: temporary service start - should be managed by capability creation for _, srv := range s.subServices { @@ -163,21 +175,86 @@ func (s *registrySyncer) Name() string { return "RegistrySyncer" } -type noOpTrigger struct { - info commoncap.CapabilityInfo - lggr logger.Logger +type mockMercuryDataProducer struct { + trigger *triggers.MercuryTriggerService + wg sync.WaitGroup + closeCh chan struct{} + lggr logger.Logger } -func (t *noOpTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error) { - return t.info, nil +var _ services.Service = &mockMercuryDataProducer{} + +func NewMockMercuryDataProducer(trigger *triggers.MercuryTriggerService, lggr logger.Logger) *mockMercuryDataProducer { + return &mockMercuryDataProducer{ + trigger: trigger, + closeCh: make(chan struct{}), + lggr: lggr, + } } -func (t *noOpTrigger) RegisterTrigger(_ context.Context, _ chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error { - t.lggr.Infow("no-op trigger RegisterTrigger", "workflowID", request.Metadata.WorkflowID) +func (m *mockMercuryDataProducer) Start(ctx context.Context) error { + m.wg.Add(1) + go m.loop() return nil } -func (t *noOpTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error { - t.lggr.Infow("no-op trigger RegisterTrigger", "workflowID", request.Metadata.WorkflowID) +func (m *mockMercuryDataProducer) loop() { + defer m.wg.Done() + + sleepSec := 60 + ticker := time.NewTicker(time.Duration(sleepSec) * time.Second) + defer ticker.Stop() + + prices := []int64{300000, 40000, 5000000} + + for range ticker.C { + for i := range prices { + prices[i] = prices[i] + 1 + } + + reports := []mercury.FeedReport{ + { + FeedID: "0x1111111111111111111100000000000000000000000000000000000000000000", + FullReport: []byte{0x11, 0xaa, 0xbb, 0xcc}, + BenchmarkPrice: prices[0], + ObservationTimestamp: time.Now().Unix(), + }, + { + FeedID: "0x2222222222222222222200000000000000000000000000000000000000000000", + FullReport: []byte{0x22, 0xaa, 0xbb, 0xcc}, + BenchmarkPrice: prices[1], + ObservationTimestamp: time.Now().Unix(), + }, + { + FeedID: "0x3333333333333333333300000000000000000000000000000000000000000000", + FullReport: []byte{0x33, 0xaa, 0xbb, 0xcc}, + BenchmarkPrice: prices[2], + ObservationTimestamp: time.Now().Unix(), + }, + } + + m.lggr.Infow("New set of Mercury reports", "timestamp", time.Now().Unix(), "payload", reports) + err := m.trigger.ProcessReport(reports) + if err != nil { + m.lggr.Errorw("failed to process Mercury reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) + } + } +} + +func (m *mockMercuryDataProducer) Close() error { + close(m.closeCh) + m.wg.Wait() + return nil +} + +func (m *mockMercuryDataProducer) HealthReport() map[string]error { return nil } + +func (m *mockMercuryDataProducer) Ready() error { + return nil +} + +func (m *mockMercuryDataProducer) Name() string { + return "mockMercuryDataProducer" +} diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go index 335b9774689..757135635d8 100644 --- a/core/capabilities/syncer_test.go +++ b/core/capabilities/syncer_test.go @@ -20,7 +20,7 @@ func TestSyncer_CleanStartClose(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) var pid ragetypes.PeerID - err := pid.UnmarshalText([]byte("12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC")) + err := pid.UnmarshalText([]byte("12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N")) require.NoError(t, err) peer := mocks.NewPeer(t) peer.On("UpdateConnections", mock.Anything).Return(nil) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index afb3248f629..8c68d33c604 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -21,7 +21,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052 @@ -249,6 +249,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/scylladb/go-reflectx v1.0.1 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 449d8124317..2883ee68528 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb h1:yLDt5cQWRwcFM5VEdSTbc3vDrYrxYqBjSvyTMU/o8s4= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/core/services/p2p/wrapper/wrapper.go b/core/services/p2p/wrapper/wrapper.go index fd47c6c2dd2..acb6694b5a3 100644 --- a/core/services/p2p/wrapper/wrapper.go +++ b/core/services/p2p/wrapper/wrapper.go @@ -66,7 +66,9 @@ func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.Peer DeltaDial: p2pConfig.V2().DeltaDial().Duration(), DiscovererDatabase: discovererDB, - MetricsRegisterer: prometheus.DefaultRegisterer, + // NOTE: this is equivalent to prometheus.DefaultRegisterer, but we need to use a separate + // object to avoid conflicts with the OCR registerer + MetricsRegisterer: prometheus.NewRegistry(), } return peerConfig, nil diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index dedf53e369b..ed7d266131a 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -3,13 +3,10 @@ package workflows import ( "context" "fmt" - "time" "github.com/google/uuid" "github.com/pelletier/go-toml" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/mercury" - "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink/v2/core/capabilities/targets" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" @@ -45,14 +42,6 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser d.logger.Errorw("could not initialize writes", err) } - trigger := triggers.NewMercuryTriggerService(0, d.logger) - err = d.registry.Add(context.Background(), trigger) - if err != nil { - d.logger.Errorw("could not add mercury trigger to registry", err) - } else { - go mercuryEventLoop(trigger, d.logger) - } - cfg := Config{ Lggr: d.logger, Spec: spec.WorkflowSpec.Workflow, @@ -70,52 +59,6 @@ func NewDelegate(logger logger.Logger, registry types.CapabilitiesRegistry, lega return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains} } -func mercuryEventLoop(trigger *triggers.MercuryTriggerService, logger logger.Logger) { - sleepSec := 60 * time.Second - ticker := time.NewTicker(sleepSec) - defer ticker.Stop() - - prices := []int64{300000, 2000, 5000000} - - for range ticker.C { - for i := range prices { - prices[i] = prices[i] + 1 - } - - t := time.Now().Round(sleepSec).Unix() - reports, err := emitReports(logger, trigger, t, prices) - if err != nil { - logger.Errorw("failed to process Mercury reports", "err", err, "timestamp", time.Now().Unix(), "payload", reports) - } - } -} - -func emitReports(logger logger.Logger, trigger *triggers.MercuryTriggerService, t int64, prices []int64) ([]mercury.FeedReport, error) { - reports := []mercury.FeedReport{ - { - FeedID: "0x1111111111111111111100000000000000000000000000000000000000000000", - FullReport: []byte(fmt.Sprintf(`{ "feed": "ETH", "price": %d }`, prices[0])), - BenchmarkPrice: prices[0], - ObservationTimestamp: t, - }, - { - FeedID: "0x2222222222222222222200000000000000000000000000000000000000000000", - FullReport: []byte(fmt.Sprintf(`{ "feed": "LINK", "price": %d }`, prices[1])), - BenchmarkPrice: prices[1], - ObservationTimestamp: t, - }, - { - FeedID: "0x3333333333333333333300000000000000000000000000000000000000000000", - FullReport: []byte(fmt.Sprintf(`{ "feed": "BTC", "price": %d }`, prices[2])), - BenchmarkPrice: prices[2], - ObservationTimestamp: t, - }, - } - - logger.Infow("New set of Mercury reports", "timestamp", time.Now().Unix(), "payload", reports) - return reports, trigger.ProcessReport(reports) -} - func ValidatedWorkflowSpec(tomlString string) (job.Job, error) { var jb = job.Job{ExternalJobID: uuid.New()} diff --git a/go.mod b/go.mod index 9ad9b8c6dbe..3970b20a4bf 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240119021347-3c541a78cdb8 diff --git a/go.sum b/go.sum index 6e041744bf2..753ffcda235 100644 --- a/go.sum +++ b/go.sum @@ -1180,8 +1180,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb h1:yLDt5cQWRwcFM5VEdSTbc3vDrYrxYqBjSvyTMU/o8s4= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 7a09fa9875b..7746cca72d6 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -24,7 +24,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 github.com/smartcontractkit/chainlink-testing-framework v1.28.3 github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 @@ -368,6 +368,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday v1.6.0 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/sercand/kuberesolver/v5 v5.1.1 // indirect diff --git a/integration-tests/go.sum b/integration-tests/go.sum index 5b689d0ae3d..c18825a1f24 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1519,8 +1519,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb h1:yLDt5cQWRwcFM5VEdSTbc3vDrYrxYqBjSvyTMU/o8s4= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index b77ad247c01..f0f62ce41e7 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,7 +16,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.3 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 github.com/smartcontractkit/chainlink-testing-framework v1.28.3 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 @@ -355,6 +355,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect github.com/russross/blackfriday v1.6.0 // indirect + github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/sasha-s/go-deadlock v0.3.1 // indirect github.com/scylladb/go-reflectx v1.0.1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index 8926c81f74d..13d80e3a8e1 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1502,8 +1502,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs= github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb h1:yLDt5cQWRwcFM5VEdSTbc3vDrYrxYqBjSvyTMU/o8s4= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240415164156-8872a8f311cb/go.mod h1:kstYjAGqBswdZpl7YkSPeXBDVwaY1VaR6tUMPWl8ykA= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=