diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index 1d44eb5ec0..7a89844062 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). ## Unreleased - + +### Breaking Changes + +- Change ingestion filtering logic to store transactions if any filter matches on it. ([5303](https://github.com/stellar/go/pull/5303)) + - The previous behaviour was to store a tx only if both asset and account filters match together. So even if a tx matched an account filter but failed to match an asset filter, it would not be stored by Horizon. + ## 2.30.0 **This release adds support for Protocol 21** diff --git a/services/horizon/internal/ingest/filters/account.go b/services/horizon/internal/ingest/filters/account.go index 08def6a2b7..1601314c75 100644 --- a/services/horizon/internal/ingest/filters/account.go +++ b/services/horizon/internal/ingest/filters/account.go @@ -26,41 +26,45 @@ func NewAccountFilter() AccountFilter { } } -func (filter *accountFilter) Name() string { +func (f *accountFilter) Name() string { return "filters.accountFilter" } -func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error { +func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error { // only need to re-initialize the filter config state(rules) if its cached version(in memory) // is older than the incoming config version based on lastModified epoch timestamp - if filterConfig.LastModified > filter.lastModified { + if filterConfig.LastModified > f.lastModified { logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig) - filter.enabled = filterConfig.Enabled - filter.whitelistedAccountsSet = listToSet(filterConfig.Whitelist) - filter.lastModified = filterConfig.LastModified + f.enabled = filterConfig.Enabled + f.whitelistedAccountsSet = listToSet(filterConfig.Whitelist) + f.lastModified = filterConfig.LastModified } return nil } -func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - // filtering is disabled if the whitelist is empty for now, as that is the only filter rule - if len(f.whitelistedAccountsSet) == 0 || !f.enabled { - return true, nil +func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) { + if !f.isEnabled() { + return false, true, nil } participants, err := processors.ParticipantsForTransaction(0, transaction) if err != nil { - return false, err + return true, false, err } // NOTE: this assumes that the participant list has a small memory footprint // otherwise, we should be doing the filtering on the DB side for _, p := range participants { if f.whitelistedAccountsSet.Contains(p.Address()) { - return true, nil + return true, true, nil } } - return false, nil + return true, false, nil +} + +func (f accountFilter) isEnabled() bool { + // filtering is disabled if the whitelist is empty for now, as that is the only filter rule + return len(f.whitelistedAccountsSet) >= 1 && f.enabled } diff --git a/services/horizon/internal/ingest/filters/account_test.go b/services/horizon/internal/ingest/filters/account_test.go index 1831a6a6e5..17f290a460 100644 --- a/services/horizon/internal/ingest/filters/account_test.go +++ b/services/horizon/internal/ingest/filters/account_test.go @@ -26,11 +26,12 @@ func TestAccountFilterAllowsWhenMatch(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) } @@ -47,13 +48,14 @@ func TestAccountFilterAllowsWhenDisabled(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) // there is no match on filter rule, but since filter is disabled, it should allow all + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -70,11 +72,12 @@ func TestAccountFilterAllowsWhenEmptyWhitelist(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -92,11 +95,12 @@ func TestAccountFilterDoesNotAllowWhenNoMatch(t *testing.T) { err := filter.RefreshAccountFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, + isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t, "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H", "GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) } diff --git a/services/horizon/internal/ingest/filters/asset.go b/services/horizon/internal/ingest/filters/asset.go index 0a3a4ca6c7..4aa6ebc9b3 100644 --- a/services/horizon/internal/ingest/filters/asset.go +++ b/services/horizon/internal/ingest/filters/asset.go @@ -34,27 +34,26 @@ func NewAssetFilter() AssetFilter { } } -func (filter *assetFilter) Name() string { +func (f *assetFilter) Name() string { return "filters.assetFilter" } -func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error { +func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error { // only need to re-initialize the filter config state(rules) if it's cached version(in memory) // is older than the incoming config version based on lastModified epoch timestamp - if filterConfig.LastModified > filter.lastModified { + if filterConfig.LastModified > f.lastModified { logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig) - filter.enabled = filterConfig.Enabled - filter.canonicalAssetsLookup = listToSet(filterConfig.Whitelist) - filter.lastModified = filterConfig.LastModified + f.enabled = filterConfig.Enabled + f.canonicalAssetsLookup = listToSet(filterConfig.Whitelist) + f.lastModified = filterConfig.LastModified } return nil } -func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) { - // filtering is disabled if the whitelist is empty for now as that is the only filter rule - if len(f.canonicalAssetsLookup) < 1 || !f.enabled { - return true, nil +func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) { + if !f.isEnabled() { + return false, true, nil } var operations []xdr.Operation @@ -68,11 +67,11 @@ func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest. } if f.filterOperationsMatchedOnRules(operations) { - return true, nil + return true, true, nil } logger.Debugf("No match, dropped tx with seq %v ", transaction.Envelope.SeqNum()) - return false, nil + return true, false, nil } func (f assetFilter) filterOperationsMatchedOnRules(operations []xdr.Operation) bool { @@ -144,3 +143,8 @@ func listToSet(list []string) set.Set[string] { } return set } + +func (f assetFilter) isEnabled() bool { + // filtering is disabled if the whitelist is empty for now as that is the only filter rule + return len(f.canonicalAssetsLookup) >= 1 && f.enabled +} diff --git a/services/horizon/internal/ingest/filters/asset_test.go b/services/horizon/internal/ingest/filters/asset_test.go index 3da23bc440..e0deed7771 100644 --- a/services/horizon/internal/ingest/filters/asset_test.go +++ b/services/horizon/internal/ingest/filters/asset_test.go @@ -25,12 +25,14 @@ func TestAssetFilterAllowsOnMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, true) } @@ -47,12 +49,14 @@ func TestAssetFilterAllowsWhenEmptyWhitelist(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -69,9 +73,10 @@ func TestAssetFilterAllowsWhenDisabled(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) // there was no match on filter rules, but since filter was disabled also, it should allow all + tt.Equal(isEnabled, false) tt.Equal(result, true) } @@ -89,12 +94,14 @@ func TestAssetFilterDoesNotAllowV1WhenNoMatch(t *testing.T) { err := filter.RefreshAssetFilter(filterConfig) tt.NoError(err) - result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) - result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) + isEnabled, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL")) tt.NoError(err) + tt.Equal(isEnabled, true) tt.Equal(result, false) } diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 00f8adde8a..4a6c2e69af 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -178,21 +178,31 @@ func (g *groupTransactionFilterers) Name() string { return "groupTransactionFilterers" } -func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) { +func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, bool, error) { + filtersEnabled := false + for _, f := range g.filterers { startTime := time.Now() - include, err := f.FilterTransaction(ctx, tx) + filterEnabled, include, err := f.FilterTransaction(ctx, tx) + if !filterEnabled { + continue + } + + filtersEnabled = true if err != nil { - return false, errors.Wrapf(err, "error in %T.FilterTransaction", f) + return true, false, errors.Wrapf(err, "error in %T.FilterTransaction", f) } g.AddRunDuration(f.Name(), startTime) - if !include { - // filter out, we can return early - g.droppedTransactions++ - return false, nil + if include { + return true, true, nil } } - return true, nil + + if filtersEnabled { + g.droppedTransactions++ + return true, false, nil + } + return false, true, nil } func (g *groupTransactionFilterers) ResetStats() { diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index 2b6c1cc8fb..5db09d9ef0 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -28,7 +28,7 @@ type LedgerTransactionProcessor interface { type LedgerTransactionFilterer interface { Name() string - FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) + FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) } func StreamLedgerTransactions( @@ -47,7 +47,7 @@ func StreamLedgerTransactions( if err != nil { return errors.Wrap(err, "could not read transaction") } - include, err := txFilterer.FilterTransaction(ctx, tx) + _, include, err := txFilterer.FilterTransaction(ctx, tx) if err != nil { return errors.Wrapf( err, diff --git a/services/horizon/internal/integration/ingestion_filtering_test.go b/services/horizon/internal/integration/ingestion_filtering_test.go index 6d0bcee69e..f35e67cada 100644 --- a/services/horizon/internal/integration/ingestion_filtering_test.go +++ b/services/horizon/internal/integration/ingestion_filtering_test.go @@ -5,12 +5,13 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stellar/go/clients/horizonclient" hProtocol "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/services/horizon/internal/ingest/filters" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/txnbuild" - "github.com/stretchr/testify/assert" ) func TestFilteringWithNoFilters(t *testing.T) { @@ -168,3 +169,67 @@ func TestFilteringAssetWhiteList(t *testing.T) { _, err = itest.Client().TransactionDetail(txResp.Hash) tt.NoError(err) } + +func TestFilteringAssetAndAccountFilters(t *testing.T) { + tt := assert.New(t) + const adminPort uint16 = 6000 + itest := integration.NewTest(t, integration.Config{ + HorizonIngestParameters: map[string]string{ + "admin-port": strconv.Itoa(int(adminPort)), + }, + }) + + fullKeys, accounts := itest.CreateAccounts(2, "10000") + whitelistedAccount := accounts[0] + whitelistedAccountKey := fullKeys[0] + nonWhitelistedAccount := accounts[1] + nonWhitelistedAccountKey := fullKeys[1] + enabled := true + + whitelistedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()} + nonWhitelistedAsset := txnbuild.CreditAsset{Code: "SEK", Issuer: nonWhitelistedAccountKey.Address()} + itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, nonWhitelistedAsset) + + // Setup whitelisted account and asset rule, force refresh of filter configs to be quick + filters.SetFilterConfigCheckIntervalSeconds(1) + + expectedAccountFilter := hProtocol.AccountFilterConfig{ + Whitelist: []string{whitelistedAccount.GetAccountID()}, + Enabled: &enabled, + } + err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter) + tt.NoError(err) + accountFilter, err := itest.AdminClient().GetIngestionAccountFilter() + tt.NoError(err) + tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist) + tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled) + + asset, err := whitelistedAsset.ToXDR() + tt.NoError(err) + expectedAssetFilter := hProtocol.AssetFilterConfig{ + Whitelist: []string{asset.StringCanonical()}, + Enabled: &enabled, + } + err = itest.AdminClient().SetIngestionAssetFilter(expectedAssetFilter) + tt.NoError(err) + assetFilter, err := itest.AdminClient().GetIngestionAssetFilter() + tt.NoError(err) + + tt.ElementsMatch(expectedAssetFilter.Whitelist, assetFilter.Whitelist) + tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled) + + // Ensure the latest filter configs are reloaded by the ingestion state machine processor + time.Sleep(filters.GetFilterConfigCheckIntervalSeconds() * time.Second) + + // Use a non-whitelisted account to submit a non-whitelisted asset to a whitelisted account. + // The transaction should be stored. + txResp := itest.MustSubmitOperations(nonWhitelistedAccount, nonWhitelistedAccountKey, + &txnbuild.Payment{ + Destination: whitelistedAccount.GetAccountID(), + Amount: "10", + Asset: nonWhitelistedAsset, + }, + ) + _, err = itest.Client().TransactionDetail(txResp.Hash) + tt.NoError(err) +}