Skip to content

Commit

Permalink
Preparation for Threshold plugin integration with Functions (#9670)
Browse files Browse the repository at this point in the history
* squashing all commits

* integrated changes from #9677

* fixed lint error

* fixed lint error

* fixed proto name conflict

* ran protoc

* addressed feedback

* Addressed feedback

* Added decryptionQueueConfig to integration test jobspec

* Fix bugs from merge conflict resolution
  • Loading branch information
KuphJr authored Jun 29, 2023
1 parent 369ce8c commit 367196e
Show file tree
Hide file tree
Showing 21 changed files with 580 additions and 247 deletions.
4 changes: 2 additions & 2 deletions core/config/v2/docs/secrets.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ Password = "A-Mercury-Password" # Example
URL = "https://example.com" # Example

[Threshold]
# ThresholdDecryptionKeyShare used by the threshold decryption OCR plugin
ThresholdDecryptionKeyShare = "A-Threshold-Decryption-Key-Share" # Example
# ThresholdKeyShare used by the threshold decryption OCR plugin
ThresholdKeyShare = "A-Threshold-Decryption-Key-Share" # Example
2 changes: 2 additions & 0 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ require (
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230620171700-bbcb3a99b7d3 // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230612131011-369bfb503592 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20230622060316-7ce48476dd7d // indirect
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230616141813-ca0ecf03ca5c // indirect
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230616062456-5c32c95fc166 // indirect
github.com/smartcontractkit/wsrpc v0.7.2 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,10 @@ github.com/smartcontractkit/ocr2vrf v0.0.0-20230616201444-d8b4222aff3c h1:BX1ibM
github.com/smartcontractkit/ocr2vrf v0.0.0-20230616201444-d8b4222aff3c/go.mod h1:AT1OrDDOCd8vzmMsCnA70N+tZdq9AbdZAiAw+gQq260=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb h1:OMaBUb4X9IFPLbGbCHsMU+kw/BPCrewaVwWGIBc0I4A=
github.com/smartcontractkit/sqlx v1.3.5-0.20210805004948-4be295aacbeb/go.mod h1:HNUu4cJekUdsJbwRBCiOybtkPJEfGRELQPe2tkoDEyk=
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230616141813-ca0ecf03ca5c h1:B7jWegIHCHXY32qWGwlNalrJYSz4uZh5zAgd2rQ3Iyc=
github.com/smartcontractkit/tdh2/go/ocr2/decryptionplugin v0.0.0-20230616141813-ca0ecf03ca5c/go.mod h1:q6f4fe39oZPdsh1i57WznEZgxd8siidMaSFq3wdPmVg=
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230616062456-5c32c95fc166 h1:cNH0nQjRfmWj173L8exDkQratcFVQ8AAj8RZ8a+suaI=
github.com/smartcontractkit/tdh2/go/tdh2 v0.0.0-20230616062456-5c32c95fc166/go.mod h1:G5Sd/yzHWf26rQ+X0nG9E0buKPqRGPMJAfk2gwCzOOw=
github.com/smartcontractkit/wsrpc v0.7.2 h1:iBXzMeg7vc5YoezIQBq896y25BARw7OKbhrb6vPbtRQ=
github.com/smartcontractkit/wsrpc v0.7.2/go.mod h1:sj7QX2NQibhkhxTfs3KOhAj/5xwgqMipTvJVSssT9i0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
Expand Down
7 changes: 2 additions & 5 deletions core/services/functions/external_adapter_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package functions
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -80,7 +79,7 @@ type secretsPayload struct {

type secretsData struct {
RequestType string `json:"requestType"`
EncryptedSecretsUrls string `json:"encryptedSecretsUrls"`
EncryptedSecretsUrls []byte `json:"encryptedSecretsUrls"`
}

type response struct {
Expand Down Expand Up @@ -147,11 +146,9 @@ func (ea *externalAdapterClient) RunComputation(
}

func (ea *externalAdapterClient) FetchEncryptedSecrets(ctx context.Context, encryptedSecretsUrls []byte, requestId string, jobName string) (encryptedSecrets, userError []byte, err error) {
encodedSecretsUrls := base64.StdEncoding.EncodeToString(encryptedSecretsUrls)

data := secretsData{
RequestType: "fetchThresholdEncryptedSecrets",
EncryptedSecretsUrls: encodedSecretsUrls,
EncryptedSecretsUrls: encryptedSecretsUrls,
}

payload := secretsPayload{
Expand Down
37 changes: 35 additions & 2 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -128,13 +129,25 @@ type FunctionsListener struct {
logger logger.Logger
mailMon *utils.MailboxMonitor
urlsMonEndpoint commontypes.MonitoringEndpoint
decryptor threshold.Decryptor
}

func formatRequestId(requestId [32]byte) string {
return fmt.Sprintf("0x%x", requestId)
}

func NewFunctionsListener(oracle *ocr2dr_oracle.OCR2DROracle, job job.Job, bridgeAccessor BridgeAccessor, pluginORM ORM, pluginConfig config.PluginConfig, logBroadcaster log.Broadcaster, lggr logger.Logger, mailMon *utils.MailboxMonitor, urlsMonEndpoint commontypes.MonitoringEndpoint) *FunctionsListener {
func NewFunctionsListener(
oracle *ocr2dr_oracle.OCR2DROracle,
job job.Job,
bridgeAccessor BridgeAccessor,
pluginORM ORM,
pluginConfig config.PluginConfig,
logBroadcaster log.Broadcaster,
lggr logger.Logger,
mailMon *utils.MailboxMonitor,
urlsMonEndpoint commontypes.MonitoringEndpoint,
decryptor threshold.Decryptor,
) *FunctionsListener {
return &FunctionsListener{
oracle: oracle,
oracleHexAddr: oracle.Address().Hex(),
Expand All @@ -148,6 +161,7 @@ func NewFunctionsListener(oracle *ocr2dr_oracle.OCR2DROracle, job job.Job, bridg
logger: lggr,
mailMon: mailMon,
urlsMonEndpoint: urlsMonEndpoint,
decryptor: decryptor,
}
}

Expand Down Expand Up @@ -352,7 +366,26 @@ func (l *FunctionsListener) handleRequest(ctx context.Context, requestID [32]byt
return
}

computationResult, computationError, domains, err := eaClient.RunComputation(ctx, requestIDStr, l.job.Name.ValueOrZero(), subscriptionOwner.Hex(), subscriptionId, "", requestData)
nodeProvidedSecrets := ""
if l.decryptor != nil && requestData.SecretsLocation == LocationRemote && len(requestData.Secrets) > 0 {
thresholdEncSecrets, userError, err2 := eaClient.FetchEncryptedSecrets(ctx, requestData.Secrets, requestIDStr, l.job.Name.ValueOrZero())
if err2 != nil {
l.logger.Errorw("failed to fetch encrypted secrets", "requestID", requestIDStr, "err", err2)
}
if len(userError) != 0 {
l.logger.Debugw("no valid threshold encrypted secrets detected - falling back to legacy secrets", "requestID", requestIDStr, "err", string(userError))
}
if len(thresholdEncSecrets) != 0 {
decryptedSecrets, err2 := l.decryptor.Decrypt(ctx, []byte(requestIDStr), thresholdEncSecrets)
if err2 != nil {
l.logger.Debugw("threshold decryption of user secrets failed", "requestID", requestIDStr, "err", err2)
} else {
nodeProvidedSecrets = string(decryptedSecrets)
}
}
}

computationResult, computationError, domains, err := eaClient.RunComputation(ctx, requestIDStr, l.job.Name.ValueOrZero(), subscriptionOwner.Hex(), subscriptionId, nodeProvidedSecrets, requestData)

if err != nil {
l.logger.Errorw("internal adapter error", "requestID", requestIDStr, "err", err)
Expand Down
42 changes: 41 additions & 1 deletion core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
functions_mocks "github.com/smartcontractkit/chainlink/v2/core/services/functions/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
threshold_mocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/services/srvctest"
Expand All @@ -46,6 +47,7 @@ type FunctionsListenerUniverse struct {
pluginORM *functions_mocks.ORM
logBroadcaster *log_mocks.Broadcaster
ingressClient *sync_mocks.TelemetryIngressClient
decryptor *threshold_mocks.Decryptor
}

func ptr[T any](t T) *T { return &t }
Expand All @@ -58,6 +60,9 @@ var (
ResultBytes = []byte{0xab, 0xcd}
ErrorBytes = []byte{0xff, 0x11}
Domains = []string{"github.com", "google.com"}
EncryptedSecretsUrls []byte = []byte{0x11, 0x22}
EncryptedSecrets []byte = []byte(`{"TDH2Ctxt":"eyJHcm","SymCtxt":"+yHR","Nonce":"kgjHyT3Jar0M155E"}`)
DecryptedSecrets []byte = []byte(`{"0x0":"lhcK"}`)
)

func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySec int) *FunctionsListenerUniverse {
Expand Down Expand Up @@ -94,6 +99,7 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe
}
eaClient := functions_mocks.NewExternalAdapterClient(t)
bridgeAccessor := functions_mocks.NewBridgeAccessor(t)
decryptor := threshold_mocks.NewDecryptor(t)

var pluginConfig config.PluginConfig
err := json.Unmarshal(jsonConfig.Bytes(), &pluginConfig)
Expand All @@ -106,7 +112,7 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe
ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient)
monEndpoint := ingressAgent.GenMonitoringEndpoint("0xa", synchronization.FunctionsRequests)

functionsListener := functions_service.NewFunctionsListener(oracleContract, jb, bridgeAccessor, pluginORM, pluginConfig, broadcaster, lggr, mailMon, monEndpoint)
functionsListener := functions_service.NewFunctionsListener(oracleContract, jb, bridgeAccessor, pluginORM, pluginConfig, broadcaster, lggr, mailMon, monEndpoint, decryptor)

return &FunctionsListenerUniverse{
service: functionsListener,
Expand All @@ -115,6 +121,7 @@ func NewFunctionsListenerUniverse(t *testing.T, timeoutSec int, pruneFrequencySe
pluginORM: pluginORM,
logBroadcaster: broadcaster,
ingressClient: ingressClient,
decryptor: decryptor,
}
}

Expand Down Expand Up @@ -159,6 +166,39 @@ func TestFunctionsListener_HandleOracleRequestSuccess(t *testing.T) {
uni.service.Close()
}

func TestFunctionsListener_ThresholdDecryptedSecrets(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()

reqData := &struct {
SecretsLocation int `cbor:"secretsLocation"`
Secrets []byte `cbor:"secrets"`
}{
SecretsLocation: 1,
Secrets: EncryptedSecretsUrls,
}
cborBytes, err := cbor.Marshal(reqData)
require.NoError(t, err)
// Remove first byte (map header) to make it "diet" CBOR
cborBytes = cborBytes[1:]

uni, log, doneCh := PrepareAndStartFunctionsListener(t, cborBytes)

uni.pluginORM.On("CreateRequest", RequestID, mock.Anything, mock.Anything, mock.Anything).Return(nil)
uni.logBroadcaster.On("MarkConsumed", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient").Return(uni.eaClient, nil)
uni.eaClient.On("FetchEncryptedSecrets", mock.Anything, mock.Anything, RequestIDStr, mock.Anything, mock.Anything).Return(EncryptedSecrets, nil, nil)
uni.decryptor.On("Decrypt", mock.Anything, []byte(RequestIDStr), EncryptedSecrets).Return(DecryptedSecrets, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, string(DecryptedSecrets), mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, mock.Anything, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

uni.service.HandleLog(log)
<-doneCh
uni.service.Close()
}

func TestFunctionsListener_HandleOracleRequestDuplicateMarkLogConsumed(t *testing.T) {
testutils.SkipShortDB(t)
t.Parallel()
Expand Down
75 changes: 44 additions & 31 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,9 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return d.newServicesOCR2Keepers(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case job.OCR2Functions:
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
thresholdPluginId := int32(1)
thresholdPluginDB := NewDB(d.db, spec.ID, thresholdPluginId, lggr, d.cfg.Database())
return d.newServicesOCR2Functions(lggr, jb, runResults, bootstrapPeers, kb, ocrDB, thresholdPluginDB, lc, ocrLogger)

default:
return nil, errors.Errorf("plugin type %s not supported", spec.PluginType)
Expand Down Expand Up @@ -929,19 +931,16 @@ func (d *Delegate) newServicesOCR2Functions(
runResults chan pipeline.Run,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
ocrDB *db,
functionsOcrDB *db,
thresholdOcrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
) ([]job.ServiceCtx, error) {
encryptedThresholdKeyShare := d.cfg.Threshold().ThresholdKeyShare()
if len(encryptedThresholdKeyShare) == 0 {
d.lggr.Warn("ThresholdKeyShare is empty")
}
spec := jb.OCR2OracleSpec
if spec.Relay != relay.EVM {
return nil, fmt.Errorf("unsupported relay: %s", spec.Relay)
}
functionsProvider, err2 := evmrelay.NewFunctionsProvider(
functionsProvider, err := evmrelay.NewFunctionsProvider(
d.chainSet,
types.RelayArgs{
ExternalJobID: jb.ExternalJobID,
Expand All @@ -958,26 +957,26 @@ func (d *Delegate) newServicesOCR2Functions(
d.ethKs,
functionsRelay.FunctionsPlugin,
)
if err2 != nil {
return nil, err2
if err != nil {
return nil, err
}

var relayConfig evmrelaytypes.RelayConfig
err2 = json.Unmarshal(spec.RelayConfig.Bytes(), &relayConfig)
if err2 != nil {
return nil, err2
err = json.Unmarshal(spec.RelayConfig.Bytes(), &relayConfig)
if err != nil {
return nil, err
}
chain, err2 := d.chainSet.Get(relayConfig.ChainID.ToInt())
if err2 != nil {
return nil, err2
chain, err := d.chainSet.Get(relayConfig.ChainID.ToInt())
if err != nil {
return nil, err
}

sharedOracleArgs := libocr2.OCR2OracleArgs{
functionsOracleArgs := libocr2.OCR2OracleArgs{
BinaryNetworkEndpointFactory: d.peerWrapper.Peer2,
V2Bootstrappers: bootstrapPeers,
ContractTransmitter: functionsProvider.ContractTransmitter(),
ContractConfigTracker: functionsProvider.ContractConfigTracker(),
Database: ocrDB,
Database: functionsOcrDB,
LocalConfig: lc,
Logger: ocrLogger,
MonitoringEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.OCR2Functions),
Expand All @@ -987,21 +986,35 @@ func (d *Delegate) newServicesOCR2Functions(
ReportingPluginFactory: nil, // To be set by NewFunctionsServices
}

encryptedThresholdKeyShare := d.cfg.Threshold().ThresholdKeyShare()
var thresholdKeyShare []byte
if len(encryptedThresholdKeyShare) > 0 {
encryptedThresholdKeyShareBytes, err2 := hex.DecodeString(encryptedThresholdKeyShare)
if err2 != nil {
return nil, errors.Wrap(err2, "failed to decode ThresholdKeyShare hex string")
}
thresholdKeyShare, err2 = kb.NaclBoxOpenAnonymous(encryptedThresholdKeyShareBytes)
if err2 != nil {
return nil, errors.Wrap(err2, "failed to decrypt ThresholdKeyShare")
}
}

functionsServicesConfig := functions.FunctionsServicesConfig{
Job: jb,
JobORM: d.jobORM,
BridgeORM: d.bridgeORM,
QConfig: d.cfg.Database(),
DB: d.db,
Chain: chain,
ContractID: spec.ContractID,
Logger: lggr,
MailMon: d.mailMon,
URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.FunctionsRequests),
EthKeystore: d.ethKs,
}

functionsServices, err := functions.NewFunctionsServices(&sharedOracleArgs, &functionsServicesConfig)
Job: jb,
JobORM: d.jobORM,
BridgeORM: d.bridgeORM,
QConfig: d.cfg.Database(),
DB: d.db,
Chain: chain,
ContractID: spec.ContractID,
Logger: lggr,
MailMon: d.mailMon,
URLsMonEndpoint: d.monitoringEndpointGen.GenMonitoringEndpoint(spec.ContractID, synchronization.FunctionsRequests),
EthKeystore: d.ethKs,
ThresholdKeyShare: thresholdKeyShare,
}

functionsServices, err := functions.NewFunctionsServices(&functionsOracleArgs, nil, &functionsServicesConfig)
if err != nil {
return nil, errors.Wrap(err, "error calling NewFunctionsServices")
}
Expand Down
Loading

0 comments on commit 367196e

Please sign in to comment.