Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix in memory data source cache error logging and add err log when cache is over 24h old #12586

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chilly-garlics-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Fix error log formatting for in memory data source cache for juels fee per coin
5 changes: 5 additions & 0 deletions .changeset/shaggy-pots-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Add error log if juels fee per coin cache is over 24h old and lower other logs severity in cache to warn
30 changes: 21 additions & 9 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func NewInMemoryDataSource(pr pipeline.Runner, jb job.Job, spec pipeline.Spec, l
}

const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
Expand Down Expand Up @@ -158,7 +159,7 @@ func (ds *inMemoryDataSource) currentAnswer() (*big.Int, *big.Int) {
func (ds *inMemoryDataSource) executeRun(ctx context.Context) (*pipeline.Run, pipeline.TaskRunResults, error) {
md, err := bridges.MarshalBridgeMetaData(ds.currentAnswer())
if err != nil {
ds.lggr.Warnw("unable to attach metadata for run", "err", err)
ds.lggr.Warnf("unable to attach metadata for run, err: %v", err)
}

vars := pipeline.NewVarsFrom(map[string]interface{}{
Expand Down Expand Up @@ -236,12 +237,17 @@ func (ds *inMemoryDataSourceCache) updater() {
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache", "err", err)
ds.lggr.Warnf("failed to update cache, err: %v", err)
}
cancel()
}
}

type ResultTimePair struct {
Result serializablebig.Big `json:"result"`
Time time.Time `json:"time"`
}

func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()
Expand All @@ -257,7 +263,7 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.latestUpdateErr = latestUpdateErr
// raise log severity
if previousUpdateErr != nil {
ds.lggr.Errorf("consecutive cache updates errored: previous err: %w new err: %w", previousUpdateErr, ds.latestUpdateErr)
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)
}
Expand All @@ -270,8 +276,8 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
}

// backup in case data source fails continuously and node gets rebooted
if err = ds.kvStore.Store(dataSourceCacheKey, serializablebig.New(value)); err != nil {
ds.lggr.Errorf("failed to persist latest task run value", err)
if err = ds.kvStore.Store(dataSourceCacheKey, &ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()}); err != nil {
ds.lggr.Errorf("failed to persist latest task run value, err: %v", err)
}

return nil
Expand All @@ -287,7 +293,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache, returning stale result now", "err", err)
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -296,11 +302,17 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
}

func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2types.ReportTimestamp) (*big.Int, error) {
var val serializablebig.Big
var resTime ResultTimePair
latestResult, latestTrrs := ds.get(ctx)
if latestTrrs == nil {
ds.lggr.Errorf("cache is empty, returning persisted value now")
return val.ToInt(), ds.kvStore.Get(dataSourceCacheKey, &val)
ds.lggr.Warnf("cache is empty, returning persisted value now")
if err := ds.kvStore.Get(dataSourceCacheKey, &resTime); err != nil {
return nil, err
}
if time.Since(resTime.Time) >= defaultCacheFreshnessAlert {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", defaultCacheFreshnessAlert, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}

setEATelemetry(ds.inMemoryDataSource, latestResult, latestTrrs, ObservationTimestamp{
Expand Down
12 changes: 6 additions & 6 deletions core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))
mockKVStore := mocks.KVStore{}
mockKVStore.On("Store", mock.Anything, mock.Anything).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)

Expand All @@ -98,14 +98,14 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
})

t.Run("test total updater fail with persisted value recovery", func(t *testing.T) {
persistedVal := big.NewInt(1337)
runner := pipelinemocks.NewRunner(t)
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*serializablebig.Big)
arg.ToInt().Set(persistedVal)
persistedVal := serializablebig.NewI(1337)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Run(func(args mock.Arguments) {
arg := args.Get(1).(*ocrcommon.ResultTimePair)
arg.Result = *persistedVal
})

// set updater to a long time so that it doesn't log errors after the test is done
Expand All @@ -125,7 +125,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
ds := ocrcommon.NewInMemoryDataSource(runner, job.Job{}, pipeline.Spec{}, logger.TestLogger(t))

mockKVStore := mocks.KVStore{}
mockKVStore.On("Get", mock.Anything, mock.AnythingOfType("*big.Big")).Return(nil).Return(assert.AnError)
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil).Return(assert.AnError)

// set updater to a long time so that it doesn't log errors after the test is done
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
Expand Down
Loading