Skip to content

Commit

Permalink
[BCF-3178] - Improve err handling and logs for in memory data source …
Browse files Browse the repository at this point in the history
…cache (#12907)

* Fix inMemoryDataSourceCache get() warn log formatting

* Fix in mem ds cache updateCache() to save runs even if some ds failed

* Improve in memory data source cache Observe() logs and error messages

* Add changeset
  • Loading branch information
ilija42 committed Apr 22, 2024
1 parent 2d2a428 commit be50a83
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 21 deletions.
5 changes: 5 additions & 0 deletions .changeset/brown-penguins-grin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fix in memory data source cache changes/bug that only allowed pipeline results where none of the data sources failed. #bugfix
47 changes: 26 additions & 21 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ocrcommon
import (
"context"
"encoding/json"
errjoin "errors"
"fmt"
"math/big"
"sync"
Expand Down Expand Up @@ -294,31 +293,30 @@ func (ds *inMemoryDataSourceCache) updateCache(ctx context.Context) error {
ds.mu.Lock()
defer ds.mu.Unlock()

// check for any errors
_, latestTrrs, latestUpdateErr := ds.executeRun(ctx)
if latestTrrs.FinalResult(ds.lggr).HasErrors() {
latestUpdateErr = errjoin.Join(append(latestTrrs.FinalResult(ds.lggr).AllErrors, latestUpdateErr)...)
}

if latestUpdateErr != nil {
_, latestTrrs, err := ds.executeRun(ctx)
if err != nil {
previousUpdateErr := ds.latestUpdateErr
ds.latestUpdateErr = latestUpdateErr
// raise log severity
ds.latestUpdateErr = err
// warn log if previous cache update also errored
if previousUpdateErr != nil {
ds.lggr.Warnf("consecutive cache updates errored: previous err: %v new err: %v", previousUpdateErr, ds.latestUpdateErr)
}
return errors.Wrapf(ds.latestUpdateErr, "error executing run for spec ID %v", ds.spec.ID)

return errors.Wrapf(ds.latestUpdateErr, "error updating in memory data source cache for spec ID %v", ds.spec.ID)
}

ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
value, err := ds.inMemoryDataSource.parse(ds.latestResult)
value, err := ds.inMemoryDataSource.parse(latestTrrs.FinalResult(ds.lggr))
if err != nil {
return errors.Wrapf(err, "invalid result")
ds.latestUpdateErr = errors.Wrapf(err, "invalid result")
return ds.latestUpdateErr
}

// backup in case data source fails continuously and node gets rebooted
// update cache values
ds.latestTrrs = latestTrrs
ds.latestResult = ds.latestTrrs.FinalResult(ds.lggr)
ds.latestUpdateErr = nil

// backup in case data source fails continuously and node gets rebooted
timePairBytes, err := json.Marshal(&ResultTimePair{Result: *serializablebig.New(value), Time: time.Now()})
if err != nil {
return fmt.Errorf("failed to marshal result time pair, err: %w", err)
Expand All @@ -341,7 +339,7 @@ func (ds *inMemoryDataSourceCache) get(ctx context.Context) (pipeline.FinalResul
ds.mu.RUnlock()

if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache err: %v, returning stale result now, err: %v", err)
ds.lggr.Warnf("failed to update cache, returning stale result now, err: %v", err)
}

ds.mu.RLock()
Expand All @@ -357,15 +355,15 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty

timePairBytes, err := ds.kvStore.Get(ctx, dataSourceCacheKey)
if err != nil {
return nil, fmt.Errorf("failed to get result time pair bytes, err: %w", err)
return nil, fmt.Errorf("in memory data source cache is empty and failed to get backup persisted value, err: %w", err)
}

if err := json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal result time pair bytes, err: %w", err)
if err = json.Unmarshal(timePairBytes, &resTime); err != nil {
return nil, fmt.Errorf("in memory data source cache is empty and failed to unmarshal backup persisted value, err: %w", err)
}

if time.Since(resTime.Time) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("cache hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
ds.lggr.Errorf("in memory data source cache is empty and the persisted value hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}
return resTime.Result.ToInt(), nil
}
Expand All @@ -376,6 +374,13 @@ func (ds *inMemoryDataSourceCache) Observe(ctx context.Context, timestamp ocr2ty
ConfigDigest: timestamp.ConfigDigest.Hex(),
})

// if last update was unsuccessful, check how much time passed since a successful update
if ds.latestUpdateErr != nil {
if time.Since(ds.latestTrrs.GetTaskRunResultsFinishedAt()) >= ds.stalenessAlertThreshold {
ds.lggr.Errorf("in memory cache is old and hasn't been updated for over %v, latestUpdateErr is: %v", ds.stalenessAlertThreshold, ds.latestUpdateErr)
}

}
return ds.parse(latestResult)
}

Expand Down
11 changes: 11 additions & 0 deletions core/services/pipeline/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ func (result *TaskRunResult) IsTerminal() bool {
// TaskRunResults represents a collection of results for all task runs for one pipeline run
type TaskRunResults []TaskRunResult

// GetTaskRunResultsFinishedAt returns latest finishedAt time from TaskRunResults.
func (trrs TaskRunResults) GetTaskRunResultsFinishedAt() time.Time {
var finishedTime time.Time
for _, trr := range trrs {
if trr.FinishedAt.Valid && trr.FinishedAt.Time.After(finishedTime) {
finishedTime = trr.FinishedAt.Time
}
}
return finishedTime
}

// FinalResult pulls the FinalResult for the pipeline_run from the task runs
// It needs to respect the output index of each task
func (trrs TaskRunResults) FinalResult(l logger.Logger) FinalResult {
Expand Down

0 comments on commit be50a83

Please sign in to comment.