Skip to content

Commit

Permalink
Fix InMemoryDataSourceCache cleanup (#12647)
Browse files Browse the repository at this point in the history
* Fix InMemoryDataSourceCache cleanup

* Add changeset

* Make linter happy

* Handle data source cache unit test cleanup

* Use services.StopChan in inMemoryDataSourceCache

* Move ctxWithTimeout into updateCache

Co-authored-by: Jordan Krage <jmank88@gmail.com>

---------

Co-authored-by: Jordan Krage <jmank88@gmail.com>
  • Loading branch information
ilija42 and jmank88 authored Apr 1, 2024
1 parent 633d3e0 commit bc4fbbd
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-oranges-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

fix jfpc cache cleanup
7 changes: 5 additions & 2 deletions core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,12 @@ func NewMedianServices(ctx context.Context,

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
juelsPerFeeCoinSourceCache, err2 := ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration())
if err2 != nil {
return nil, err2
}
juelsPerFeeCoinSource = juelsPerFeeCoinSourceCache
srvs = append(srvs, juelsPerFeeCoinSourceCache)
}

if cmdName := env.MedianPlugin.Cmd.Get(); cmdName != "" {
Expand Down
42 changes: 37 additions & 5 deletions core/services/ocrcommon/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/bridges"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -104,7 +105,13 @@ const defaultCacheFreshness = time.Minute * 5
const defaultCacheFreshnessAlert = time.Hour * 24
const dataSourceCacheKey = "dscache"

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (median.DataSource, error) {
type DataSourceCacheService interface {
Start(context.Context) error
Close() error
median.DataSource
}

func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cacheFreshness time.Duration) (DataSourceCacheService, error) {
inMemoryDS, ok := ds.(*inMemoryDataSource)
if !ok {
return nil, errors.Errorf("unsupported data source type: %T, only inMemoryDataSource supported", ds)
Expand All @@ -118,8 +125,9 @@ func NewInMemoryDataSourceCache(ds median.DataSource, kvStore job.KVStore, cache
kvStore: kvStore,
cacheFreshness: cacheFreshness,
inMemoryDataSource: inMemoryDS,
chStop: make(chan struct{}),
chDone: make(chan struct{}),
}
go func() { dsCache.updater() }()
return dsCache, nil
}

Expand Down Expand Up @@ -225,21 +233,45 @@ type inMemoryDataSourceCache struct {
// Even if updates fail, previous values are returned.
cacheFreshness time.Duration
mu sync.RWMutex
chStop services.StopChan
chDone chan struct{}
latestUpdateErr error
latestTrrs pipeline.TaskRunResults
latestResult pipeline.FinalResult
kvStore job.KVStore
}

func (ds *inMemoryDataSourceCache) Start(context.Context) error {
go func() { ds.updater() }()
return nil
}

func (ds *inMemoryDataSourceCache) Close() error {
close(ds.chStop)
<-ds.chDone
return nil
}

// updater periodically updates data source cache.
func (ds *inMemoryDataSourceCache) updater() {
ticker := time.NewTicker(ds.cacheFreshness)
for ; true; <-ticker.C {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
updateCache := func() {
ctx, cancel := ds.chStop.CtxCancel(context.WithTimeout(context.Background(), time.Second*10))
defer cancel()
if err := ds.updateCache(ctx); err != nil {
ds.lggr.Warnf("failed to update cache, err: %v", err)
}
cancel()
}

updateCache()
for {
select {
case <-ticker.C:
updateCache()
case <-ds.chStop:
close(ds.chDone)
return
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion core/services/ocrcommon/data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
serializablebig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -78,6 +79,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
mockKVStore.On("Get", mock.Anything, mock.IsType(&ocrcommon.ResultTimePair{})).Return(nil)
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Second*2)
require.NoError(t, err)
servicetest.Run(t, dsCache)

mockVal := int64(1)
// Test if Observe notices that cache updater failed and can refresh the cache on its own
Expand Down Expand Up @@ -112,12 +114,12 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)

time.Sleep(time.Millisecond * 100)
val, err := dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
require.NoError(t, err)
assert.Equal(t, persistedVal.String(), val.String())

})

t.Run("test total updater fail with no persisted value ", func(t *testing.T) {
Expand All @@ -131,6 +133,7 @@ func Test_CachedInMemoryDataSourceErrHandling(t *testing.T) {
dsCache, err := ocrcommon.NewInMemoryDataSourceCache(ds, &mockKVStore, time.Hour*100)
require.NoError(t, err)
changeResultValue(runner, "-1", true, false)
servicetest.Run(t, dsCache)

time.Sleep(time.Millisecond * 100)
_, err = dsCache.Observe(testutils.Context(t), types.ReportTimestamp{})
Expand Down

0 comments on commit bc4fbbd

Please sign in to comment.