Skip to content

Commit

Permalink
feat(ledger): add some columns on transactions table to speed up read…
Browse files Browse the repository at this point in the history
… queries (#883)
  • Loading branch information
gfyrag authored and flemzord committed Dec 4, 2023
1 parent a312cab commit ccb2ecf
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 7 deletions.
48 changes: 45 additions & 3 deletions internal/storage/ledgerstore/migrations/0-init-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ create table transactions (
timestamp timestamp without time zone not null,
reference varchar,
reverted_at timestamp without time zone,
postings varchar not null
postings varchar not null,
sources jsonb,
destinations jsonb,
sources_arrays jsonb,
destinations_arrays jsonb
);

create table transactions_metadata (
Expand Down Expand Up @@ -127,6 +131,10 @@ create index moves_range_dates on moves (account_address, asset, effective_date)
create index transactions_date on transactions (timestamp);
create index transactions_metadata_metadata on transactions_metadata using gin (metadata);
--create unique index transactions_revisions on transactions_metadata(id desc, revision desc);
create index transactions_sources on transactions using gin (sources jsonb_path_ops);
create index transactions_destinations on transactions using gin (destinations jsonb_path_ops);
create index transactions_sources_arrays on transactions using gin (sources_arrays jsonb_path_ops);
create index transactions_destinations_arrays on transactions using gin (destinations_arrays jsonb_path_ops);

create index moves_account_address on moves (account_address);
create index moves_account_address_array on moves using gin (account_address_array jsonb_ops);
Expand Down Expand Up @@ -160,6 +168,23 @@ as $$
end;
$$;

-- given the input : "a:b:c", the function will produce : '{"0": "a", "1": "b", "2": "c", "3": null}'
create function explode_address(_address varchar)
returns jsonb
language sql
immutable
as $$
select aggregate_objects(jsonb_build_object(data.number - 1, data.value))
from (
select row_number() over () as number, v.value
from (
select unnest(string_to_array(_address, ':')) as value
union all
select null
) v
) data
$$;

create function get_account(_account_address varchar, _before timestamp default null)
returns setof accounts_metadata
language sql
Expand Down Expand Up @@ -405,11 +430,28 @@ as $$
declare
posting jsonb;
begin
insert into transactions (id, timestamp, reference, postings)
insert into transactions (id, timestamp, reference, postings, sources, destinations, sources_arrays, destinations_arrays)
values ((data->>'id')::numeric,
(data->>'timestamp')::timestamp without time zone,
data->>'reference',
jsonb_pretty(data->'postings'));
jsonb_pretty(data->'postings'),
(
select to_jsonb(array_agg(v->>'source')) as value
from jsonb_array_elements(data->'postings') v
),
(
select to_jsonb(array_agg(v->>'destination')) as value
from jsonb_array_elements(data->'postings') v
),
(
select to_jsonb(array_agg(explode_address(v->>'source'))) as value
from jsonb_array_elements(data->'postings') v
),
(
select to_jsonb(array_agg(explode_address(v->>'destination'))) as value
from jsonb_array_elements(data->'postings') v
)
);

for posting in (select jsonb_array_elements(data->'postings')) loop
-- todo: sometimes the balance is known at commit time (for sources != world), we need to forward the value to populate the pre_commit_aggregated_input and output
Expand Down
7 changes: 3 additions & 4 deletions internal/storage/ledgerstore/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.Sel
query = query.
Table("transactions").
ColumnExpr("distinct on(transactions.id) transactions.*, transactions_metadata.metadata").
Join("join moves m on transactions.id = m.transaction_id").
Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String()))

if p.PIT != nil && !p.PIT.IsZero() {
Expand Down Expand Up @@ -186,7 +185,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er
}
switch address := value.(type) {
case string:
return filterAccountAddress(address, "m.account_address"), nil, nil
return filterAccountAddressOnTransactions(address, true, true), nil, nil
default:
return "", nil, newErrInvalidQuery("unexpected type %T for column 'account'", address)
}
Expand All @@ -197,7 +196,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er
}
switch address := value.(type) {
case string:
return fmt.Sprintf("(%s) and m.is_source", filterAccountAddress(address, "m.account_address")), nil, nil
return filterAccountAddressOnTransactions(address, true, false), nil, nil
default:
return "", nil, newErrInvalidQuery("unexpected type %T for column 'source'", address)
}
Expand All @@ -208,7 +207,7 @@ func (store *Store) transactionQueryContext(qb query.Builder) (string, []any, er
}
switch address := value.(type) {
case string:
return fmt.Sprintf("(%s) and not m.is_source", filterAccountAddress(address, "m.account_address")), nil, nil
return filterAccountAddressOnTransactions(address, false, true), nil, nil
default:
return "", nil, newErrInvalidQuery("unexpected type %T for column 'destination'", address)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/storage/ledgerstore/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,13 +1020,15 @@ func TestGetTransactions(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

tc.query.Options.ExpandVolumes = true
tc.query.Options.ExpandEffectiveVolumes = false
cursor, err := store.GetTransactions(ctx, NewGetTransactionsQuery(tc.query))
if tc.expectError != nil {
require.True(t, errors.Is(err, tc.expectError))
} else {
require.NoError(t, err)
require.Len(t, cursor.Data, len(tc.expected.Data))
internaltesting.RequireEqual(t, *tc.expected, *cursor)

count, err := store.CountTransactions(ctx, NewGetTransactionsQuery(tc.query))
Expand Down
54 changes: 54 additions & 0 deletions internal/storage/ledgerstore/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledgerstore

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -94,6 +95,59 @@ func filterAccountAddress(address, key string) string {
return strings.Join(parts, " and ")
}

func filterAccountAddressOnTransactions(address string, source, destination bool) string {
src := strings.Split(address, ":")

needSegmentCheck := false
for _, segment := range src {
needSegmentCheck = segment == ""
if needSegmentCheck {
break
}
}

if needSegmentCheck {
m := map[string]any{
fmt.Sprint(len(src)): nil,
}
parts := make([]string, 0)

for i, segment := range src {
if len(segment) == 0 {
continue
}
m[fmt.Sprint(i)] = segment
}

data, err := json.Marshal([]any{m})
if err != nil {
panic(err)
}

if source {
parts = append(parts, fmt.Sprintf("sources_arrays @> '%s'", string(data)))
}
if destination {
parts = append(parts, fmt.Sprintf("destinations_arrays @> '%s'", string(data)))
}
return strings.Join(parts, " or ")
} else {
data, err := json.Marshal([]string{address})
if err != nil {
panic(err)
}

parts := make([]string, 0)
if source {
parts = append(parts, fmt.Sprintf("sources @> '%s'", string(data)))
}
if destination {
parts = append(parts, fmt.Sprintf("destinations @> '%s'", string(data)))
}
return strings.Join(parts, " or ")
}
}

func filterPIT(pit *ledger.Time, column string) func(query *bun.SelectQuery) *bun.SelectQuery {
return func(query *bun.SelectQuery) *bun.SelectQuery {
if pit == nil || pit.IsZero() {
Expand Down

0 comments on commit ccb2ecf

Please sign in to comment.