Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pending block/blob zero peer edge case #13625

Merged
merged 2 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ func (s *Service) hasPeer() bool {
return len(s.cfg.p2p.Peers().Connected()) > 0
}

var errNoPeersForPending = errors.New("no suitable peers to process pending block queue, delaying")

// processAndBroadcastBlock validates, processes, and broadcasts a block.
// part of the function is to request missing blobs from peers if the block contains kzg commitments.
func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) error {
Expand All @@ -202,10 +204,17 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
}
}

peers := s.getBestPeers()
peerCount := len(peers)
if peerCount > 0 {
if err := s.requestPendingBlobs(ctx, b, blkRoot, peers[rand.NewGenerator().Int()%peerCount]); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, b)
if err != nil {
return err
}
if len(request) > 0 {
peers := s.getBestPeers()
peerCount := len(peers)
if peerCount == 0 {
return errors.Wrapf(errNoPeersForPending, "block root=%#x", blkRoot)
}
if err := s.sendAndSaveBlobSidecars(ctx, request, peers[rand.NewGenerator().Int()%peerCount], b); err != nil {
return err
}
}
Expand Down
62 changes: 28 additions & 34 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
}
return nil
})

for _, blk := range blks {
// Skip blocks before deneb because they have no blob.
if blk.Version() < version.Deneb {
Expand All @@ -56,11 +55,17 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
if err != nil {
return err
}
if err := s.requestPendingBlobs(ctx, blk, blkRoot, id); err != nil {
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return err
}
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return err
}
}

return err
}

Expand Down Expand Up @@ -128,41 +133,13 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return nil
}

// requestPendingBlobs handles the request for pending blobs based on the given beacon block.
func (s *Service) requestPendingBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, peerID peer.ID) error {
if block.Version() < version.Deneb {
return nil // Block before deneb has no blob.
}

commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return err
}

if len(commitments) == 0 {
return nil // No operation if the block has no blob commitments.
}

contextByte, err := ContextByteVersionsForValRoot(s.cfg.chain.GenesisValidatorsRoot())
if err != nil {
return err
}

request, err := s.constructPendingBlobsRequest(ctx, blockRoot, len(commitments))
if err != nil {
return err
}

return s.sendAndSaveBlobSidecars(ctx, request, contextByte, peerID, block)
}

// sendAndSaveBlobSidecars sends the blob request and saves received sidecars.
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, contextByte ContextByteVersions, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
if len(request) == 0 {
return nil
}

sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, contextByte, &request)
sidecars, err := SendBlobSidecarByRoot(ctx, s.cfg.clock, s.cfg.p2p, peerID, s.ctxMap, &request)
if err != nil {
return err
}
Expand Down Expand Up @@ -193,8 +170,25 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
return nil
}

func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) {
if b.Version() < version.Deneb {
return nil, nil // Block before deneb has no blob.
}
cc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, err
}
if len(cc) == 0 {
return nil, nil
}
return s.constructPendingBlobsRequest(root, len(cc))
}

// constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB.
func (s *Service) constructPendingBlobsRequest(ctx context.Context, root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
if commitments == 0 {
return nil, nil
}
stored, err := s.cfg.blobStorage.Indices(root)
if err != nil {
return nil, err
Expand Down
19 changes: 12 additions & 7 deletions beacon-chain/sync/rpc_beacon_blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
leakybucket "github.com/prysmaticlabs/prysm/v5/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
"github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
eth "github.com/prysmaticlabs/prysm/v5/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
Expand Down Expand Up @@ -370,12 +370,16 @@ func TestRequestPendingBlobs(t *testing.T) {
t.Run("old block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("empty commitment block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, "test"))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("unsupported protocol", func(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
Expand Down Expand Up @@ -406,20 +410,21 @@ func TestRequestPendingBlobs(t *testing.T) {
b.Block.Body.BlobKzgCommitments = make([][]byte, 1)
b1, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, [32]byte{}, p2.PeerID()))
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b1)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.sendAndSaveBlobSidecars(context.Background(), request, p2.PeerID(), b1))
})
}

func TestConstructPendingBlobsRequest(t *testing.T) {
d := db.SetupDB(t)
bs := filesystem.NewEphemeralBlobStorage(t)
s := &Service{cfg: &config{beaconDB: d, blobStorage: bs}}
ctx := context.Background()

// No unknown indices.
root := [32]byte{1}
count := 3
actual, err := s.constructPendingBlobsRequest(ctx, root, count)
actual, err := s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, 3, len(actual))
for i, id := range actual {
Expand Down Expand Up @@ -449,7 +454,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
expected := []*eth.BlobIdentifier{
{Index: 1, BlockRoot: root[:]},
}
actual, err = s.constructPendingBlobsRequest(ctx, root, count)
actual, err = s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, expected[0].Index, actual[0].Index)
require.DeepEqual(t, expected[0].BlockRoot, actual[0].BlockRoot)
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type Service struct {
verifierWaiter *verification.InitializerWaiter
newBlobVerifier verification.NewBlobVerifier
availableBlocker coverage.AvailableBlocker
ctxMap ContextByteVersions
}

// NewService initializes new regular sync service.
Expand Down Expand Up @@ -295,6 +296,15 @@ func (s *Service) waitForChainStart() {
s.cfg.clock = clock
startTime := clock.GenesisTime()
log.WithField("starttime", startTime).Debug("Received state initialized event")

ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithError(err).WithField("genesis_validator_root", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap

// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
// Wait for chainstart in separate routine.
Expand Down
Loading