From 16a8c3d4789459099db6159e635eed4491d3b83f Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 9 Oct 2024 19:08:04 +0200 Subject: [PATCH] refactoring --- indexer/execution/consolidation_indexer.go | 28 ++-- indexer/execution/contract_indexer.go | 166 ++++++++++----------- indexer/execution/withdrawal_indexer.go | 46 +++--- 3 files changed, 120 insertions(+), 120 deletions(-) diff --git a/indexer/execution/consolidation_indexer.go b/indexer/execution/consolidation_indexer.go index d5daa77..b83f099 100644 --- a/indexer/execution/consolidation_indexer.go +++ b/indexer/execution/consolidation_indexer.go @@ -59,27 +59,27 @@ func NewConsolidationIndexer(indexer *IndexerCtx) *ConsolidationIndexer { return ci } -func (ds *ConsolidationIndexer) runConsolidationIndexerLoop() { +func (ci *ConsolidationIndexer) runConsolidationIndexerLoop() { defer utils.HandleSubroutinePanic("ConsolidationIndexer.runConsolidationIndexerLoop") for { time.Sleep(30 * time.Second) - ds.logger.Debugf("run consolidation indexer logic") + ci.logger.Debugf("run consolidation indexer logic") - err := ds.indexer.runContractIndexer() + err := ci.indexer.runContractIndexer() if err != nil { - ds.logger.Errorf("indexer error: %v", err) + ci.logger.Errorf("indexer error: %v", err) } - err = ds.matcher.runConsolidationMatcher() + err = ci.matcher.runConsolidationMatcher() if err != nil { - ds.logger.Errorf("matcher error: %v", err) + ci.logger.Errorf("matcher error: %v", err) } } } -func (ds *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.ConsolidationRequestTx, error) { - requestTx := ds.parseRequestLog(log) +func (ci *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.ConsolidationRequestTx, error) { + requestTx := ci.parseRequestLog(log) if requestTx == nil { return nil, fmt.Errorf("invalid consolidation log") } @@ -94,8 +94,8 @@ func (ds *ConsolidationIndexer) processFinalTx(log *types.Log, tx *types.Transac return requestTx, nil } -func (ds *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.ConsolidationRequestTx, error) { - requestTx := ds.parseRequestLog(log) +func (ci *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.ConsolidationRequestTx, error) { + requestTx := ci.parseRequestLog(log) if requestTx == nil { return nil, fmt.Errorf("invalid consolidation log") } @@ -107,7 +107,7 @@ func (ds *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transa requestTx.TxTarget = txTo[:] requestTx.DequeueBlock = dequeueBlock - clBlock := ds.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)) + clBlock := ci.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)) if len(clBlock) > 0 { requestTx.ForkId = uint64(clBlock[0].GetForkId()) } else { @@ -117,14 +117,14 @@ func (ds *ConsolidationIndexer) processRecentTx(log *types.Log, tx *types.Transa return requestTx, nil } -func (ds *ConsolidationIndexer) parseRequestLog(log *types.Log) *dbtypes.ConsolidationRequestTx { +func (ci *ConsolidationIndexer) parseRequestLog(log *types.Log) *dbtypes.ConsolidationRequestTx { // data layout: // 0-20: sender address (20 bytes) // 20-68: source pubkey (48 bytes) // 68-116: target pubkey (48 bytes) if len(log.Data) < 116 { - ds.logger.Warnf("invalid consolidation log data length: %v", len(log.Data)) + ci.logger.Warnf("invalid consolidation log data length: %v", len(log.Data)) return nil } @@ -145,7 +145,7 @@ func (ds *ConsolidationIndexer) parseRequestLog(log *types.Log) *dbtypes.Consoli return requestTx } -func (ds *ConsolidationIndexer) persistConsolidationTxs(tx *sqlx.Tx, requests []*dbtypes.ConsolidationRequestTx) error { +func (ci *ConsolidationIndexer) persistConsolidationTxs(tx *sqlx.Tx, requests []*dbtypes.ConsolidationRequestTx) error { requestCount := len(requests) for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 { endIdx := requestIdx + 500 diff --git a/indexer/execution/contract_indexer.go b/indexer/execution/contract_indexer.go index c468f5c..a95a8c0 100644 --- a/indexer/execution/contract_indexer.go +++ b/indexer/execution/contract_indexer.go @@ -65,29 +65,29 @@ func newContractIndexer[TxType any](indexer *IndexerCtx, logger logrus.FieldLogg return ci } -func (ds *contractIndexer[_]) loadState() { +func (ci *contractIndexer[_]) loadState() { syncState := contractIndexerState{} - db.GetExplorerState(ds.options.indexerKey, &syncState) - ds.state = &syncState + db.GetExplorerState(ci.options.indexerKey, &syncState) + ci.state = &syncState - if ds.state.ForkStates == nil { - ds.state.ForkStates = make(map[beacon.ForkKey]*contractIndexerForkState) + if ci.state.ForkStates == nil { + ci.state.ForkStates = make(map[beacon.ForkKey]*contractIndexerForkState) } - if ds.state.FinalBlock == 0 { - ds.state.FinalBlock = ds.options.deployBlock + if ci.state.FinalBlock == 0 { + ci.state.FinalBlock = ci.options.deployBlock } } -func (ds *contractIndexer[_]) persistState(tx *sqlx.Tx) error { - finalizedBlockNumber := ds.getFinalizedBlockNumber() - for forkId, forkState := range ds.state.ForkStates { +func (ci *contractIndexer[_]) persistState(tx *sqlx.Tx) error { + finalizedBlockNumber := ci.getFinalizedBlockNumber() + for forkId, forkState := range ci.state.ForkStates { if forkState.Block < finalizedBlockNumber { - delete(ds.state.ForkStates, forkId) + delete(ci.state.ForkStates, forkId) } } - err := db.SetExplorerState(ds.options.indexerKey, ds.state, tx) + err := db.SetExplorerState(ci.options.indexerKey, ci.state, tx) if err != nil { return fmt.Errorf("error while updating contract indexer state: %v", err) } @@ -97,41 +97,41 @@ func (ds *contractIndexer[_]) persistState(tx *sqlx.Tx) error { // runConsolidationIndexer runs the consolidation indexer logic. // It fetches consolidation logs from finalized and recent blocks. -func (ds *contractIndexer[_]) runContractIndexer() error { - if ds.state == nil { - ds.loadState() +func (ci *contractIndexer[_]) runContractIndexer() error { + if ci.state == nil { + ci.loadState() } - finalizedEpoch, _ := ds.indexer.chainState.GetFinalizedCheckpoint() + finalizedEpoch, _ := ci.indexer.chainState.GetFinalizedCheckpoint() if finalizedEpoch > 0 { - finalizedBlockNumber := ds.getFinalizedBlockNumber() + finalizedBlockNumber := ci.getFinalizedBlockNumber() if finalizedBlockNumber == 0 { return fmt.Errorf("finalized block not found in cache or db") } - if finalizedBlockNumber < ds.state.FinalBlock { - return fmt.Errorf("finalized block number (%v) smaller than index state (%v)", finalizedBlockNumber, ds.state.FinalBlock) + if finalizedBlockNumber < ci.state.FinalBlock { + return fmt.Errorf("finalized block number (%v) smaller than index state (%v)", finalizedBlockNumber, ci.state.FinalBlock) } - if finalizedBlockNumber > ds.state.FinalBlock { - err := ds.processFinalizedBlocks(finalizedBlockNumber) + if finalizedBlockNumber > ci.state.FinalBlock { + err := ci.processFinalizedBlocks(finalizedBlockNumber) if err != nil { return err } } } - ds.processRecentBlocks() + ci.processRecentBlocks() return nil } -func (ds *contractIndexer[_]) getFinalizedBlockNumber() uint64 { +func (ci *contractIndexer[_]) getFinalizedBlockNumber() uint64 { var finalizedBlockNumber uint64 - _, finalizedRoot := ds.indexer.chainState.GetFinalizedCheckpoint() - if finalizedBlock := ds.indexer.beaconIndexer.GetBlockByRoot(finalizedRoot); finalizedBlock != nil { + _, finalizedRoot := ci.indexer.chainState.GetFinalizedCheckpoint() + if finalizedBlock := ci.indexer.beaconIndexer.GetBlockByRoot(finalizedRoot); finalizedBlock != nil { if indexVals := finalizedBlock.GetBlockIndex(); indexVals != nil { finalizedBlockNumber = indexVals.ExecutionNumber } @@ -147,14 +147,14 @@ func (ds *contractIndexer[_]) getFinalizedBlockNumber() uint64 { return finalizedBlockNumber } -func (ds *contractIndexer[_]) loadFilteredLogs(ctx context.Context, client *execution.Client, query ethereum.FilterQuery) ([]types.Log, error) { +func (ci *contractIndexer[_]) loadFilteredLogs(ctx context.Context, client *execution.Client, query ethereum.FilterQuery) ([]types.Log, error) { ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() return client.GetRPCClient().GetEthClient().FilterLogs(ctx, query) } -func (ds *contractIndexer[_]) loadTransactionByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Transaction, error) { +func (ci *contractIndexer[_]) loadTransactionByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Transaction, error) { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -162,15 +162,15 @@ func (ds *contractIndexer[_]) loadTransactionByHash(ctx context.Context, client return tx, err } -func (ds *contractIndexer[_]) loadHeaderByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) { +func (ci *contractIndexer[_]) loadHeaderByHash(ctx context.Context, client *execution.Client, hash common.Hash) (*types.Header, error) { ctx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() return client.GetRPCClient().GetHeaderByHash(ctx, hash) } -func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber uint64) error { - clients := ds.indexer.getFinalizedClients(execution.AnyClient) +func (ci *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber uint64) error { + clients := ci.indexer.getFinalizedClients(execution.AnyClient) if len(clients) == 0 { return fmt.Errorf("no ready execution client found") } @@ -180,10 +180,10 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u retryCount := 0 - for ds.state.FinalBlock < finalizedBlockNumber { + for ci.state.FinalBlock < finalizedBlockNumber { client := clients[retryCount%len(clients)] - batchSize := uint64(ds.options.batchSize) + batchSize := uint64(ci.options.batchSize) if retryCount > 0 { batchSize /= uint64(math.Pow(2, float64(retryCount))) if batchSize < 10 { @@ -191,20 +191,20 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } } - toBlock := ds.state.FinalBlock + uint64(ds.options.batchSize) + toBlock := ci.state.FinalBlock + uint64(ci.options.batchSize) if toBlock > finalizedBlockNumber { toBlock = finalizedBlockNumber } query := ethereum.FilterQuery{ - FromBlock: big.NewInt(0).SetUint64(ds.state.FinalBlock + 1), + FromBlock: big.NewInt(0).SetUint64(ci.state.FinalBlock + 1), ToBlock: big.NewInt(0).SetUint64(toBlock), Addresses: []common.Address{ - ds.options.contractAddress, + ci.options.contractAddress, }, } - logs, err := ds.loadFilteredLogs(ctx, client, query) + logs, err := ci.loadFilteredLogs(ctx, client, query) if err != nil { if retryCount < 3 { retryCount++ @@ -221,16 +221,16 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u var txBlockHeader *types.Header requestTxs := []*TxType{} - queueBlock := ds.state.FinalBlock - queueLength := ds.state.FinalQueueLen + queueBlock := ci.state.FinalBlock + queueLength := ci.state.FinalQueueLen - ds.logger.Debugf("received contract logs for block %v - %v: %v events", ds.state.FinalBlock, toBlock, len(logs)) + ci.logger.Debugf("received contract logs for block %v - %v: %v events", ci.state.FinalBlock, toBlock, len(logs)) for idx := range logs { log := &logs[idx] if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) { - txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash) + txDetails, err = ci.loadTransactionByHash(ctx, client, log.TxHash) if err != nil { return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err) } @@ -239,7 +239,7 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } if txBlockHeader == nil || !bytes.Equal(txHeaderHash, log.BlockHash[:]) { - txBlockHeader, err = ds.loadHeaderByHash(ctx, client, log.BlockHash) + txBlockHeader, err = ci.loadHeaderByHash(ctx, client, log.BlockHash) if err != nil { return fmt.Errorf("could not load block details (%v): %v", log.BlockHash, err) } @@ -253,10 +253,10 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } if queueBlock > log.BlockNumber { - ds.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock) + ci.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock) return nil } else if queueBlock < log.BlockNumber { - dequeuedRequests := (log.BlockNumber - queueBlock) * ds.options.dequeueRate + dequeuedRequests := (log.BlockNumber - queueBlock) * ci.options.dequeueRate if dequeuedRequests > queueLength { queueLength = 0 } else { @@ -266,10 +266,10 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u queueBlock = log.BlockNumber } - dequeueBlock := log.BlockNumber + (queueLength / ds.options.dequeueRate) + dequeueBlock := log.BlockNumber + (queueLength / ci.options.dequeueRate) queueLength++ - requestTx, err := ds.options.processFinalTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock) + requestTx, err := ci.options.processFinalTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock) if err != nil { continue } @@ -282,7 +282,7 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } if queueBlock < toBlock { - dequeuedRequests := (toBlock - queueBlock) * ds.options.dequeueRate + dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate if dequeuedRequests > queueLength { queueLength = 0 } else { @@ -293,10 +293,10 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u } if len(requestTxs) > 0 { - ds.logger.Infof("crawled transactions for block %v - %v: %v events", ds.state.FinalBlock, toBlock, len(requestTxs)) + ci.logger.Infof("crawled transactions for block %v - %v: %v events", ci.state.FinalBlock, toBlock, len(requestTxs)) } - err = ds.persistFinalizedRequestTxs(toBlock, queueLength, requestTxs) + err = ci.persistFinalizedRequestTxs(toBlock, queueLength, requestTxs) if err != nil { return fmt.Errorf("could not persist indexed transactions: %v", err) } @@ -307,23 +307,23 @@ func (ds *contractIndexer[TxType]) processFinalizedBlocks(finalizedBlockNumber u return nil } -func (ds *contractIndexer[_]) processRecentBlocks() error { - headForks := ds.indexer.getForksWithClients(execution.AnyClient) +func (ci *contractIndexer[_]) processRecentBlocks() error { + headForks := ci.indexer.getForksWithClients(execution.AnyClient) for _, headFork := range headForks { - err := ds.processRecentBlocksForFork(headFork) + err := ci.processRecentBlocksForFork(headFork) if err != nil { if headFork.canonical { - ds.logger.Errorf("could not process recent events from canonical fork %v: %v", headFork.forkId, err) + ci.logger.Errorf("could not process recent events from canonical fork %v: %v", headFork.forkId, err) } else { - ds.logger.Warnf("could not process recent events from fork %v: %v", headFork.forkId, err) + ci.logger.Warnf("could not process recent events from fork %v: %v", headFork.forkId, err) } } } return nil } -func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWithClients) error { - elHeadBlock := ds.indexer.beaconIndexer.GetCanonicalHead(&headFork.forkId) +func (ci *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWithClients) error { + elHeadBlock := ci.indexer.beaconIndexer.GetCanonicalHead(&headFork.forkId) if elHeadBlock == nil { return fmt.Errorf("head block not found") } @@ -338,11 +338,11 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith elHeadBlockNumber-- } - startBlockNumber := ds.state.FinalBlock + 1 - startQueueLen := ds.state.FinalQueueLen + startBlockNumber := ci.state.FinalBlock + 1 + startQueueLen := ci.state.FinalQueueLen // get last processed block for this fork - if forkState := ds.state.ForkStates[headFork.forkId]; forkState != nil && forkState.Block <= elHeadBlockNumber { + if forkState := ci.state.ForkStates[headFork.forkId]; forkState != nil && forkState.Block <= elHeadBlockNumber { if forkState.Block == elHeadBlockNumber { return nil // already processed } @@ -350,8 +350,8 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith startBlockNumber = forkState.Block + 1 startQueueLen = forkState.QueueLen } else { - for parentForkId := range ds.indexer.beaconIndexer.GetParentForkIds(headFork.forkId) { - if parentForkState := ds.state.ForkStates[beacon.ForkKey(parentForkId)]; parentForkState != nil && parentForkState.Block <= elHeadBlockNumber { + for parentForkId := range ci.indexer.beaconIndexer.GetParentForkIds(headFork.forkId) { + if parentForkState := ci.state.ForkStates[beacon.ForkKey(parentForkId)]; parentForkState != nil && parentForkState.Block <= elHeadBlockNumber { startBlockNumber = parentForkState.Block + 1 startQueueLen = parentForkState.QueueLen } @@ -381,7 +381,7 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith for retryCount := 0; retryCount < 3; retryCount++ { client := headFork.clients[retryCount%len(headFork.clients)] - batchSize := uint64(ds.options.batchSize) + batchSize := uint64(ci.options.batchSize) if retryCount > 0 { batchSize /= uint64(math.Pow(2, float64(retryCount))) if batchSize < 10 { @@ -389,7 +389,7 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } } - toBlock = startBlockNumber + uint64(ds.options.batchSize) + toBlock = startBlockNumber + uint64(ci.options.batchSize) if toBlock > elHeadBlockNumber { toBlock = elHeadBlockNumber } @@ -404,13 +404,13 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith FromBlock: big.NewInt(0).SetUint64(startBlockNumber), ToBlock: big.NewInt(0).SetUint64(toBlock), Addresses: []common.Address{ - ds.options.contractAddress, + ci.options.contractAddress, }, } - logs, reqError = ds.loadFilteredLogs(ctx, client, query) + logs, reqError = ci.loadFilteredLogs(ctx, client, query) if reqError != nil { - ds.logger.Warnf("error fetching contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError) + ci.logger.Warnf("error fetching contract logs for fork %v (%v-%v): %v", headFork.forkId, startBlockNumber, toBlock, reqError) continue } @@ -420,7 +420,7 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith log := &logs[idx] if txHash == nil || !bytes.Equal(txHash, log.TxHash[:]) { - txDetails, err = ds.loadTransactionByHash(ctx, client, log.TxHash) + txDetails, err = ci.loadTransactionByHash(ctx, client, log.TxHash) if err != nil { return fmt.Errorf("could not load tx details (%v): %v", log.TxHash, err) } @@ -429,7 +429,7 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } if txBlockHeader == nil || !bytes.Equal(txHeaderHash, log.BlockHash[:]) { - txBlockHeader, err = ds.loadHeaderByHash(ctx, client, log.BlockHash) + txBlockHeader, err = ci.loadHeaderByHash(ctx, client, log.BlockHash) if err != nil { return fmt.Errorf("could not load block details (%v): %v", log.BlockHash, err) } @@ -443,10 +443,10 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } if queueBlock > log.BlockNumber { - ds.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock) + ci.logger.Warnf("contract log for block %v received after block %v", log.BlockNumber, queueBlock) return nil } else if queueBlock < log.BlockNumber { - dequeuedRequests := (log.BlockNumber - queueBlock) * ds.options.dequeueRate + dequeuedRequests := (log.BlockNumber - queueBlock) * ci.options.dequeueRate if dequeuedRequests > startQueueLen { startQueueLen = 0 } else { @@ -456,10 +456,10 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith queueBlock = log.BlockNumber } - dequeueBlock := log.BlockNumber + (startQueueLen / ds.options.dequeueRate) + dequeueBlock := log.BlockNumber + (startQueueLen / ci.options.dequeueRate) startQueueLen++ - requestTx, err := ds.options.processRecentTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock, headFork) + requestTx, err := ci.options.processRecentTx(log, txDetails, txBlockHeader, txFrom, dequeueBlock, headFork) if err != nil { continue } @@ -472,7 +472,7 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } if queueBlock < toBlock { - dequeuedRequests := (toBlock - queueBlock) * ds.options.dequeueRate + dequeuedRequests := (toBlock - queueBlock) * ci.options.dequeueRate if dequeuedRequests > startQueueLen { startQueueLen = 0 } else { @@ -483,9 +483,9 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith } if len(requestTxs) > 0 { - ds.logger.Infof("crawled recent contract logs for fork %v (%v-%v): %v events", headFork.forkId, startBlockNumber, toBlock, len(requestTxs)) + ci.logger.Infof("crawled recent contract logs for fork %v (%v-%v): %v events", headFork.forkId, startBlockNumber, toBlock, len(requestTxs)) - err := ds.persistRecentRequestTxs(headFork.forkId, queueBlock, startQueueLen, requestTxs) + err := ci.persistRecentRequestTxs(headFork.forkId, queueBlock, startQueueLen, requestTxs) if err != nil { return fmt.Errorf("could not persist contract logs: %v", err) } @@ -506,32 +506,32 @@ func (ds *contractIndexer[TxType]) processRecentBlocksForFork(headFork *forkWith return resError } -func (ds *contractIndexer[TxType]) persistFinalizedRequestTxs(finalBlockNumber, finalQueueLen uint64, requests []*TxType) error { +func (ci *contractIndexer[TxType]) persistFinalizedRequestTxs(finalBlockNumber, finalQueueLen uint64, requests []*TxType) error { return db.RunDBTransaction(func(tx *sqlx.Tx) error { - err := ds.options.persistTxs(tx, requests) + err := ci.options.persistTxs(tx, requests) if err != nil { return fmt.Errorf("error while persisting contract logs: %v", err) } - ds.state.FinalBlock = finalBlockNumber - ds.state.FinalQueueLen = finalQueueLen + ci.state.FinalBlock = finalBlockNumber + ci.state.FinalQueueLen = finalQueueLen - return ds.persistState(tx) + return ci.persistState(tx) }) } -func (ds *contractIndexer[TxType]) persistRecentRequestTxs(forkId beacon.ForkKey, finalBlockNumber, finalQueueLen uint64, requests []*TxType) error { +func (ci *contractIndexer[TxType]) persistRecentRequestTxs(forkId beacon.ForkKey, finalBlockNumber, finalQueueLen uint64, requests []*TxType) error { return db.RunDBTransaction(func(tx *sqlx.Tx) error { - err := ds.options.persistTxs(tx, requests) + err := ci.options.persistTxs(tx, requests) if err != nil { return fmt.Errorf("error while persisting contract logs: %v", err) } - ds.state.ForkStates[forkId] = &contractIndexerForkState{ + ci.state.ForkStates[forkId] = &contractIndexerForkState{ Block: finalBlockNumber, QueueLen: finalQueueLen, } - return ds.persistState(tx) + return ci.persistState(tx) }) } diff --git a/indexer/execution/withdrawal_indexer.go b/indexer/execution/withdrawal_indexer.go index 8359819..aea811f 100644 --- a/indexer/execution/withdrawal_indexer.go +++ b/indexer/execution/withdrawal_indexer.go @@ -32,14 +32,14 @@ func NewWithdrawalIndexer(indexer *IndexerCtx) *WithdrawalIndexer { batchSize = 1000 } - ci := &WithdrawalIndexer{ + wi := &WithdrawalIndexer{ indexerCtx: indexer, logger: indexer.logger.WithField("indexer", "withdrawal"), } - ci.indexer = newContractIndexer[dbtypes.WithdrawalRequestTx]( + wi.indexer = newContractIndexer[dbtypes.WithdrawalRequestTx]( indexer, - ci.logger.WithField("routine", "crawler"), + wi.logger.WithField("routine", "crawler"), &contractIndexerOptions[dbtypes.WithdrawalRequestTx]{ indexerKey: "indexer.withdrawalindexer", batchSize: batchSize, @@ -47,40 +47,40 @@ func NewWithdrawalIndexer(indexer *IndexerCtx) *WithdrawalIndexer { deployBlock: uint64(utils.Config.ExecutionApi.ElectraDeployBlock), dequeueRate: withdrawalDequeueRate, - processFinalTx: ci.processFinalTx, - processRecentTx: ci.processRecentTx, - persistTxs: ci.persistWithdrawalTxs, + processFinalTx: wi.processFinalTx, + processRecentTx: wi.processRecentTx, + persistTxs: wi.persistWithdrawalTxs, }, ) - ci.matcher = NewWithdrawalMatcher(indexer, ci) + wi.matcher = NewWithdrawalMatcher(indexer, wi) - go ci.runWithdrawalIndexerLoop() + go wi.runWithdrawalIndexerLoop() - return ci + return wi } -func (ds *WithdrawalIndexer) runWithdrawalIndexerLoop() { +func (wi *WithdrawalIndexer) runWithdrawalIndexerLoop() { defer utils.HandleSubroutinePanic("WithdrawalIndexer.runWithdrawalIndexerLoop") for { time.Sleep(30 * time.Second) - ds.logger.Debugf("run withdrawal indexer logic") + wi.logger.Debugf("run withdrawal indexer logic") - err := ds.indexer.runContractIndexer() + err := wi.indexer.runContractIndexer() if err != nil { - ds.logger.Errorf("indexer error: %v", err) + wi.logger.Errorf("indexer error: %v", err) } - err = ds.matcher.runWithdrawalMatcher() + err = wi.matcher.runWithdrawalMatcher() if err != nil { - ds.logger.Errorf("matcher error: %v", err) + wi.logger.Errorf("matcher error: %v", err) } } } -func (ds *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.WithdrawalRequestTx, error) { - requestTx := ds.parseRequestLog(log) +func (wi *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64) (*dbtypes.WithdrawalRequestTx, error) { + requestTx := wi.parseRequestLog(log) if requestTx == nil { return nil, fmt.Errorf("invalid withdrawal log") } @@ -95,8 +95,8 @@ func (ds *WithdrawalIndexer) processFinalTx(log *types.Log, tx *types.Transactio return requestTx, nil } -func (ds *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.WithdrawalRequestTx, error) { - requestTx := ds.parseRequestLog(log) +func (wi *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transaction, header *types.Header, txFrom common.Address, dequeueBlock uint64, fork *forkWithClients) (*dbtypes.WithdrawalRequestTx, error) { + requestTx := wi.parseRequestLog(log) if requestTx == nil { return nil, fmt.Errorf("invalid withdrawal log") } @@ -108,7 +108,7 @@ func (ds *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transacti requestTx.TxTarget = txTo[:] requestTx.DequeueBlock = dequeueBlock - clBlock := ds.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)) + clBlock := wi.indexerCtx.beaconIndexer.GetBlocksByExecutionBlockHash(phase0.Hash32(log.BlockHash)) if len(clBlock) > 0 { requestTx.ForkId = uint64(clBlock[0].GetForkId()) } else { @@ -118,14 +118,14 @@ func (ds *WithdrawalIndexer) processRecentTx(log *types.Log, tx *types.Transacti return requestTx, nil } -func (ds *WithdrawalIndexer) parseRequestLog(log *types.Log) *dbtypes.WithdrawalRequestTx { +func (wi *WithdrawalIndexer) parseRequestLog(log *types.Log) *dbtypes.WithdrawalRequestTx { // data layout: // 0-20: sender address (20 bytes) // 20-68: validator pubkey (48 bytes) // 68-76: amount (8 bytes) if len(log.Data) < 76 { - ds.logger.Warnf("invalid withdrawal log data length: %v", len(log.Data)) + wi.logger.Warnf("invalid withdrawal log data length: %v", len(log.Data)) return nil } @@ -146,7 +146,7 @@ func (ds *WithdrawalIndexer) parseRequestLog(log *types.Log) *dbtypes.Withdrawal return requestTx } -func (ds *WithdrawalIndexer) persistWithdrawalTxs(tx *sqlx.Tx, requests []*dbtypes.WithdrawalRequestTx) error { +func (wi *WithdrawalIndexer) persistWithdrawalTxs(tx *sqlx.Tx, requests []*dbtypes.WithdrawalRequestTx) error { requestCount := len(requests) for requestIdx := 0; requestIdx < requestCount; requestIdx += 500 { endIdx := requestIdx + 500