From a865709ea18bfc792db758b60de6f03e953f141f Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Thu, 22 Aug 2024 10:57:33 +0200 Subject: [PATCH] CCIP-2971 Optimize token/gas prices database interactions (#14074) * Simplify codebase and improve performance by switching to upsert * Update core/services/ccip/orm.go Co-authored-by: Abdelrahman Soliman (Boda) <2677789+asoliman92@users.noreply.github.com> * Post review fixes --------- Co-authored-by: Abdelrahman Soliman (Boda) <2677789+asoliman92@users.noreply.github.com> --- .changeset/thick-mails-applaud.md | 5 + core/services/ccip/mocks/orm.go | 201 ++++++------------ core/services/ccip/observability.go | 115 ++++++++++ core/services/ccip/observability_test.go | 94 ++++++++ core/services/ccip/orm.go | 173 +++++++++------ core/services/ccip/orm_test.go | 168 +++++++++------ .../plugins/ccip/ccipcommit/initializers.go | 2 +- .../ccip/internal/ccipdb/price_service.go | 71 ++----- .../internal/ccipdb/price_service_test.go | 113 +--------- .../migrations/0250_ccip_token_prices_fix.sql | 49 +++++ 10 files changed, 559 insertions(+), 432 deletions(-) create mode 100644 .changeset/thick-mails-applaud.md create mode 100644 core/services/ccip/observability.go create mode 100644 core/services/ccip/observability_test.go create mode 100644 core/store/migrate/migrations/0250_ccip_token_prices_fix.sql diff --git a/.changeset/thick-mails-applaud.md b/.changeset/thick-mails-applaud.md new file mode 100644 index 00000000000..ceec9e64fd4 --- /dev/null +++ b/.changeset/thick-mails-applaud.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Simplify how token and gas prices are stored in the database - user upsert instead of insert/delete flow #db_update diff --git a/core/services/ccip/mocks/orm.go b/core/services/ccip/mocks/orm.go index 8a987c21602..0c9086def7e 100644 --- a/core/services/ccip/mocks/orm.go +++ b/core/services/ccip/mocks/orm.go @@ -8,6 +8,8 @@ import ( ccip "github.com/smartcontractkit/chainlink/v2/core/services/ccip" mock "github.com/stretchr/testify/mock" + + time "time" ) // ORM is an autogenerated mock type for the ORM type @@ -23,102 +25,6 @@ func (_m *ORM) EXPECT() *ORM_Expecter { return &ORM_Expecter{mock: &_m.Mock} } -// ClearGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearGasPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearGasPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearGasPricesByDestChain' -type ORM_ClearGasPricesByDestChain_Call struct { - *mock.Call -} - -// ClearGasPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearGasPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearGasPricesByDestChain_Call { - return &ORM_ClearGasPricesByDestChain_Call{Call: _e.mock.On("ClearGasPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) Return(_a0 error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearGasPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearGasPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - -// ClearTokenPricesByDestChain provides a mock function with given fields: ctx, destChainSelector, expireSec -func (_m *ORM) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - ret := _m.Called(ctx, destChainSelector, expireSec) - - if len(ret) == 0 { - panic("no return value specified for ClearTokenPricesByDestChain") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int) error); ok { - r0 = rf(ctx, destChainSelector, expireSec) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_ClearTokenPricesByDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ClearTokenPricesByDestChain' -type ORM_ClearTokenPricesByDestChain_Call struct { - *mock.Call -} - -// ClearTokenPricesByDestChain is a helper method to define mock.On call -// - ctx context.Context -// - destChainSelector uint64 -// - expireSec int -func (_e *ORM_Expecter) ClearTokenPricesByDestChain(ctx interface{}, destChainSelector interface{}, expireSec interface{}) *ORM_ClearTokenPricesByDestChain_Call { - return &ORM_ClearTokenPricesByDestChain_Call{Call: _e.mock.On("ClearTokenPricesByDestChain", ctx, destChainSelector, expireSec)} -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, expireSec int)) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int)) - }) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) Return(_a0 error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_ClearTokenPricesByDestChain_Call) RunAndReturn(run func(context.Context, uint64, int) error) *ORM_ClearTokenPricesByDestChain_Call { - _c.Call.Return(run) - return _c -} - // GetGasPricesByDestChain provides a mock function with given fields: ctx, destChainSelector func (_m *ORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]ccip.GasPrice, error) { ret := _m.Called(ctx, destChainSelector) @@ -237,100 +143,119 @@ func (_c *ORM_GetTokenPricesByDestChain_Call) RunAndReturn(run func(context.Cont return _c } -// InsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, gasPrices -func (_m *ORM) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, gasPrices) +// UpsertGasPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, gasPrices +func (_m *ORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice) (int64, error) { + ret := _m.Called(ctx, destChainSelector, gasPrices) if len(ret) == 0 { - panic("no return value specified for InsertGasPricesForDestChain") + panic("no return value specified for UpsertGasPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, gasPrices) + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) (int64, error)); ok { + return rf(ctx, destChainSelector, gasPrices) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.GasPrice) int64); ok { + r0 = rf(ctx, destChainSelector, gasPrices) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.GasPrice) error); ok { + r1 = rf(ctx, destChainSelector, gasPrices) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// ORM_InsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertGasPricesForDestChain' -type ORM_InsertGasPricesForDestChain_Call struct { +// ORM_UpsertGasPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertGasPricesForDestChain' +type ORM_UpsertGasPricesForDestChain_Call struct { *mock.Call } -// InsertGasPricesForDestChain is a helper method to define mock.On call +// UpsertGasPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 -// - gasPrices []ccip.GasPriceUpdate -func (_e *ORM_Expecter) InsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, gasPrices interface{}) *ORM_InsertGasPricesForDestChain_Call { - return &ORM_InsertGasPricesForDestChain_Call{Call: _e.mock.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, gasPrices)} +// - gasPrices []ccip.GasPrice +func (_e *ORM_Expecter) UpsertGasPricesForDestChain(ctx interface{}, destChainSelector interface{}, gasPrices interface{}) *ORM_UpsertGasPricesForDestChain_Call { + return &ORM_UpsertGasPricesForDestChain_Call{Call: _e.mock.On("UpsertGasPricesForDestChain", ctx, destChainSelector, gasPrices)} } -func (_c *ORM_InsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []ccip.GasPriceUpdate)) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, gasPrices []ccip.GasPrice)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.GasPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.GasPrice)) }) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) Return(_a0 error) *ORM_InsertGasPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertGasPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertGasPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.GasPriceUpdate) error) *ORM_InsertGasPricesForDestChain_Call { +func (_c *ORM_UpsertGasPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.GasPrice) (int64, error)) *ORM_UpsertGasPricesForDestChain_Call { _c.Call.Return(run) return _c } -// InsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, jobId, tokenPrices -func (_m *ORM) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate) error { - ret := _m.Called(ctx, destChainSelector, jobId, tokenPrices) +// UpsertTokenPricesForDestChain provides a mock function with given fields: ctx, destChainSelector, tokenPrices, interval +func (_m *ORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration) (int64, error) { + ret := _m.Called(ctx, destChainSelector, tokenPrices, interval) if len(ret) == 0 { - panic("no return value specified for InsertTokenPricesForDestChain") + panic("no return value specified for UpsertTokenPricesForDestChain") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error); ok { - r0 = rf(ctx, destChainSelector, jobId, tokenPrices) + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)); ok { + return rf(ctx, destChainSelector, tokenPrices, interval) + } + if rf, ok := ret.Get(0).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) int64); ok { + r0 = rf(ctx, destChainSelector, tokenPrices, interval) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, uint64, []ccip.TokenPrice, time.Duration) error); ok { + r1 = rf(ctx, destChainSelector, tokenPrices, interval) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// ORM_InsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertTokenPricesForDestChain' -type ORM_InsertTokenPricesForDestChain_Call struct { +// ORM_UpsertTokenPricesForDestChain_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertTokenPricesForDestChain' +type ORM_UpsertTokenPricesForDestChain_Call struct { *mock.Call } -// InsertTokenPricesForDestChain is a helper method to define mock.On call +// UpsertTokenPricesForDestChain is a helper method to define mock.On call // - ctx context.Context // - destChainSelector uint64 -// - jobId int32 -// - tokenPrices []ccip.TokenPriceUpdate -func (_e *ORM_Expecter) InsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, jobId interface{}, tokenPrices interface{}) *ORM_InsertTokenPricesForDestChain_Call { - return &ORM_InsertTokenPricesForDestChain_Call{Call: _e.mock.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, tokenPrices)} +// - tokenPrices []ccip.TokenPrice +// - interval time.Duration +func (_e *ORM_Expecter) UpsertTokenPricesForDestChain(ctx interface{}, destChainSelector interface{}, tokenPrices interface{}, interval interface{}) *ORM_UpsertTokenPricesForDestChain_Call { + return &ORM_UpsertTokenPricesForDestChain_Call{Call: _e.mock.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, tokenPrices, interval)} } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []ccip.TokenPriceUpdate)) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Run(run func(ctx context.Context, destChainSelector uint64, tokenPrices []ccip.TokenPrice, interval time.Duration)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(uint64), args[2].(int32), args[3].([]ccip.TokenPriceUpdate)) + run(args[0].(context.Context), args[1].(uint64), args[2].([]ccip.TokenPrice), args[3].(time.Duration)) }) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) Return(_a0 error) *ORM_InsertTokenPricesForDestChain_Call { - _c.Call.Return(_a0) +func (_c *ORM_UpsertTokenPricesForDestChain_Call) Return(_a0 int64, _a1 error) *ORM_UpsertTokenPricesForDestChain_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_InsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, int32, []ccip.TokenPriceUpdate) error) *ORM_InsertTokenPricesForDestChain_Call { +func (_c *ORM_UpsertTokenPricesForDestChain_Call) RunAndReturn(run func(context.Context, uint64, []ccip.TokenPrice, time.Duration) (int64, error)) *ORM_UpsertTokenPricesForDestChain_Call { _c.Call.Return(run) return _c } diff --git a/core/services/ccip/observability.go b/core/services/ccip/observability.go new file mode 100644 index 00000000000..8a061893ce6 --- /dev/null +++ b/core/services/ccip/observability.go @@ -0,0 +1,115 @@ +package ccip + +import ( + "context" + "strconv" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + sqlLatencyBuckets = []float64{ + float64(10 * time.Millisecond), + float64(20 * time.Millisecond), + float64(30 * time.Millisecond), + float64(40 * time.Millisecond), + float64(50 * time.Millisecond), + float64(70 * time.Millisecond), + float64(90 * time.Millisecond), + float64(100 * time.Millisecond), + float64(200 * time.Millisecond), + float64(300 * time.Millisecond), + float64(400 * time.Millisecond), + float64(500 * time.Millisecond), + float64(750 * time.Millisecond), + float64(1 * time.Second), + } + ccipQueryDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "ccip_orm_query_duration", + Buckets: sqlLatencyBuckets, + }, []string{"query", "destChainSelector"}) + ccipQueryDatasets = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccip_orm_dataset_size", + }, []string{"query", "destChainSelector"}) +) + +type observedORM struct { + ORM + queryDuration *prometheus.HistogramVec + datasetSize *prometheus.GaugeVec +} + +var _ ORM = (*observedORM)(nil) + +func NewObservedORM(ds sqlutil.DataSource, lggr logger.Logger) (*observedORM, error) { + delegate, err := NewORM(ds, lggr) + if err != nil { + return nil, err + } + + return &observedORM{ + ORM: delegate, + queryDuration: ccipQueryDuration, + datasetSize: ccipQueryDatasets, + }, nil +} + +func (o *observedORM) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { + return withObservedQueryAndResults(o, "GetGasPricesByDestChain", destChainSelector, func() ([]GasPrice, error) { + return o.ORM.GetGasPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { + return withObservedQueryAndResults(o, "GetTokenPricesByDestChain", destChainSelector, func() ([]TokenPrice, error) { + return o.ORM.GetTokenPricesByDestChain(ctx, destChainSelector) + }) +} + +func (o *observedORM) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertGasPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertGasPricesForDestChain(ctx, destChainSelector, gasPrices) + }) +} + +func (o *observedORM) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { + return withObservedQueryAndRowsAffected(o, "UpsertTokenPricesForDestChain", destChainSelector, func() (int64, error) { + return o.ORM.UpsertTokenPricesForDestChain(ctx, destChainSelector, tokenPrices, interval) + }) +} + +func withObservedQueryAndRowsAffected(o *observedORM, queryName string, chainSelector uint64, query func() (int64, error)) (int64, error) { + rowsAffected, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(rowsAffected)) + } + return rowsAffected, err +} + +func withObservedQueryAndResults[T any](o *observedORM, queryName string, chainSelector uint64, query func() ([]T, error)) ([]T, error) { + results, err := withObservedQuery(o, queryName, chainSelector, query) + if err == nil { + o.datasetSize. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Set(float64(len(results))) + } + return results, err +} + +func withObservedQuery[T any](o *observedORM, queryName string, chainSelector uint64, query func() (T, error)) (T, error) { + queryStarted := time.Now() + defer func() { + o.queryDuration. + WithLabelValues(queryName, strconv.FormatUint(chainSelector, 10)). + Observe(float64(time.Since(queryStarted))) + }() + return query() +} diff --git a/core/services/ccip/observability_test.go b/core/services/ccip/observability_test.go new file mode 100644 index 00000000000..24bfb4a9ec1 --- /dev/null +++ b/core/services/ccip/observability_test.go @@ -0,0 +1,94 @@ +package ccip + +import ( + "math/big" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +func Test_MetricsAreTrackedForAllMethods(t *testing.T) { + ctx := testutils.Context(t) + db := pgtest.NewSqlxDB(t) + ccipORM, err := NewObservedORM(db, logger.TestLogger(t)) + require.NoError(t, err) + + tokenPrices := []TokenPrice{ + { + TokenAddr: "0xA", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + TokenAddr: "0xB", + TokenPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + tokensUpserted, err := ccipORM.UpsertTokenPricesForDestChain(ctx, 100, tokenPrices, time.Second) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), int(tokensUpserted)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertTokenPricesForDestChain", "200")) + + tokens, err := ccipORM.GetTokenPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(tokenPrices), len(tokens)) + assert.Equal(t, len(tokenPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetTokenPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetTokenPricesByDestChain", "100")) + + gasPrices := []GasPrice{ + { + SourceChainSelector: 200, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 201, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + { + SourceChainSelector: 202, + GasPrice: assets.NewWei(big.NewInt(1e18)), + }, + } + gasUpserted, err := ccipORM.UpsertGasPricesForDestChain(ctx, 100, gasPrices) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), int(gasUpserted)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "100")) + assert.Equal(t, 0, counterFromGaugeByLabels(ccipORM.datasetSize, "UpsertGasPricesForDestChain", "200")) + + gas, err := ccipORM.GetGasPricesByDestChain(ctx, 100) + require.NoError(t, err) + assert.Equal(t, len(gasPrices), len(gas)) + assert.Equal(t, len(gasPrices), counterFromGaugeByLabels(ccipORM.datasetSize, "GetGasPricesByDestChain", "100")) + assert.Equal(t, 1, counterFromHistogramByLabels(t, ccipORM.queryDuration, "GetGasPricesByDestChain", "100")) +} + +func counterFromHistogramByLabels(t *testing.T, histogramVec *prometheus.HistogramVec, labels ...string) int { + observer, err := histogramVec.GetMetricWithLabelValues(labels...) + require.NoError(t, err) + + metricCh := make(chan prometheus.Metric, 1) + observer.(prometheus.Histogram).Collect(metricCh) + close(metricCh) + + metric := <-metricCh + pb := &io_prometheus_client.Metric{} + err = metric.Write(pb) + require.NoError(t, err) + + return int(pb.GetHistogram().GetSampleCount()) +} + +func counterFromGaugeByLabels(gaugeVec *prometheus.GaugeVec, labels ...string) int { + value := testutil.ToFloat64(gaugeVec.WithLabelValues(labels...)) + return int(value) +} diff --git a/core/services/ccip/orm.go b/core/services/ccip/orm.go index d074ea7473e..1942c68fef9 100644 --- a/core/services/ccip/orm.go +++ b/core/services/ccip/orm.go @@ -8,65 +8,51 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) type GasPrice struct { SourceChainSelector uint64 GasPrice *assets.Wei - CreatedAt time.Time -} - -type GasPriceUpdate struct { - SourceChainSelector uint64 - GasPrice *assets.Wei } type TokenPrice struct { TokenAddr string TokenPrice *assets.Wei - CreatedAt time.Time -} - -type TokenPriceUpdate struct { - TokenAddr string - TokenPrice *assets.Wei } type ORM interface { GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) - InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error - InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error - - ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error - ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error + UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) + UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) } type orm struct { - ds sqlutil.DataSource + ds sqlutil.DataSource + lggr logger.Logger } var _ ORM = (*orm)(nil) -func NewORM(ds sqlutil.DataSource) (ORM, error) { +func NewORM(ds sqlutil.DataSource, lggr logger.Logger) (ORM, error) { if ds == nil { return nil, fmt.Errorf("datasource to CCIP NewORM cannot be nil") } return &orm{ - ds: ds, + ds: ds, + lggr: lggr, }, nil } func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]GasPrice, error) { var gasPrices []GasPrice stmt := ` - SELECT DISTINCT ON (source_chain_selector) - source_chain_selector, gas_price, created_at + SELECT source_chain_selector, gas_price FROM ccip.observed_gas_prices - WHERE chain_selector = $1 - ORDER BY source_chain_selector, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &gasPrices, stmt, destChainSelector) if err != nil { @@ -79,82 +65,147 @@ func (o *orm) GetGasPricesByDestChain(ctx context.Context, destChainSelector uin func (o *orm) GetTokenPricesByDestChain(ctx context.Context, destChainSelector uint64) ([]TokenPrice, error) { var tokenPrices []TokenPrice stmt := ` - SELECT DISTINCT ON (token_addr) - token_addr, token_price, created_at + SELECT token_addr, token_price FROM ccip.observed_token_prices - WHERE chain_selector = $1 - ORDER BY token_addr, created_at DESC; + WHERE chain_selector = $1; ` err := o.ds.SelectContext(ctx, &tokenPrices, stmt, destChainSelector) if err != nil { return nil, err } - return tokenPrices, nil } -func (o *orm) InsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, gasPrices []GasPriceUpdate) error { +func (o *orm) UpsertGasPricesForDestChain(ctx context.Context, destChainSelector uint64, gasPrices []GasPrice) (int64, error) { if len(gasPrices) == 0 { - return nil + return 0, nil + } + + uniqueGasUpdates := make(map[string]GasPrice) + for _, gasPrice := range gasPrices { + key := fmt.Sprintf("%d-%d", gasPrice.SourceChainSelector, destChainSelector) + uniqueGasUpdates[key] = gasPrice } - insertData := make([]map[string]interface{}, 0, len(gasPrices)) - for _, price := range gasPrices { + insertData := make([]map[string]interface{}, 0, len(uniqueGasUpdates)) + for _, price := range uniqueGasUpdates { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "source_chain_selector": price.SourceChainSelector, "gas_price": price.GasPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, job_id, source_chain_selector, gas_price, created_at) - VALUES (:chain_selector, :job_id, :source_chain_selector, :gas_price, statement_timestamp());` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + stmt := `INSERT INTO ccip.observed_gas_prices (chain_selector, source_chain_selector, gas_price, updated_at) + VALUES (:chain_selector, :source_chain_selector, :gas_price, statement_timestamp()) + ON CONFLICT (source_chain_selector, chain_selector) + DO UPDATE SET gas_price = EXCLUDED.gas_price, updated_at = EXCLUDED.updated_at;` + + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting gas prices for job %d: %w", jobId, err) + return 0, fmt.Errorf("error inserting gas prices %w", err) } - - return err + return result.RowsAffected() } -func (o *orm) InsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, jobId int32, tokenPrices []TokenPriceUpdate) error { +// UpsertTokenPricesForDestChain inserts or updates only relevant token prices. +// In order to reduce locking an unnecessary writes to the table, we start with fetching current prices. +// If price for a token doesn't change or was updated recently we don't include that token to the upsert query. +// We don't run in TX intentionally, because we don't want to lock the table and conflicts are resolved on the insert level +func (o *orm) UpsertTokenPricesForDestChain(ctx context.Context, destChainSelector uint64, tokenPrices []TokenPrice, interval time.Duration) (int64, error) { if len(tokenPrices) == 0 { - return nil + return 0, nil } - insertData := make([]map[string]interface{}, 0, len(tokenPrices)) - for _, price := range tokenPrices { + tokensToUpdate, err := o.pickOnlyRelevantTokensForUpdate(ctx, destChainSelector, tokenPrices, interval) + if err != nil || len(tokensToUpdate) == 0 { + return 0, err + } + + insertData := make([]map[string]interface{}, 0, len(tokensToUpdate)) + for _, price := range tokensToUpdate { insertData = append(insertData, map[string]interface{}{ "chain_selector": destChainSelector, - "job_id": jobId, "token_addr": price.TokenAddr, "token_price": price.TokenPrice, }) } - // using statement_timestamp() to make testing easier - stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, job_id, token_addr, token_price, created_at) - VALUES (:chain_selector, :job_id, :token_addr, :token_price, statement_timestamp());` - _, err := o.ds.NamedExecContext(ctx, stmt, insertData) + stmt := `INSERT INTO ccip.observed_token_prices (chain_selector, token_addr, token_price, updated_at) + VALUES (:chain_selector, :token_addr, :token_price, statement_timestamp()) + ON CONFLICT (token_addr, chain_selector) + DO UPDATE SET token_price = EXCLUDED.token_price, updated_at = EXCLUDED.updated_at;` + result, err := o.ds.NamedExecContext(ctx, stmt, insertData) if err != nil { - err = fmt.Errorf("error inserting token prices for job %d: %w", jobId, err) + return 0, fmt.Errorf("error inserting token prices %w", err) } - - return err + return result.RowsAffected() } -func (o *orm) ClearGasPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_gas_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` +// pickOnlyRelevantTokensForUpdate returns only tokens that need to be updated. Multiple jobs can be updating the same tokens, +// in order to reduce table locking and redundant upserts we start with reading the table and checking which tokens are eligible for update. +// A token is eligible for update when time since last update is greater than the interval. +func (o *orm) pickOnlyRelevantTokensForUpdate( + ctx context.Context, + destChainSelector uint64, + tokenPrices []TokenPrice, + interval time.Duration, +) ([]TokenPrice, error) { + tokenPricesByAddress := toTokensByAddress(tokenPrices) + + // Picks only tokens which were recently updated and can be ignored, + // we will filter out these tokens from the upsert query. + stmt := ` + SELECT + token_addr + FROM ccip.observed_token_prices + WHERE + chain_selector = $1 + and token_addr = any($2) + and updated_at >= statement_timestamp() - $3::interval + ` + + pgInterval := fmt.Sprintf("%d milliseconds", interval.Milliseconds()) + args := []interface{}{destChainSelector, tokenAddrsToBytes(tokenPricesByAddress), pgInterval} + var dbTokensToIgnore []string + if err := o.ds.SelectContext(ctx, &dbTokensToIgnore, stmt, args...); err != nil { + return nil, err + } + + tokensToIgnore := make(map[string]struct{}, len(dbTokensToIgnore)) + for _, tk := range dbTokensToIgnore { + tokensToIgnore[tk] = struct{}{} + } - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err + tokenPricesToUpdate := make([]TokenPrice, 0, len(tokenPrices)) + for tokenAddr, tokenPrice := range tokenPricesByAddress { + eligibleForUpdate := false + if _, ok := tokensToIgnore[tokenAddr]; !ok { + eligibleForUpdate = true + tokenPricesToUpdate = append(tokenPricesToUpdate, TokenPrice{TokenAddr: tokenAddr, TokenPrice: tokenPrice}) + } + o.lggr.Debugw( + "Token price eligibility for database update", + "eligibleForUpdate", eligibleForUpdate, + "token", tokenAddr, + "price", tokenPrice, + ) + } + return tokenPricesToUpdate, nil } -func (o *orm) ClearTokenPricesByDestChain(ctx context.Context, destChainSelector uint64, expireSec int) error { - stmt := `DELETE FROM ccip.observed_token_prices WHERE chain_selector = $1 AND created_at < (statement_timestamp() - $2 * interval '1 second')` +func toTokensByAddress(tokens []TokenPrice) map[string]*assets.Wei { + tokensByAddr := make(map[string]*assets.Wei, len(tokens)) + for _, tk := range tokens { + tokensByAddr[tk.TokenAddr] = tk.TokenPrice + } + return tokensByAddr +} - _, err := o.ds.ExecContext(ctx, stmt, destChainSelector, expireSec) - return err +func tokenAddrsToBytes(tokens map[string]*assets.Wei) [][]byte { + addrs := make([][]byte, 0, len(tokens)) + for tkAddr := range tokens { + addrs = append(addrs, []byte(tkAddr)) + } + return addrs } diff --git a/core/services/ccip/orm_test.go b/core/services/ccip/orm_test.go index 7b7b8d82710..e778ddf6ce3 100644 --- a/core/services/ccip/orm_test.go +++ b/core/services/ccip/orm_test.go @@ -15,13 +15,18 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +var ( + r = rand.New(rand.NewSource(time.Now().UnixNano())) ) func setupORM(t *testing.T) (ORM, sqlutil.DataSource) { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := NewORM(db) + orm, err := NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -37,12 +42,12 @@ func generateChainSelectors(n int) []uint64 { return selectors } -func generateGasPriceUpdates(chainSelector uint64, n int) []GasPriceUpdate { - updates := make([]GasPriceUpdate, n) +func generateGasPrices(chainSelector uint64, n int) []GasPrice { + updates := make([]GasPrice, n) for i := 0; i < n; i++ { // gas prices can take up whole range of uint256 uint256Max := new(big.Int).Sub(new(big.Int).Exp(big.NewInt(2), big.NewInt(256), nil), big.NewInt(1)) - row := GasPriceUpdate{ + row := GasPrice{ SourceChainSelector: chainSelector, GasPrice: assets.NewWei(new(big.Int).Sub(uint256Max, big.NewInt(int64(i)))), } @@ -61,10 +66,21 @@ func generateTokenAddresses(n int) []string { return addrs } -func generateTokenPriceUpdates(tokenAddr string, n int) []TokenPriceUpdate { - updates := make([]TokenPriceUpdate, n) +func generateRandomTokenPrices(tokenAddrs []string) []TokenPrice { + updates := make([]TokenPrice, 0, len(tokenAddrs)) + for _, addr := range tokenAddrs { + updates = append(updates, TokenPrice{ + TokenAddr: addr, + TokenPrice: assets.NewWei(new(big.Int).Rand(r, big.NewInt(1e18))), + }) + } + return updates +} + +func generateTokenPrices(tokenAddr string, n int) []TokenPrice { + updates := make([]TokenPrice, n) for i := 0; i < n; i++ { - row := TokenPriceUpdate{ + row := TokenPrice{ TokenAddr: tokenAddr, TokenPrice: assets.NewWei(new(big.Int).Mul(big.NewInt(1e18), big.NewInt(int64(i)))), } @@ -134,20 +150,20 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[uint64]GasPriceUpdate) + expectedPrices := make(map[uint64]GasPrice) for i := 0; i < numJobs; i++ { for selector, updatesPerSelector := range updates { lastIndex := len(updatesPerSelector) - 1 - err := orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[:lastIndex]) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[:lastIndex]) assert.NoError(t, err) - err = orm.InsertGasPricesForDestChain(ctx, destSelector, int32(i), updatesPerSelector[lastIndex:]) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector[lastIndex:]) assert.NoError(t, err) expectedPrices[selector] = updatesPerSelector[lastIndex] @@ -156,7 +172,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { // verify number of rows inserted numRows := getGasTableRowCount(t, db) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector, numRows) + assert.Equal(t, numSourceChainSelectors, numRows) prices, err := orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -170,15 +186,15 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []GasPriceUpdate + var combinedUpdates []GasPrice for selector, updatesPerSelector := range updates { combinedUpdates = append(combinedUpdates, updatesPerSelector[0]) expectedPrices[selector] = updatesPerSelector[0] } - err = orm.InsertGasPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + _, err = orm.UpsertGasPricesForDestChain(ctx, destSelector, combinedUpdates) assert.NoError(t, err) - assert.Equal(t, numJobs*numSourceChainSelectors*numUpdatesPerSourceSelector+numSourceChainSelectors, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) prices, err = orm.GetGasPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -190,7 +206,7 @@ func TestORM_InsertAndGetGasPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteGasPrices(t *testing.T) { +func TestORM_UpsertGasPrices(t *testing.T) { t.Parallel() ctx := testutils.Context(t) @@ -202,13 +218,13 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { sourceSelectors := generateChainSelectors(numSourceChainSelectors) - updates := make(map[uint64][]GasPriceUpdate) + updates := make(map[uint64][]GasPrice) for _, selector := range sourceSelectors { - updates[selector] = generateGasPriceUpdates(selector, numUpdatesPerSourceSelector) + updates[selector] = generateGasPrices(selector, numUpdatesPerSourceSelector) } for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } @@ -217,21 +233,11 @@ func TestORM_InsertAndDeleteGasPrices(t *testing.T) { // insert for the 2nd time after interimTimeStamp for _, updatesPerSelector := range updates { - err := orm.InsertGasPricesForDestChain(ctx, destSelector, 1, updatesPerSelector) + _, err := orm.UpsertGasPricesForDestChain(ctx, destSelector, updatesPerSelector) assert.NoError(t, err) } - assert.Equal(t, 2*numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by sleepSec should delete rows inserted before it - err := orm.ClearGasPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numSourceChainSelectors*numUpdatesPerSourceSelector, getGasTableRowCount(t, db)) - - // clear by 0 expiration seconds should delete all rows - err = orm.ClearGasPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getGasTableRowCount(t, db)) + assert.Equal(t, numSourceChainSelectors, getGasTableRowCount(t, db)) } func TestORM_InsertAndGetTokenPrices(t *testing.T) { @@ -247,20 +253,20 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { addrs := generateTokenAddresses(numAddresses) - updates := make(map[string][]TokenPriceUpdate) + updates := make(map[string][]TokenPrice) for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) + updates[addr] = generateTokenPrices(addr, numUpdatesPerAddress) } // 5 jobs, each inserting prices for 10 chains, with 20 updates per chain. - expectedPrices := make(map[string]TokenPriceUpdate) + expectedPrices := make(map[string]TokenPrice) for i := 0; i < numJobs; i++ { for addr, updatesPerAddr := range updates { lastIndex := len(updatesPerAddr) - 1 - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[:lastIndex]) + _, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[:lastIndex], 0) assert.NoError(t, err) - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, int32(i), updatesPerAddr[lastIndex:]) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, updatesPerAddr[lastIndex:], 0) assert.NoError(t, err) expectedPrices[addr] = updatesPerAddr[lastIndex] @@ -269,7 +275,7 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { // verify number of rows inserted numRows := getTokenTableRowCount(t, db) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress, numRows) + assert.Equal(t, numAddresses, numRows) prices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -283,15 +289,15 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } // after the initial inserts, insert new round of prices, 1 price per selector this time - var combinedUpdates []TokenPriceUpdate + var combinedUpdates []TokenPrice for addr, updatesPerAddr := range updates { combinedUpdates = append(combinedUpdates, updatesPerAddr[0]) expectedPrices[addr] = updatesPerAddr[0] } - err = orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, combinedUpdates) + _, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, combinedUpdates, 0) assert.NoError(t, err) - assert.Equal(t, numJobs*numAddresses*numUpdatesPerAddress+numAddresses, getTokenTableRowCount(t, db)) + assert.Equal(t, numAddresses, getTokenTableRowCount(t, db)) prices, err = orm.GetTokenPricesByDestChain(ctx, destSelector) assert.NoError(t, err) @@ -303,46 +309,68 @@ func TestORM_InsertAndGetTokenPrices(t *testing.T) { } } -func TestORM_InsertAndDeleteTokenPrices(t *testing.T) { +func TestORM_InsertTokenPricesWhenExpired(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - - orm, db := setupORM(t) + orm, _ := setupORM(t) numAddresses := 10 - numUpdatesPerAddress := 20 - destSelector := uint64(1) - + destSelector := rand.Uint64() addrs := generateTokenAddresses(numAddresses) + initTokenUpdates := generateRandomTokenPrices(addrs) - updates := make(map[string][]TokenPriceUpdate) - for _, addr := range addrs { - updates[addr] = generateTokenPriceUpdates(addr, numUpdatesPerAddress) - } + // Insert the first time, table is initialized + rowsUpdated, err := orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) - assert.NoError(t, err) - } + //time.Sleep(100 * time.Millisecond) - sleepSec := 2 - time.Sleep(time.Duration(sleepSec) * time.Second) + // Insert the second time, no updates, because prices haven't changed + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, initTokenUpdates, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) - // insert for the 2nd time after interimTimeStamp - for _, updatesPerAddr := range updates { - err := orm.InsertTokenPricesForDestChain(ctx, destSelector, 1, updatesPerAddr) - assert.NoError(t, err) + // There are new prices, but we still haven't reached interval + newPrices := generateRandomTokenPrices(addrs) + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Minute) + require.NoError(t, err) + assert.Equal(t, int64(0), rowsUpdated) + + time.Sleep(100 * time.Millisecond) + + // Again with the same new prices, but this time interval is reached + rowsUpdated, err = orm.UpsertTokenPricesForDestChain(ctx, destSelector, newPrices, time.Millisecond) + require.NoError(t, err) + assert.Equal(t, int64(numAddresses), rowsUpdated) + + dbTokenPrices, err := orm.GetTokenPricesByDestChain(ctx, destSelector) + require.NoError(t, err) + assert.Len(t, dbTokenPrices, numAddresses) + + dbTokenPricesByAddr := toTokensByAddress(dbTokenPrices) + for _, tkPrice := range newPrices { + dbToken, ok := dbTokenPricesByAddr[tkPrice.TokenAddr] + assert.True(t, ok) + assert.Equal(t, dbToken, tkPrice.TokenPrice) } +} - assert.Equal(t, 2*numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) +func Benchmark_UpsertsTheSameTokenPrices(b *testing.B) { + db := pgtest.NewSqlxDB(b) + orm, err := NewORM(db, logger.NullLogger) + require.NoError(b, err) - // clear by sleepSec should delete rows inserted before it - err := orm.ClearTokenPricesByDestChain(ctx, destSelector, sleepSec) - assert.NoError(t, err) - assert.Equal(t, numAddresses*numUpdatesPerAddress, getTokenTableRowCount(t, db)) + ctx := testutils.Context(b) + numAddresses := 50 + destSelector := rand.Uint64() + addrs := generateTokenAddresses(numAddresses) + tokenUpdates := generateRandomTokenPrices(addrs) - // clear by 0 expiration seconds should delete all rows - err = orm.ClearTokenPricesByDestChain(ctx, destSelector, 0) - assert.NoError(t, err) - assert.Equal(t, 0, getTokenTableRowCount(t, db)) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err1 := orm.UpsertTokenPricesForDestChain(ctx, destSelector, tokenUpdates, time.Second) + require.NoError(b, err1) + } } diff --git a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go index 5fb9733cc64..771fdd322f3 100644 --- a/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go +++ b/core/services/ocr2/plugins/ccip/ccipcommit/initializers.go @@ -156,7 +156,7 @@ func NewCommitServices(ctx context.Context, ds sqlutil.DataSource, srcProvider c onRampAddress, ) - orm, err := cciporm.NewORM(ds) + orm, err := cciporm.NewObservedORM(ds, lggr) if err != nil { return nil, err } diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go index ad44555477f..2806c26e220 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service.go @@ -14,7 +14,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" cciptypes "github.com/smartcontractkit/chainlink-common/pkg/types/ccip" - "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -24,6 +23,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/ccipdata" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/internal/pricegetter" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" + "github.com/smartcontractkit/chainlink/v2/core/utils" ) // PriceService manages DB access for gas and token price data. @@ -49,23 +49,11 @@ const ( // Token prices are refreshed every 10 minutes, we only report prices for blue chip tokens, DS&A simulation show // their prices are stable, 10-minute resolution is accurate enough. tokenPriceUpdateInterval = 10 * time.Minute - - // Prices should expire after 25 minutes in DB. Prices should be fresh in the Commit plugin. - // 25 min provides sufficient buffer for the Commit plugin to withstand transient price update outages, while - // surfacing price update outages quickly enough. - priceExpireThreshold = 25 * time.Minute - - // Cleanups are called every 10 minutes. For a given job, on average we may expect 3 token prices and 1 gas price. - // 10 minutes should result in ~13 rows being cleaned up per job, it is not a heavy load on DB, so there is no need - // to run cleanup more frequently. We shouldn't clean up less frequently than `priceExpireThreshold`. - priceCleanupInterval = 10 * time.Minute ) type priceService struct { - priceExpireThreshold time.Duration - cleanupInterval time.Duration - gasUpdateInterval time.Duration - tokenUpdateInterval time.Duration + gasUpdateInterval time.Duration + tokenUpdateInterval time.Duration lggr logger.Logger orm cciporm.ORM @@ -100,10 +88,8 @@ func NewPriceService( ctx, cancel := context.WithCancel(context.Background()) pw := &priceService{ - priceExpireThreshold: priceExpireThreshold, - cleanupInterval: utils.WithJitter(priceCleanupInterval), // use WithJitter to avoid multiple services impacting DB at same time - gasUpdateInterval: utils.WithJitter(gasPriceUpdateInterval), - tokenUpdateInterval: utils.WithJitter(tokenPriceUpdateInterval), + gasUpdateInterval: gasPriceUpdateInterval, + tokenUpdateInterval: tokenPriceUpdateInterval, lggr: lggr, orm: orm, @@ -142,13 +128,11 @@ func (p *priceService) Close() error { } func (p *priceService) run() { - cleanupTicker := time.NewTicker(p.cleanupInterval) - gasUpdateTicker := time.NewTicker(p.gasUpdateInterval) - tokenUpdateTicker := time.NewTicker(p.tokenUpdateInterval) + gasUpdateTicker := time.NewTicker(utils.WithJitter(p.gasUpdateInterval)) + tokenUpdateTicker := time.NewTicker(utils.WithJitter(p.tokenUpdateInterval)) go func() { defer p.wg.Done() - defer cleanupTicker.Stop() defer gasUpdateTicker.Stop() defer tokenUpdateTicker.Stop() @@ -156,11 +140,6 @@ func (p *priceService) run() { select { case <-p.backgroundCtx.Done(): return - case <-cleanupTicker.C: - err := p.runCleanup(p.backgroundCtx) - if err != nil { - p.lggr.Errorw("Error when cleaning up in-db prices in the background", "err", err) - } case <-gasUpdateTicker.C: err := p.runGasPriceUpdate(p.backgroundCtx) if err != nil { @@ -240,28 +219,6 @@ func (p *priceService) GetGasAndTokenPrices(ctx context.Context, destChainSelect return gasPrices, tokenPrices, nil } -func (p *priceService) runCleanup(ctx context.Context) error { - eg := new(errgroup.Group) - - eg.Go(func() error { - err := p.orm.ClearGasPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing gas prices: %w", err) - } - return nil - }) - - eg.Go(func() error { - err := p.orm.ClearTokenPricesByDestChain(ctx, p.destChainSelector, int(p.priceExpireThreshold.Seconds())) - if err != nil { - return fmt.Errorf("error clearing token prices: %w", err) - } - return nil - }) - - return eg.Wait() -} - func (p *priceService) runGasPriceUpdate(ctx context.Context) error { // Protect against concurrent updates of `gasPriceEstimator` and `destPriceRegistryReader` // Price updates happen infrequently - once every `gasPriceUpdateInterval` seconds. @@ -446,28 +403,29 @@ func (p *priceService) observeTokenPriceUpdates( return tokenPricesUSD, nil } -func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) (err error) { +func (p *priceService) writeGasPricesToDB(ctx context.Context, sourceGasPriceUSD *big.Int) error { if sourceGasPriceUSD == nil { return nil } - return p.orm.InsertGasPricesForDestChain(ctx, p.destChainSelector, p.jobId, []cciporm.GasPriceUpdate{ + _, err := p.orm.UpsertGasPricesForDestChain(ctx, p.destChainSelector, []cciporm.GasPrice{ { SourceChainSelector: p.sourceChainSelector, GasPrice: assets.NewWei(sourceGasPriceUSD), }, }) + return err } -func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) (err error) { +func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD map[cciptypes.Address]*big.Int) error { if tokenPricesUSD == nil { return nil } - var tokenPrices []cciporm.TokenPriceUpdate + var tokenPrices []cciporm.TokenPrice for token, price := range tokenPricesUSD { - tokenPrices = append(tokenPrices, cciporm.TokenPriceUpdate{ + tokenPrices = append(tokenPrices, cciporm.TokenPrice{ TokenAddr: string(token), TokenPrice: assets.NewWei(price), }) @@ -478,7 +436,8 @@ func (p *priceService) writeTokenPricesToDB(ctx context.Context, tokenPricesUSD return tokenPrices[i].TokenAddr < tokenPrices[j].TokenAddr }) - return p.orm.InsertTokenPricesForDestChain(ctx, p.destChainSelector, p.jobId, tokenPrices) + _, err := p.orm.UpsertTokenPricesForDestChain(ctx, p.destChainSelector, tokenPrices, p.tokenUpdateInterval) + return err } // Input price is USD per full token, with 18 decimal precision diff --git a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go index 0468c3addb8..a25c5d3c47e 100644 --- a/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go +++ b/core/services/ocr2/plugins/ccip/internal/ccipdb/price_service_test.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" cciporm "github.com/smartcontractkit/chainlink/v2/core/services/ccip" @@ -30,81 +29,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/ccip/prices" ) -func TestPriceService_priceCleanup(t *testing.T) { - lggr := logger.TestLogger(t) - jobId := int32(1) - destChainSelector := uint64(12345) - sourceChainSelector := uint64(67890) - - testCases := []struct { - name string - gasPriceError bool - tokenPriceError bool - expectedErr bool - }{ - { - name: "ORM called successfully", - gasPriceError: false, - tokenPriceError: false, - expectedErr: false, - }, - { - name: "gasPrice clear failed", - gasPriceError: true, - tokenPriceError: false, - expectedErr: true, - }, - { - name: "tokenPrice clear failed", - gasPriceError: false, - tokenPriceError: true, - expectedErr: true, - }, - { - name: "both ORM calls failed", - gasPriceError: true, - tokenPriceError: true, - expectedErr: true, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctx := tests.Context(t) - - var gasPricesError error - var tokenPricesError error - if tc.gasPriceError { - gasPricesError = fmt.Errorf("gas prices error") - } - if tc.tokenPriceError { - tokenPricesError = fmt.Errorf("token prices error") - } - - mockOrm := ccipmocks.NewORM(t) - mockOrm.On("ClearGasPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(gasPricesError).Once() - mockOrm.On("ClearTokenPricesByDestChain", ctx, destChainSelector, int(priceExpireThreshold.Seconds())).Return(tokenPricesError).Once() - - priceService := NewPriceService( - lggr, - mockOrm, - jobId, - destChainSelector, - sourceChainSelector, - "", - nil, - nil, - ).(*priceService) - err := priceService.runCleanup(ctx) - if tc.expectedErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - } - }) - } -} - func TestPriceService_writeGasPrices(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) @@ -113,7 +37,7 @@ func TestPriceService_writeGasPrices(t *testing.T) { gasPrice := big.NewInt(1e18) - expectedGasPriceUpdate := []cciporm.GasPriceUpdate{ + expectedGasPriceUpdate := []cciporm.GasPrice{ { SourceChainSelector: sourceChainSelector, GasPrice: assets.NewWei(gasPrice), @@ -147,7 +71,7 @@ func TestPriceService_writeGasPrices(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertGasPricesForDestChain", ctx, destChainSelector, jobId, expectedGasPriceUpdate).Return(gasPricesError).Once() + mockOrm.On("UpsertGasPricesForDestChain", ctx, destChainSelector, expectedGasPriceUpdate).Return(int64(0), gasPricesError).Once() priceService := NewPriceService( lggr, @@ -180,7 +104,7 @@ func TestPriceService_writeTokenPrices(t *testing.T) { "0x234": big.NewInt(3e18), } - expectedTokenPriceUpdate := []cciporm.TokenPriceUpdate{ + expectedTokenPriceUpdate := []cciporm.TokenPrice{ { TokenAddr: "0x123", TokenPrice: assets.NewWei(big.NewInt(2e18)), @@ -218,7 +142,8 @@ func TestPriceService_writeTokenPrices(t *testing.T) { } mockOrm := ccipmocks.NewORM(t) - mockOrm.On("InsertTokenPricesForDestChain", ctx, destChainSelector, jobId, expectedTokenPriceUpdate).Return(tokenPricesError).Once() + mockOrm.On("UpsertTokenPricesForDestChain", ctx, destChainSelector, expectedTokenPriceUpdate, tokenPriceUpdateInterval). + Return(int64(len(expectedTokenPriceUpdate)), tokenPricesError).Once() priceService := NewPriceService( lggr, @@ -802,7 +727,7 @@ func setupORM(t *testing.T) cciporm.ORM { t.Helper() db := pgtest.NewSqlxDB(t) - orm, err := cciporm.NewORM(db) + orm, err := cciporm.NewORM(db, logger.TestLogger(t)) require.NoError(t, err) @@ -824,7 +749,7 @@ func checkResultLen(t *testing.T, priceService PriceService, destChainSelector u return nil } -func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { +func TestPriceService_priceWriteInBackground(t *testing.T) { lggr := logger.TestLogger(t) jobId := int32(1) destChainSelector := uint64(12345) @@ -896,16 +821,11 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { gasUpdateInterval := 2000 * time.Millisecond tokenUpdateInterval := 5000 * time.Millisecond - cleanupInterval := 3000 * time.Millisecond // run gas price task every 2 second priceService.gasUpdateInterval = gasUpdateInterval // run token price task every 5 second priceService.tokenUpdateInterval = tokenUpdateInterval - // run cleanup every 3 seconds - priceService.cleanupInterval = cleanupInterval - // expire all prices during every cleanup - priceService.priceExpireThreshold = time.Duration(0) // initially, db is empty assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) @@ -918,24 +838,5 @@ func TestPriceService_priceWriteAndCleanupInBackground(t *testing.T) { assert.NoError(t, err) assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens))) - // eventually prices will be cleaned - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 0, 0) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - - // then prices will be updated again - assert.Eventually(t, func() bool { - err := checkResultLen(t, priceService, destChainSelector, 1, len(laneTokens)) - return err == nil - }, testutils.WaitTimeout(t), testutils.TestInterval) - assert.NoError(t, priceService.Close()) - assert.NoError(t, priceService.runCleanup(ctx)) - - // after stopping PriceService and runCleanup, no more updates are inserted - for i := 0; i < 5; i++ { - time.Sleep(time.Second) - assert.NoError(t, checkResultLen(t, priceService, destChainSelector, 0, 0)) - } } diff --git a/core/store/migrate/migrations/0250_ccip_token_prices_fix.sql b/core/store/migrate/migrations/0250_ccip_token_prices_fix.sql new file mode 100644 index 00000000000..6c6cf02b43f --- /dev/null +++ b/core/store/migrate/migrations/0250_ccip_token_prices_fix.sql @@ -0,0 +1,49 @@ +-- +goose Up + +-- We need to re-create tables from scratch because of the unique constraint on tokens and chains selectors +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (chain_selector, token_addr) +); + +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + PRIMARY KEY (chain_selector, source_chain_selector) +); + +-- +goose Down +DROP TABLE ccip.observed_token_prices; +DROP TABLE ccip.observed_gas_prices; + +-- Restore state from migration 0236_ccip_prices_cache.sql +CREATE TABLE ccip.observed_gas_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + source_chain_selector NUMERIC(20, 0) NOT NULL, + gas_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE ccip.observed_token_prices +( + chain_selector NUMERIC(20, 0) NOT NULL, + job_id INTEGER NOT NULL, + token_addr BYTEA NOT NULL, + token_price NUMERIC(78, 0) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_ccip_gas_prices_chain_gas_price_timestamp ON ccip.observed_gas_prices (chain_selector, source_chain_selector, created_at DESC); +CREATE INDEX idx_ccip_token_prices_token_price_timestamp ON ccip.observed_token_prices (chain_selector, token_addr, created_at DESC);