From 8d3cd360873ffbf629205914e447207f462c50e5 Mon Sep 17 00:00:00 2001 From: Andrei Smirnov Date: Thu, 29 Jun 2023 19:47:49 +0300 Subject: [PATCH] Initializing S4 plugin (#9725) * Initializing S4 plugin * Addressed PR feedback --- core/services/ocr2/delegate.go | 73 +++++++++++++------ .../ocr2/plugins/functions/config/config.go | 29 ++++++++ .../plugins/functions/config/config_test.go | 41 +++++++++++ .../services/ocr2/plugins/functions/plugin.go | 18 ++++- core/services/synchronization/common.go | 1 + 5 files changed, 140 insertions(+), 22 deletions(-) create mode 100644 core/services/ocr2/plugins/functions/config/config_test.go diff --git a/core/services/ocr2/delegate.go b/core/services/ocr2/delegate.go index 424b2e3bff9..860aaf51c29 100644 --- a/core/services/ocr2/delegate.go +++ b/core/services/ocr2/delegate.go @@ -413,9 +413,14 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) { return d.newServicesOCR2Keepers(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger) case job.OCR2Functions: - thresholdPluginId := int32(1) + const ( + _ int32 = iota + thresholdPluginId + s4PluginId + ) thresholdPluginDB := NewDB(d.db, spec.ID, thresholdPluginId, lggr, d.cfg.Database()) - return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, lc, ocrLogger) + s4PluginDB := NewDB(d.db, spec.ID, s4PluginId, lggr, d.cfg.Database()) + return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, s4PluginDB, lc, ocrLogger) default: return nil, errors.Errorf("plugin type %s not supported", spec.PluginType) @@ -933,6 +938,7 @@ func (d *Delegate) newServicesOCR2Functions( kb ocr2key.KeyBundle, functionsOcrDB *db, thresholdOcrDB *db, + s4OcrDB *db, lc ocrtypes.LocalConfig, ocrLogger commontypes.Logger, ) ([]job.ServiceCtx, error) { @@ -940,23 +946,33 @@ func (d *Delegate) newServicesOCR2Functions( if spec.Relay != relay.EVM { return nil, fmt.Errorf("unsupported relay: %s", spec.Relay) } - functionsProvider, err := evmrelay.NewFunctionsProvider( - d.chainSet, - types.RelayArgs{ - ExternalJobID: jb.ExternalJobID, - JobID: spec.ID, - ContractID: spec.ContractID, - RelayConfig: spec.RelayConfig.Bytes(), - New: d.isNewlyCreatedJob, - }, - types.PluginArgs{ - TransmitterID: spec.TransmitterID.String, - PluginConfig: spec.PluginConfig.Bytes(), - }, - lggr.Named("FunctionsRelayer"), - d.ethKs, - functionsRelay.FunctionsPlugin, - ) + + createPluginProvider := func(pluginType functionsRelay.FunctionsPluginType, relayerName string) (types.PluginProvider, error) { + return evmrelay.NewFunctionsProvider( + d.chainSet, + types.RelayArgs{ + ExternalJobID: jb.ExternalJobID, + JobID: spec.ID, + ContractID: spec.ContractID, + RelayConfig: spec.RelayConfig.Bytes(), + New: d.isNewlyCreatedJob, + }, + types.PluginArgs{ + TransmitterID: spec.TransmitterID.String, + PluginConfig: spec.PluginConfig.Bytes(), + }, + lggr.Named(relayerName), + d.ethKs, + pluginType, + ) + } + + functionsProvider, err := createPluginProvider(functionsRelay.FunctionsPlugin, "FunctionsRelayer") + if err != nil { + return nil, err + } + + s4Provider, err := createPluginProvider(functionsRelay.S4Plugin, "FunctionsS4Relayer") if err != nil { return nil, err } @@ -986,6 +1002,21 @@ func (d *Delegate) newServicesOCR2Functions( ReportingPluginFactory: nil, // To be set by NewFunctionsServices } + s4OracleArgs := libocr2.OCR2OracleArgs{ + BinaryNetworkEndpointFactory: d.peerWrapper.Peer2, + V2Bootstrappers: bootstrapPeers, + ContractTransmitter: s4Provider.ContractTransmitter(), + ContractConfigTracker: s4Provider.ContractConfigTracker(), + Database: s4OcrDB, + LocalConfig: lc, + Logger: ocrLogger, + MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2S4), + OffchainConfigDigester: s4Provider.OffchainConfigDigester(), + OffchainKeyring: kb, + OnchainKeyring: kb, + ReportingPluginFactory: nil, // To be set by NewFunctionsServices + } + encryptedThresholdKeyShare := d.cfg.Threshold().ThresholdKeyShare() var thresholdKeyShare []byte if len(encryptedThresholdKeyShare) > 0 { @@ -1014,7 +1045,7 @@ func (d *Delegate) newServicesOCR2Functions( ThresholdKeyShare: thresholdKeyShare, } - functionsServices, err := functions.NewFunctionsServices(&functionsOracleArgs, nil, &functionsServicesConfig) + functionsServices, err := functions.NewFunctionsServices(&functionsOracleArgs, nil, &s4OracleArgs, &functionsServicesConfig) if err != nil { return nil, errors.Wrap(err, "error calling NewFunctionsServices") } @@ -1030,7 +1061,7 @@ func (d *Delegate) newServicesOCR2Functions( d.cfg.JobPipeline().MaxSuccessfulRuns(), ) - return append([]job.ServiceCtx{runResultSaver, functionsProvider}, functionsServices...), nil + return append([]job.ServiceCtx{runResultSaver, functionsProvider, s4Provider}, functionsServices...), nil } // errorLog implements [loop.ErrorLog] diff --git a/core/services/ocr2/plugins/functions/config/config.go b/core/services/ocr2/plugins/functions/config/config.go index 308b21a74be..492f869565a 100644 --- a/core/services/ocr2/plugins/functions/config/config.go +++ b/core/services/ocr2/plugins/functions/config/config.go @@ -8,8 +8,11 @@ import ( decryptionPluginConfig "github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin/config" + "github.com/smartcontractkit/libocr/offchainreporting2/types" + "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector" "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/functions" + s4PluginConfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/s4" "github.com/smartcontractkit/chainlink/v2/core/services/s4" ) @@ -101,3 +104,29 @@ func (ThresholdConfigParser) ParseConfig(config []byte) (*decryptionPluginConfig }, }, nil } + +func S4ConfigDecoder(config []byte) (*s4PluginConfig.PluginConfig, *types.ReportingPluginLimits, error) { + reportingPluginConfigWrapper, err := DecodeReportingPluginConfig(config) + if err != nil { + return nil, nil, errors.New("failed to decode S4 plugin config") + } + + pluginConfig := reportingPluginConfigWrapper.Config.S4PluginConfig + if pluginConfig == nil { + return nil, nil, fmt.Errorf("PluginConfig bytes %x did not contain s4 plugin config", config) + } + + return &s4PluginConfig.PluginConfig{ + ProductName: "functions", + NSnapshotShards: uint(pluginConfig.NSnapshotShards), + MaxObservationEntries: uint(pluginConfig.MaxObservationEntries), + MaxReportEntries: uint(pluginConfig.MaxReportEntries), + MaxDeleteExpiredEntries: uint(pluginConfig.MaxDeleteExpiredEntries), + }, + &types.ReportingPluginLimits{ + MaxQueryLength: int(pluginConfig.MaxQueryLengthBytes), + MaxObservationLength: int(pluginConfig.MaxObservationLengthBytes), + MaxReportLength: int(pluginConfig.MaxReportLengthBytes), + }, + nil +} diff --git a/core/services/ocr2/plugins/functions/config/config_test.go b/core/services/ocr2/plugins/functions/config/config_test.go new file mode 100644 index 00000000000..ee23a2fc1e9 --- /dev/null +++ b/core/services/ocr2/plugins/functions/config/config_test.go @@ -0,0 +1,41 @@ +package config_test + +import ( + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestS4ConfigDecoder(t *testing.T) { + t.Parallel() + + configProto := &config.ReportingPluginConfig{ + S4PluginConfig: &config.S4ReportingPluginConfig{ + MaxQueryLengthBytes: 100, + MaxObservationLengthBytes: 200, + MaxReportLengthBytes: 300, + NSnapshotShards: 1, + MaxObservationEntries: 111, + MaxReportEntries: 222, + MaxDeleteExpiredEntries: 333, + }, + } + + configBytes, err := proto.Marshal(configProto) + require.NoError(t, err) + + config, limits, err := config.S4ConfigDecoder(configBytes) + require.NoError(t, err) + assert.Equal(t, "functions", config.ProductName) + assert.Equal(t, uint(1), config.NSnapshotShards) + assert.Equal(t, uint(111), config.MaxObservationEntries) + assert.Equal(t, uint(222), config.MaxReportEntries) + assert.Equal(t, uint(333), config.MaxDeleteExpiredEntries) + assert.Equal(t, 100, limits.MaxQueryLength) + assert.Equal(t, 200, limits.MaxObservationLength) + assert.Equal(t, 300, limits.MaxReportLength) +} diff --git a/core/services/ocr2/plugins/functions/plugin.go b/core/services/ocr2/plugins/functions/plugin.go index 897156aa424..a645a9c5c66 100644 --- a/core/services/ocr2/plugins/functions/plugin.go +++ b/core/services/ocr2/plugins/functions/plugin.go @@ -24,6 +24,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/ethkey" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config" + s4_plugin "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/s4" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/s4" @@ -52,7 +53,7 @@ const ( ) // Create all OCR2 plugin Oracles and all extra services needed to run a Functions job. -func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs *libocr2.OCR2OracleArgs, conf *FunctionsServicesConfig) ([]job.ServiceCtx, error) { +func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs, s4OracleArgs *libocr2.OCR2OracleArgs, conf *FunctionsServicesConfig) ([]job.ServiceCtx, error) { pluginORM := functions.NewORM(conf.DB, conf.Logger, conf.QConfig, common.HexToAddress(conf.ContractID)) s4ORM := s4.NewPostgresORM(conf.DB, conf.Logger, conf.QConfig, s4.SharedTableName, FunctionsS4Namespace) @@ -139,6 +140,21 @@ func NewFunctionsServices(functionsOracleArgs, thresholdOracleArgs *libocr2.OCR2 listenerLogger.Warn("No GatewayConnectorConfig, S4Constraints or OnchainAllowlist is found in the plugin config, GatewayConnector will not be enabled") } + if s4OracleArgs != nil { + s4OracleArgs.ReportingPluginFactory = s4_plugin.S4ReportingPluginFactory{ + Logger: s4OracleArgs.Logger, + ORM: s4ORM, + ConfigDecoder: config.S4ConfigDecoder, + } + s4ReportingPluginOracle, err := libocr2.NewOracle(*s4OracleArgs) + if err != nil { + return nil, errors.Wrap(err, "failed to call NewOracle to create a S4 Reporting Plugin") + } + allServices = append(allServices, job.NewServiceAdapter(s4ReportingPluginOracle)) + } else { + listenerLogger.Warn("s4OracleArgs is nil. S4 plugin is disabled.") + } + return allServices, nil } diff --git a/core/services/synchronization/common.go b/core/services/synchronization/common.go index 6684d453490..f6d3729b45e 100644 --- a/core/services/synchronization/common.go +++ b/core/services/synchronization/common.go @@ -10,6 +10,7 @@ const ( OCR TelemetryType = "ocr" OCR2Automation TelemetryType = "ocr2-automation" OCR2Functions TelemetryType = "ocr2-functions" + OCR2S4 TelemetryType = "ocr2-s4" OCR2Median TelemetryType = "ocr2-median" OCR3Mercury TelemetryType = "ocr3-mercury" OCR2VRF TelemetryType = "ocr2-vrf"