Skip to content

Commit

Permalink
Save #2
Browse files Browse the repository at this point in the history
  • Loading branch information
justinkaseman committed Sep 12, 2024
1 parent 1fd2ab5 commit a54cb90
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 18 deletions.
35 changes: 24 additions & 11 deletions core/capabilities/integration_tests/keystone_contracts_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
kvid, err := reg.GetHashedCapabilityId(&bind.CallOpts{}, kvstore.LabelledName, kvstore.Version)
require.NoError(t, err)

logTarget := kcr.CapabilitiesRegistryCapability{
LabelledName: "log-target",
Version: "1.0.0",
CapabilityType: CapabilityTypeTarget,
}
ltid, err := reg.GetHashedCapabilityId(&bind.CallOpts{}, logTarget.LabelledName, logTarget.Version)
require.NoError(t, err)

ocr := kcr.CapabilitiesRegistryCapability{
LabelledName: "offchain_reporting",
Version: "1.0.0",
Expand All @@ -154,6 +162,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
ocr,
cronTrigger,
kvstore,
logTarget,
})
require.NoError(t, err)
backend.Commit()
Expand Down Expand Up @@ -186,7 +195,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
n, innerErr := peerToNode(nopID, wfPeer)
require.NoError(t, innerErr)

n.HashedCapabilityIds = [][32]byte{ocrid, cid}
n.HashedCapabilityIds = [][32]byte{ocrid, cid, ltid, kvid}
nodes = append(nodes, n)
}

Expand All @@ -202,7 +211,7 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
n, innerErr := peerToNode(nopID, targetPeer)
require.NoError(t, innerErr)

n.HashedCapabilityIds = [][32]byte{wid, kvid}
n.HashedCapabilityIds = [][32]byte{wid}
nodes = append(nodes, n)
}

Expand All @@ -226,6 +235,14 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
CapabilityId: cid,
Config: ccb,
},
{
CapabilityId: ltid,
Config: ccb,
},
{
CapabilityId: kvid,
Config: ccb,
},
}

_, err = reg.AddDON(transactOpts, ps, cfgs, false, true, workflowDon.F)
Expand Down Expand Up @@ -269,11 +286,11 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl

targetCapabilityConfig.DefaultConfig = values.Proto(configWithLimit).GetMapValue()

targetCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTargetConfig{
RemoteTargetConfig: &pb.RemoteTargetConfig{
RequestHashExcludedAttributes: []string{"signed_report.Signatures"},
},
}
// targetCapabilityConfig.RemoteConfig = &pb.CapabilityConfig_RemoteTargetConfig{
// RemoteTargetConfig: &pb.RemoteTargetConfig{
// RequestHashExcludedAttributes: []string{"signed_report.Signatures"},
// },
// }

remoteTargetConfigBytes, err := proto.Marshal(targetCapabilityConfig)
require.NoError(t, err)
Expand All @@ -283,10 +300,6 @@ func setupCapabilitiesRegistryContract(ctx context.Context, t *testing.T, workfl
CapabilityId: wid,
Config: remoteTargetConfigBytes,
},
{
CapabilityId: kvid,
Config: remoteTargetConfigBytes,
},
}

_, err = reg.AddDON(transactOpts, ps, cfgs, true, false, targetDon.F)
Expand Down
Binary file modified core/capabilities/integration_tests/kvstore
Binary file not shown.
92 changes: 92 additions & 0 deletions core/capabilities/integration_tests/mock_target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package integration_tests

import (
"context"
"fmt"
"sync"
"testing"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

var (
_ capabilities.ActionCapability = &logTarget{}
)

const triggerIDm = "log-target@1.0.0"

type sink struct {
services.StateMachine
targets []logTarget

stopCh services.StopChan
wg sync.WaitGroup
}

func newSink() *sink {
return &sink{
stopCh: make(services.StopChan),
}
}

func (r *sink) Start(ctx context.Context) error {
return r.StartOnce("sink", func() error {
return nil
})
}

func (r *sink) Close() error {
return r.StopOnce("sink", func() error {
close(r.stopCh)
r.wg.Wait()
return nil
})
}

// func (r *sink) getLogs(reportList []*datastreams.FeedReport) {
// for _, trigger := range r.triggers {
// resp, err := wrapReports(reportList, "1", 12, datastreams.Metadata{})
// if err != nil {
// panic(err)
// }
// trigger.sendResponse(resp)
// }
// }

func (r *sink) getNewTarget(t *testing.T) *logTarget {
target := logTarget{t: t, toSend: make(chan capabilities.TriggerResponse, 1000),
wg: &r.wg, stopCh: r.stopCh}
r.targets = append(r.targets, target)
return &target
}

type logTarget struct {
t *testing.T
cancel context.CancelFunc
toSend chan capabilities.TriggerResponse

wg *sync.WaitGroup
stopCh services.StopChan
}

func (lt *logTarget) Execute(ctx context.Context, rawRequest capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
fmt.Println("##########################################")
return capabilities.CapabilityResponse{}, nil
}

func (lt *logTarget) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
return capabilities.MustNewCapabilityInfo(
triggerIDm,
capabilities.CapabilityTypeTarget,
"issues a trigger when a report is received.",
), nil
}

