From 31efa9cd4fe22b1022b15ab5a22559211c5f2d64 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 27 Aug 2023 09:52:45 +0200 Subject: [PATCH] only use block events to keep track of the chain (head events are inconsistent across clients) --- indexer/client.go | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/indexer/client.go b/indexer/client.go index 364c6b19..a63bdc23 100644 --- a/indexer/client.go +++ b/indexer/client.go @@ -172,7 +172,7 @@ func (client *IndexerClient) runIndexerClient() error { client.retryCounter = 0 // start event stream - blockStream := client.rpcClient.NewBlockStream(rpc.StreamBlockEvent | rpc.StreamHeadEvent | rpc.StreamFinalizedEvent) + blockStream := client.rpcClient.NewBlockStream(rpc.StreamBlockEvent | rpc.StreamFinalizedEvent) defer blockStream.Close() // prefill cache @@ -201,8 +201,6 @@ func (client *IndexerClient) runIndexerClient() error { switch evt.Event { case rpc.StreamBlockEvent: client.processBlockEvent(evt.Data.(*rpctypes.StandardV1StreamedBlockEvent)) - case rpc.StreamHeadEvent: - client.processHeadEvent(evt.Data.(*rpctypes.StandardV1StreamedHeadEvent)) case rpc.StreamFinalizedEvent: client.processFinalizedEvent(evt.Data.(*rpctypes.StandardV1StreamedFinalizedCheckpointEvent)) } @@ -243,6 +241,7 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp logger.WithField("client", client.clientName).Debugf("received known block %v:%v [0x%x] warmup, head", utils.EpochOfSlot(uint64(client.lastHeadSlot)), client.lastHeadSlot, client.lastHeadRoot) } client.ensureBlock(currentBlock, &latestHeader.Data.Header) + client.setHeadBlock(latestHeader.Data.Root, uint64(latestHeader.Data.Header.Message.Slot)) // walk backwards and load all blocks until we reach a finalized epoch parentRoot := []byte(currentBlock.header.Message.ParentRoot) @@ -448,17 +447,10 @@ func (client *IndexerClient) processBlockEvent(evt *rpctypes.StandardV1StreamedB if err != nil { return err } + client.setHeadBlock(evt.Block, uint64(evt.Slot)) return nil } -func (client *IndexerClient) processHeadEvent(evt *rpctypes.StandardV1StreamedHeadEvent) error { - currentBlock := client.indexerCache.getCachedBlock(evt.Block) - if currentBlock == nil { - return fmt.Errorf("received head event for non existing block: %v", evt.Block.String()) - } - return client.setHeadBlock(evt.Block, uint64(evt.Slot)) -} - func (client *IndexerClient) processFinalizedEvent(evt *rpctypes.StandardV1StreamedFinalizedCheckpointEvent) error { logger.WithField("client", client.clientName).Debugf("received finalization_checkpoint event: epoch %v [%s]", evt.Epoch, evt.Block.String()) client.indexerCache.setFinalizedHead(int64(evt.Epoch)-1, evt.Block)