Skip to content

Commit

Permalink
standard capability support (#13215)
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec authored Jun 6, 2024
1 parent a02bb0a commit 66d8d16
Show file tree
Hide file tree
Showing 26 changed files with 594 additions and 57 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-clocks-prove.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal standard capability support
4 changes: 4 additions & 0 deletions core/cmd/jobs_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func (p JobPresenter) FriendlyCreatedAt() string {
if p.WorkflowSpec != nil {
return p.WorkflowSpec.CreatedAt.Format(time.RFC3339)
}
case presenters.StandardCapabilitiesJobSpec:
if p.StandardCapabilitiesSpec != nil {
return p.StandardCapabilitiesSpec.CreatedAt.Format(time.RFC3339)
}
default:
return "unknown"
}
Expand Down
2 changes: 1 addition & 1 deletion core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/shopspring/decimal v1.3.1
github.com/smartcontractkit/chainlink-automation v1.0.3
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240605181819-316b5eb82110
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240606094423-573049c41fa0
github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772
github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000
github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c
Expand Down
4 changes: 2 additions & 2 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1185,8 +1185,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq
github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE=
github.com/smartcontractkit/chainlink-automation v1.0.3 h1:h/ijT0NiyV06VxYVgcNfsE3+8OEzT3Q0Z9au0z1BPWs=
github.com/smartcontractkit/chainlink-automation v1.0.3/go.mod h1:RjboV0Qd7YP+To+OrzHGXaxUxoSONveCoAK2TQ1INLU=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240605181819-316b5eb82110 h1:skCp4kAmd0H+/sqiCzpwXKxkSWpf1NsdIUuN73nuz/Y=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240605181819-316b5eb82110/go.mod h1:DUZccDEW98n+J1mhdWGO7wr/Njad9p9Fzks839JN7Rs=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240606094423-573049c41fa0 h1:53zVcdDxrHG3oewhP7AWOiLtwTozcQ0/wzFTsaTBS5M=
github.com/smartcontractkit/chainlink-common v0.1.7-0.20240606094423-573049c41fa0/go.mod h1:DUZccDEW98n+J1mhdWGO7wr/Njad9p9Fzks839JN7Rs=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510=
github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k=
github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo=
Expand Down
14 changes: 11 additions & 3 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"

"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
"github.com/smartcontractkit/chainlink/v2/core/services/standardcapabilities"
"github.com/smartcontractkit/chainlink/v2/core/static"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
Expand Down Expand Up @@ -342,6 +342,8 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

srvcs = append(srvcs, pipelineORM)

loopRegistrarConfig := plugins.NewRegistrarConfig(opts.GRPCOpts, opts.LoopRegistry.Register, opts.LoopRegistry.Unregister)

var (
delegates = map[job.Type]job.Delegate{
job.DirectRequest: directrequest.NewDelegate(
Expand Down Expand Up @@ -394,6 +396,14 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
pipelineRunner,
cfg.JobPipeline(),
),
job.StandardCapabilities: standardcapabilities.NewDelegate(
globalLogger,
opts.DS, jobORM,
opts.CapabilitiesRegistry,
loopRegistrarConfig,
telemetryManager,
pipelineRunner,
opts.RelayerChainInteroperators),
}
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)
Expand Down Expand Up @@ -458,8 +468,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
globalLogger.Debug("Off-chain reporting disabled")
}

loopRegistrarConfig := plugins.NewRegistrarConfig(opts.GRPCOpts, opts.LoopRegistry.Register, opts.LoopRegistry.Unregister)

if cfg.OCR2().Enabled() {
globalLogger.Debug("Off-chain reporting v2 enabled")

Expand Down
27 changes: 27 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
VRF Type = (Type)(pipeline.VRFJobType)
Webhook Type = (Type)(pipeline.WebhookJobType)
Workflow Type = (Type)(pipeline.WorkflowJobType)
StandardCapabilities Type = (Type)(pipeline.StandardCapabilitiesJobType)
)

