Skip to content

Commit

Permalink
Add stale jfcp cache error log and lower other logs to warn
Browse files Browse the repository at this point in the history
  • Loading branch information
ilija42 committed Mar 26, 2024
1 parent a0335c6 commit 8c2288a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilly-garlics-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Fix error log formatting for in memory data source cache for juels fee per coin
5 changes: 5 additions & 0 deletions .changeset/shaggy-pots-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add error log if juels fee per coin cache is over 24h old and lower other logs severity in cache to warn
22 changes: 17 additions & 5 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
Expand Down Expand Up @@ -242,6 +243,11 @@ func (ds *inMemoryDataSourceCache) updater() {
}
}

type ResultTimePair struct {
Result serializablebig.Big `json:"result"`
Time time.Time `json:"time"`
}

func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()
Expand All @@ -257,7 +263,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.latestUpdateErr = latestUpdateErr
// raise log severity
if previousUpdateErr != nil {
ds.lggr.Errorf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}
Expand All @@ -270,7 +276,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil {
if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil {
ds.lggr.Errorf("failed to persist latest task run value, err: %v", err)
}

Expand All @@ -296,11 +302,17 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
var val serializablebig.Big
var resTime ResultTimePair
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Errorf("cache is empty, returning persisted value now")
return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val)
ds.lggr.Warnf("cache is empty, returning persisted value now")
if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil {
return nil, err
}
if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}

setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Expand Down
12 changes: 6 additions & 6 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)

Expand All @@ -98,14 +98,14 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
})

t.Run("test total updater fail with persisted value recovery", func(t *testing.T) {
persistedVal := big.NewInt(1337)
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*serializablebig.Big)
arg.ToInt().Set(persistedVal)
persistedVal := serializablebig.NewI(1337)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*ocrcommon.ResultTimePair)
arg.Result = *persistedVal
})

// set updater to a long time so that it doesn't log errors after the test is done
Expand All @@ -125,7 +125,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand Down

0 comments on commit 8c2288a

Please sign in to comment.