Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/services/functions: switch to sqlutil.DataStore #12811

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/real-numbers-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/functions: switch to sqlutil.DataStore #internal
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

delegates[job.OffchainReporting2] = ocr2.NewDelegate(
sqlxDB,
opts.DB,
jobORM,
bridgeORM,
mercuryORM,
Expand Down
15 changes: 7 additions & 8 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
evmrelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
Expand Down Expand Up @@ -270,7 +269,7 @@ func (l *functionsListener) setError(ctx context.Context, requestId RequestID, e
promRequestComputationError.WithLabelValues(l.contractAddressHex).Inc()
}
readyForProcessing := errType != INTERNAL_ERROR
if err := l.pluginORM.SetError(requestId, errType, errBytes, time.Now(), readyForProcessing, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.SetError(ctx, requestId, errType, errBytes, time.Now(), readyForProcessing); err != nil {
l.logger.Errorw("call to SetError failed", "requestID", formatRequestId(requestId), "err", err)
}
}
Expand Down Expand Up @@ -321,7 +320,7 @@ func (l *functionsListener) HandleOffchainRequest(ctx context.Context, request *
CoordinatorContractAddress: &senderAddr,
OnchainMetadata: []byte(OffchainRequestMarker),
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.CreateRequest(ctx, newReq); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err)
} else {
Expand All @@ -348,7 +347,7 @@ func (l *functionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleR
CoordinatorContractAddress: &request.CoordinatorContract,
OnchainMetadata: request.OnchainMetadata,
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.CreateRequest(ctx, newReq); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("handleOracleRequestV1: received a log with duplicate request ID", "requestID", formatRequestId(request.RequestId), "err", err)
} else {
Expand Down Expand Up @@ -450,7 +449,7 @@ func (l *functionsListener) handleRequest(ctx context.Context, requestID Request
promRequestComputationSuccess.WithLabelValues(l.contractAddressHex).Inc()
promComputationResultSize.WithLabelValues(l.contractAddressHex).Set(float64(len(computationResult)))
l.logger.Debugw("saving computation result", "requestID", requestIDStr)
if err2 := l.pluginORM.SetResult(requestID, computationResult, time.Now(), pg.WithParentCtx(ctx)); err2 != nil {
if err2 := l.pluginORM.SetResult(ctx, requestID, computationResult, time.Now()); err2 != nil {
l.logger.Errorw("call to SetResult failed", "requestID", requestIDStr, "err", err2)
return err2
}
Expand All @@ -464,7 +463,7 @@ func (l *functionsListener) handleOracleResponseV1(response *evmrelayTypes.Oracl

ctx, cancel := l.getNewHandlerContext()
defer cancel()
if err := l.pluginORM.SetConfirmed(response.RequestId, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.SetConfirmed(ctx, response.RequestId); err != nil {
l.logger.Errorw("setting CONFIRMED state failed", "requestID", formatRequestId(response.RequestId), "err", err)
}
promRequestConfirmed.WithLabelValues(l.contractAddressHex).Inc()
Expand All @@ -486,7 +485,7 @@ func (l *functionsListener) timeoutRequests() {
case <-ticker.C:
cutoff := time.Now().Add(-(time.Duration(timeoutSec) * time.Second))
ctx, cancel := l.getNewHandlerContext()
ids, err := l.pluginORM.TimeoutExpiredResults(cutoff, batchSize, pg.WithParentCtx(ctx))
ids, err := l.pluginORM.TimeoutExpiredResults(ctx, cutoff, batchSize)
cancel()
if err != nil {
l.logger.Errorw("error when calling FindExpiredResults", "err", err)
Expand Down Expand Up @@ -531,7 +530,7 @@ func (l *functionsListener) pruneRequests() {
case <-ticker.C:
ctx, cancel := l.getNewHandlerContext()
startTime := time.Now()
nTotal, nPruned, err := l.pluginORM.PruneOldestRequests(maxStoredRequests, batchSize, pg.WithParentCtx(ctx))
nTotal, nPruned, err := l.pluginORM.PruneOldestRequests(ctx, maxStoredRequests, batchSize)
cancel()
elapsedMillis := time.Since(startTime).Milliseconds()
if err != nil {
Expand Down
24 changes: 11 additions & 13 deletions core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package functions_test

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -35,7 +36,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
threshold_mocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand All @@ -189,7 +189,7 @@ func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestFunctionsListener_HandleOffchainRequest_InternalError(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, errors.New("error"))
uni.pluginORM.On("SetError", RequestID, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetError", mock.Anything, RequestID, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(nil, ErrorBytes, nil, nil)
uni.pluginORM.On("SetError", RequestID, mock.Anything, ErrorBytes, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetError", mock.Anything, RequestID, mock.Anything, ErrorBytes, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand Down Expand Up @@ -307,7 +307,7 @@ func TestFunctionsListener_HandleOracleRequestV1_ThresholdDecryptedSecrets(t *te
uni.eaClient.On("FetchEncryptedSecrets", mock.Anything, mock.Anything, RequestIDStr, mock.Anything, mock.Anything).Return(EncryptedSecrets, nil, nil)
uni.decryptor.On("Decrypt", mock.Anything, decryptionPlugin.CiphertextId(RequestID[:]), EncryptedSecrets).Return(DecryptedSecrets, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand All @@ -333,7 +333,7 @@ func TestFunctionsListener_HandleOracleRequestV1_CBORTooBig(t *testing.T) {
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return([]types.OracleRequest{request}, nil, nil).Once()
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetError", RequestID, functions_service.USER_ERROR, []byte("request too big (max 10 bytes)"), mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetError", mock.Anything, RequestID, functions_service.USER_ERROR, []byte("request too big (max 10 bytes)"), mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand Down Expand Up @@ -361,7 +361,7 @@ func TestFunctionsListener_ReportSourceCodeDomains(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, Domains, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)
var sentMessage []byte
Expand All @@ -388,7 +388,7 @@ func TestFunctionsListener_PruneRequests(t *testing.T) {
uni := NewFunctionsListenerUniverse(t, 0, 1)
doneCh := make(chan bool)
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("PruneOldestRequests", functions_service.DefaultPruneMaxStoredRequests, functions_service.DefaultPruneBatchSize, mock.Anything).Return(uint32(0), uint32(0), nil).Run(func(args mock.Arguments) {
uni.pluginORM.On("PruneOldestRequests", mock.Anything, functions_service.DefaultPruneMaxStoredRequests, functions_service.DefaultPruneBatchSize, mock.Anything).Return(uint32(0), uint32(0), nil).Run(func(args mock.Arguments) {
doneCh <- true
})

Expand All @@ -403,7 +403,7 @@ func TestFunctionsListener_TimeoutRequests(t *testing.T) {
uni := NewFunctionsListenerUniverse(t, 1, 0)
doneCh := make(chan bool)
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("TimeoutExpiredResults", mock.Anything, uint32(1), mock.Anything).Return([]functions_service.RequestID{}, nil).Run(func(args mock.Arguments) {
uni.pluginORM.On("TimeoutExpiredResults", mock.Anything, mock.Anything, uint32(1), mock.Anything).Return([]functions_service.RequestID{}, nil).Run(func(args mock.Arguments) {
doneCh <- true
})

Expand All @@ -423,9 +423,7 @@ func TestFunctionsListener_ORMDoesNotFreezeHandlersForever(t *testing.T) {
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return([]types.OracleRequest{request}, nil, nil).Once()
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
var queryerWrapper pg.Q
args.Get(1).(pg.QOpt)(&queryerWrapper)
<-queryerWrapper.ParentCtx.Done()
<-args.Get(0).(context.Context).Done()
ormCallExited.Done()
}).Return(errors.New("timeout"))

Expand Down
Loading
Loading