Skip to content

Commit

Permalink
lookup validator indexes for request transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Oct 9, 2024
1 parent 00d00b6 commit e502857
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 57 deletions.
20 changes: 11 additions & 9 deletions db/consolidation_request_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ
dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)",
"(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12
fieldCount := 14

args := make([]any, len(consolidationTxs)*fieldCount)
for i, consolidationTx := range consolidationTxs {
Expand All @@ -43,15 +43,17 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ
args[argIdx+4] = consolidationTx.ForkId
args[argIdx+5] = consolidationTx.SourceAddress
args[argIdx+6] = consolidationTx.SourcePubkey
args[argIdx+7] = consolidationTx.TargetPubkey
args[argIdx+8] = consolidationTx.TxHash
args[argIdx+9] = consolidationTx.TxSender
args[argIdx+10] = consolidationTx.TxTarget
args[argIdx+11] = consolidationTx.DequeueBlock
args[argIdx+7] = consolidationTx.SourceIndex
args[argIdx+8] = consolidationTx.TargetPubkey
args[argIdx+9] = consolidationTx.TargetIndex
args[argIdx+10] = consolidationTx.TxHash
args[argIdx+11] = consolidationTx.TxSender
args[argIdx+12] = consolidationTx.TxTarget
args[argIdx+13] = consolidationTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

Expand All @@ -65,7 +67,7 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ
func GetConsolidationRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.ConsolidationRequestTx {
consolidationTxs := []*dbtypes.ConsolidationRequestTx{}

err := ReaderDb.Select(&consolidationTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block
err := ReaderDb.Select(&consolidationTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block
FROM consolidation_request_txs
WHERE dequeue_block >= $1 AND dequeue_block <= $2
ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
Expand Down
19 changes: 19 additions & 0 deletions db/schema/pgsql/20241006182734_pectra-updates3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ CREATE TABLE IF NOT EXISTS public."consolidation_request_txs" (
fork_id BIGINT NOT NULL DEFAULT 0,
source_address bytea NOT NULL,
source_pubkey bytea NULL,
source_index BIGINT NULL,
target_pubkey bytea NULL,
target_index BIGINT NULL,
tx_hash bytea NULL,
tx_sender bytea NOT NULL,
tx_target bytea NOT NULL,
Expand All @@ -25,6 +27,14 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx"
ON public."consolidation_request_txs"
("source_address" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx"
ON public."consolidation_request_txs"
("source_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx"
ON public."consolidation_request_txs"
("target_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx"
ON public."consolidation_request_txs"
("fork_id" ASC NULLS FIRST);
Expand Down Expand Up @@ -56,6 +66,7 @@ CREATE TABLE IF NOT EXISTS public."withdrawal_request_txs" (
fork_id BIGINT NOT NULL DEFAULT 0,
source_address bytea NOT NULL,
validator_pubkey bytea NOT NULL,
validator_index BIGINT NULL,
amount BIGINT NOT NULL,
tx_hash bytea NULL,
tx_sender bytea NOT NULL,
Expand All @@ -72,6 +83,14 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx"
ON public."withdrawal_request_txs"
("source_address" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx"
ON public."withdrawal_request_txs"
("validator_index" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx"
ON public."withdrawal_request_txs"
("amount" ASC NULLS FIRST);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx"
ON public."withdrawal_request_txs"
("fork_id" ASC NULLS FIRST);
Expand Down
19 changes: 19 additions & 0 deletions db/schema/sqlite/20241006182734_pectra-updates3.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ CREATE TABLE IF NOT EXISTS "consolidation_request_txs" (
fork_id BIGINT NOT NULL DEFAULT 0,
source_address BLOB NOT NULL,
source_pubkey BLOB NULL,
source_index BIGINT NULL,
target_pubkey BLOB NULL,
target_index BIGINT NULL,
tx_hash BLOB NULL,
tx_sender BLOB NOT NULL,
tx_target BLOB NOT NULL,
Expand All @@ -25,6 +27,14 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx"
ON "consolidation_request_txs"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx"
ON "consolidation_request_txs"
("source_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx"
ON "consolidation_request_txs"
("target_index" ASC);

CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx"
ON "consolidation_request_txs"
("fork_id" ASC);
Expand Down Expand Up @@ -56,6 +66,7 @@ CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" (
fork_id BIGINT NOT NULL DEFAULT 0,
source_address BLOB NOT NULL,
validator_pubkey BLOB NOT NULL,
validator_index BIGINT NULL,
amount BIGINT NOT NULL,
tx_hash BLOB NULL,
tx_sender BLOB NOT NULL,
Expand All @@ -72,6 +83,14 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx"
ON "withdrawal_request_txs"
("source_address" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx"
ON "withdrawal_request_txs"
("validator_index" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx"
ON "withdrawal_request_txs"
("amount" ASC);

CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx"
ON "withdrawal_request_txs"
("fork_id" ASC);
Expand Down
136 changes: 128 additions & 8 deletions db/withdrawal_request_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx
dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_request_txs ",
dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_request_txs ",
}),
"(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, amount, tx_hash, tx_sender, tx_target, dequeue_block)",
"(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block)",
" VALUES ",
)
argIdx := 0
fieldCount := 12
fieldCount := 13

args := make([]any, len(withdrawalTxs)*fieldCount)
for i, withdrawalTx := range withdrawalTxs {
Expand All @@ -43,11 +43,12 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx
args[argIdx+4] = withdrawalTx.ForkId
args[argIdx+5] = withdrawalTx.SourceAddress
args[argIdx+6] = withdrawalTx.ValidatorPubkey
args[argIdx+7] = withdrawalTx.Amount
args[argIdx+8] = withdrawalTx.TxHash
args[argIdx+9] = withdrawalTx.TxSender
args[argIdx+10] = withdrawalTx.TxTarget
args[argIdx+11] = withdrawalTx.DequeueBlock
args[argIdx+7] = withdrawalTx.ValidatorIndex
args[argIdx+8] = withdrawalTx.Amount
args[argIdx+9] = withdrawalTx.TxHash
args[argIdx+10] = withdrawalTx.TxSender
args[argIdx+11] = withdrawalTx.TxTarget
args[argIdx+12] = withdrawalTx.DequeueBlock
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
Expand All @@ -65,7 +66,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx
func GetWithdrawalRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.WithdrawalRequestTx {
withdrawalTxs := []*dbtypes.WithdrawalRequestTx{}

err := ReaderDb.Select(&withdrawalTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, amount, tx_hash, tx_sender, tx_target, dequeue_block
err := ReaderDb.Select(&withdrawalTxs, `SELECT withdrawal_request_txs.*
FROM withdrawal_request_txs
WHERE dequeue_block >= $1 AND dequeue_block <= $2
ORDER BY dequeue_block ASC, block_number ASC, block_index ASC
Expand All @@ -77,3 +78,122 @@ func GetWithdrawalRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint

return withdrawalTxs
}

func GetWithdrawalRequestTxsFiltered(offset uint64, limit uint32, canonicalForkIds []uint64, filter *dbtypes.WithdrawalRequestTxFilter) ([]*dbtypes.WithdrawalRequestTx, uint64, error) {
var sql strings.Builder
args := []interface{}{}
fmt.Fprint(&sql, `
WITH cte AS (
SELECT
block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block
FROM withdrawal_request_txs
`)

if filter.ValidatorName != "" {
fmt.Fprint(&sql, `
LEFT JOIN validator_names AS source_names ON source_names."index" = withdrawal_request_txs.validator_index
`)
}

filterOp := "WHERE"
if filter.MinDequeue > 0 {
args = append(args, filter.MinDequeue)
fmt.Fprintf(&sql, " %v dequeue_block >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxDequeue > 0 {
args = append(args, filter.MaxDequeue)
fmt.Fprintf(&sql, " %v dequeue_block <= $%v", filterOp, len(args))
filterOp = "AND"
}
if len(filter.SourceAddress) > 0 {
args = append(args, filter.SourceAddress)
fmt.Fprintf(&sql, " %v source_address = $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MinIndex > 0 {
args = append(args, filter.MinIndex)
fmt.Fprintf(&sql, " %v validator_index >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxIndex > 0 {
args = append(args, filter.MaxIndex)
fmt.Fprintf(&sql, " %v validator_index <= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.ValidatorName != "" {
args = append(args, "%"+filter.ValidatorName+"%")
fmt.Fprintf(&sql, " %v ", filterOp)
fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: ` source_names.name ilike $%v `,
dbtypes.DBEngineSqlite: ` source_names.name LIKE $%v `,
}), len(args))
filterOp = "AND"
}
if filter.MinAmount != nil {
args = append(args, *filter.MinAmount)
fmt.Fprintf(&sql, " %v amount >= $%v", filterOp, len(args))
filterOp = "AND"
}
if filter.MaxAmount != nil {
args = append(args, *filter.MaxAmount)
fmt.Fprintf(&sql, " %v amount <= $%v", filterOp, len(args))
filterOp = "AND"
}

if filter.WithOrphaned != 1 {
forkIdStr := make([]string, len(canonicalForkIds))
for i, forkId := range canonicalForkIds {
forkIdStr[i] = fmt.Sprintf("%v", forkId)
}
if len(forkIdStr) == 0 {
forkIdStr = append(forkIdStr, "0")
}

if filter.WithOrphaned == 0 {
fmt.Fprintf(&sql, " %v fork_id IN (%v)", filterOp, strings.Join(forkIdStr, ","))
filterOp = "AND"
} else if filter.WithOrphaned == 2 {
fmt.Fprintf(&sql, " %v fork_id NOT IN (%v)", filterOp, strings.Join(forkIdStr, ","))
filterOp = "AND"
}
}

args = append(args, limit)
fmt.Fprintf(&sql, `)
SELECT
count(*) AS block_number,
0 AS block_index,
0 AS block_time,
null AS block_root,
0 AS fork_id,
null AS source_address,
0 AS validator_index,
null AS validator_pubkey,
0 AS amount,
null AS tx_hash,
null AS tx_sender,
null AS tx_target,
0 AS dequeue_block
FROM cte
UNION ALL SELECT * FROM (
SELECT * FROM cte
ORDER BY block_time DESC
LIMIT $%v
`, len(args))

if offset > 0 {
args = append(args, offset)
fmt.Fprintf(&sql, " OFFSET $%v ", len(args))
}
fmt.Fprintf(&sql, ") AS t1")

withdrawalRequestTxs := []*dbtypes.WithdrawalRequestTx{}
err := ReaderDb.Select(&withdrawalRequestTxs, sql.String(), args...)
if err != nil {
logger.Errorf("Error while fetching filtered withdrawal request txs: %v", err)
return nil, 0, err
}

return withdrawalRequestTxs[1:], withdrawalRequestTxs[0].BlockNumber, nil
}
51 changes: 27 additions & 24 deletions dbtypes/dbtypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,18 +267,20 @@ type ConsolidationRequest struct {
}

type ConsolidationRequestTx struct {
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourcePubkey []byte `db:"source_pubkey"`
TargetPubkey []byte `db:"target_pubkey"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
SourcePubkey []byte `db:"source_pubkey"`
SourceIndex *uint64 `db:"source_index"`
TargetPubkey []byte `db:"target_pubkey"`
TargetIndex *uint64 `db:"target_index"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
}

type WithdrawalRequest struct {
Expand All @@ -296,16 +298,17 @@ type WithdrawalRequest struct {
}

type WithdrawalRequestTx struct {
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
ValidatorPubkey []byte `db:"validator_pubkey"`
Amount uint64 `db:"amount"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
BlockNumber uint64 `db:"block_number"`
BlockIndex uint64 `db:"block_index"`
BlockTime uint64 `db:"block_time"`
BlockRoot []byte `db:"block_root"`
ForkId uint64 `db:"fork_id"`
SourceAddress []byte `db:"source_address"`
ValidatorPubkey []byte `db:"validator_pubkey"`
ValidatorIndex *uint64 `db:"validator_index"`
Amount uint64 `db:"amount"`
TxHash []byte `db:"tx_hash"`
TxSender []byte `db:"tx_sender"`
TxTarget []byte `db:"tx_target"`
DequeueBlock uint64 `db:"dequeue_block"`
}
12 changes: 12 additions & 0 deletions dbtypes/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ type WithdrawalRequestFilter struct {
WithOrphaned uint8
}

type WithdrawalRequestTxFilter struct {
MinDequeue uint64
MaxDequeue uint64
SourceAddress []byte
MinIndex uint64
MaxIndex uint64
ValidatorName string
MinAmount *uint64
MaxAmount *uint64
WithOrphaned uint8
}

type ConsolidationRequestFilter struct {
MinSlot uint64
MaxSlot uint64
Expand Down
Loading

0 comments on commit e502857

Please sign in to comment.