Skip to content

Commit

Permalink
Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain committed Oct 18, 2024
1 parent 7cbbb68 commit 3004aeb
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 19 deletions.
15 changes: 6 additions & 9 deletions beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/big"
"slices"
"strings"
"time"

Expand Down Expand Up @@ -489,12 +488,9 @@ func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) (
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()
// If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later.
s.capabilitiesLock.RLock()
if !slices.Contains(s.capabilities, GetBlobsV1) {
s.capabilitiesLock.RUnlock()
if !s.capabilityCache.Has(GetBlobsV1) {
return nil, nil
}
s.capabilitiesLock.RUnlock()

result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
Expand Down Expand Up @@ -566,7 +562,7 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
}
if blobs == nil {
if len(blobs) == 0 {
return nil, nil
}

Expand All @@ -582,11 +578,12 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.
continue
}

blob := blobs[blobIndex]
blobIndex++
if blob == nil {
if blobIndex >= len(blobs) || blobs[blobIndex] == nil {
blobIndex++
continue
}
blob := blobs[blobIndex]
blobIndex++

proof, err := blocks.MerkleProofKZGCommitment(blockBody, i)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions beacon-chain/execution/engine_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2058,7 +2058,7 @@ func Test_ExchangeCapabilities(t *testing.T) {
}

func TestReconstructBlobSidecars(t *testing.T) {
client := &Service{}
client := &Service{capabilityCache: &capabilityCache{}}
b := util.NewBeaconBlockDeneb()
kzgCommitments := createRandomKzgCommitments(t, 6)

Expand All @@ -2083,6 +2083,10 @@ func TestReconstructBlobSidecars(t *testing.T) {
require.Equal(t, 0, len(verifiedBlobs))
})

client.capabilityCache = &capabilityCache{
capabilities: []string{GetBlobsV1},
}

t.Run("recovered 6 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 6)
defer srv.Close()
Expand Down Expand Up @@ -2146,7 +2150,9 @@ func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Se
require.NoError(t, err)

client.rpcClient = rpcClient
client.capabilities = []string{GetBlobsV1}
client.capabilityCache = &capabilityCache{
capabilities: []string{GetBlobsV1},
}
client.blobVerifier = testNewBlobVerifier()

return rpcClient, client
Expand Down
4 changes: 1 addition & 3 deletions beacon-chain/execution/rpc_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
if err != nil {
errorLogger(err, "Could not exchange capabilities with execution client")
}
s.capabilitiesLock.Lock()
s.capabilities = c
s.capabilitiesLock.Unlock()
s.capabilityCache.Save(c)

return
case <-s.ctx.Done():
Expand Down
28 changes: 26 additions & 2 deletions beacon-chain/execution/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,9 @@ type Service struct {
lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam.
runError error
preGenesisState state.BeaconState
capabilities []string
capabilitiesLock sync.RWMutex
verifierWaiter *verification.InitializerWaiter
blobVerifier verification.NewBlobVerifier
capabilityCache *capabilityCache
}

// NewService sets up a new instance with an ethclient when given a web3 endpoint as a string in the config.
Expand Down Expand Up @@ -198,6 +197,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
lastReceivedMerkleIndex: -1,
preGenesisState: genState,
eth1HeadTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
capabilityCache: &capabilityCache{},
}

for _, opt := range opts {
Expand Down Expand Up @@ -905,3 +905,27 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.
return ini.NewBlobVerifier(b, reqs)
}
}

type capabilityCache struct {
capabilities []string
capabilitiesLock sync.RWMutex
}

func (c *capabilityCache) Save(cs []string) {
c.capabilitiesLock.Lock()
defer c.capabilitiesLock.Unlock()

c.capabilities = cs
}

func (c *capabilityCache) Has(capability string) bool {
c.capabilitiesLock.RLock()
defer c.capabilitiesLock.RUnlock()

for _, existingCapability := range c.capabilities {
if existingCapability == capability {
return true
}
}
return false
}
7 changes: 4 additions & 3 deletions beacon-chain/sync/subscriber_beacon_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interf

// Broadcast blob sidecars first than save them to the db
for _, sidecar := range blobSidecars {
if indices[sidecar.Index] {
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] {
continue
}
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil {
Expand All @@ -115,12 +115,13 @@ func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interf
}

for _, sidecar := range blobSidecars {
if indices[sidecar.Index] {
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] {
blobExistedInDBCount.Inc()
continue
}
if err := s.cfg.chain.ReceiveBlob(ctx, sidecar); err != nil {
if err := s.subscribeBlob(ctx, sidecar); err != nil {
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob")
continue
}

blobRecoveredFromELCount.Inc()
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/subscriber_beacon_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ func TestReconstructAndBroadcastBlobs(t *testing.T) {
executionReconstructor: &mockExecution.EngineClient{
BlobSidecars: tt.blobSidecars,
},
operationNotifier: &chainMock.MockOperationNotifier{},
},
seenBlobCache: lruwrpr.New(1),
}
s.reconstructAndBroadcastBlobs(context.Background(), sb)
require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs))
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/sync/subscriber_blob_sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func (s *Service) blobSubscriber(ctx context.Context, msg proto.Message) error {
return fmt.Errorf("message was not type blocks.ROBlob, type=%T", msg)
}

return s.subscribeBlob(ctx, b)
}

func (s *Service) subscribeBlob(ctx context.Context, b blocks.VerifiedROBlob) error {
s.setSeenBlobIndex(b.Slot(), b.ProposerIndex(), b.Index)

if err := s.cfg.chain.ReceiveBlob(ctx, b); err != nil {
Expand Down

0 comments on commit 3004aeb

Please sign in to comment.