Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingestion filtering should use OR logic for rules rather than AND #5303

Merged
merged 13 commits into from
May 10, 2024
Merged
30 changes: 17 additions & 13 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,41 +26,45 @@ func NewAccountFilter() AccountFilter {
}
}

func (filter *accountFilter) Name() string {
func (f *accountFilter) Name() string {
return "filters.accountFilter"
}

func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
func (f *accountFilter) RefreshAccountFilter(filterConfig *history.AccountFilterConfig) error {
// only need to re-initialize the filter config state(rules) if its cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Account Filter config detected, reloading new config %v ", *filterConfig)

filter.enabled = filterConfig.Enabled
filter.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.whitelistedAccountsSet = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
if len(f.whitelistedAccountsSet) == 0 || !f.enabled {
return true, nil
func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) {
if !f.isEnabled() {
return false, true, nil
}

participants, err := processors.ParticipantsForTransaction(0, transaction)
if err != nil {
return false, err
return true, false, err
}

// NOTE: this assumes that the participant list has a small memory footprint
// otherwise, we should be doing the filtering on the DB side
for _, p := range participants {
if f.whitelistedAccountsSet.Contains(p.Address()) {
return true, nil
return true, true, nil
}
}
return false, nil
return true, false, nil
}

func (f accountFilter) isEnabled() bool {
// filtering is disabled if the whitelist is empty for now, as that is the only filter rule
return len(f.whitelistedAccountsSet) >= 1 && f.enabled
}
12 changes: 8 additions & 4 deletions services/horizon/internal/ingest/filters/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ func TestAccountFilterAllowsWhenMatch(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, true)
}

Expand All @@ -47,13 +48,14 @@ func TestAccountFilterAllowsWhenDisabled(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)

// there is no match on filter rule, but since filter is disabled, it should allow all
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -70,11 +72,12 @@ func TestAccountFilterAllowsWhenEmptyWhitelist(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, false)
tt.Equal(result, true)
}

Expand All @@ -92,11 +95,12 @@ func TestAccountFilterDoesNotAllowWhenNoMatch(t *testing.T) {
err := filter.RefreshAccountFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
isEnabled, result, err := filter.FilterTransaction(ctx, getAccountTestTx(t,
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H",
"GBRPYHIL2CI3FNQ4BXLFMNDLFJUNPU2HY3ZMFSHONUCEOASW7QC7OX2H"))

tt.NoError(err)
tt.Equal(isEnabled, true)
tt.Equal(result, false)
}

Expand Down
28 changes: 16 additions & 12 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,26 @@ func NewAssetFilter() AssetFilter {
}
}

