diff --git a/db/consolidation_request_txs.go b/db/consolidation_request_txs.go index 051d941..3f06362 100644 --- a/db/consolidation_request_txs.go +++ b/db/consolidation_request_txs.go @@ -15,11 +15,11 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ dbtypes.DBEnginePgsql: "INSERT INTO consolidation_request_txs ", dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO consolidation_request_txs ", }), - "(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block)", + "(block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block)", " VALUES ", ) argIdx := 0 - fieldCount := 12 + fieldCount := 14 args := make([]any, len(consolidationTxs)*fieldCount) for i, consolidationTx := range consolidationTxs { @@ -43,15 +43,17 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ args[argIdx+4] = consolidationTx.ForkId args[argIdx+5] = consolidationTx.SourceAddress args[argIdx+6] = consolidationTx.SourcePubkey - args[argIdx+7] = consolidationTx.TargetPubkey - args[argIdx+8] = consolidationTx.TxHash - args[argIdx+9] = consolidationTx.TxSender - args[argIdx+10] = consolidationTx.TxTarget - args[argIdx+11] = consolidationTx.DequeueBlock + args[argIdx+7] = consolidationTx.SourceIndex + args[argIdx+8] = consolidationTx.TargetPubkey + args[argIdx+9] = consolidationTx.TargetIndex + args[argIdx+10] = consolidationTx.TxHash + args[argIdx+11] = consolidationTx.TxSender + args[argIdx+12] = consolidationTx.TxTarget + args[argIdx+13] = consolidationTx.DequeueBlock argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET fork_id = excluded.fork_id", + dbtypes.DBEnginePgsql: " ON CONFLICT (block_number, block_index) DO UPDATE SET source_index = excluded.source_index, target_index = excluded.target_index, fork_id = excluded.fork_id", dbtypes.DBEngineSqlite: "", })) @@ -65,7 +67,7 @@ func InsertConsolidationRequestTxs(consolidationTxs []*dbtypes.ConsolidationRequ func GetConsolidationRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.ConsolidationRequestTx { consolidationTxs := []*dbtypes.ConsolidationRequestTx{} - err := ReaderDb.Select(&consolidationTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, target_pubkey, tx_hash, tx_sender, tx_target, dequeue_block + err := ReaderDb.Select(&consolidationTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, source_pubkey, source_index, target_pubkey, target_index, tx_hash, tx_sender, tx_target, dequeue_block FROM consolidation_request_txs WHERE dequeue_block >= $1 AND dequeue_block <= $2 ORDER BY dequeue_block ASC, block_number ASC, block_index ASC diff --git a/db/schema/pgsql/20241006182734_pectra-updates3.sql b/db/schema/pgsql/20241006182734_pectra-updates3.sql index 1c4e902..82dd8f8 100644 --- a/db/schema/pgsql/20241006182734_pectra-updates3.sql +++ b/db/schema/pgsql/20241006182734_pectra-updates3.sql @@ -9,7 +9,9 @@ CREATE TABLE IF NOT EXISTS public."consolidation_request_txs" ( fork_id BIGINT NOT NULL DEFAULT 0, source_address bytea NOT NULL, source_pubkey bytea NULL, + source_index BIGINT NULL, target_pubkey bytea NULL, + target_index BIGINT NULL, tx_hash bytea NULL, tx_sender bytea NOT NULL, tx_target bytea NOT NULL, @@ -25,6 +27,14 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx" ON public."consolidation_request_txs" ("source_address" ASC NULLS FIRST); +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx" + ON public."consolidation_request_txs" + ("source_index" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx" + ON public."consolidation_request_txs" + ("target_index" ASC NULLS FIRST); + CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx" ON public."consolidation_request_txs" ("fork_id" ASC NULLS FIRST); @@ -56,6 +66,7 @@ CREATE TABLE IF NOT EXISTS public."withdrawal_request_txs" ( fork_id BIGINT NOT NULL DEFAULT 0, source_address bytea NOT NULL, validator_pubkey bytea NOT NULL, + validator_index BIGINT NULL, amount BIGINT NOT NULL, tx_hash bytea NULL, tx_sender bytea NOT NULL, @@ -72,6 +83,14 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx" ON public."withdrawal_request_txs" ("source_address" ASC NULLS FIRST); +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx" + ON public."withdrawal_request_txs" + ("validator_index" ASC NULLS FIRST); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx" + ON public."withdrawal_request_txs" + ("amount" ASC NULLS FIRST); + CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx" ON public."withdrawal_request_txs" ("fork_id" ASC NULLS FIRST); diff --git a/db/schema/sqlite/20241006182734_pectra-updates3.sql b/db/schema/sqlite/20241006182734_pectra-updates3.sql index 07f4912..acd4faf 100644 --- a/db/schema/sqlite/20241006182734_pectra-updates3.sql +++ b/db/schema/sqlite/20241006182734_pectra-updates3.sql @@ -9,7 +9,9 @@ CREATE TABLE IF NOT EXISTS "consolidation_request_txs" ( fork_id BIGINT NOT NULL DEFAULT 0, source_address BLOB NOT NULL, source_pubkey BLOB NULL, + source_index BIGINT NULL, target_pubkey BLOB NULL, + target_index BIGINT NULL, tx_hash BLOB NULL, tx_sender BLOB NOT NULL, tx_target BLOB NOT NULL, @@ -25,6 +27,14 @@ CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_addr_idx" ON "consolidation_request_txs" ("source_address" ASC); +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_source_idx" + ON "consolidation_request_txs" + ("source_index" ASC); + +CREATE INDEX IF NOT EXISTS "consolidation_request_txs_target_idx" + ON "consolidation_request_txs" + ("target_index" ASC); + CREATE INDEX IF NOT EXISTS "consolidation_request_txs_fork_idx" ON "consolidation_request_txs" ("fork_id" ASC); @@ -56,6 +66,7 @@ CREATE TABLE IF NOT EXISTS "withdrawal_request_txs" ( fork_id BIGINT NOT NULL DEFAULT 0, source_address BLOB NOT NULL, validator_pubkey BLOB NOT NULL, + validator_index BIGINT NULL, amount BIGINT NOT NULL, tx_hash BLOB NULL, tx_sender BLOB NOT NULL, @@ -72,6 +83,14 @@ CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_source_addr_idx" ON "withdrawal_request_txs" ("source_address" ASC); +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_validator_index_idx" + ON "withdrawal_request_txs" + ("validator_index" ASC); + +CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_amount_idx" + ON "withdrawal_request_txs" + ("amount" ASC); + CREATE INDEX IF NOT EXISTS "withdrawal_request_txs_fork_idx" ON "withdrawal_request_txs" ("fork_id" ASC); diff --git a/db/withdrawal_request_txs.go b/db/withdrawal_request_txs.go index 86c182c..8a7a14d 100644 --- a/db/withdrawal_request_txs.go +++ b/db/withdrawal_request_txs.go @@ -15,11 +15,11 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx dbtypes.DBEnginePgsql: "INSERT INTO withdrawal_request_txs ", dbtypes.DBEngineSqlite: "INSERT OR REPLACE INTO withdrawal_request_txs ", }), - "(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, amount, tx_hash, tx_sender, tx_target, dequeue_block)", + "(block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block)", " VALUES ", ) argIdx := 0 - fieldCount := 12 + fieldCount := 13 args := make([]any, len(withdrawalTxs)*fieldCount) for i, withdrawalTx := range withdrawalTxs { @@ -43,11 +43,12 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx args[argIdx+4] = withdrawalTx.ForkId args[argIdx+5] = withdrawalTx.SourceAddress args[argIdx+6] = withdrawalTx.ValidatorPubkey - args[argIdx+7] = withdrawalTx.Amount - args[argIdx+8] = withdrawalTx.TxHash - args[argIdx+9] = withdrawalTx.TxSender - args[argIdx+10] = withdrawalTx.TxTarget - args[argIdx+11] = withdrawalTx.DequeueBlock + args[argIdx+7] = withdrawalTx.ValidatorIndex + args[argIdx+8] = withdrawalTx.Amount + args[argIdx+9] = withdrawalTx.TxHash + args[argIdx+10] = withdrawalTx.TxSender + args[argIdx+11] = withdrawalTx.TxTarget + args[argIdx+12] = withdrawalTx.DequeueBlock argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ @@ -65,7 +66,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx func GetWithdrawalRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint64) []*dbtypes.WithdrawalRequestTx { withdrawalTxs := []*dbtypes.WithdrawalRequestTx{} - err := ReaderDb.Select(&withdrawalTxs, `SELECT block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, amount, tx_hash, tx_sender, tx_target, dequeue_block + err := ReaderDb.Select(&withdrawalTxs, `SELECT withdrawal_request_txs.* FROM withdrawal_request_txs WHERE dequeue_block >= $1 AND dequeue_block <= $2 ORDER BY dequeue_block ASC, block_number ASC, block_index ASC @@ -77,3 +78,122 @@ func GetWithdrawalRequestTxsByDequeueRange(dequeueFirst uint64, dequeueLast uint return withdrawalTxs } + +func GetWithdrawalRequestTxsFiltered(offset uint64, limit uint32, canonicalForkIds []uint64, filter *dbtypes.WithdrawalRequestTxFilter) ([]*dbtypes.WithdrawalRequestTx, uint64, error) { + var sql strings.Builder + args := []interface{}{} + fmt.Fprint(&sql, ` + WITH cte AS ( + SELECT + block_number, block_index, block_time, block_root, fork_id, source_address, validator_pubkey, validator_index, amount, tx_hash, tx_sender, tx_target, dequeue_block + FROM withdrawal_request_txs + `) + + if filter.ValidatorName != "" { + fmt.Fprint(&sql, ` + LEFT JOIN validator_names AS source_names ON source_names."index" = withdrawal_request_txs.validator_index + `) + } + + filterOp := "WHERE" + if filter.MinDequeue > 0 { + args = append(args, filter.MinDequeue) + fmt.Fprintf(&sql, " %v dequeue_block >= $%v", filterOp, len(args)) + filterOp = "AND" + } + if filter.MaxDequeue > 0 { + args = append(args, filter.MaxDequeue) + fmt.Fprintf(&sql, " %v dequeue_block <= $%v", filterOp, len(args)) + filterOp = "AND" + } + if len(filter.SourceAddress) > 0 { + args = append(args, filter.SourceAddress) + fmt.Fprintf(&sql, " %v source_address = $%v", filterOp, len(args)) + filterOp = "AND" + } + if filter.MinIndex > 0 { + args = append(args, filter.MinIndex) + fmt.Fprintf(&sql, " %v validator_index >= $%v", filterOp, len(args)) + filterOp = "AND" + } + if filter.MaxIndex > 0 { + args = append(args, filter.MaxIndex) + fmt.Fprintf(&sql, " %v validator_index <= $%v", filterOp, len(args)) + filterOp = "AND" + } + if filter.ValidatorName != "" { + args = append(args, "%"+filter.ValidatorName+"%") + fmt.Fprintf(&sql, " %v ", filterOp) + fmt.Fprintf(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ + dbtypes.DBEnginePgsql: ` source_names.name ilike $%v `, + dbtypes.DBEngineSqlite: ` source_names.name LIKE $%v `, + }), len(args)) + filterOp = "AND" + } + if filter.MinAmount != nil { + args = append(args, *filter.MinAmount) + fmt.Fprintf(&sql, " %v amount >= $%v", filterOp, len(args)) + filterOp = "AND" + } + if filter.MaxAmount != nil { + args = append(args, *filter.MaxAmount) + fmt.Fprintf(&sql, " %v amount <= $%v", filterOp, len(args)) + filterOp = "AND" + } + + if filter.WithOrphaned != 1 { + forkIdStr := make([]string, len(canonicalForkIds)) + for i, forkId := range canonicalForkIds { + forkIdStr[i] = fmt.Sprintf("%v", forkId) + } + if len(forkIdStr) == 0 { + forkIdStr = append(forkIdStr, "0") + } + + if filter.WithOrphaned == 0 { + fmt.Fprintf(&sql, " %v fork_id IN (%v)", filterOp, strings.Join(forkIdStr, ",")) + filterOp = "AND" + } else if filter.WithOrphaned == 2 { + fmt.Fprintf(&sql, " %v fork_id NOT IN (%v)", filterOp, strings.Join(forkIdStr, ",")) + filterOp = "AND" + } + } + + args = append(args, limit) + fmt.Fprintf(&sql, `) + SELECT + count(*) AS block_number, + 0 AS block_index, + 0 AS block_time, + null AS block_root, + 0 AS fork_id, + null AS source_address, + 0 AS validator_index, + null AS validator_pubkey, + 0 AS amount, + null AS tx_hash, + null AS tx_sender, + null AS tx_target, + 0 AS dequeue_block + FROM cte + UNION ALL SELECT * FROM ( + SELECT * FROM cte + ORDER BY block_time DESC + LIMIT $%v + `, len(args)) + + if offset > 0 { + args = append(args, offset) + fmt.Fprintf(&sql, " OFFSET $%v ", len(args)) + } + fmt.Fprintf(&sql, ") AS t1") + + withdrawalRequestTxs := []*dbtypes.WithdrawalRequestTx{} + err := ReaderDb.Select(&withdrawalRequestTxs, sql.String(), args...) + if err != nil { + logger.Errorf("Error while fetching filtered withdrawal request txs: %v", err) + return nil, 0, err + } + + return withdrawalRequestTxs[1:], withdrawalRequestTxs[0].BlockNumber, nil +} diff --git a/dbtypes/dbtypes.go b/dbtypes/dbtypes.go index c67850c..a55b23e 100644 --- a/dbtypes/dbtypes.go +++ b/dbtypes/dbtypes.go @@ -267,18 +267,20 @@ type ConsolidationRequest struct { } type ConsolidationRequestTx struct { - BlockNumber uint64 `db:"block_number"` - BlockIndex uint64 `db:"block_index"` - BlockTime uint64 `db:"block_time"` - BlockRoot []byte `db:"block_root"` - ForkId uint64 `db:"fork_id"` - SourceAddress []byte `db:"source_address"` - SourcePubkey []byte `db:"source_pubkey"` - TargetPubkey []byte `db:"target_pubkey"` - TxHash []byte `db:"tx_hash"` - TxSender []byte `db:"tx_sender"` - TxTarget []byte `db:"tx_target"` - DequeueBlock uint64 `db:"dequeue_block"` + BlockNumber uint64 `db:"block_number"` + BlockIndex uint64 `db:"block_index"` + BlockTime uint64 `db:"block_time"` + BlockRoot []byte `db:"block_root"` + ForkId uint64 `db:"fork_id"` + SourceAddress []byte `db:"source_address"` + SourcePubkey []byte `db:"source_pubkey"` + SourceIndex *uint64 `db:"source_index"` + TargetPubkey []byte `db:"target_pubkey"` + TargetIndex *uint64 `db:"target_index"` + TxHash []byte `db:"tx_hash"` + TxSender []byte `db:"tx_sender"` + TxTarget []byte `db:"tx_target"` + DequeueBlock uint64 `db:"dequeue_block"` } type WithdrawalRequest struct { @@ -296,16 +298,17 @@ type WithdrawalRequest struct { } type WithdrawalRequestTx struct { - BlockNumber uint64 `db:"block_number"` - BlockIndex uint64 `db:"block_index"` - BlockTime uint64 `db:"block_time"` - BlockRoot []byte `db:"block_root"` - ForkId uint64 `db:"fork_id"` - SourceAddress []byte `db:"source_address"` - ValidatorPubkey []byte `db:"validator_pubkey"` - Amount uint64 `db:"amount"` - TxHash []byte `db:"tx_hash"` - TxSender []byte `db:"tx_sender"` - TxTarget []byte `db:"tx_target"` - DequeueBlock uint64 `db:"dequeue_block"` + BlockNumber uint64 `db:"block_number"` + BlockIndex uint64 `db:"block_index"` + BlockTime uint64 `db:"block_time"` + BlockRoot []byte `db:"block_root"` + ForkId uint64 `db:"fork_id"` + SourceAddress []byte `db:"source_address"` + ValidatorPubkey []byte `db:"validator_pubkey"` + ValidatorIndex *uint64 `db:"validator_index"` + Amount uint64 `db:"amount"` + TxHash []byte `db:"tx_hash"` + TxSender []byte `db:"tx_sender"` + TxTarget []byte `db:"tx_target"` + DequeueBlock uint64 `db:"dequeue_block"` } diff --git a/dbtypes/other.go b/dbtypes/other.go index 1442936..34087c2 100644 --- a/dbtypes/other.go +++ b/dbtypes/other.go @@ -104,6 +104,18 @@ type WithdrawalRequestFilter struct { WithOrphaned uint8 } +type WithdrawalRequestTxFilter struct { + MinDequeue uint64 + MaxDequeue uint64 + SourceAddress []byte + MinIndex uint64 + MaxIndex uint64 + ValidatorName string + MinAmount *uint64 + MaxAmount *uint64 + WithOrphaned uint8 +} + type ConsolidationRequestFilter struct { MinSlot uint64 MaxSlot uint64 diff --git a/indexer/execution/consolidation_indexer.go b/indexer/execution/consolidation_indexer.go index 147a91a..2d7b39f 100644 --- a/indexer/execution/consolidation_indexer.go +++ b/indexer/execution/consolidation_indexer.go @@ -1,6 +1,7 @@ package execution import ( + "bytes" "fmt" "time" @@ -97,7 +98,7 @@ func (ci *ConsolidationIndexer) runConsolidationIndexerLoop() { } func (ci *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.ConsolidationRequestTx, error) { - requestTx := ci.parseRequestLog(log) + requestTx := ci.parseRequestLog(log, nil) if requestTx == nil { return nil, fmt.Errorf("invalid consolidation log") } @@ -113,7 +114,7 @@ func (ci *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transac } func (ci *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.ConsolidationRequestTx, error) { - requestTx := ci.parseRequestLog(log) + requestTx := ci.parseRequestLog(log, &fork.forkId) if requestTx == nil { return nil, fmt.Errorf("invalid consolidation log") } @@ -135,7 +136,7 @@ func (ci *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transa return requestTx, nil } -func (ci *ConsolidationIndexer) parseRequestLog(log *types.Log) *dbtypes.ConsolidationRequestTx { +func (ci *ConsolidationIndexer) parseRequestLog(log *types.Log, forkId *beacon.ForkKey) *dbtypes.ConsolidationRequestTx { // data layout: // 0-20: sender address (20 bytes) // 20-68: source pubkey (48 bytes) @@ -150,13 +151,35 @@ func (ci *ConsolidationIndexer) parseRequestLog(log *types.Log) *dbtypes.Consoli sourcePubkey := log.Data[20:68] targetPubkey := log.Data[68:116] + validatorSet := ci.indexerCtx.beaconIndexer.GetCanonicalValidatorSet(forkId) + + var sourceIndex, targetIndex *uint64 + for _, validator := range validatorSet { + if sourceIndex == nil && bytes.Equal(validator.Validator.PublicKey[:], sourcePubkey) { + index := uint64(validator.Index) + sourceIndex = &index + if targetIndex != nil { + break + } + } + if targetIndex == nil && bytes.Equal(validator.Validator.PublicKey[:], targetPubkey) { + index := uint64(validator.Index) + targetIndex = &index + if sourceIndex != nil { + break + } + } + } + requestTx := &dbtypes.ConsolidationRequestTx{ BlockNumber: log.BlockNumber, BlockIndex: uint64(log.Index), BlockRoot: log.BlockHash[:], SourceAddress: senderAddr, SourcePubkey: sourcePubkey, + SourceIndex: sourceIndex, TargetPubkey: targetPubkey, + TargetIndex: targetIndex, TxHash: log.TxHash[:], } diff --git a/indexer/execution/withdrawal_indexer.go b/indexer/execution/withdrawal_indexer.go index ee9a9f7..ca0a235 100644 --- a/indexer/execution/withdrawal_indexer.go +++ b/indexer/execution/withdrawal_indexer.go @@ -1,6 +1,7 @@ package execution import ( + "bytes" "fmt" "math/big" "time" @@ -78,6 +79,10 @@ func NewWithdrawalIndexer(indexer *IndexerCtx) *WithdrawalIndexer { return wi } +func (wi *WithdrawalIndexer) GetMatcherHeight() uint64 { + return wi.matcher.state.MatchHeight +} + func (wi *WithdrawalIndexer) runWithdrawalIndexerLoop() { defer utils.HandleSubroutinePanic("WithdrawalIndexer.runWithdrawalIndexerLoop") @@ -98,7 +103,7 @@ func (wi *WithdrawalIndexer) runWithdrawalIndexerLoop() { } func (wi *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.WithdrawalRequestTx, error) { - requestTx := wi.parseRequestLog(log) + requestTx := wi.parseRequestLog(log, nil) if requestTx == nil { return nil, fmt.Errorf("invalid withdrawal log") } @@ -114,7 +119,7 @@ func (wi *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transactio } func (wi *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.WithdrawalRequestTx, error) { - requestTx := wi.parseRequestLog(log) + requestTx := wi.parseRequestLog(log, &fork.forkId) if requestTx == nil { return nil, fmt.Errorf("invalid withdrawal log") } @@ -136,7 +141,7 @@ func (wi *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transacti return requestTx, nil } -func (wi *WithdrawalIndexer) parseRequestLog(log *types.Log) *dbtypes.WithdrawalRequestTx { +func (wi *WithdrawalIndexer) parseRequestLog(log *types.Log, forkId *beacon.ForkKey) *dbtypes.WithdrawalRequestTx { // data layout: // 0-20: sender address (20 bytes) // 20-68: validator pubkey (48 bytes) @@ -151,12 +156,24 @@ func (wi *WithdrawalIndexer) parseRequestLog(log *types.Log) *dbtypes.Withdrawal validatorPubkey := log.Data[20:68] amount := big.NewInt(0).SetBytes(log.Data[68:76]).Uint64() + validatorSet := wi.indexerCtx.beaconIndexer.GetCanonicalValidatorSet(forkId) + + var validatorIndex *uint64 + for _, validator := range validatorSet { + if bytes.Equal(validator.Validator.PublicKey[:], validatorPubkey) { + index := uint64(validator.Index) + validatorIndex = &index + break + } + } + requestTx := &dbtypes.WithdrawalRequestTx{ BlockNumber: log.BlockNumber, BlockIndex: uint64(log.Index), BlockRoot: log.BlockHash[:], SourceAddress: senderAddr, ValidatorPubkey: validatorPubkey, + ValidatorIndex: validatorIndex, Amount: amount, TxHash: log.TxHash[:], } diff --git a/services/chainservice.go b/services/chainservice.go index 5e4da02..e516ae1 100644 --- a/services/chainservice.go +++ b/services/chainservice.go @@ -24,13 +24,16 @@ import ( ) type ChainService struct { - logger logrus.FieldLogger - consensusPool *consensus.Pool - executionPool *execution.Pool - beaconIndexer *beacon.Indexer - validatorNames *ValidatorNames - mevRelayIndexer *mevrelay.MevIndexer - started bool + logger logrus.FieldLogger + consensusPool *consensus.Pool + executionPool *execution.Pool + beaconIndexer *beacon.Indexer + validatorNames *ValidatorNames + depositIndexer *execindexer.DepositIndexer + consolidationIndexer *execindexer.ConsolidationIndexer + withdrawalIndexer *execindexer.WithdrawalIndexer + mevRelayIndexer *mevrelay.MevIndexer + started bool } var GlobalBeaconService *ChainService @@ -179,9 +182,9 @@ func (cs *ChainService) StartService() error { cs.beaconIndexer.StartIndexer() // add execution indexers - execindexer.NewDepositIndexer(executionIndexerCtx) - execindexer.NewConsolidationIndexer(executionIndexerCtx) - execindexer.NewWithdrawalIndexer(executionIndexerCtx) + cs.depositIndexer = execindexer.NewDepositIndexer(executionIndexerCtx) + cs.consolidationIndexer = execindexer.NewConsolidationIndexer(executionIndexerCtx) + cs.withdrawalIndexer = execindexer.NewWithdrawalIndexer(executionIndexerCtx) // start MEV relay indexer cs.mevRelayIndexer.StartUpdater()