//revive:disable:redefines-builtin-id
Expand Down Expand Up @@ -88,6 +89,7 @@ var (
VRF: true,
Webhook: true,
Workflow: false,
StandardCapabilities: false,
}
supportsAsync = map[Type]bool{
BlockHeaderFeeder: false,
Expand All @@ -106,6 +108,7 @@ var (
VRF: true,
Webhook: true,
Workflow: false,
StandardCapabilities: false,
}
schemaVersions = map[Type]uint32{
BlockHeaderFeeder: 1,
Expand All @@ -124,6 +127,7 @@ var (
VRF: 1,
Webhook: 1,
Workflow: 1,
StandardCapabilities: 1,
}
)

Expand Down Expand Up @@ -167,6 +171,8 @@ type Job struct {
PipelineSpec *pipeline.Spec
WorkflowSpecID *int32
WorkflowSpec *WorkflowSpec
StandardCapabilitiesSpecID *int32
StandardCapabilitiesSpec *StandardCapabilitiesSpec
JobSpecErrors []SpecError
Type Type `toml:"type"`
SchemaVersion uint32 `toml:"schemaVersion"`
Expand Down Expand Up @@ -886,3 +892,24 @@ func (w *WorkflowSpec) Validate() error {

return nil
}

type StandardCapabilitiesSpec struct {
ID int32
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
Command string `toml:"command"`
Config string `toml:"config"`
}

func (w *StandardCapabilitiesSpec) GetID() string {
return fmt.Sprintf("%v", w.ID)
}

func (w *StandardCapabilitiesSpec) SetID(value string) error {
ID, err := strconv.ParseInt(value, 10, 32)
if err != nil {
return err
}
w.ID = int32(ID)
return nil
}
25 changes: 20 additions & 5 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,16 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
return errors.Wrap(err, "failed to create WorkflowSpec for jobSpec")
}
jb.WorkflowSpecID = &specID
case StandardCapabilities:
sql := `INSERT INTO standardcapabilities_specs (command, config, created_at, updated_at)
VALUES (:command, :config, NOW(), NOW())
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.StandardCapabilitiesSpec)
if err != nil {
return errors.Wrap(err, "failed to create StandardCapabilities for jobSpec")
}
jb.StandardCapabilitiesSpecID = &specID

default:
o.lggr.Panicf("Unsupported jb.Type: %v", jb.Type)
}
Expand Down Expand Up @@ -630,18 +640,18 @@ func (o *orm) InsertJob(ctx context.Context, job *Job) error {
if job.ID == 0 {
query = `INSERT INTO jobs (name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
} else {
query = `INSERT INTO jobs (id, name, stream_id, schema_version, type, max_task_duration, ocr_oracle_spec_id, ocr2_oracle_spec_id, direct_request_spec_id, flux_monitor_spec_id,
keeper_spec_id, cron_spec_id, vrf_spec_id, webhook_spec_id, blockhash_store_spec_id, bootstrap_spec_id, block_header_feeder_spec_id, gateway_spec_id,
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
legacy_gas_station_server_spec_id, legacy_gas_station_sidecar_spec_id, workflow_spec_id, standard_capabilities_spec_id, external_job_id, gas_limit, forwarding_allowed, created_at)
VALUES (:id, :name, :stream_id, :schema_version, :type, :max_task_duration, :ocr_oracle_spec_id, :ocr2_oracle_spec_id, :direct_request_spec_id, :flux_monitor_spec_id,
:keeper_spec_id, :cron_spec_id, :vrf_spec_id, :webhook_spec_id, :blockhash_store_spec_id, :bootstrap_spec_id, :block_header_feeder_spec_id, :gateway_spec_id,
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
:legacy_gas_station_server_spec_id, :legacy_gas_station_sidecar_spec_id, :workflow_spec_id, :standard_capabilities_spec_id, :external_job_id, :gas_limit, :forwarding_allowed, NOW())
RETURNING *;`
}
query, args, err := tx.ds.BindNamed(query, job)
Expand Down Expand Up @@ -684,7 +694,8 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error {
bootstrap_spec_id,
block_header_feeder_spec_id,
gateway_spec_id,
workflow_spec_id
workflow_spec_id,
standard_capabilities_spec_id
),
deleted_oracle_specs AS (
DELETE FROM ocr_oracle_specs WHERE id IN (SELECT ocr_oracle_spec_id FROM deleted_jobs)
Expand Down Expand Up @@ -725,6 +736,9 @@ func (o *orm) DeleteJob(ctx context.Context, id int32) error {
deleted_workflow_specs AS (
DELETE FROM workflow_specs WHERE id in (SELECT workflow_spec_id FROM deleted_jobs)
),
deleted_standardcapabilities_specs AS (
DELETE FROM standardcapabilities_specs WHERE id in (SELECT standard_capabilities_spec_id FROM deleted_jobs)
),
deleted_job_pipeline_specs AS (
DELETE FROM job_pipeline_specs WHERE job_id IN (SELECT id FROM deleted_jobs) RETURNING pipeline_spec_id
)
Expand Down Expand Up @@ -1372,6 +1386,7 @@ func (o *orm) loadAllJobTypes(ctx context.Context, job *Job) error {
o.loadJobType(ctx, job, "BootstrapSpec", "bootstrap_specs", job.BootstrapSpecID),
o.loadJobType(ctx, job, "GatewaySpec", "gateway_specs", job.GatewaySpecID),
o.loadJobType(ctx, job, "WorkflowSpec", "workflow_specs", job.WorkflowSpecID),
o.loadJobType(ctx, job, "StandardCapabilitiesSpec", "standardcapabilities_specs", job.StandardCapabilitiesSpecID),
)
}

Expand Down
1 change: 1 addition & 0 deletions core/services/job/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var (
VRF: {},
Webhook: {},
Workflow: {},
StandardCapabilities: {},
}
)

Expand Down
1 change: 1 addition & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
VRFJobType string = "vrf"
WebhookJobType string = "webhook"
WorkflowJobType string = "workflow"
StandardCapabilitiesJobType string = "standardcapabilities"
)