func (lt *logTarget) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
return nil
}

func (lt *logTarget) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
return nil
}
16 changes: 9 additions & 7 deletions core/capabilities/integration_tests/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func setupDonsWithTransmissionScheduleStreams(ctx context.Context, t *testing.T,
workflowDonNodes, _, _ := createDons(ctx, t, lggr, sink,
workflowDonInfo, triggerDonInfo, targetDonInfo,
ethBlockchain, capabilitiesRegistryAddr, forwarderAddr,
workflowDonInfo.keyBundles, transactor, libocr)
workflowDonInfo.keyBundles, transactor, libocr, nil)
for _, node := range workflowDonNodes {
addWorkflowJobStreams(t, node, workflowName, workflowOwnerID, feedIDs, consumerAddr, deltaStage, schedule)
}
Expand All @@ -105,21 +105,19 @@ func setupDonsWithTransmissionSchedulePoR(ctx context.Context, t *testing.T, wor
consumerAddr, _ := setupConsumerContract(t, transactor, ethBlockchain, forwarderAddr, workflowOwnerID, workflowName)

sink := newReportsSink()
sink2 := newSink()

libocr := newMockLibOCR(t, workflowDonInfo.F, 1*time.Second)
workflowDonNodes, _, targetDonNodes := createDons(ctx, t, lggr, sink,
workflowDonNodes, _, _ := createDons(ctx, t, lggr, sink,
workflowDonInfo, triggerDonInfo, targetDonInfo,
ethBlockchain, capabilitiesRegistryAddr, forwarderAddr,
workflowDonInfo.keyBundles, transactor, libocr)
workflowDonInfo.keyBundles, transactor, libocr, sink2)
for _, node := range workflowDonNodes {
addStandardCapabilityKV(t, node)
addStandardCapabilityCron(t, node)
addWorkflowJobPoR(t, node, workflowName, workflowOwnerID, cronSchedule, consumerAddr, deltaStage, schedule)
}

for _, node := range targetDonNodes {
addStandardCapabilityKV(t, node)
}

servicetest.Run(t, ethBlockchain)
servicetest.Run(t, libocr)
servicetest.Run(t, sink)
Expand All @@ -137,6 +135,7 @@ func createDons(ctx context.Context, t *testing.T, lggr logger.Logger, reportsSi
workflowNodeKeyBundles []ocr2key.KeyBundle,
transactor *bind.TransactOpts,
libocr *mockLibOCR,
sink *sink,
) ([]*cltest.TestApplication, []*cltest.TestApplication, []*cltest.TestApplication) {
broker := newTestAsyncMessageBroker(t, 1000)

Expand Down Expand Up @@ -182,6 +181,9 @@ func createDons(ctx context.Context, t *testing.T, lggr logger.Logger, reportsSi
workflowPeerDispatcher := broker.NewDispatcherForNode(workflowPeer)
capabilityRegistry := capabilities.NewRegistry(lggr)

target := sink.getNewTarget(t)
capabilityRegistry.Add(ctx, target)

requestTimeout := 10 * time.Minute
cfg := ocr3.Config{
Logger: lggr,
Expand Down
9 changes: 9 additions & 0 deletions core/capabilities/integration_tests/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ targets:
schedule: %s
`

// targets:
// - id: "log-target@1.0.0"
// ref: "target"
// inputs:
// signed_report: $(trigger.outputs)
// config:
// deltaStage: %s
// schedule: %s

func addWorkflowJobPoR(
t *testing.T,
app *cltest.TestApplication,
Expand Down
1 change: 1 addition & 0 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
}

err = cc.RegisterToWorkflow(ctx, registrationRequest)
fmt.Println("##########################", registrationRequest, cc)
if err != nil {
return newCPErr(fmt.Sprintf("failed to register capability to workflow (%+v)", registrationRequest), err)
}
Expand Down

0 comments on commit a54cb90

Please sign in to comment.