Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KS-198: Workflow Spec Approval #13181

Merged
merged 9 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-yaks-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added workflow spec auto-approval via CLO
65 changes: 60 additions & 5 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr"
ocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand All @@ -50,6 +51,21 @@ var (
Help: "Metric to track job proposal requests",
})

promWorkflowRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_requests",
bolekk marked this conversation as resolved.
Show resolved Hide resolved
Help: "Metric to track workflow requests",
})

promWorkflowApprovals = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_approvals",
Help: "Metric to track workflow successful auto approvals",
})

promWorkflowFailures = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_rejections",
Help: "Metric to track workflow failed auto approvals",
})

promJobProposalCounts = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "feeds_job_proposal_count",
Help: "Number of job proposals for the node partitioned by status.",
Expand Down Expand Up @@ -553,6 +569,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
}

if exists {
// note: CLO auto-increments the version number on re-proposal, so this should never happen
return 0, errors.New("proposed job spec version already exists")
}
}
Expand Down Expand Up @@ -596,9 +613,21 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
if err != nil {
return 0, err
}

// Track the given job proposal request
promJobProposalRequest.Inc()
// auto approve workflow specs
if isWFSpec(logger, args.Spec) {
promWorkflowRequests.Inc()
err = s.ApproveSpec(ctx, id, true)
bolekk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
promWorkflowFailures.Inc()
logger.Errorw("Failed to auto approve workflow spec", "id", id, "err", err)
return 0, fmt.Errorf("failed to approve workflow spec %d: %w", id, err)
}
logger.Infow("Successful workflow spec auto approval", "id", id)
promWorkflowApprovals.Inc()
} else {
// Track the given job proposal request
promJobProposalRequest.Inc()
}

if err = s.observeJobProposalCounts(ctx); err != nil {
logger.Errorw("Failed to push metrics for propose job", err)
Expand All @@ -607,6 +636,16 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
return id, nil
}

func isWFSpec(lggr logger.Logger, spec string) bool {
jobType, err := job.ValidateSpec(spec)
if err != nil {
// this should not happen in practice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like there's a lot of duplication around validating specs. I guess it's necessary for each function individually but it appears like it's getting called up to 4 times. Not a big deal but maybe we can refactor and consolidate a bit later

lggr.Errorw("Failed to validate spec while checking for workflow", "err", err)
return false
}
return jobType == job.Workflow
}

// GetJobProposal gets a job proposal by id.
func (s *service) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return s.orm.GetJobProposal(ctx, id)
Expand Down Expand Up @@ -761,6 +800,15 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
return errors.Wrap(txerr, "FindOCR2JobIDByAddress failed")
}
}
case job.Workflow:
existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM)
if txerr != nil {
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing workflow job: %w", txerr)
}
}
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down Expand Up @@ -1058,6 +1106,11 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
return nil
}

// TODO KS-205 implement this. Need to figure out how exactly how we want to handle this.
func findExistingWorkflowJob(ctx context.Context, wfSpec job.WorkflowSpec, tx job.ORM) (int32, error) {
return 0, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to highlight that without this being implemented, what this means is, if the job has always been managed through CLO then it won't be applicable, as the externalJobID should be able to find the existing job. However if the workflow was added outside CLO then that job needs to be manually removed first. If workflow specs are not being manually added by nops then technically the latter should also not be a problem. However, if we do hide the workflow job from the UI it would also mean that they can't remove the job either.

I don't think it's a problem for now but perhaps we should revisit so it's full proof. The query here is usually through a unique identifier that ties the job to the spec.

}

// findExistingJobForOCR2 looks for existing job for OCR2
func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, error) {
var contractID string
Expand All @@ -1073,7 +1126,7 @@ func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32,
feedID = j.BootstrapSpec.FeedID
}
case job.FluxMonitor, job.OffchainReporting:
return 0, errors.Errorf("contradID and feedID not applicable for job type: %s", j.Type)
return 0, errors.Errorf("contractID and feedID not applicable for job type: %s", j.Type)
default:
return 0, errors.Errorf("unsupported job type: %s", j.Type)
}
Expand Down Expand Up @@ -1106,7 +1159,7 @@ func findExistingJobForOCRFlux(ctx context.Context, j *job.Job, tx job.ORM) (int
func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error) {
jobType, err := job.ValidateSpec(spec)
if err != nil {
return nil, errors.Wrap(err, "failed to parse job spec TOML")
return nil, fmt.Errorf("failed to parse job spec TOML'%s': %w", spec, err)
}

var js job.Job
Expand All @@ -1128,6 +1181,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
js, err = ocrbootstrap.ValidatedBootstrapSpecToml(spec)
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowSpec(spec)
default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
133 changes: 133 additions & 0 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand Down Expand Up @@ -638,6 +639,54 @@ func Test_Service_ProposeJob(t *testing.T) {
}

httpTimeout = *commonconfig.MustNewDuration(1 * time.Second)

// variables for workflow spec
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
specYaml = `
triggers:
- id: "a-trigger"

actions:
- id: "an-action"
ref: "an-action"
inputs:
trigger_output: $(trigger.outputs)

consensus:
- id: "a-consensus"
ref: "a-consensus"
inputs:
trigger_output: $(trigger.outputs)
an-action_output: $(an-action.outputs)

targets:
- id: "a-target"
ref: "a-target"
inputs:
consensus_output: $(a-consensus.outputs)
`
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml()
proposalIDWF = int64(11)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
FeedsManagerID: 1,
RemoteUUID: remoteUUIDWF,
Spec: wfSpec,
Version: 1,
}
jpWF = feeds.JobProposal{
FeedsManagerID: 1,
Name: null.StringFrom("test-spec"),
RemoteUUID: remoteUUIDWF,
Status: feeds.JobProposalStatusPending,
}
proposalSpecWF = feeds.JobProposalSpec{
Definition: wfSpec,
Status: feeds.SpecStatusPending,
Version: 1,
JobProposalID: proposalIDWF,
}
)

testCases := []struct {
Expand All @@ -647,6 +696,90 @@ func Test_Service_ProposeJob(t *testing.T) {
wantID int64
wantErr string
}{
{
name: "Auto approve WF spec",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
proposalSpecWF.JobProposalID,
mock.IsType(uuid.UUID{}),
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jpWF.RemoteUUID.String(),
Version: int64(proposalSpecWF.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
},
args: argsWF,
wantID: proposalIDWF,
},
{
name: "Auto approve WF spec: error creating job",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
// svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(fmt.Errorf("error creating job"))
},
args: argsWF,
wantID: 0,
wantErr: "error creating job",
},

{
name: "Create success (Flux Monitor)",
before: func(svc *TestService) {
Expand Down
14 changes: 10 additions & 4 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,27 @@
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on spec: %w", err)
}
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s, expected %s", jb.Type, job.Workflow)
}

var spec job.WorkflowSpec
err = tree.Unmarshal(&spec)
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on job: %w", err)
return jb, fmt.Errorf("toml unmarshal error on workflow spec: %w", err)
}

if err := spec.Validate(); err != nil {

Check failure on line 135 in core/services/workflows/delegate.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 116 (govet)
return jb, err
}

jb.WorkflowSpec = &spec
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s", jb.Type)
// ensure the embedded workflow graph is valid
_, err = Parse(spec.Workflow)
if err != nil {
return jb, fmt.Errorf("failed to parse workflow graph: %w", err)
}
jb.WorkflowSpec = &spec
jb.WorkflowSpecID = &spec.ID

return jb, nil
}
Loading