//go:generate mockery --quiet --name Config --output ./mocks/ --case=underscore
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 120 additions & 0 deletions core/services/standardcapabilities/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package standardcapabilities

import (
"context"
"fmt"

"github.com/google/uuid"
"github.com/pelletier/go-toml"
"github.com/pkg/errors"

"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/generic"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/telemetry"
"github.com/smartcontractkit/chainlink/v2/plugins"
)

type RelayGetter interface {
Get(id types.RelayID) (loop.Relayer, error)
GetIDToRelayerMap() (map[types.RelayID]loop.Relayer, error)
}

type Delegate struct {
logger logger.Logger
ds sqlutil.DataSource
jobORM job.ORM
registry core.CapabilitiesRegistry
cfg plugins.RegistrarConfig
monitoringEndpointGen telemetry.MonitoringEndpointGenerator
pipelineRunner pipeline.Runner
relayers RelayGetter

isNewlyCreatedJob bool
}

func NewDelegate(logger logger.Logger, ds sqlutil.DataSource, jobORM job.ORM, registry core.CapabilitiesRegistry,
cfg plugins.RegistrarConfig, monitoringEndpointGen telemetry.MonitoringEndpointGenerator, pipelineRunner pipeline.Runner,
relayers RelayGetter) *Delegate {
return &Delegate{logger: logger, ds: ds, jobORM: jobORM, registry: registry, cfg: cfg, monitoringEndpointGen: monitoringEndpointGen, pipelineRunner: pipelineRunner,
relayers: relayers, isNewlyCreatedJob: false}
}

func (d *Delegate) JobType() job.Type {
return job.StandardCapabilities
}

func (d *Delegate) BeforeJobCreated(job job.Job) {
// This is only called first time the job is created
d.isNewlyCreatedJob = true
}

func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.ServiceCtx, error) {
log := d.logger.Named("StandardCapabilities").Named(spec.StandardCapabilitiesSpec.GetID())

kvStore := job.NewKVStore(spec.ID, d.ds, log)
telemetryService := generic.NewTelemetryAdapter(d.monitoringEndpointGen)
errorLog := &ErrorLog{jobID: spec.ID, recordError: d.jobORM.RecordError}
pr := generic.NewPipelineRunnerAdapter(log, spec, d.pipelineRunner)

relayerSet, err := generic.NewRelayerSet(d.relayers, spec.ExternalJobID, spec.ID, d.isNewlyCreatedJob)
if err != nil {
return nil, fmt.Errorf("failed to create relayer set: %w", err)
}

standardCapability := newStandardCapabilities(log, spec.StandardCapabilitiesSpec, d.cfg, telemetryService, kvStore, d.registry, errorLog,
pr, relayerSet)

return []job.ServiceCtx{standardCapability}, nil
}

func (d *Delegate) AfterJobCreated(job job.Job) {}

func (d *Delegate) BeforeJobDeleted(job job.Job) {}

func (d *Delegate) OnDeleteJob(ctx context.Context, jb job.Job) error { return nil }

func ValidatedStandardCapabilitiesSpec(tomlString string) (job.Job, error) {
var jb = job.Job{ExternalJobID: uuid.New()}

tree, err := toml.Load(tomlString)
if err != nil {
return jb, errors.Wrap(err, "toml error on load standard capabilities")
}

err = tree.Unmarshal(&jb)
if err != nil {
return jb, errors.Wrap(err, "toml unmarshal error on standard capabilities spec")
}

var spec job.StandardCapabilitiesSpec
err = tree.Unmarshal(&spec)
if err != nil {
return jb, errors.Wrap(err, "toml unmarshal error on standard capabilities job")
}

jb.StandardCapabilitiesSpec = &spec
if jb.Type != job.StandardCapabilities {
return jb, errors.Errorf("standard capabilities unsupported job type %s", jb.Type)
}

if len(jb.StandardCapabilitiesSpec.Command) == 0 {
return jb, errors.Errorf("standard capabilities command must be set")
}

return jb, nil
}

type ErrorLog struct {
jobID int32
recordError func(ctx context.Context, jobID int32, description string) error
}

func (l *ErrorLog) SaveError(ctx context.Context, msg string) error {
return l.recordError(ctx, l.jobID, msg)
}
Loading

0 comments on commit 66d8d16

Please sign in to comment.