Skip to content

Commit

Permalink
Add jfpc cache staleness alert to config and pass it into jfpc cache (#…
Browse files Browse the repository at this point in the history
…12595)

* Add jfpc cache staleness alert to config and pass it into jfpc cache

* Extract JuelsPerFeeCoinCache cfg into a separate struct, improve naming

* Changeset

* Update config validation test

* Reduce changeset to minor
  • Loading branch information
ilija42 committed Apr 22, 2024
1 parent 344ff06 commit 2d2a428
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 45 deletions.
6 changes: 6 additions & 0 deletions .changeset/poor-masks-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chainlink": minor
---

Move JuelsPerFeeCoinCacheDuration under JuelsPerFeeCoinCache struct in config. Rename JuelsPerFeeCoinCacheDuration to updateInterval. Add stalenessAlertThreshold to JuelsPerFeeCoinCache config.
StalenessAlertThreshold cfg option has a default of 24 hours which means that it doesn't have to be set unless we want to override the duration after which a stale cache should start throwing errors.
6 changes: 4 additions & 2 deletions core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, blockBeforeConfig.Number().Int64(), chainReaderSpec, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down Expand Up @@ -840,7 +841,8 @@ juelsPerFeeCoinSource = """
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`, ocrContractAddress, kbs[i].ID(), transmitters[i], fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i, fmt.Sprintf("bridge%d", i), i, slowServers[i].URL, i), nil)
require.NoError(t, err)
err = apps[i].AddJobV2(testutils.Context(t), &ocrJob)
Expand Down
9 changes: 6 additions & 3 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "1m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "1m"
`
const BootstrapTestSpecTemplate = `
type = "bootstrap"
Expand Down Expand Up @@ -2189,7 +2190,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "30s"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "30s"
`
defn2 = `
name = 'LINK / ETH | version 3 | contract 0x0000000000000000000000000000000000000000'
Expand Down Expand Up @@ -2219,7 +2221,8 @@ ds1_multiply [type=multiply times=1.23];
ds1 -> ds1_parse -> ds1_multiply -> answer1;
answer1 [type=median index=0];
"""
juelsPerFeeCoinCacheDuration = "20m"
[pluginConfig.juelsPerFeeCoinCache]
updateInterval = "20m"
`

jp = &feeds.JobProposal{
Expand Down
24 changes: 15 additions & 9 deletions core/services/ocr2/plugins/median/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import (

// The PluginConfig struct contains the custom arguments needed for the Median plugin.
type PluginConfig struct {
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
JuelsPerFeeCoinCacheDuration models.Interval `json:"juelsPerFeeCoinCacheDuration"`
JuelsPerFeeCoinCacheDisabled bool `json:"juelsPerFeeCoinCacheDisabled"`
JuelsPerFeeCoinPipeline string `json:"juelsPerFeeCoinSource"`
// JuelsPerFeeCoinCache is disabled when nil
JuelsPerFeeCoinCache *JuelsPerFeeCoinCache `json:"juelsPerFeeCoinCache"`
}

type JuelsPerFeeCoinCache struct {
UpdateInterval models.Interval `json:"updateInterval"`
StalenessAlertThreshold models.Interval `json:"stalenessAlertThreshold"`
}

// ValidatePluginConfig validates the arguments for the Median plugin.
Expand All @@ -25,12 +30,13 @@ func ValidatePluginConfig(config PluginConfig) error {
return errors.Wrap(err, "invalid juelsPerFeeCoinSource pipeline")
}

// unset duration defaults later
if config.JuelsPerFeeCoinCacheDuration != 0 {
if config.JuelsPerFeeCoinCacheDuration.Duration() < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is below 30 second minimum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
} else if config.JuelsPerFeeCoinCacheDuration.Duration() > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSource cache duration: %s is above 20 minute maximum", config.JuelsPerFeeCoinCacheDuration.Duration().String())
// unset durations have a default set late
if config.JuelsPerFeeCoinCache != nil {
updateInterval := config.JuelsPerFeeCoinCache.UpdateInterval.Duration()
if updateInterval != 0 && updateInterval < time.Second*30 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is below 30 second minimum", updateInterval.String())
} else if updateInterval > time.Minute*20 {
return errors.Errorf("juelsPerFeeCoinSourceCache update interval: %s is above 20 minute maximum", updateInterval.String())
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/services/ocr2/plugins/median/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func TestValidatePluginConfig(t *testing.T) {

t.Run("cache duration validation", func(t *testing.T) {
for _, tc := range []testCase{
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSource cache duration: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSource cache duration: 20m1s is above 20 minute maximum")},
{"cache duration below minimum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Second * 29), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 29s is below 30 second minimum")},
{"cache duration above maximum", `ds1 [type=bridge name=voter_turnout];`, models.Interval(time.Minute*20 + time.Second), fmt.Errorf("juelsPerFeeCoinSourceCache update interval: 20m1s is above 20 minute maximum")},
} {
t.Run(tc.name, func(t *testing.T) {
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCacheDuration: tc.cacheDuration}), tc.expectedError.Error())
assert.EqualError(t, ValidatePluginConfig(PluginConfig{JuelsPerFeeCoinPipeline: tc.pipeline, JuelsPerFeeCoinCache: &JuelsPerFeeCoinCache{UpdateInterval: tc.cacheDuration}}), tc.expectedError.Error())
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ func NewMedianServices(ctx context.Context,
CreatedAt: time.Now(),
}, lggr)

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
if pluginConfig.JuelsPerFeeCoinCache != nil {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration())
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, *pluginConfig.JuelsPerFeeCoinCache)
if err2 != nil {
return nil, err2
}
Expand Down
54 changes: 31 additions & 23 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"

"github.com/smartcontractkit/chainlink/v2/core/utils"
Expand Down Expand Up @@ -103,8 +104,8 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const defaultUpdateInterval = time.Minute * 5
const defaultStalenessAlertThreshold = time.Hour * 24
const dataSourceCacheKey = "dscache"

type DataSourceCacheService interface {
Expand All @@ -113,22 +114,27 @@ type DataSourceCacheService interface {
median.DataSource
}

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) {
func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheCfg config.JuelsPerFeeCoinCache) (DataSourceCacheService, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
}

if cacheFreshness == 0 {
cacheFreshness = defaultCacheFreshness
updateInterval, stalenessAlertThreshold := cacheCfg.UpdateInterval.Duration(), cacheCfg.StalenessAlertThreshold.Duration()
if updateInterval == 0 {
updateInterval = defaultUpdateInterval
}
if stalenessAlertThreshold == 0 {
stalenessAlertThreshold = defaultStalenessAlertThreshold
}

dsCache := &inMemoryDataSourceCache{
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
inMemoryDataSource: inMemoryDS,
kvStore: kvStore,
updateInterval: updateInterval,
stalenessAlertThreshold: stalenessAlertThreshold,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
return dsCache, nil
}
Expand Down Expand Up @@ -231,16 +237,18 @@ func (ds *inMemoryDataSource) Observe(ctx context.Context, timestamp ocr2types.R
// If cache update is overdue Observe defaults to standard inMemoryDataSource behaviour.
type inMemoryDataSourceCache struct {
*inMemoryDataSource
// cacheFreshness indicates duration between cache updates.
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
// updateInterval indicates duration between cache updates.
// Even if update fail, previous values are returned.
updateInterval time.Duration
// stalenessAlertThreshold indicates duration before logs raise severity level because of stale cache.
stalenessAlertThreshold time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

func (ds *inMemoryDataSourceCache) Start(context.Context) error {
Expand All @@ -256,7 +264,7 @@ func (ds *inMemoryDataSourceCache) Close() error {

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheFreshness)
ticker := time.NewTicker(ds.updateInterval)
updateCache := func() {
ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10))
defer cancel()
Expand Down Expand Up @@ -356,8 +364,8 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
}

if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand Down
8 changes: 5 additions & 3 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/job/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/median/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
pipelinemocks "github.com/smartcontractkit/chainlink/v2/core/services/pipeline/mocks"
"github.com/smartcontractkit/chainlink/v2/core/store/models"
)

var (
Expand Down Expand Up @@ -78,7 +80,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Second * 2)})
require.NoError(t, err)
servicetest.Run(t, dsCache)

Expand Down Expand Up @@ -112,7 +114,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(result, nil)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand All @@ -131,7 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.Anything).Return(nil, assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, config.JuelsPerFeeCoinCache{UpdateInterval: models.Interval(time.Hour * 100)})
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)
Expand Down

0 comments on commit 2d2a428

Please sign in to comment.