func (filter *assetFilter) Name() string {
func (f *assetFilter) Name() string {
return "filters.assetFilter"
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
func (f *assetFilter) RefreshAssetFilter(filterConfig *history.AssetFilterConfig) error {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > filter.lastModified {
if filterConfig.LastModified > f.lastModified {
logger.Infof("New Asset Filter config detected, reloading new config %v ", *filterConfig)
filter.enabled = filterConfig.Enabled
filter.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
filter.lastModified = filterConfig.LastModified
f.enabled = filterConfig.Enabled
f.canonicalAssetsLookup = listToSet(filterConfig.Whitelist)
f.lastModified = filterConfig.LastModified
}

return nil
}

func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
if len(f.canonicalAssetsLookup) < 1 || !f.enabled {
return true, nil
func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error) {
if !f.isEnabled() {
return false, true, nil
}

var operations []xdr.Operation
Expand All @@ -68,11 +67,11 @@ func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.
}

if f.filterOperationsMatchedOnRules(operations) {
return true, nil
return true, true, nil
}

logger.Debugf("No match, dropped tx with seq %v ", transaction.Envelope.SeqNum())
return false, nil
return true, false, nil
}

func (f assetFilter) filterOperationsMatchedOnRules(operations []xdr.Operation) bool {
Expand Down Expand Up @@ -144,3 +143,8 @@ func listToSet(list []string) set.Set[string] {
}
return set
}

func (f assetFilter) isEnabled() bool {
// filtering is disabled if the whitelist is empty for now as that is the only filter rule
return len(f.canonicalAssetsLookup) >= 1 && f.enabled
}
14 changes: 7 additions & 7 deletions services/horizon/internal/ingest/filters/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ func TestAssetFilterAllowsOnMatch(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
tt.NoError(err)
tt.Equal(result, true)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(result, true)
}
Expand All @@ -47,11 +47,11 @@ func TestAssetFilterAllowsWhenEmptyWhitelist(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
tt.NoError(err)
tt.Equal(result, true)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(result, true)
}
Expand All @@ -69,7 +69,7 @@ func TestAssetFilterAllowsWhenDisabled(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
tt.NoError(err)
// there was no match on filter rules, but since filter was disabled also, it should allow all
tt.Equal(result, true)
Expand All @@ -89,11 +89,11 @@ func TestAssetFilterDoesNotAllowV1WhenNoMatch(t *testing.T) {
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err := filter.FilterTransaction(ctx, getAssetTestV1Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(result, false)

result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
_, result, err = filter.FilterTransaction(ctx, getAssetTestV0Tx(t, "GD6WNNTW664WH7FXC5RUMUTF7P5QSURC2IT36VOQEEGFZ4UWUEQGECAL"))
tt.NoError(err)
tt.Equal(result, false)
}
Expand Down
26 changes: 18 additions & 8 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,31 @@ func (g *groupTransactionFilterers) Name() string {
return "groupTransactionFilterers"
}

func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, error) {
func (g *groupTransactionFilterers) FilterTransaction(ctx context.Context, tx ingest.LedgerTransaction) (bool, bool, error) {
filtersEnabled := false

for _, f := range g.filterers {
startTime := time.Now()
include, err := f.FilterTransaction(ctx, tx)
filterEnabled, include, err := f.FilterTransaction(ctx, tx)
if !filterEnabled {
continue
}

filtersEnabled = true
if err != nil {
return false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
return true, false, errors.Wrapf(err, "error in %T.FilterTransaction", f)
}
g.AddRunDuration(f.Name(), startTime)
if !include {
// filter out, we can return early
g.droppedTransactions++
return false, nil
if include {
return true, true, nil
}
}
return true, nil

if filtersEnabled {
g.droppedTransactions++
return true, false, nil
}
return false, true, nil
}

func (g *groupTransactionFilterers) ResetStats() {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/processors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type LedgerTransactionProcessor interface {

type LedgerTransactionFilterer interface {
Name() string
FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error)
FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, bool, error)
}

func StreamLedgerTransactions(
Expand All @@ -47,7 +47,7 @@ func StreamLedgerTransactions(
if err != nil {
return errors.Wrap(err, "could not read transaction")
}
include, err := txFilterer.FilterTransaction(ctx, tx)
_, include, err := txFilterer.FilterTransaction(ctx, tx)
if err != nil {
return errors.Wrapf(
err,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/stellar/go/clients/horizonclient"
hProtocol "github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/services/horizon/internal/test/integration"
"github.com/stellar/go/txnbuild"
"github.com/stretchr/testify/assert"
)

func TestFilteringWithNoFilters(t *testing.T) {
Expand Down Expand Up @@ -168,3 +169,67 @@ func TestFilteringAssetWhiteList(t *testing.T) {
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}

func TestFilteringAssetAndAccountFilters(t *testing.T) {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
tt := assert.New(t)
const adminPort uint16 = 6000
itest := integration.NewTest(t, integration.Config{
HorizonIngestParameters: map[string]string{
"admin-port": strconv.Itoa(int(adminPort)),
},
})

fullKeys, accounts := itest.CreateAccounts(2, "10000")
whitelistedAccount := accounts[0]
whitelistedAccountKey := fullKeys[0]
nonWhitelistedAccount := accounts[1]
nonWhitelistedAccountKey := fullKeys[1]
enabled := true

whitelistedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()}
nonWhitelistedAsset := txnbuild.CreditAsset{Code: "SEK", Issuer: nonWhitelistedAccountKey.Address()}
itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, nonWhitelistedAsset)

// Setup whitelisted account and asset rule, force refresh of filter configs to be quick
filters.SetFilterConfigCheckIntervalSeconds(1)

expectedAccountFilter := hProtocol.AccountFilterConfig{
Whitelist: []string{whitelistedAccount.GetAccountID()},
Enabled: &enabled,
}
err := itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter)
tt.NoError(err)
accountFilter, err := itest.AdminClient().GetIngestionAccountFilter()
tt.NoError(err)
tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist)
tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled)

asset, err := whitelistedAsset.ToXDR()
tt.NoError(err)
expectedAssetFilter := hProtocol.AssetFilterConfig{
Whitelist: []string{asset.StringCanonical()},
Enabled: &enabled,
}
err = itest.AdminClient().SetIngestionAssetFilter(expectedAssetFilter)
tt.NoError(err)
assetFilter, err := itest.AdminClient().GetIngestionAssetFilter()
tt.NoError(err)

tt.ElementsMatch(expectedAssetFilter.Whitelist, assetFilter.Whitelist)
tt.Equal(expectedAssetFilter.Enabled, assetFilter.Enabled)

// Ensure the latest filter configs are reloaded by the ingestion state machine processor
time.Sleep(filters.GetFilterConfigCheckIntervalSeconds() * time.Second)

// Use a non-whitelisted account to submit a non-whitelisted asset to a whitelisted account.
// The transaction should be stored.
txResp := itest.MustSubmitOperations(nonWhitelistedAccount, nonWhitelistedAccountKey,
&txnbuild.Payment{
Destination: whitelistedAccount.GetAccountID(),
Amount: "10",
Asset: nonWhitelistedAsset,
},
)
_, err = itest.Client().TransactionDetail(txResp.Hash)
tt.NoError(err)
}
Loading