Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update core to be compatible with changes made in chainlink-common as part of KS-120 (moving capabilities to use streams) #12857

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fresh-lizards-love.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Updates required to work with chainlink-common changes to support grpc streams for capabilities
6 changes: 3 additions & 3 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type mockCapability struct {
capabilities.CapabilityInfo
}

func (m *mockCapability) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
return nil
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
return nil, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
{
name: "trigger",
newCapability: func(ctx context.Context, reg *coreCapabilities.Registry) (string, error) {
odt := triggers.NewOnDemand()
odt := triggers.NewOnDemand(logger.TestLogger(t))
info, err := odt.Info(ctx)
require.NoError(t, err)
return info.ID, reg.Add(ctx, odt)
Expand Down
8 changes: 5 additions & 3 deletions core/capabilities/remote/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (c *remoteTargetCaller) UnregisterFromWorkflow(ctx context.Context, request
return errors.New("not implemented")
}

func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (c *remoteTargetCaller) Execute(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
c.lggr.Debugw("not implemented - executing fake remote target capability", "capabilityId", c.capInfo.ID, "nMembers", len(c.donInfo.Members))
for _, peerID := range c.donInfo.Members {
m := &types.MessageBody{
Expand All @@ -60,10 +60,12 @@ func (c *remoteTargetCaller) Execute(ctx context.Context, callback chan<- common
}
err := c.dispatcher.Send(peerID, m)
if err != nil {
return err
return nil, err
}
}
return nil

// TODO: return a channel that will be closed when all responses are received
return nil, nil
}

func (c *remoteTargetCaller) Receive(msg *types.MessageBody) {
Expand Down
6 changes: 4 additions & 2 deletions core/capabilities/remote/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package remote_test
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
Expand All @@ -24,5 +24,7 @@ func TestTarget_Placeholder(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)
dispatcher.On("Send", mock.Anything, mock.Anything).Return(nil)
target := remote.NewRemoteTargetCaller(commoncap.CapabilityInfo{}, donInfo, dispatcher, lggr)
require.NoError(t, target.Execute(ctx, nil, commoncap.CapabilityRequest{}))

_, err := target.Execute(ctx, commoncap.CapabilityRequest{})
assert.NoError(t, err)
}
8 changes: 3 additions & 5 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type registrationKey struct {
}

type pubRegState struct {
callback chan<- commoncap.CapabilityResponse
callback <-chan commoncap.CapabilityResponse
request commoncap.CapabilityRequest
}

Expand Down Expand Up @@ -112,9 +112,8 @@ func (p *triggerPublisher) Receive(msg *types.MessageBody) {
p.lggr.Errorw("failed to unmarshal request", "capabilityId", p.capInfo.ID, "err", err)
return
}
callbackCh := make(chan commoncap.CapabilityResponse)
ctx, cancel := p.stopCh.NewCtx()
err = p.underlying.RegisterTrigger(ctx, callbackCh, unmarshaled)
callbackCh, err := p.underlying.RegisterTrigger(ctx, unmarshaled)
cancel()
if err == nil {
p.registrations[key] = &pubRegState{
Expand Down Expand Up @@ -153,7 +152,6 @@ func (p *triggerPublisher) registrationCleanupLoop() {
cancel()
p.lggr.Infow("unregistered trigger", "capabilityId", p.capInfo.ID, "callerDonID", key.callerDonId, "workflowId", key.workflowId, "err", err)
// after calling UnregisterTrigger, the underlying trigger will not send any more events to the channel
close(req.callback)
delete(p.registrations, key)
p.messageCache.Delete(key)
}
Expand All @@ -163,7 +161,7 @@ func (p *triggerPublisher) registrationCleanupLoop() {
}
}

func (p *triggerPublisher) triggerEventLoop(callbackCh chan commoncap.CapabilityResponse, key registrationKey) {
func (p *triggerPublisher) triggerEventLoop(callbackCh <-chan commoncap.CapabilityResponse, key registrationKey) {
defer p.wg.Done()
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func (t *testTrigger) Info(_ context.Context) (commoncap.CapabilityInfo, error)
return t.info, nil
}

func (t *testTrigger) RegisterTrigger(_ context.Context, _ chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (t *testTrigger) RegisterTrigger(_ context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
t.registrationsCh <- request
return nil
return nil, nil
}

func (t *testTrigger) UnregisterTrigger(_ context.Context, request commoncap.CapabilityRequest) error {
Expand Down
19 changes: 14 additions & 5 deletions core/capabilities/remote/trigger_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ var _ commoncap.TriggerCapability = &triggerSubscriber{}
var _ types.Receiver = &triggerSubscriber{}
var _ services.Service = &triggerSubscriber{}

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
// TODO makes this configurable with a default
const defaultSendChannelBufferSize = 1000

func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON,
dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber {
if aggregator == nil {
lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID)
aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1))
Expand Down Expand Up @@ -88,22 +92,25 @@ func (s *triggerSubscriber) Info(ctx context.Context) (commoncap.CapabilityInfo,
return s.capInfo, nil
}

func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, callback chan<- commoncap.CapabilityResponse, request commoncap.CapabilityRequest) error {
func (s *triggerSubscriber) RegisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) (<-chan commoncap.CapabilityResponse, error) {
rawRequest, err := pb.MarshalCapabilityRequest(request)
if err != nil {
return err
return nil, err
}
if request.Metadata.WorkflowID == "" {
return errors.New("empty workflowID")
return nil, errors.New("empty workflowID")
}
s.mu.Lock()
defer s.mu.Unlock()

callback := make(chan commoncap.CapabilityResponse, defaultSendChannelBufferSize)
s.registeredWorkflows[request.Metadata.WorkflowID] = &subRegState{
callback: callback,
rawRequest: rawRequest,
}

s.lggr.Infow("RegisterTrigger called", "capabilityId", s.capInfo.ID, "donId", s.capDonInfo.ID, "workflowID", request.Metadata.WorkflowID)
return nil
return callback, nil
}

func (s *triggerSubscriber) registrationLoop() {
Expand Down Expand Up @@ -141,6 +148,8 @@ func (s *triggerSubscriber) registrationLoop() {
func (s *triggerSubscriber) UnregisterTrigger(ctx context.Context, request commoncap.CapabilityRequest) error {
s.mu.Lock()
defer s.mu.Unlock()

close(s.registeredWorkflows[request.Metadata.WorkflowID].callback)
delete(s.registeredWorkflows, request.Metadata.WorkflowID)
// Registrations will quickly expire on all remote nodes.
// Alternatively, we could send UnregisterTrigger messages right away.
Expand Down
7 changes: 4 additions & 3 deletions core/capabilities/remote/trigger_subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) {
}
subscriber := remote.NewTriggerSubscriber(config, capInfo, capDonInfo, workflowDonInfo, dispatcher, nil, lggr)
require.NoError(t, subscriber.Start(ctx))
triggerEventCallbackCh := make(chan commoncap.CapabilityResponse, 2)
require.NoError(t, subscriber.RegisterTrigger(ctx, triggerEventCallbackCh, commoncap.CapabilityRequest{

triggerEventCallbackCh, err := subscriber.RegisterTrigger(ctx, commoncap.CapabilityRequest{
Metadata: commoncap.RequestMetadata{
WorkflowID: workflowID1,
},
}))
})
require.NoError(t, err)
<-awaitRegistrationMessageCh

// receive trigger event
Expand Down
26 changes: 15 additions & 11 deletions core/capabilities/targets/write_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import (

var forwardABI = evmtypes.MustGetABI(forwarder.KeystoneForwarderMetaData.ABI)

func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, lggr logger.Logger) error {
func InitializeWrite(registry commontypes.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer,
lggr logger.Logger) error {
for _, chain := range legacyEVMChains.Slice() {
capability := NewEvmWrite(chain, lggr)
if err := registry.Add(context.TODO(), capability); err != nil {
Expand Down Expand Up @@ -157,7 +158,7 @@ func encodePayload(args []any, rawSelector string) ([]byte, error) {
// return append(method.ID, arguments...), nil
}

func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.CapabilityResponse, request capabilities.CapabilityRequest) error {
func (cap *EvmWrite) Execute(ctx context.Context, request capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cap.lggr.Debugw("Execute", "request", request)
// TODO: idempotency

Expand All @@ -168,22 +169,23 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C

reqConfig, err := parseConfig(request.Config)
if err != nil {
return err
return nil, err
}

inputsAny, err := request.Inputs.Unwrap()
if err != nil {
return err
return nil, err
}
inputs := inputsAny.(map[string]any)
rep, ok := inputs["report"]
if !ok {
return errors.New("malformed data: inputs doesn't contain a report key")
return nil, errors.New("malformed data: inputs doesn't contain a report key")
}

if rep == nil {
// We received any empty report -- this means we should skip transmission.
cap.lggr.Debugw("Skipping empty report", "request", request)
callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -192,18 +194,18 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

// evaluate any variables in reqConfig.Params
args, err := evaluateParams(reqConfig.Params, inputs)
if err != nil {
return err
return nil, err
}

data, err := encodePayload(args, reqConfig.ABI)
if err != nil {
return err
return nil, err
}

// TODO: validate encoded report is prefixed with workflowID and executionID that match the request meta
Expand All @@ -214,7 +216,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
// construct forwarding payload
calldata, err := forwardABI.Pack("report", common.HexToAddress(reqConfig.Address), data, signatures)
if err != nil {
return err
return nil, err
}

txMeta := &txmgr.TxMeta{
Expand All @@ -238,9 +240,11 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
tx, err := txm.CreateTransaction(ctx, req)
if err != nil {
return err
return nil, err
}
cap.lggr.Debugw("Transaction submitted", "request", request, "transaction", tx)

callback := make(chan capabilities.CapabilityResponse)
go func() {
// TODO: cast tx.Error to Err (or Value to Value?)
callback <- capabilities.CapabilityResponse{
Expand All @@ -249,7 +253,7 @@ func (cap *EvmWrite) Execute(ctx context.Context, callback chan<- capabilities.C
}
close(callback)
}()
return nil
return callback, nil
}

func (cap *EvmWrite) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand Down
8 changes: 2 additions & 6 deletions core/capabilities/targets/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func TestEvmWrite(t *testing.T) {

})

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down Expand Up @@ -134,9 +132,7 @@ func TestEvmWrite_EmptyReport(t *testing.T) {
Inputs: inputs,
}

ch := make(chan capabilities.CapabilityResponse)

err = capability.Execute(ctx, ch, req)
ch, err := capability.Execute(ctx, req)
require.NoError(t, err)

response := <-ch
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240326191951-2bbe9382d052
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0 h1:K7Y+gd0lEumrhgaIQFjC8reXadJOMVgDI4xsCAsiuSo=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419013737-4554767e4db0/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e h1:nHs5mFOR7FPII20GrCGIPywDW43MhEUlD7DqHnTgu6Q=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240419105123-fc5d616c7d2e/go.mod h1:GTDBbovHUSAUk+fuGIySF2A/whhdtHGaWmU61BoERks=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8 h1:I326nw5GwHQHsLKHwtu5Sb9EBLylC8CfUd7BFAS0jtg=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240213120401-01a23955f9f8/go.mod h1:a65NtrK4xZb01mf0dDNghPkN2wXgcqFQ55ADthVBgMc=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
9 changes: 8 additions & 1 deletion core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) erro
Config: tc,
Inputs: triggerInputs,
}
err = t.trigger.RegisterTrigger(ctx, e.triggerEvents, triggerRegRequest)
eventsCh, err := t.trigger.RegisterTrigger(ctx, triggerRegRequest)
if err != nil {
return fmt.Errorf("failed to instantiate trigger %s, %s", t.Type, err)
}

go func() {
for event := range eventsCh {
e.triggerEvents <- event
}
}()

return nil
}

Expand Down
18 changes: 11 additions & 7 deletions core/services/workflows/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,18 @@ func newMockCapability(info capabilities.CapabilityInfo, transform func(capabili
}
}

func (m *mockCapability) Execute(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
cr, err := m.transform(req)
if err != nil {
return err
return nil, err
}

ch := make(chan capabilities.CapabilityResponse, 10)

m.response <- cr
ch <- cr
close(ch)
m.response <- cr
return nil
return ch, nil
}

func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
Expand All @@ -102,13 +104,14 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap
type mockTriggerCapability struct {
capabilities.CapabilityInfo
triggerEvent capabilities.CapabilityResponse
ch chan capabilities.CapabilityResponse
}

var _ capabilities.TriggerCapability = (*mockTriggerCapability)(nil)

func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, ch chan<- capabilities.CapabilityResponse, req capabilities.CapabilityRequest) error {
ch <- m.triggerEvent
return nil
func (m *mockTriggerCapability) RegisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) {
m.ch <- m.triggerEvent
return m.ch, nil
}

func (m *mockTriggerCapability) UnregisterTrigger(ctx context.Context, req capabilities.CapabilityRequest) error {
Expand Down Expand Up @@ -217,6 +220,7 @@ func mockTrigger(t *testing.T) (capabilities.TriggerCapability, capabilities.Cap
"issues a trigger when a mercury report is received.",
"v1.0.0",
),
ch: make(chan capabilities.CapabilityResponse, 10),
}
resp, err := values.NewMap(map[string]any{
"123": decimal.NewFromFloat(1.00),
Expand Down
Loading
Loading