Skip to content

Commit

Permalink
compute request first for len check
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Feb 16, 2024
1 parent 060e9dd commit d2d1763
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 44 deletions.
6 changes: 1 addition & 5 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,7 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea
}
}

commitmentCount, err := commitmentsForBlock(b)
if err != nil {
return err
}
request, err := s.constructPendingBlobsRequest(ctx, blkRoot, commitmentCount)
request, err := s.pendingBlobsRequestForBlock(blkRoot, b)
if err != nil {
return err
}
Expand Down
51 changes: 21 additions & 30 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *t
if blk.Version() < version.Deneb {
continue
}
cmts, err := commitmentsForBlock(blk)
blkRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return err
}
if cmts == 0 {
continue
}
blkRoot, err := blk.Block().HashTreeRoot()
request, err := s.pendingBlobsRequestForBlock(blkRoot, blk)
if err != nil {
return err
}
if err := s.requestPendingBlobs(ctx, blk, blkRoot, cmts, id); err != nil {
if len(request) == 0 {
continue
}
if err := s.sendAndSaveBlobSidecars(ctx, request, id, blk); err != nil {
return err
}
}
Expand Down Expand Up @@ -133,29 +133,6 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
return nil
}

func commitmentsForBlock(block interfaces.ReadOnlySignedBeaconBlock) (int, error) {
if block.Version() < version.Deneb {
return 0, nil // Block before deneb has no blob.
}
commitments, err := block.Block().Body().BlobKzgCommitments()
if err != nil {
return 0, err
}
return len(commitments), nil
}

// requestPendingBlobs handles the request for pending blobs based on the given beacon block.
func (s *Service) requestPendingBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, cmts int, peerID peer.ID) error {
if cmts < 1 {
return nil
}
request, err := s.constructPendingBlobsRequest(ctx, blockRoot, cmts)
if err != nil {
return err
}
return s.sendAndSaveBlobSidecars(ctx, request, peerID, block)
}

// sendAndSaveBlobSidecars sends the blob request and saves received sidecars.
func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.BlobSidecarsByRootReq, peerID peer.ID, block interfaces.ReadOnlySignedBeaconBlock) error {
if len(request) == 0 {
Expand Down Expand Up @@ -193,8 +170,22 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
return nil
}

func (s *Service) pendingBlobsRequestForBlock(root [32]byte, b interfaces.ReadOnlySignedBeaconBlock) (types.BlobSidecarsByRootReq, error) {
if b.Version() < version.Deneb {
return nil, nil // Block before deneb has no blob.
}
cc, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, err
}
if len(cc) == 0 {
return nil, nil
}
return s.constructPendingBlobsRequest(root, len(cc))
}

// constructPendingBlobsRequest creates a request for BlobSidecars by root, considering blobs already in DB.
func (s *Service) constructPendingBlobsRequest(ctx context.Context, root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
func (s *Service) constructPendingBlobsRequest(root [32]byte, commitments int) (types.BlobSidecarsByRootReq, error) {
if commitments == 0 {
return nil, nil
}
Expand Down
17 changes: 8 additions & 9 deletions beacon-chain/sync/rpc_beacon_blocks_by_root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,16 @@ func TestRequestPendingBlobs(t *testing.T) {
t.Run("old block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
cc, err := commitmentsForBlock(b)
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, cc, "test"))
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("empty commitment block should not fail", func(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
cc, err := commitmentsForBlock(b)
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b)
require.NoError(t, err)
require.NoError(t, s.requestPendingBlobs(context.Background(), b, [32]byte{}, cc, "test"))
require.NoError(t, s.sendAndSaveBlobSidecars(context.Background(), request, "test", b))
})
t.Run("unsupported protocol", func(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
Expand Down Expand Up @@ -410,22 +410,21 @@ func TestRequestPendingBlobs(t *testing.T) {
b.Block.Body.BlobKzgCommitments = make([][]byte, 1)
b1, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
cc, err := commitmentsForBlock(b1)
request, err := s.pendingBlobsRequestForBlock([32]byte{}, b1)
require.NoError(t, err)
require.ErrorContains(t, "protocols not supported", s.requestPendingBlobs(context.Background(), b1, [32]byte{}, cc, p2.PeerID()))
require.ErrorContains(t, "protocols not supported", s.sendAndSaveBlobSidecars(context.Background(), request, p2.PeerID(), b1))
})
}

func TestConstructPendingBlobsRequest(t *testing.T) {
d := db.SetupDB(t)
bs := filesystem.NewEphemeralBlobStorage(t)
s := &Service{cfg: &config{beaconDB: d, blobStorage: bs}}
ctx := context.Background()

// No unknown indices.
root := [32]byte{1}
count := 3
actual, err := s.constructPendingBlobsRequest(ctx, root, count)
actual, err := s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, 3, len(actual))
for i, id := range actual {
Expand Down Expand Up @@ -455,7 +454,7 @@ func TestConstructPendingBlobsRequest(t *testing.T) {
expected := []*eth.BlobIdentifier{
{Index: 1, BlockRoot: root[:]},
}
actual, err = s.constructPendingBlobsRequest(ctx, root, count)
actual, err = s.constructPendingBlobsRequest(root, count)
require.NoError(t, err)
require.Equal(t, expected[0].Index, actual[0].Index)
require.DeepEqual(t, expected[0].BlockRoot, actual[0].BlockRoot)
Expand Down

0 comments on commit d2d1763

Please sign in to comment.