From aa34343a958668d40d1889f346066f8aa202093f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 18 Jul 2024 10:59:39 +0100 Subject: [PATCH 01/10] Optimize query for reaping lookup tables --- services/horizon/internal/db2/history/main.go | 110 ++++++++---------- .../horizon/internal/db2/history/main_test.go | 39 ++++--- 2 files changed, 73 insertions(+), 76 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 47e8952b07..412245a118 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -994,6 +994,11 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( results := map[string]LookupTableReapResult{} for table, historyTables := range map[string][]tableObjectFieldPair{ "history_accounts": { + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + { name: "history_effects", objectField: "history_account_id", @@ -1010,10 +1015,6 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( name: "history_trades", objectField: "counter_account_id", }, - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, }, "history_assets": { { @@ -1035,34 +1036,31 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( }, "history_claimable_balances": { { - name: "history_operation_claimable_balances", + name: "history_transaction_claimable_balances", objectField: "history_claimable_balance_id", }, { - name: "history_transaction_claimable_balances", + name: "history_operation_claimable_balances", objectField: "history_claimable_balance_id", }, }, "history_liquidity_pools": { { - name: "history_operation_liquidity_pools", + name: "history_transaction_liquidity_pools", objectField: "history_liquidity_pool_id", }, { - name: "history_transaction_liquidity_pools", + name: "history_operation_liquidity_pools", objectField: "history_liquidity_pool_id", }, }, } { startTime := time.Now() - query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) - if err != nil { - return nil, errors.Wrap(err, "error constructing a query") - } + query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) // Find new offset before removing the rows var newOffset int64 - err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) + err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) if err != nil { if q.NoRows(err) { newOffset = 0 @@ -1098,17 +1096,24 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( // // delete from history_claimable_balances where id in // -// (select id from -// (select id, -// (select 1 from history_operation_claimable_balances -// where history_claimable_balance_id = hcb.id limit 1) as c1, -// (select 1 from history_transaction_claimable_balances -// where history_claimable_balance_id = hcb.id limit 1) as c2, -// 1 as cx, -// from history_claimable_balances hcb where id > 1000 order by id limit 100) -// as sub where c1 IS NULL and c2 IS NULL and 1=1); +// (SELECT e1.id FROM ( +// SELECT id FROM history_claimable_balances +// WHERE id >= 1000 +// ORDER BY id LIMIT 1000 +// ) e1 LEFT JOIN LATERAL ( +// SELECT 1 AS row +// FROM history_transaction_claimable_balances +// where history_transaction_claimable_balances.history_claimable_balance_id = e1.id +// LIMIT 1 +// ) e2 ON true LEFT JOIN LATERAL ( +// SELECT 1 AS row +// FROM history_operation_claimable_balances +// where history_operation_claimable_balances.history_claimable_balance_id = e1.id +// LIMIT 1 +// ) e3 ON true +// WHERE e2.row IS NULL AND e3.row IS NULL); // -// In short it checks the 100 rows omitting 1000 row of history_claimable_balances +// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances // and counts occurrences of each row in corresponding history tables. // If there are no history rows for a given id, the row in // history_claimable_balances is removed. @@ -1118,45 +1123,32 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( // possible that rows will be skipped from deletion. But offset is reset // when it reaches the table size so eventually all orphaned rows are // deleted. -func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) { - var sb strings.Builder - var err error - _, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table) - if err != nil { - return "", err - } - - for i, historyTable := range historyTables { - _, err = fmt.Fprintf( - &sb, - `(select 1 from %s where %s = hcb.id limit 1) as c%d, `, - historyTable.name, - historyTable.objectField, - i, +func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) string { + index := 2 + var joins []string + var conditions []string + + for _, historyTable := range historyTables { + joins = append( + joins, + fmt.Sprintf( + ` LEFT JOIN LATERAL ( SELECT 1 as row FROM %s WHERE %s.%s = e1.id LIMIT 1) e%d ON true`, + historyTable.name, + historyTable.name, historyTable.objectField, + index, + ), ) - if err != nil { - return "", err - } - } - - _, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize) - if err != nil { - return "", err + conditions = append(conditions, fmt.Sprintf("e%d.row IS NULL", index)) + index++ } - for i := range historyTables { - _, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i) - if err != nil { - return "", err - } - } - - _, err = sb.WriteString("1=1);") - if err != nil { - return "", err - } - - return sb.String(), nil + return fmt.Sprintf( + "DELETE FROM %s WHERE id IN (SELECT e1.id FROM (SELECT id FROM %s WHERE id >= %d ORDER BY id LIMIT %d) e1", + table, + table, + offset, + batchSize, + ) + strings.Join(joins, "") + fmt.Sprintf(" WHERE %s);", strings.Join(conditions, " AND ")) } // DeleteRangeAll deletes a range of rows from all history tables between diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 792f9826aa..5863e0d87d 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -4,9 +4,9 @@ import ( "testing" "time" - "github.com/stellar/go/services/horizon/internal/test" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + + "github.com/stellar/go/services/horizon/internal/test" ) func TestLatestLedger(t *testing.T) { @@ -70,9 +70,13 @@ func TestElderLedger(t *testing.T) { } func TestConstructReapLookupTablesQuery(t *testing.T) { - query, err := constructReapLookupTablesQuery( + query := constructReapLookupTablesQuery( "history_accounts", []tableObjectFieldPair{ + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, { name: "history_effects", objectField: "history_account_id", @@ -89,24 +93,25 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { name: "history_trades", objectField: "counter_account_id", }, - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, }, 10, 0, ) - require.NoError(t, err) assert.Equal(t, - "delete from history_accounts where id IN "+ - "(select id from "+ - "(select id, (select 1 from history_effects where history_account_id = hcb.id limit 1) as c0, "+ - "(select 1 from history_operation_participants where history_account_id = hcb.id limit 1) as c1, "+ - "(select 1 from history_trades where base_account_id = hcb.id limit 1) as c2, "+ - "(select 1 from history_trades where counter_account_id = hcb.id limit 1) as c3, "+ - "(select 1 from history_transaction_participants where history_account_id = hcb.id limit 1) as c4, "+ - "1 as cx from history_accounts hcb where id >= 0 order by id limit 10) as sub "+ - "where c0 IS NULL and c1 IS NULL and c2 IS NULL and c3 IS NULL and c4 IS NULL and 1=1);", query) + "DELETE FROM history_accounts WHERE id IN ("+ + "SELECT e1.id FROM ("+ + "SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id LIMIT 10) e1 "+ + "LEFT JOIN LATERAL ( "+ + "SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = e1.id LIMIT 1"+ + ") e2 ON true LEFT JOIN LATERAL ( "+ + "SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = e1.id LIMIT 1"+ + ") e3 ON true LEFT JOIN LATERAL ( "+ + "SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = e1.id LIMIT 1"+ + ") e4 ON true LEFT JOIN LATERAL ( "+ + "SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = e1.id LIMIT 1"+ + ") e5 ON true LEFT JOIN LATERAL ( "+ + "SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = e1.id LIMIT 1"+ + ") e6 ON true "+ + "WHERE e2.row IS NULL AND e3.row IS NULL AND e4.row IS NULL AND e5.row IS NULL AND e6.row IS NULL);", query) } From f338004c658f7a254402bb04ff7c012764fbcbdd Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 25 Jul 2024 08:26:09 +0100 Subject: [PATCH 02/10] use NOT EXISTS --- services/horizon/internal/db2/history/main.go | 42 +++++++------------ .../horizon/internal/db2/history/main_test.go | 20 +++------ 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 412245a118..cf333ed270 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1094,24 +1094,17 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( // constructReapLookupTablesQuery creates a query like (using history_claimable_balances // as an example): // -// delete from history_claimable_balances where id in +// delete from history_claimable_balances where id in ( // -// (SELECT e1.id FROM ( -// SELECT id FROM history_claimable_balances +// WITH ha_batch AS ( +// SELECT id +// FROM history_claimable_balances // WHERE id >= 1000 -// ORDER BY id LIMIT 1000 -// ) e1 LEFT JOIN LATERAL ( -// SELECT 1 AS row -// FROM history_transaction_claimable_balances -// where history_transaction_claimable_balances.history_claimable_balance_id = e1.id -// LIMIT 1 -// ) e2 ON true LEFT JOIN LATERAL ( -// SELECT 1 AS row -// FROM history_operation_claimable_balances -// where history_operation_claimable_balances.history_claimable_balance_id = e1.id -// LIMIT 1 -// ) e3 ON true -// WHERE e2.row IS NULL AND e3.row IS NULL); +// ORDER BY id limit 1000 +// ) SELECT e1.id as id FROM ha_batch e1 +// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1) +// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1) +// ) // // In short it checks the 1000 rows omitting 1000 row of history_claimable_balances // and counts occurrences of each row in corresponding history tables. @@ -1124,31 +1117,28 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( // when it reaches the table size so eventually all orphaned rows are // deleted. func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) string { - index := 2 - var joins []string var conditions []string for _, historyTable := range historyTables { - joins = append( - joins, + conditions = append( + conditions, fmt.Sprintf( - ` LEFT JOIN LATERAL ( SELECT 1 as row FROM %s WHERE %s.%s = e1.id LIMIT 1) e%d ON true`, + "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", historyTable.name, historyTable.name, historyTable.objectField, - index, ), ) - conditions = append(conditions, fmt.Sprintf("e%d.row IS NULL", index)) - index++ } return fmt.Sprintf( - "DELETE FROM %s WHERE id IN (SELECT e1.id FROM (SELECT id FROM %s WHERE id >= %d ORDER BY id LIMIT %d) e1", + "DELETE FROM %s WHERE id IN ("+ + "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ + "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, table, offset, batchSize, - ) + strings.Join(joins, "") + fmt.Sprintf(" WHERE %s);", strings.Join(conditions, " AND ")) + ) + strings.Join(conditions, " AND ") + ")" } // DeleteRangeAll deletes a range of rows from all history tables between diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 5863e0d87d..f2ebaed26f 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -100,18 +100,10 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { assert.Equal(t, "DELETE FROM history_accounts WHERE id IN ("+ - "SELECT e1.id FROM ("+ - "SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id LIMIT 10) e1 "+ - "LEFT JOIN LATERAL ( "+ - "SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = e1.id LIMIT 1"+ - ") e2 ON true LEFT JOIN LATERAL ( "+ - "SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = e1.id LIMIT 1"+ - ") e3 ON true LEFT JOIN LATERAL ( "+ - "SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = e1.id LIMIT 1"+ - ") e4 ON true LEFT JOIN LATERAL ( "+ - "SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = e1.id LIMIT 1"+ - ") e5 ON true LEFT JOIN LATERAL ( "+ - "SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = e1.id LIMIT 1"+ - ") e6 ON true "+ - "WHERE e2.row IS NULL AND e3.row IS NULL AND e4.row IS NULL AND e5.row IS NULL AND e6.row IS NULL);", query) + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ + "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1)", query) } From d81c37d38c5e2d46d79074629bc07e9bbaef87a7 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 25 Jul 2024 08:27:55 +0100 Subject: [PATCH 03/10] add JSON tags for logging --- services/horizon/internal/db2/history/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index cf333ed270..02c0edf143 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -972,9 +972,9 @@ type tableObjectFieldPair struct { } type LookupTableReapResult struct { - Offset int64 - RowsDeleted int64 - Duration time.Duration + Offset int64 `json:"offset"` + RowsDeleted int64 `json:"rowsDeleted"` + Duration time.Duration `json:"duration"` } // ReapLookupTables removes rows from lookup tables like history_claimable_balances From e86edf42257eff597858faeb4dc41ba159fd7e22 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 11:42:03 +0100 Subject: [PATCH 04/10] record offsets in db --- .../horizon/internal/db2/history/key_value.go | 40 +++++ services/horizon/internal/db2/history/main.go | 141 ++++++++++-------- .../horizon/internal/db2/history/reap_test.go | 2 +- services/horizon/internal/ingest/main.go | 5 +- 4 files changed, 118 insertions(+), 70 deletions(-) diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index a2a170a4b1..f9c49b04a6 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -3,9 +3,12 @@ package history import ( "context" "database/sql" + "fmt" "strconv" + "strings" sq "github.com/Masterminds/squirrel" + "github.com/stellar/go/support/errors" ) @@ -18,6 +21,7 @@ const ( stateInvalid = "exp_state_invalid" offerCompactionSequence = "offer_compaction_sequence" liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence" + lookupTableReapOffsetSuffix = "_reap_offset" ) // GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but @@ -203,6 +207,42 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) ( return value, nil } +type KeyValuePair struct { + Key string `db:"key"` + Value string `db:"value"` +} + +func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) { + keys := make([]string, 0, len(historyLookupTables)) + for table := range historyLookupTables { + keys = append(keys, table+lookupTableReapOffsetSuffix) + } + offsets := map[string]int64{} + var pairs []KeyValuePair + query := sq.Select("key", "value"). + From("key_value_store"). + Where(map[string]interface{}{ + "key": keys, + }) + err := q.Select(ctx, &pairs, query) + for _, pair := range pairs { + table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) + if _, ok := historyLookupTables[table]; !ok { + return nil, fmt.Errorf("invalid key: %s", pair.Key) + } + offset, err := strconv.ParseInt(pair.Value, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid offset: %s", pair.Value) + } + offsets[table] = offset + } + return offsets, err +} + +func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error { + return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10)) +} + // updateValueInStore updates a value for a given key in KV store func (q *Q) updateValueInStore(ctx context.Context, key, value string) error { query := sq.Insert("key_value_store"). diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 02c0edf143..90face3129 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error) + ReapLookupTables(ctx context.Context) (map[string]LookupTableReapResult, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -981,7 +981,7 @@ type LookupTableReapResult struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( +func (q *Q) ReapLookupTables(ctx context.Context) ( map[string]LookupTableReapResult, error, ) { @@ -989,72 +989,15 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.New("cannot be called outside of an ingestion transaction") } + offsets, err := q.getLookupTableReapOffsets(ctx) + if err != nil { + return nil, fmt.Errorf("could not obtain offsets: %w", err) + } + const batchSize = 1000 results := map[string]LookupTableReapResult{} - for table, historyTables := range map[string][]tableObjectFieldPair{ - "history_accounts": { - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, - - { - name: "history_effects", - objectField: "history_account_id", - }, - { - name: "history_operation_participants", - objectField: "history_account_id", - }, - { - name: "history_trades", - objectField: "base_account_id", - }, - { - name: "history_trades", - objectField: "counter_account_id", - }, - }, - "history_assets": { - { - name: "history_trades", - objectField: "base_asset_id", - }, - { - name: "history_trades", - objectField: "counter_asset_id", - }, - { - name: "history_trades_60000", - objectField: "base_asset_id", - }, - { - name: "history_trades_60000", - objectField: "counter_asset_id", - }, - }, - "history_claimable_balances": { - { - name: "history_transaction_claimable_balances", - objectField: "history_claimable_balance_id", - }, - { - name: "history_operation_claimable_balances", - objectField: "history_claimable_balance_id", - }, - }, - "history_liquidity_pools": { - { - name: "history_transaction_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - { - name: "history_operation_liquidity_pools", - objectField: "history_liquidity_pool_id", - }, - }, - } { + for table, historyTables := range historyLookupTables { startTime := time.Now() query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) @@ -1077,6 +1020,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return nil, errors.Wrapf(err, "error running query: %s", query) } + if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { + return nil, fmt.Errorf("error updating offset: %w", err) + } + rows, err := res.RowsAffected() if err != nil { return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) @@ -1091,6 +1038,70 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( return results, nil } +var historyLookupTables = map[string][]tableObjectFieldPair{ + "history_accounts": { + { + name: "history_transaction_participants", + objectField: "history_account_id", + }, + + { + name: "history_effects", + objectField: "history_account_id", + }, + { + name: "history_operation_participants", + objectField: "history_account_id", + }, + { + name: "history_trades", + objectField: "base_account_id", + }, + { + name: "history_trades", + objectField: "counter_account_id", + }, + }, + "history_assets": { + { + name: "history_trades", + objectField: "base_asset_id", + }, + { + name: "history_trades", + objectField: "counter_asset_id", + }, + { + name: "history_trades_60000", + objectField: "base_asset_id", + }, + { + name: "history_trades_60000", + objectField: "counter_asset_id", + }, + }, + "history_claimable_balances": { + { + name: "history_transaction_claimable_balances", + objectField: "history_claimable_balance_id", + }, + { + name: "history_operation_claimable_balances", + objectField: "history_claimable_balance_id", + }, + }, + "history_liquidity_pools": { + { + name: "history_transaction_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + { + name: "history_operation_liquidity_pools", + objectField: "history_liquidity_pool_id", + }, + }, +} + // constructReapLookupTablesQuery creates a query like (using history_claimable_balances // as an example): // diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 0f033c3629..0b209cf7d7 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin(tt.Ctx) tt.Require.NoError(err) - results, err := q.ReapLookupTables(tt.Ctx, nil) + results, err := q.ReapLookupTables(tt.Ctx) tt.Require.NoError(err) err = q.Commit() diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 1a54e6843c..508da0c7da 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -253,7 +253,6 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsetByTable map[string]int64 maxLedgerPerFlush uint32 reaper *Reaper @@ -369,7 +368,6 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), - reapOffsetByTable: map[string]int64{}, } system.initMetrics() @@ -843,7 +841,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable) + results, err := s.historyQ.ReapLookupTables(ctx) if err != nil { log.WithError(err).Warn("Error reaping lookup tables") return @@ -860,7 +858,6 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { for table, result := range results { totalDeleted += result.RowsDeleted reapLog = reapLog.WithField(table, result) - s.reapOffsetByTable[table] = result.Offset s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) } From 93f7ae96819cdca66813e8afa4a89423099b3668 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 12:18:33 +0100 Subject: [PATCH 05/10] fix query --- .../horizon/internal/db2/history/main_test.go | 25 ++----------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index f2ebaed26f..1a28b9e584 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -72,28 +72,7 @@ func TestElderLedger(t *testing.T) { func TestConstructReapLookupTablesQuery(t *testing.T) { query := constructReapLookupTablesQuery( "history_accounts", - []tableObjectFieldPair{ - { - name: "history_transaction_participants", - objectField: "history_account_id", - }, - { - name: "history_effects", - objectField: "history_account_id", - }, - { - name: "history_operation_participants", - objectField: "history_account_id", - }, - { - name: "history_trades", - objectField: "base_account_id", - }, - { - name: "history_trades", - objectField: "counter_account_id", - }, - }, + historyLookupTables["history_accounts"], 10, 0, ) @@ -105,5 +84,5 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ - "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1)", query) + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) } From 57ed33497c4b707dd036f1ec92d412584846f807 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 12:41:19 +0100 Subject: [PATCH 06/10] fix tests --- services/horizon/internal/ingest/main_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index fde8e40a9c..d4c0c34310 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -562,8 +562,8 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]history.LookupTableReapResult, error) { - args := m.Called(ctx, offsets) +func (m *mockDBQ) ReapLookupTables(ctx context.Context) (map[string]history.LookupTableReapResult, error) { + args := m.Called(ctx) var r1 map[string]history.LookupTableReapResult if args.Get(0) != nil { r1 = args.Get(0).(map[string]history.LookupTableReapResult) From 6a491029c3e72400db7555d084c283a3e043c32f Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 15:09:32 +0100 Subject: [PATCH 07/10] fix govet --- services/horizon/internal/db2/history/key_value.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index f9c49b04a6..3d23451937 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -225,12 +225,17 @@ func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, er "key": keys, }) err := q.Select(ctx, &pairs, query) + if err != nil { + return nil, err + } for _, pair := range pairs { table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) if _, ok := historyLookupTables[table]; !ok { return nil, fmt.Errorf("invalid key: %s", pair.Key) } - offset, err := strconv.ParseInt(pair.Value, 10, 64) + + var offset int64 + offset, err = strconv.ParseInt(pair.Value, 10, 64) if err != nil { return nil, fmt.Errorf("invalid offset: %s", pair.Value) } From 7ae9b3ff3d8f28b0fc19c67d96f6700cf60bb502 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 15:16:13 +0100 Subject: [PATCH 08/10] fix logging --- services/horizon/internal/db2/history/main.go | 6 +++--- services/horizon/internal/ingest/main.go | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 90face3129..95e1d285f6 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -972,9 +972,9 @@ type tableObjectFieldPair struct { } type LookupTableReapResult struct { - Offset int64 `json:"offset"` - RowsDeleted int64 `json:"rowsDeleted"` - Duration time.Duration `json:"duration"` + Offset int64 + RowsDeleted int64 + Duration time.Duration } // ReapLookupTables removes rows from lookup tables like history_claimable_balances diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 508da0c7da..572bc2569d 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -857,7 +857,9 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { reapLog := log for table, result := range results { totalDeleted += result.RowsDeleted - reapLog = reapLog.WithField(table, result) + reapLog = reapLog.WithField(table+"_offset", result.Offset) + reapLog = reapLog.WithField(table+"_duration", result.Duration) + reapLog = reapLog.WithField(table+"_rows_deleted", result.RowsDeleted) s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) } From d1f33c63670af2d38442a5fc2f914fc01b517e65 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 18:17:31 +0100 Subject: [PATCH 09/10] make batch size configurable --- services/horizon/internal/db2/history/main.go | 8 +-- .../horizon/internal/db2/history/reap_test.go | 70 +++++++++++++++++-- services/horizon/internal/ingest/main.go | 5 +- services/horizon/internal/ingest/main_test.go | 4 +- 4 files changed, 73 insertions(+), 14 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 95e1d285f6..e9d8ffb185 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context) (map[string]LookupTableReapResult, error) + ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -981,7 +981,7 @@ type LookupTableReapResult struct { // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. -func (q *Q) ReapLookupTables(ctx context.Context) ( +func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) ( map[string]LookupTableReapResult, error, ) { @@ -994,8 +994,6 @@ func (q *Q) ReapLookupTables(ctx context.Context) ( return nil, fmt.Errorf("could not obtain offsets: %w", err) } - const batchSize = 1000 - results := map[string]LookupTableReapResult{} for table, historyTables := range historyLookupTables { startTime := time.Now() @@ -1127,7 +1125,7 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ // possible that rows will be skipped from deletion. But offset is reset // when it reaches the table size so eventually all orphaned rows are // deleted. -func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) string { +func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string { var conditions []string for _, historyTable := range historyTables { diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 0b209cf7d7..ab7ac2ed7a 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin(tt.Ctx) tt.Require.NoError(err) - results, err := q.ReapLookupTables(tt.Ctx) + results, err := q.ReapLookupTables(tt.Ctx, 5) tt.Require.NoError(err) err = q.Commit() @@ -76,12 +76,12 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(1, curLedgers, "curLedgers") tt.Assert.Equal(25, prevAccounts, "prevAccounts") - tt.Assert.Equal(1, curAccounts, "curAccounts") - tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + tt.Assert.Equal(21, curAccounts, "curAccounts") + tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) tt.Assert.Equal(7, prevAssets, "prevAssets") - tt.Assert.Equal(0, curAssets, "curAssets") - tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + tt.Assert.Equal(2, curAssets, "curAssets") + tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") @@ -91,6 +91,66 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(6), results["history_accounts"].Offset) + tt.Assert.Equal(int64(6), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + err = q.Begin(tt.Ctx) + tt.Require.NoError(err) + + results, err = q.ReapLookupTables(tt.Ctx, 5) + tt.Require.NoError(err) + + err = q.Commit() + tt.Require.NoError(err) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(16, curAccounts, "curAccounts") + tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(11), results["history_accounts"].Offset) + tt.Assert.Equal(int64(0), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + err = q.Begin(tt.Ctx) + tt.Require.NoError(err) + + results, err = q.ReapLookupTables(tt.Ctx, 1000) + tt.Require.NoError(err) + + err = q.Commit() + tt.Require.NoError(err) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + tt.Assert.Len(results, 4) tt.Assert.Equal(int64(0), results["history_accounts"].Offset) tt.Assert.Equal(int64(0), results["history_assets"].Offset) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 572bc2569d..64e4558723 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -73,12 +73,13 @@ const ( // * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock) MaxDBConnections = 5 - defaultCoreCursorName = "HORIZON" stateVerificationErrorThreshold = 3 // 100 ledgers per flush has shown in stress tests // to be best point on performance curve, default to that. MaxLedgersPerFlush uint32 = 100 + + reapLookupTablesBatchSize = 1000 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -841,7 +842,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx) + results, err := s.historyQ.ReapLookupTables(ctx, reapLookupTablesBatchSize) if err != nil { log.WithError(err).Warn("Error reaping lookup tables") return diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index d4c0c34310..d5733ee5e4 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -562,8 +562,8 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context) (map[string]history.LookupTableReapResult, error) { - args := m.Called(ctx) +func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) { + args := m.Called(ctx, batchSize) var r1 map[string]history.LookupTableReapResult if args.Get(0) != nil { r1 = args.Get(0).(map[string]history.LookupTableReapResult) From 388ba6bd56e9b3dc5c34f733a5e025acf4747a3a Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 26 Jul 2024 18:22:40 +0100 Subject: [PATCH 10/10] fix go vet --- .../horizon/internal/db2/history/reap_test.go | 50 ++++++++----------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index ab7ac2ed7a..5601cd19b6 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -30,21 +30,18 @@ func TestReapLookupTables(t *testing.T) { prevLiquidityPools, curLiquidityPools int ) - // Prev - { - err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - } - - err := reaper.DeleteUnretainedHistory(tt.Ctx) + err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) + + err = reaper.DeleteUnretainedHistory(tt.Ctx) tt.Require.NoError(err) q := &history.Q{tt.HorizonSession()} @@ -58,19 +55,16 @@ func TestReapLookupTables(t *testing.T) { err = q.Commit() tt.Require.NoError(err) - // cur - { - err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - } + err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) tt.Assert.Equal(61, prevLedgers, "prevLedgers") tt.Assert.Equal(1, curLedgers, "curLedgers")