diff --git a/.changeset/poor-masks-fold.md b/.changeset/poor-masks-fold.md new file mode 100644 index 00000000000..1564aa0791f --- /dev/null +++ b/.changeset/poor-masks-fold.md @@ -0,0 +1,6 @@ +--- +"chainlink": minor +--- + +Move JuelsPerFeeCoinCacheDuration under JuelsPerFeeCoinCache struct in config. Rename JuelsPerFeeCoinCacheDuration to updateInterval. Add stalenessAlertThreshold to JuelsPerFeeCoinCache config. +StalenessAlertThreshold cfg option has a default of 24 hours which means that it doesn't have to be set unless we want to override the duration after which a stale cache should start throwing errors. diff --git a/core/internal/features/ocr2/features_ocr2_test.go b/core/internal/features/ocr2/features_ocr2_test.go index 216ca272b1b..1273608a203 100644 --- a/core/internal/features/ocr2/features_ocr2_test.go +++ b/core/internal/features/ocr2/features_ocr2_test.go @@ -487,7 +487,8 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" `, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) @@ -840,7 +841,8 @@ juelsPerFeeCoinSource = """ answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" `, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil) require.NoError(t, err) err = apps[i].AddJobV2(testutils.Context(t), &ocrJob) diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 5283b03affe..dcb57e5fd3c 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -124,7 +124,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "1m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "1m" ` const BootstrapTestSpecTemplate = ` type = "bootstrap" @@ -2189,7 +2190,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "30s" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "30s" ` defn2 = ` name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000' @@ -2219,7 +2221,8 @@ ds1_multiply [type=multiply times=1.23]; ds1 -> ds1_parse -> ds1_multiply -> answer1; answer1 [type=median index=0]; """ -juelsPerFeeCoinCacheDuration = "20m" +[pluginConfig.juelsPerFeeCoinCache] +updateInterval = "20m" ` jp = &feeds.JobProposal{ diff --git a/core/services/ocr2/plugins/median/config/config.go b/core/services/ocr2/plugins/median/config/config.go index 218f7491094..9cfda641552 100644 --- a/core/services/ocr2/plugins/median/config/config.go +++ b/core/services/ocr2/plugins/median/config/config.go @@ -14,9 +14,14 @@ import ( // The PluginConfig struct contains the custom arguments needed for the Median plugin. type PluginConfig struct { - JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"` - JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"` - JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"` + JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"` + // JuelsPerFeeCoinCache is disabled when nil + JuelsPerFeeCoinCache *JuelsPerFeeCoinCache `json:"juelsPerFeeCoinCache"` +} + +type JuelsPerFeeCoinCache struct { + UpdateInterval models.Interval `json:"updateInterval"` + StalenessAlertThreshold models.Interval `json:"stalenessAlertThreshold"` } // ValidatePluginConfig validates the arguments for the Median plugin. @@ -25,12 +30,13 @@ func ValidatePluginConfig(config PluginConfig) error { return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline") } - // unset duration defaults later - if config.JuelsPerFeeCoinCacheDuration != 0 { - if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 { - return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String()) - } else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 { - return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String()) + // unset durations have a default set late + if config.JuelsPerFeeCoinCache != nil { + updateInterval := config.JuelsPerFeeCoinCache.UpdateInterval.Duration() + if updateInterval != 0 && updateInterval < time.Second*30 { + return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is below 30 second minimum", updateInterval.String()) + } else if updateInterval > time.Minute*20 { + return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is above 20 minute maximum", updateInterval.String()) } } diff --git a/core/services/ocr2/plugins/median/config/config_test.go b/core/services/ocr2/plugins/median/config/config_test.go index e0dcd4f9203..e54b59215bd 100644 --- a/core/services/ocr2/plugins/median/config/config_test.go +++ b/core/services/ocr2/plugins/median/config/config_test.go @@ -32,11 +32,11 @@ func TestValidatePluginConfig(t *testing.T) { t.Run("cache duration validation", func(t *testing.T) { for _, tc := range []testCase{ - {"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")}, - {"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")}, + {"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 29s is below 30 second minimum")}, + {"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 20m1s is above 20 minute maximum")}, } { t.Run(tc.name, func(t *testing.T) { - assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error()) + assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCache: &JuelsPerFeeCoinCache{UpdateInterval: tc.cacheDuration}}), tc.expectedError.Error()) }) } }) diff --git a/core/services/ocr2/plugins/median/services.go b/core/services/ocr2/plugins/median/services.go index 4615f934511..779ea4f346c 100644 --- a/core/services/ocr2/plugins/median/services.go +++ b/core/services/ocr2/plugins/median/services.go @@ -127,9 +127,9 @@ func NewMedianServices(ctx context.Context, CreatedAt: time.Now(), }, lggr) - if !pluginConfig.JuelsPerFeeCoinCacheDisabled { + if pluginConfig.JuelsPerFeeCoinCache != nil { lggr.Infof("juelsPerFeeCoin data source caching is enabled") - juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()) + juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, *pluginConfig.JuelsPerFeeCoinCache) if err2 != nil { return nil, err2 } diff --git a/core/services/ocrcommon/data_source.go b/core/services/ocrcommon/data_source.go index e90382a06af..f07cfc0ab7a 100644 --- a/core/services/ocrcommon/data_source.go +++ b/core/services/ocrcommon/data_source.go @@ -19,6 +19,7 @@ import ( serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" "github.com/smartcontractkit/chainlink/v2/core/utils" @@ -103,8 +104,8 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l } } -const defaultCacheFreshness = time.Minute * 5 -const defaultCacheFreshnessAlert = time.Hour * 24 +const defaultUpdateInterval = time.Minute * 5 +const defaultStalenessAlertThreshold = time.Hour * 24 const dataSourceCacheKey = "dscache" type DataSourceCacheService interface { @@ -113,22 +114,27 @@ type DataSourceCacheService interface { median.DataSource } -func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) { +func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheCfg config.JuelsPerFeeCoinCache) (DataSourceCacheService, error) { inMemoryDS, ok := ds.(*inMemoryDataSource) if !ok { return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds) } - if cacheFreshness == 0 { - cacheFreshness = defaultCacheFreshness + updateInterval, stalenessAlertThreshold := cacheCfg.UpdateInterval.Duration(), cacheCfg.StalenessAlertThreshold.Duration() + if updateInterval == 0 { + updateInterval = defaultUpdateInterval + } + if stalenessAlertThreshold == 0 { + stalenessAlertThreshold = defaultStalenessAlertThreshold } dsCache := &inMemoryDataSourceCache{ - kvStore: kvStore, - cacheFreshness: cacheFreshness, - inMemoryDataSource: inMemoryDS, - chStop: make(chan struct{}), - chDone: make(chan struct{}), + inMemoryDataSource: inMemoryDS, + kvStore: kvStore, + updateInterval: updateInterval, + stalenessAlertThreshold: stalenessAlertThreshold, + chStop: make(chan struct{}), + chDone: make(chan struct{}), } return dsCache, nil } @@ -231,16 +237,18 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R // If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour. type inMemoryDataSourceCache struct { *inMemoryDataSource - // cacheFreshness indicates duration between cache updates. - // Even if updates fail, previous values are returned. - cacheFreshness time.Duration - mu sync.RWMutex - chStop services.StopChan - chDone chan struct{} - latestUpdateErr error - latestTrrs pipeline.TaskRunResults - latestResult pipeline.FinalResult - kvStore job.KVStore + // updateInterval indicates duration between cache updates. + // Even if update fail, previous values are returned. + updateInterval time.Duration + // stalenessAlertThreshold indicates duration before logs raise severity level because of stale cache. + stalenessAlertThreshold time.Duration + mu sync.RWMutex + chStop services.StopChan + chDone chan struct{} + latestUpdateErr error + latestTrrs pipeline.TaskRunResults + latestResult pipeline.FinalResult + kvStore job.KVStore } func (ds *inMemoryDataSourceCache) Start(context.Context) error { @@ -256,7 +264,7 @@ func (ds *inMemoryDataSourceCache) Close() error { // updater periodically updates data source cache. func (ds *inMemoryDataSourceCache) updater() { - ticker := time.NewTicker(ds.cacheFreshness) + ticker := time.NewTicker(ds.updateInterval) updateCache := func() { ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10)) defer cancel() @@ -356,8 +364,8 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err) } - if time.Since(resTime.Time) >= defaultCacheFreshnessAlert { - ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr) + if time.Since(resTime.Time) >= ds.stalenessAlertThreshold { + ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr) } return resTime.Result.ToInt(), nil } diff --git a/core/services/ocrcommon/data_source_test.go b/core/services/ocrcommon/data_source_test.go index b9b19d30bf2..05ba0f4aa42 100644 --- a/core/services/ocrcommon/data_source_test.go +++ b/core/services/ocrcommon/data_source_test.go @@ -21,9 +21,11 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/job/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks" + "github.com/smartcontractkit/chainlink/v2/core/store/models" ) var ( @@ -78,7 +80,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore := mocks.KVStore{} mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil) mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil) - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Second * 2)}) require.NoError(t, err) servicetest.Run(t, dsCache) @@ -112,7 +114,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil) // set updater to a long time so that it doesn't log errors after the test is done - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)}) require.NoError(t, err) changeResultValue(runner, "-1", true, false) servicetest.Run(t, dsCache) @@ -131,7 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) { mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError) // set updater to a long time so that it doesn't log errors after the test is done - dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100) + dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)}) require.NoError(t, err) changeResultValue(runner, "-1", true, false) servicetest.Run(t, dsCache)