diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index e50a605a101f..2ac7ad0e685e 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -192,6 +192,8 @@ func (s *Service) hasPeer() bool { return len(s.cfg.p2p.Peers().Connected()) > 0 } +var errNoPeersForPending = errors.New("no suitable peers to process pending block queue, delaying") + // processAndBroadcastBlock validates, processes, and broadcasts a block. // part of the function is to request missing blobs from peers if the block contains kzg commitments. func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error { @@ -202,10 +204,17 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea } } - peers := s.getBestPeers() - peerCount := len(peers) - if peerCount > 0 { - if err := s.requestPendingBlobs(ctx, b, blkRoot, peers[rand.NewGenerator().Int()%peerCount]); err != nil { + request, err := s.pendingBlobsRequestForBlock(blkRoot, b) + if err != nil { + return err + } + if len(request) > 0 { + peers := s.getBestPeers() + peerCount := len(peers) + if peerCount == 0 { + return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot) + } + if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil { return err } } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 143817689b1c..f8f8ce94478c 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -46,7 +46,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t } return nil }) - for _, blk := range blks { // Skip blocks before deneb because they have no blob. if blk.Version() < version.Deneb { @@ -56,11 +55,17 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t if err != nil { return err } - if err := s.requestPendingBlobs(ctx, blk, blkRoot, id); err != nil { + request, err := s.pendingBlobsRequestForBlock(blkRoot, blk) + if err != nil { + return err + } + if len(request) == 0 { + continue + } + if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil { return err } } - return err } @@ -128,41 +133,13 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ return nil } -// requestPendingBlobs handles the request for pending blobs based on the given beacon block. -func (s *Service) requestPendingBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, peerID peer.ID) error { - if block.Version() < version.Deneb { - return nil // Block before deneb has no blob. - } - - commitments, err := block.Block().Body().BlobKzgCommitments() - if err != nil { - return err - } - - if len(commitments) == 0 { - return nil // No operation if the block has no blob commitments. - } - - contextByte, err := ContextByteVersionsForValRoot(s.cfg.chain.GenesisValidatorsRoot()) - if err != nil { - return err - } - - request, err := s.constructPendingBlobsRequest(ctx, blockRoot, len(commitments)) - if err != nil { - return err - } - - return s.sendAndSaveBlobSidecars(ctx, request, contextByte, peerID, block) -} - // sendAndSaveBlobSidecars sends the blob request and saves received sidecars. -func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, contextByte ContextByteVersions, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error { +func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error { if len(request) == 0 { return nil } - sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, contextByte, &request) + sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request) if err != nil { return err } @@ -193,8 +170,25 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo return nil } +func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) { + if b.Version() < version.Deneb { + return nil, nil // Block before deneb has no blob. + } + cc, err := b.Block().Body().BlobKzgCommitments() + if err != nil { + return nil, err + } + if len(cc) == 0 { + return nil, nil + } + return s.constructPendingBlobsRequest(root, len(cc)) +} + // constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB. -func (s *Service) constructPendingBlobsRequest(ctx context.Context, root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) { +func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) { + if commitments == 0 { + return nil, nil + } stored, err := s.cfg.blobStorage.Indices(root) if err != nil { return nil, err diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index 59cbbc39bee3..d95306297025 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -31,7 +31,7 @@ import ( leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" - "github.com/prysmaticlabs/prysm/v5/proto/eth/v2" + eth "github.com/prysmaticlabs/prysm/v5/proto/eth/v2" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" @@ -370,12 +370,16 @@ func TestRequestPendingBlobs(t *testing.T) { t.Run("old block should not fail", func(t *testing.T) { b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) - require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test")) + request, err := s.pendingBlobsRequestForBlock([32]byte{}, b) + require.NoError(t, err) + require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b)) }) t.Run("empty commitment block should not fail", func(t *testing.T) { b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock()) require.NoError(t, err) - require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test")) + request, err := s.pendingBlobsRequestForBlock([32]byte{}, b) + require.NoError(t, err) + require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b)) }) t.Run("unsupported protocol", func(t *testing.T) { p1 := p2ptest.NewTestP2P(t) @@ -406,7 +410,9 @@ func TestRequestPendingBlobs(t *testing.T) { b.Block.Body.BlobKzgCommitments = make([][]byte, 1) b1, err := blocks.NewSignedBeaconBlock(b) require.NoError(t, err) - require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, [32]byte{}, p2.PeerID())) + request, err := s.pendingBlobsRequestForBlock([32]byte{}, b1) + require.NoError(t, err) + require.ErrorContains(t, "protocols not supported", s.sendAndSaveBlobSidecars(context.Background(), request, p2.PeerID(), b1)) }) } @@ -414,12 +420,11 @@ func TestConstructPendingBlobsRequest(t *testing.T) { d := db.SetupDB(t) bs := filesystem.NewEphemeralBlobStorage(t) s := &Service{cfg: &config{beaconDB: d, blobStorage: bs}} - ctx := context.Background() // No unknown indices. root := [32]byte{1} count := 3 - actual, err := s.constructPendingBlobsRequest(ctx, root, count) + actual, err := s.constructPendingBlobsRequest(root, count) require.NoError(t, err) require.Equal(t, 3, len(actual)) for i, id := range actual { @@ -449,7 +454,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) { expected := []*eth.BlobIdentifier{ {Index: 1, BlockRoot: root[:]}, } - actual, err = s.constructPendingBlobsRequest(ctx, root, count) + actual, err = s.constructPendingBlobsRequest(root, count) require.NoError(t, err) require.Equal(t, expected[0].Index, actual[0].Index) require.DeepEqual(t, expected[0].BlockRoot, actual[0].BlockRoot) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index a31c8a582d03..c6bcd557aa63 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -157,6 +157,7 @@ type Service struct { verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier availableBlocker coverage.AvailableBlocker + ctxMap ContextByteVersions } // NewService initializes new regular sync service. @@ -295,6 +296,15 @@ func (s *Service) waitForChainStart() { s.cfg.clock = clock startTime := clock.GenesisTime() log.WithField("starttime", startTime).Debug("Received state initialized event") + + ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot()) + if err != nil { + log.WithError(err).WithField("genesis_validator_root", clock.GenesisValidatorsRoot()). + Error("sync service failed to initialize context version map") + return + } + s.ctxMap = ctxMap + // Register respective rpc handlers at state initialized event. s.registerRPCHandlers() // Wait for chainstart in separate routine.