diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f36fa51b638..3663f43c710a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,7 @@ Updating to this release is recommended at your convenience. - fastssz version bump (better error messages). - SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413) - Added GetPoolAttesterSlashingsV2 endpoint. +- Use engine api get-blobs for block subscriber ### Changed diff --git a/beacon-chain/execution/BUILD.bazel b/beacon-chain/execution/BUILD.bazel index 828ca2ea7fd5..de5584fe3a80 100644 --- a/beacon-chain/execution/BUILD.bazel +++ b/beacon-chain/execution/BUILD.bazel @@ -37,6 +37,7 @@ go_library( "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", "//beacon-chain/state/stategen:go_default_library", + "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index 039707d8fca6..4eaed9ed2936 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "slices" "strings" "time" @@ -14,6 +15,7 @@ import ( "github.com/holiman/uint256" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/types" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -23,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/sirupsen/logrus" @@ -79,6 +82,8 @@ const ( GetPayloadBodiesByRangeV1 = "engine_getPayloadBodiesByRangeV1" // ExchangeCapabilities request string for JSON-RPC. ExchangeCapabilities = "engine_exchangeCapabilities" + // GetBlobsV1 request string for JSON-RPC. + GetBlobsV1 = "engine_getBlobsV1" // Defines the seconds before timing out engine endpoints with non-block execution semantics. defaultEngineTimeout = time.Second ) @@ -93,16 +98,15 @@ type ForkchoiceUpdatedResponse struct { ValidationError string `json:"validationError"` } -// PayloadReconstructor defines a service that can reconstruct a full beacon -// block with an execution payload from a signed beacon block and a connection -// to an execution client's engine API. -type PayloadReconstructor interface { +// Reconstructor defines a service responsible for reconstructing full beacon chain objects by utilizing the execution API and making requests through the execution client. +type Reconstructor interface { ReconstructFullBlock( ctx context.Context, blindedBlock interfaces.ReadOnlySignedBeaconBlock, ) (interfaces.SignedBeaconBlock, error) ReconstructFullBellatrixBlockBatch( ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock, ) ([]interfaces.SignedBeaconBlock, error) + ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices [6]bool) ([]blocks.VerifiedROBlob, error) } // EngineCaller defines a client that can interact with an Ethereum @@ -480,6 +484,23 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H return hdr, err } +// GetBlobs returns the blob and proof from the execution engine for the given versioned hashes. +func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) { + ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs") + defer span.End() + // If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later. + s.capabilitiesLock.RLock() + if !slices.Contains(s.capabilities, GetBlobsV1) { + s.capabilitiesLock.RUnlock() + return nil, nil + } + s.capabilitiesLock.RUnlock() + + result := make([]*pb.BlobAndProof, len(versionedHashes)) + err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes) + return result, handleRPCError(err) +} + // ReconstructFullBlock takes in a blinded beacon block and reconstructs // a beacon block with a full execution payload via the engine API. func (s *Service) ReconstructFullBlock( @@ -508,6 +529,104 @@ func (s *Service) ReconstructFullBellatrixBlockBatch( return unb, nil } +// ReconstructBlobSidecars reconstructs the verified blob sidecars for a given beacon block. +// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs, +// and constructs the corresponding verified read-only blob sidecars. +// +// The 'exists' argument is a boolean array of length 6, where each element corresponds to whether a +// particular blob sidecar already exists. If exists[i] is true, the blob for the i-th KZG commitment +// has already been retrieved and does not need to be fetched again from the execution layer (EL). +// +// For example: +// - If exists = [true, false, true, false, true, false], the function will fetch the blobs +// associated with indices 1, 3, and 5 (since those are marked as non-existent). +// - If exists = [false ... x 6], the function will attempt to fetch all blobs. +// +// Only the blobs that do not already exist (where exists[i] is false) are fetched using the KZG commitments from block body. +func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists [6]bool) ([]blocks.VerifiedROBlob, error) { + blockBody := block.Block().Body() + kzgCommitments, err := blockBody.BlobKzgCommitments() + if err != nil { + return nil, errors.Wrap(err, "could not get blob KZG commitments") + } + + // Collect KZG hashes for non-existing blobs + var kzgHashes []common.Hash + for i, commitment := range kzgCommitments { + if !exists[i] { + kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment)) + } + } + if len(kzgHashes) == 0 { + return nil, nil + } + + // Fetch blobs from EL + blobs, err := s.GetBlobs(ctx, kzgHashes) + if err != nil { + return nil, errors.Wrap(err, "could not get blobs") + } + if blobs == nil { + return nil, nil + } + + header, err := block.Header() + if err != nil { + return nil, errors.Wrap(err, "could not get header") + } + + // Reconstruct verify blob sidecars + var verifiedBlobs []blocks.VerifiedROBlob + for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ { + if exists[i] { + continue + } + + blob := blobs[blobIndex] + blobIndex++ + if blob == nil { + continue + } + + proof, err := blocks.MerkleProofKZGCommitment(blockBody, i) + if err != nil { + log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment") + continue + } + sidecar := ðpb.BlobSidecar{ + Index: uint64(i), + Blob: blob.Blob, + KzgCommitment: kzgCommitments[i], + KzgProof: blob.KzgProof, + SignedBlockHeader: header, + CommitmentInclusionProof: proof, + } + + roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot) + if err != nil { + log.WithError(err).WithField("index", i).Error("failed to create RO blob with root") + continue + } + + // Verify the sidecar KZG proof + v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements) + if err := v.SidecarKzgProofVerified(); err != nil { + log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar") + continue + } + + verifiedBlob, err := v.VerifiedROBlob() + if err != nil { + log.WithError(err).WithField("index", i).Error("failed to verify RO blob") + continue + } + + verifiedBlobs = append(verifiedBlobs, verifiedBlob) + } + + return verifiedBlobs, nil +} + func fullPayloadFromPayloadBody( header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int, ) (interfaces.ExecutionData, error) { diff --git a/beacon-chain/execution/engine_client_test.go b/beacon-chain/execution/engine_client_test.go index 7bee80ebdb9d..00e285227bf8 100644 --- a/beacon-chain/execution/engine_client_test.go +++ b/beacon-chain/execution/engine_client_test.go @@ -2,6 +2,7 @@ package execution import ( "context" + "crypto/rand" "encoding/json" "fmt" "io" @@ -20,6 +21,7 @@ import ( "github.com/holiman/uint256" "github.com/pkg/errors" mocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -37,9 +39,9 @@ import ( ) var ( - _ = PayloadReconstructor(&Service{}) + _ = Reconstructor(&Service{}) _ = EngineCaller(&Service{}) - _ = PayloadReconstructor(&Service{}) + _ = Reconstructor(&Service{}) _ = EngineCaller(&mocks.EngineClient{}) ) @@ -2054,3 +2056,108 @@ func Test_ExchangeCapabilities(t *testing.T) { } }) } + +func TestReconstructBlobSidecars(t *testing.T) { + client := &Service{} + b := util.NewBeaconBlockDeneb() + kzgCommitments := createRandomKzgCommitments(t, 6) + + b.Block.Body.BlobKzgCommitments = kzgCommitments + r, err := b.Block.HashTreeRoot() + require.NoError(t, err) + sb, err := blocks.NewSignedBeaconBlock(b) + require.NoError(t, err) + + ctx := context.Background() + t.Run("all seen", func(t *testing.T) { + exists := [6]bool{true, true, true, true, true, true} + verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists) + require.NoError(t, err) + require.Equal(t, 0, len(verifiedBlobs)) + }) + + t.Run("get-blobs end point is not supported", func(t *testing.T) { + exists := [6]bool{true, true, true, true, true, false} + verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists) + require.NoError(t, err) + require.Equal(t, 0, len(verifiedBlobs)) + }) + + t.Run("recovered 6 missing blobs", func(t *testing.T) { + srv := createBlobServer(t, 6) + defer srv.Close() + + rpcClient, client := setupRpcClient(t, srv.URL, client) + defer rpcClient.Close() + + exists := [6]bool{} + verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists) + require.NoError(t, err) + require.Equal(t, 6, len(verifiedBlobs)) + }) + + t.Run("recovered 3 missing blobs", func(t *testing.T) { + srv := createBlobServer(t, 3) + defer srv.Close() + + rpcClient, client := setupRpcClient(t, srv.URL, client) + defer rpcClient.Close() + + exists := [6]bool{true, false, true, false, true, false} + verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists) + require.NoError(t, err) + require.Equal(t, 3, len(verifiedBlobs)) + }) +} + +func createRandomKzgCommitments(t *testing.T, num int) [][]byte { + kzgCommitments := make([][]byte, num) + for i := range kzgCommitments { + kzgCommitments[i] = make([]byte, 48) + _, err := rand.Read(kzgCommitments[i]) + require.NoError(t, err) + } + return kzgCommitments +} + +func createBlobServer(t *testing.T, numBlobs int) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + defer func() { + require.NoError(t, r.Body.Close()) + }() + + blobs := make([]pb.BlobAndProofJson, numBlobs) + for i := range blobs { + blobs[i] = pb.BlobAndProofJson{Blob: []byte(fmt.Sprintf("blob%d", i+1)), KzgProof: []byte(fmt.Sprintf("proof%d", i+1))} + } + + respJSON := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "result": blobs, + } + require.NoError(t, json.NewEncoder(w).Encode(respJSON)) + })) +} + +func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Service) { + rpcClient, err := rpc.DialHTTP(url) + require.NoError(t, err) + + client.rpcClient = rpcClient + client.capabilities = []string{GetBlobsV1} + client.blobVerifier = testNewBlobVerifier() + + return rpcClient, client +} + +func testNewBlobVerifier() verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ + CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + return blocks.VerifiedROBlob{}, nil + }, + } + } +} diff --git a/beacon-chain/execution/options.go b/beacon-chain/execution/options.go index edc616bcc533..028b7f0c1c38 100644 --- a/beacon-chain/execution/options.go +++ b/beacon-chain/execution/options.go @@ -7,6 +7,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/network" "github.com/prysmaticlabs/prysm/v5/network/authorization" ) @@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option { return nil } } + +// WithVerifierWaiter gives the sync package direct access to the verifier waiter. +func WithVerifierWaiter(v *verification.InitializerWaiter) Option { + return func(s *Service) error { + s.verifierWaiter = v + return nil + } +} diff --git a/beacon-chain/execution/rpc_connection.go b/beacon-chain/execution/rpc_connection.go index 04e63ab2a0ba..fe19791eb317 100644 --- a/beacon-chain/execution/rpc_connection.go +++ b/beacon-chain/execution/rpc_connection.go @@ -78,6 +78,15 @@ func (s *Service) pollConnectionStatus(ctx context.Context) { currClient.Close() } log.WithField("endpoint", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)).Info("Connected to new endpoint") + + c, err := s.ExchangeCapabilities(ctx) + if err != nil { + errorLogger(err, "Could not exchange capabilities with execution client") + } + s.capabilitiesLock.Lock() + s.capabilities = c + s.capabilitiesLock.Unlock() + return case <-s.ctx.Done(): log.Debug("Received cancelled context,closing existing powchain service") diff --git a/beacon-chain/execution/service.go b/beacon-chain/execution/service.go index d71b0b949407..597d70e48efd 100644 --- a/beacon-chain/execution/service.go +++ b/beacon-chain/execution/service.go @@ -29,7 +29,9 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/container/trie" contracts "github.com/prysmaticlabs/prysm/v5/contracts/deposit" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" @@ -155,6 +157,10 @@ type Service struct { lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam. runError error preGenesisState state.BeaconState + capabilities []string + capabilitiesLock sync.RWMutex + verifierWaiter *verification.InitializerWaiter + blobVerifier verification.NewBlobVerifier } // NewService sets up a new instance with an ethclient when given a web3 endpoint as a string in the config. @@ -229,6 +235,13 @@ func (s *Service) Start() { } } + v, err := s.verifierWaiter.WaitForInitializer(s.ctx) + if err != nil { + log.WithError(err).Error("Could not get verification initializer") + return + } + s.blobVerifier = newBlobVerifierFromInitializer(v) + s.isRunning = true // Poll the execution client connection and fallback if errors occur. @@ -886,3 +899,9 @@ func (s *Service) migrateOldDepositTree(eth1DataInDB *ethpb.ETH1ChainData) error func (s *Service) removeStartupState() { s.cfg.finalizedStateAtStartup = nil } + +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) + } +} diff --git a/beacon-chain/execution/testing/mock_engine_client.go b/beacon-chain/execution/testing/mock_engine_client.go index 5fd5c41bd25c..e81741da05fc 100644 --- a/beacon-chain/execution/testing/mock_engine_client.go +++ b/beacon-chain/execution/testing/mock_engine_client.go @@ -36,6 +36,8 @@ type EngineClient struct { OverrideValidHash [32]byte GetPayloadResponse *blocks.GetPayloadResponse ErrGetPayload error + BlobSidecars []blocks.VerifiedROBlob + ErrorBlobSidecars error } // NewPayload -- @@ -106,6 +108,11 @@ func (e *EngineClient) ReconstructFullBellatrixBlockBatch( return fullBlocks, nil } +// ReconstructBlobSidecars is a mock implementation of the ReconstructBlobSidecars method. +func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [32]byte, [6]bool) ([]blocks.VerifiedROBlob, error) { + return e.BlobSidecars, e.ErrorBlobSidecars +} + // GetTerminalBlockHash -- func (e *EngineClient) GetTerminalBlockHash(ctx context.Context, transitionTime uint64) ([]byte, bool, error) { ttd := new(big.Int) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index b5c735ba8957..fce4a4e56afd 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -796,6 +796,7 @@ func (b *BeaconNode) registerPOWChainService() error { execution.WithBeaconNodeStatsUpdater(bs), execution.WithFinalizedStateAtStartup(b.finalizedStateAtStartUp), execution.WithJwtId(b.cliCtx.String(flags.JwtId.Name)), + execution.WithVerifierWaiter(b.verifyInitWaiter), ) web3Service, err := execution.NewService(b.ctx, opts...) if err != nil { @@ -838,7 +839,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithStateGen(b.stateGen), regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed), - regularsync.WithPayloadReconstructor(web3Service), + regularsync.WithReconstructor(web3Service), regularsync.WithClockWaiter(b.clockWaiter), regularsync.WithInitialSyncComplete(initialSyncComplete), regularsync.WithStateNotifier(b), @@ -953,55 +954,55 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error { p2pService := b.fetchP2P() rpcService := rpc.NewService(b.ctx, &rpc.Config{ - ExecutionEngineCaller: web3Service, - ExecutionPayloadReconstructor: web3Service, - Host: host, - Port: port, - BeaconMonitoringHost: beaconMonitoringHost, - BeaconMonitoringPort: beaconMonitoringPort, - CertFlag: cert, - KeyFlag: key, - BeaconDB: b.db, - Broadcaster: p2pService, - PeersFetcher: p2pService, - PeerManager: p2pService, - MetadataProvider: p2pService, - ChainInfoFetcher: chainService, - HeadFetcher: chainService, - CanonicalFetcher: chainService, - ForkFetcher: chainService, - ForkchoiceFetcher: chainService, - FinalizationFetcher: chainService, - BlockReceiver: chainService, - BlobReceiver: chainService, - AttestationReceiver: chainService, - GenesisTimeFetcher: chainService, - GenesisFetcher: chainService, - OptimisticModeFetcher: chainService, - AttestationsPool: b.attestationPool, - ExitPool: b.exitPool, - SlashingsPool: b.slashingsPool, - BLSChangesPool: b.blsToExecPool, - SyncCommitteeObjectPool: b.syncCommitteePool, - ExecutionChainService: web3Service, - ExecutionChainInfoFetcher: web3Service, - ChainStartFetcher: chainStartFetcher, - MockEth1Votes: mockEth1DataVotes, - SyncService: syncService, - DepositFetcher: depositFetcher, - PendingDepositFetcher: b.depositCache, - BlockNotifier: b, - StateNotifier: b, - OperationNotifier: b, - StateGen: b.stateGen, - EnableDebugRPCEndpoints: enableDebugRPCEndpoints, - MaxMsgSize: maxMsgSize, - BlockBuilder: b.fetchBuilderService(), - Router: router, - ClockWaiter: b.clockWaiter, - BlobStorage: b.BlobStorage, - TrackedValidatorsCache: b.trackedValidatorsCache, - PayloadIDCache: b.payloadIDCache, + ExecutionEngineCaller: web3Service, + ExecutionReconstructor: web3Service, + Host: host, + Port: port, + BeaconMonitoringHost: beaconMonitoringHost, + BeaconMonitoringPort: beaconMonitoringPort, + CertFlag: cert, + KeyFlag: key, + BeaconDB: b.db, + Broadcaster: p2pService, + PeersFetcher: p2pService, + PeerManager: p2pService, + MetadataProvider: p2pService, + ChainInfoFetcher: chainService, + HeadFetcher: chainService, + CanonicalFetcher: chainService, + ForkFetcher: chainService, + ForkchoiceFetcher: chainService, + FinalizationFetcher: chainService, + BlockReceiver: chainService, + BlobReceiver: chainService, + AttestationReceiver: chainService, + GenesisTimeFetcher: chainService, + GenesisFetcher: chainService, + OptimisticModeFetcher: chainService, + AttestationsPool: b.attestationPool, + ExitPool: b.exitPool, + SlashingsPool: b.slashingsPool, + BLSChangesPool: b.blsToExecPool, + SyncCommitteeObjectPool: b.syncCommitteePool, + ExecutionChainService: web3Service, + ExecutionChainInfoFetcher: web3Service, + ChainStartFetcher: chainStartFetcher, + MockEth1Votes: mockEth1DataVotes, + SyncService: syncService, + DepositFetcher: depositFetcher, + PendingDepositFetcher: b.depositCache, + BlockNotifier: b, + StateNotifier: b, + OperationNotifier: b, + StateGen: b.stateGen, + EnableDebugRPCEndpoints: enableDebugRPCEndpoints, + MaxMsgSize: maxMsgSize, + BlockBuilder: b.fetchBuilderService(), + Router: router, + ClockWaiter: b.clockWaiter, + BlobStorage: b.BlobStorage, + TrackedValidatorsCache: b.trackedValidatorsCache, + PayloadIDCache: b.payloadIDCache, }) return b.services.RegisterService(rpcService) diff --git a/beacon-chain/rpc/endpoints.go b/beacon-chain/rpc/endpoints.go index f6b47009582b..52ee1cd5e0a9 100644 --- a/beacon-chain/rpc/endpoints.go +++ b/beacon-chain/rpc/endpoints.go @@ -454,30 +454,30 @@ func (s *Service) beaconEndpoints( coreService *core.Service, ) []endpoint { server := &beacon.Server{ - CanonicalHistory: ch, - BeaconDB: s.cfg.BeaconDB, - AttestationsPool: s.cfg.AttestationsPool, - SlashingsPool: s.cfg.SlashingsPool, - ChainInfoFetcher: s.cfg.ChainInfoFetcher, - GenesisTimeFetcher: s.cfg.GenesisTimeFetcher, - BlockNotifier: s.cfg.BlockNotifier, - OperationNotifier: s.cfg.OperationNotifier, - Broadcaster: s.cfg.Broadcaster, - BlockReceiver: s.cfg.BlockReceiver, - StateGenService: s.cfg.StateGen, - Stater: stater, - Blocker: blocker, - OptimisticModeFetcher: s.cfg.OptimisticModeFetcher, - HeadFetcher: s.cfg.HeadFetcher, - TimeFetcher: s.cfg.GenesisTimeFetcher, - VoluntaryExitsPool: s.cfg.ExitPool, - V1Alpha1ValidatorServer: validatorServer, - SyncChecker: s.cfg.SyncService, - ExecutionPayloadReconstructor: s.cfg.ExecutionPayloadReconstructor, - BLSChangesPool: s.cfg.BLSChangesPool, - FinalizationFetcher: s.cfg.FinalizationFetcher, - ForkchoiceFetcher: s.cfg.ForkchoiceFetcher, - CoreService: coreService, + CanonicalHistory: ch, + BeaconDB: s.cfg.BeaconDB, + AttestationsPool: s.cfg.AttestationsPool, + SlashingsPool: s.cfg.SlashingsPool, + ChainInfoFetcher: s.cfg.ChainInfoFetcher, + GenesisTimeFetcher: s.cfg.GenesisTimeFetcher, + BlockNotifier: s.cfg.BlockNotifier, + OperationNotifier: s.cfg.OperationNotifier, + Broadcaster: s.cfg.Broadcaster, + BlockReceiver: s.cfg.BlockReceiver, + StateGenService: s.cfg.StateGen, + Stater: stater, + Blocker: blocker, + OptimisticModeFetcher: s.cfg.OptimisticModeFetcher, + HeadFetcher: s.cfg.HeadFetcher, + TimeFetcher: s.cfg.GenesisTimeFetcher, + VoluntaryExitsPool: s.cfg.ExitPool, + V1Alpha1ValidatorServer: validatorServer, + SyncChecker: s.cfg.SyncService, + ExecutionReconstructor: s.cfg.ExecutionReconstructor, + BLSChangesPool: s.cfg.BLSChangesPool, + FinalizationFetcher: s.cfg.FinalizationFetcher, + ForkchoiceFetcher: s.cfg.ForkchoiceFetcher, + CoreService: coreService, } const namespace = "beacon" diff --git a/beacon-chain/rpc/eth/beacon/handlers.go b/beacon-chain/rpc/eth/beacon/handlers.go index 33eeb9dbae24..ec7d713011ea 100644 --- a/beacon-chain/rpc/eth/beacon/handlers.go +++ b/beacon-chain/rpc/eth/beacon/handlers.go @@ -65,7 +65,7 @@ func (s *Server) GetBlockV2(w http.ResponseWriter, r *http.Request) { // Deal with block unblinding. if blk.Version() >= version.Bellatrix && blk.IsBlinded() { - blk, err = s.ExecutionPayloadReconstructor.ReconstructFullBlock(ctx, blk) + blk, err = s.ExecutionReconstructor.ReconstructFullBlock(ctx, blk) if err != nil { httputil.HandleError(w, errors.Wrapf(err, "could not reconstruct full execution payload to create signed beacon block").Error(), http.StatusBadRequest) return diff --git a/beacon-chain/rpc/eth/beacon/server.go b/beacon-chain/rpc/eth/beacon/server.go index e7eef22493f5..878f533f908d 100644 --- a/beacon-chain/rpc/eth/beacon/server.go +++ b/beacon-chain/rpc/eth/beacon/server.go @@ -24,28 +24,28 @@ import ( // Server defines a server implementation of the gRPC Beacon Chain service, // providing RPC endpoints to access data relevant to the Ethereum Beacon Chain. type Server struct { - BeaconDB db.ReadOnlyDatabase - ChainInfoFetcher blockchain.ChainInfoFetcher - GenesisTimeFetcher blockchain.TimeFetcher - BlockReceiver blockchain.BlockReceiver - BlockNotifier blockfeed.Notifier - OperationNotifier operation.Notifier - Broadcaster p2p.Broadcaster - AttestationsPool attestations.Pool - SlashingsPool slashings.PoolManager - VoluntaryExitsPool voluntaryexits.PoolManager - StateGenService stategen.StateManager - Stater lookup.Stater - Blocker lookup.Blocker - HeadFetcher blockchain.HeadFetcher - TimeFetcher blockchain.TimeFetcher - OptimisticModeFetcher blockchain.OptimisticModeFetcher - V1Alpha1ValidatorServer eth.BeaconNodeValidatorServer - SyncChecker sync.Checker - CanonicalHistory *stategen.CanonicalHistory - ExecutionPayloadReconstructor execution.PayloadReconstructor - FinalizationFetcher blockchain.FinalizationFetcher - BLSChangesPool blstoexec.PoolManager - ForkchoiceFetcher blockchain.ForkchoiceFetcher - CoreService *core.Service + BeaconDB db.ReadOnlyDatabase + ChainInfoFetcher blockchain.ChainInfoFetcher + GenesisTimeFetcher blockchain.TimeFetcher + BlockReceiver blockchain.BlockReceiver + BlockNotifier blockfeed.Notifier + OperationNotifier operation.Notifier + Broadcaster p2p.Broadcaster + AttestationsPool attestations.Pool + SlashingsPool slashings.PoolManager + VoluntaryExitsPool voluntaryexits.PoolManager + StateGenService stategen.StateManager + Stater lookup.Stater + Blocker lookup.Blocker + HeadFetcher blockchain.HeadFetcher + TimeFetcher blockchain.TimeFetcher + OptimisticModeFetcher blockchain.OptimisticModeFetcher + V1Alpha1ValidatorServer eth.BeaconNodeValidatorServer + SyncChecker sync.Checker + CanonicalHistory *stategen.CanonicalHistory + ExecutionReconstructor execution.Reconstructor + FinalizationFetcher blockchain.FinalizationFetcher + BLSChangesPool blstoexec.PoolManager + ForkchoiceFetcher blockchain.ForkchoiceFetcher + CoreService *core.Service } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index c7d29594600c..4a56c0d4162e 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -91,55 +91,55 @@ type Service struct { // Config options for the beacon node RPC server. type Config struct { - ExecutionPayloadReconstructor execution.PayloadReconstructor - Host string - Port string - CertFlag string - KeyFlag string - BeaconMonitoringHost string - BeaconMonitoringPort int - BeaconDB db.HeadAccessDatabase - ChainInfoFetcher blockchain.ChainInfoFetcher - HeadFetcher blockchain.HeadFetcher - CanonicalFetcher blockchain.CanonicalFetcher - ForkFetcher blockchain.ForkFetcher - ForkchoiceFetcher blockchain.ForkchoiceFetcher - FinalizationFetcher blockchain.FinalizationFetcher - AttestationReceiver blockchain.AttestationReceiver - BlockReceiver blockchain.BlockReceiver - BlobReceiver blockchain.BlobReceiver - ExecutionChainService execution.Chain - ChainStartFetcher execution.ChainStartFetcher - ExecutionChainInfoFetcher execution.ChainInfoFetcher - GenesisTimeFetcher blockchain.TimeFetcher - GenesisFetcher blockchain.GenesisFetcher - MockEth1Votes bool - EnableDebugRPCEndpoints bool - AttestationsPool attestations.Pool - ExitPool voluntaryexits.PoolManager - SlashingsPool slashings.PoolManager - SyncCommitteeObjectPool synccommittee.Pool - BLSChangesPool blstoexec.PoolManager - SyncService chainSync.Checker - Broadcaster p2p.Broadcaster - PeersFetcher p2p.PeersProvider - PeerManager p2p.PeerManager - MetadataProvider p2p.MetadataProvider - DepositFetcher cache.DepositFetcher - PendingDepositFetcher depositsnapshot.PendingDepositsFetcher - StateNotifier statefeed.Notifier - BlockNotifier blockfeed.Notifier - OperationNotifier opfeed.Notifier - StateGen *stategen.State - MaxMsgSize int - ExecutionEngineCaller execution.EngineCaller - OptimisticModeFetcher blockchain.OptimisticModeFetcher - BlockBuilder builder.BlockBuilder - Router *http.ServeMux - ClockWaiter startup.ClockWaiter - BlobStorage *filesystem.BlobStorage - TrackedValidatorsCache *cache.TrackedValidatorsCache - PayloadIDCache *cache.PayloadIDCache + ExecutionReconstructor execution.Reconstructor + Host string + Port string + CertFlag string + KeyFlag string + BeaconMonitoringHost string + BeaconMonitoringPort int + BeaconDB db.HeadAccessDatabase + ChainInfoFetcher blockchain.ChainInfoFetcher + HeadFetcher blockchain.HeadFetcher + CanonicalFetcher blockchain.CanonicalFetcher + ForkFetcher blockchain.ForkFetcher + ForkchoiceFetcher blockchain.ForkchoiceFetcher + FinalizationFetcher blockchain.FinalizationFetcher + AttestationReceiver blockchain.AttestationReceiver + BlockReceiver blockchain.BlockReceiver + BlobReceiver blockchain.BlobReceiver + ExecutionChainService execution.Chain + ChainStartFetcher execution.ChainStartFetcher + ExecutionChainInfoFetcher execution.ChainInfoFetcher + GenesisTimeFetcher blockchain.TimeFetcher + GenesisFetcher blockchain.GenesisFetcher + MockEth1Votes bool + EnableDebugRPCEndpoints bool + AttestationsPool attestations.Pool + ExitPool voluntaryexits.PoolManager + SlashingsPool slashings.PoolManager + SyncCommitteeObjectPool synccommittee.Pool + BLSChangesPool blstoexec.PoolManager + SyncService chainSync.Checker + Broadcaster p2p.Broadcaster + PeersFetcher p2p.PeersProvider + PeerManager p2p.PeerManager + MetadataProvider p2p.MetadataProvider + DepositFetcher cache.DepositFetcher + PendingDepositFetcher depositsnapshot.PendingDepositsFetcher + StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier + OperationNotifier opfeed.Notifier + StateGen *stategen.State + MaxMsgSize int + ExecutionEngineCaller execution.EngineCaller + OptimisticModeFetcher blockchain.OptimisticModeFetcher + BlockBuilder builder.BlockBuilder + Router *http.ServeMux + ClockWaiter startup.ClockWaiter + BlobStorage *filesystem.BlobStorage + TrackedValidatorsCache *cache.TrackedValidatorsCache + PayloadIDCache *cache.PayloadIDCache } // NewService instantiates a new RPC service instance that will diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 07502117a458..5f7c748f25d9 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -170,6 +170,20 @@ var ( Help: "The number of blob sidecars that were dropped due to missing parent block", }, ) + + blobRecoveredFromELCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "blob_recovered_from_el_count", + Help: "Count the number of times blobs have been recovered from the execution layer.", + }, + ) + + blobExistedInDBCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "blob_existed_in_db_count", + Help: "Count the number of times blobs have been found in the database.", + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index 9b0281ea667f..ff20b8b81212 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -127,9 +127,9 @@ func WithSlasherBlockHeadersFeed(slasherBlockHeadersFeed *event.Feed) Option { } } -func WithPayloadReconstructor(r execution.PayloadReconstructor) Option { +func WithReconstructor(r execution.Reconstructor) Option { return func(s *Service) error { - s.cfg.executionPayloadReconstructor = r + s.cfg.executionReconstructor = r return nil } } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 116187899fa2..865195f9806c 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -160,7 +160,7 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, return nil } - reconstructed, err := s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blinded) + reconstructed, err := s.cfg.executionReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blinded) if err != nil { log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies") return err diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 832940cedaa5..0178425a2c28 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -239,11 +239,11 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) { // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{ cfg: &config{ - p2p: p1, - beaconDB: d, - chain: &chainMock.ChainService{}, - clock: clock, - executionPayloadReconstructor: mockEngine, + p2p: p1, + beaconDB: d, + chain: &chainMock.ChainService{}, + clock: clock, + executionReconstructor: mockEngine, }, rateLimiter: newRateLimiter(p1), availableBlocker: mockBlocker{avail: true}, diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index ad1ffe83d292..4379bbf60057 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -112,7 +112,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ } if blk.Block().IsBlinded() { - blk, err = s.cfg.executionPayloadReconstructor.ReconstructFullBlock(ctx, blk) + blk, err = s.cfg.executionReconstructor.ReconstructFullBlock(ctx, blk) if err != nil { if errors.Is(err, execution.ErrEmptyBlockHash) { log.WithError(err).Warn("Could not reconstruct block from header with syncing execution client. Waiting to complete syncing") diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index d95306297025..4d6be0c9691d 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -151,11 +151,11 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks_ReconstructsPayload(t *testi }, } r := &Service{cfg: &config{ - p2p: p1, - beaconDB: d, - executionPayloadReconstructor: mockEngine, - chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, - clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), + p2p: p1, + beaconDB: d, + executionReconstructor: mockEngine, + chain: &mock.ChainService{ValidatorsRoot: [32]byte{}}, + clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), }, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) topic := string(pcl) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 15196bf6ca74..473d3d9709ff 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -77,25 +77,25 @@ type validationFn func(ctx context.Context) (pubsub.ValidationResult, error) // config to hold dependencies for the sync service. type config struct { - attestationNotifier operation.Notifier - p2p p2p.P2P - beaconDB db.NoHeadAccessDatabase - attPool attestations.Pool - exitPool voluntaryexits.PoolManager - slashingPool slashings.PoolManager - syncCommsPool synccommittee.Pool - blsToExecPool blstoexec.PoolManager - chain blockchainService - initialSync Checker - blockNotifier blockfeed.Notifier - operationNotifier operation.Notifier - executionPayloadReconstructor execution.PayloadReconstructor - stateGen *stategen.State - slasherAttestationsFeed *event.Feed - slasherBlockHeadersFeed *event.Feed - clock *startup.Clock - stateNotifier statefeed.Notifier - blobStorage *filesystem.BlobStorage + attestationNotifier operation.Notifier + p2p p2p.P2P + beaconDB db.NoHeadAccessDatabase + attPool attestations.Pool + exitPool voluntaryexits.PoolManager + slashingPool slashings.PoolManager + syncCommsPool synccommittee.Pool + blsToExecPool blstoexec.PoolManager + chain blockchainService + initialSync Checker + blockNotifier blockfeed.Notifier + operationNotifier operation.Notifier + executionReconstructor execution.Reconstructor + stateGen *stategen.State + slasherAttestationsFeed *event.Feed + slasherBlockHeadersFeed *event.Feed + clock *startup.Clock + stateNotifier statefeed.Notifier + blobStorage *filesystem.BlobStorage } // This defines the interface for interacting with block chain service diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index ff5f4c16457a..fe3d4a24b129 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v5/io/file" + "github.com/prysmaticlabs/prysm/v5/time/slots" "google.golang.org/protobuf/proto" ) @@ -33,6 +34,8 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return err } + go s.reconstructAndBroadcastBlobs(ctx, signed) + if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil { if blockchain.IsInvalidBlock(err) { r := blockchain.InvalidBlockRoot(err) @@ -55,6 +58,75 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return err } +// reconstructAndBroadcastBlobs processes and broadcasts blob sidecars for a given beacon block. +// This function reconstructs the blob sidecars from the EL using the block's KZG commitments, +// broadcasts the reconstructed blobs over P2P, and saves them into the blob storage. +func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) { + startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), block.Block().Slot()) + if err != nil { + log.WithError(err).Error("Failed to convert slot to time") + } + + blockRoot, err := block.Block().HashTreeRoot() + if err != nil { + log.WithError(err).Error("Failed to calculate block root") + return + } + + indices, err := s.cfg.blobStorage.Indices(blockRoot) + if err != nil { + log.WithError(err).Error("Failed to retrieve indices for block") + return + } + for _, index := range indices { + if index { + blobExistedInDBCount.Inc() + } + } + + // Reconstruct blob sidecars from the EL + blobSidecars, err := s.cfg.executionReconstructor.ReconstructBlobSidecars(ctx, block, blockRoot, indices) + if err != nil { + log.WithError(err).Error("Failed to reconstruct blob sidecars") + return + } + if len(blobSidecars) == 0 { + return + } + + // Refresh indices as new blobs may have been added to the db + indices, err = s.cfg.blobStorage.Indices(blockRoot) + if err != nil { + log.WithError(err).Error("Failed to retrieve indices for block") + return + } + + // Broadcast blob sidecars first than save them to the db + for _, sidecar := range blobSidecars { + if indices[sidecar.Index] { + continue + } + if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { + log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to broadcast blob sidecar") + } + } + + for _, sidecar := range blobSidecars { + if indices[sidecar.Index] { + blobExistedInDBCount.Inc() + continue + } + if err := s.cfg.chain.ReceiveBlob(ctx, sidecar); err != nil { + log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob") + } + + blobRecoveredFromELCount.Inc() + fields := blobFields(sidecar.ROBlob) + fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime) + log.WithFields(fields).Debug("Processed blob sidecar from EL") + } +} + // WriteInvalidBlockToDisk as a block ssz. Writes to temp directory. func saveInvalidBlockToTemp(block interfaces.ReadOnlySignedBeaconBlock) { if !features.Get().SaveInvalidBlock { diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index 626215144af6..a305ace15d47 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -9,14 +9,20 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" chainMock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" dbtest "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" + mockExecution "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" + mockp2p "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" lruwrpr "github.com/prysmaticlabs/prysm/v5/cache/lru" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" + "github.com/prysmaticlabs/prysm/v5/time" "google.golang.org/protobuf/proto" ) @@ -124,3 +130,63 @@ func TestService_BeaconBlockSubscribe_UndefinedEeError(t *testing.T) { require.Equal(t, 0, len(s.badBlockCache.Keys())) require.Equal(t, 1, len(s.seenBlockCache.Keys())) } + +func TestReconstructAndBroadcastBlobs(t *testing.T) { + rob, err := blocks.NewROBlob( + ðpb.BlobSidecar{ + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + ParentRoot: make([]byte, 32), + BodyRoot: make([]byte, 32), + StateRoot: make([]byte, 32), + }, + Signature: []byte("signature"), + }, + }) + require.NoError(t, err) + + chainService := &chainMock.ChainService{ + Genesis: time.Now(), + } + + b := util.NewBeaconBlockDeneb() + sb, err := blocks.NewSignedBeaconBlock(b) + require.NoError(t, err) + + tests := []struct { + name string + blobSidecars []blocks.VerifiedROBlob + expectedBlobCount int + }{ + { + name: "Constructed 0 blobs", + blobSidecars: nil, + expectedBlobCount: 0, + }, + { + name: "Constructed 6 blobs", + blobSidecars: []blocks.VerifiedROBlob{ + {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, {ROBlob: rob}, + }, + expectedBlobCount: 6, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := Service{ + cfg: &config{ + p2p: mockp2p.NewTestP2P(t), + chain: chainService, + clock: startup.NewClock(time.Now(), [32]byte{}), + blobStorage: filesystem.NewEphemeralBlobStorage(t), + executionReconstructor: &mockExecution.EngineClient{ + BlobSidecars: tt.blobSidecars, + }, + }, + } + s.reconstructAndBroadcastBlobs(context.Background(), sb) + require.Equal(t, tt.expectedBlobCount, len(chainService.Blobs)) + }) + } +} diff --git a/beacon-chain/verification/batch_test.go b/beacon-chain/verification/batch_test.go index f0e987d79739..6bc33bea3d40 100644 --- a/beacon-chain/verification/batch_test.go +++ b/beacon-chain/verification/batch_test.go @@ -41,7 +41,7 @@ func TestBatchVerifier(t *testing.T) { }, nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)} + return &MockBlobVerifier{CbVerifiedROBlob: vbcb(bl, nil)} } }, nblobs: 0, @@ -50,7 +50,7 @@ func TestBatchVerifier(t *testing.T) { name: "happy path", nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)} + return &MockBlobVerifier{CbVerifiedROBlob: vbcb(bl, nil)} } }, bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) { @@ -62,7 +62,7 @@ func TestBatchVerifier(t *testing.T) { name: "partial batch", nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: vbcb(bl, nil)} + return &MockBlobVerifier{CbVerifiedROBlob: vbcb(bl, nil)} } }, bandb: func(t *testing.T, nb int) (blocks.ROBlock, []blocks.ROBlob) { @@ -76,7 +76,7 @@ func TestBatchVerifier(t *testing.T) { name: "invalid commitment", nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + return &MockBlobVerifier{CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { t.Fatal("Batch verifier should stop before this point") return blocks.VerifiedROBlob{}, nil }} @@ -93,7 +93,7 @@ func TestBatchVerifier(t *testing.T) { name: "signature mismatch", nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + return &MockBlobVerifier{CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { t.Fatal("Batch verifier should stop before this point") return blocks.VerifiedROBlob{}, nil }} @@ -111,7 +111,7 @@ func TestBatchVerifier(t *testing.T) { name: "root mismatch", nv: func() NewBlobVerifier { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { - return &MockBlobVerifier{cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + return &MockBlobVerifier{CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { t.Fatal("Batch verifier should stop before this point") return blocks.VerifiedROBlob{}, nil }} @@ -133,7 +133,7 @@ func TestBatchVerifier(t *testing.T) { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { return &MockBlobVerifier{ ErrBlobIndexInBounds: ErrBlobIndexInvalid, - cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { t.Fatal("Batch verifier should stop before this point") return blocks.VerifiedROBlob{}, nil }} @@ -151,7 +151,7 @@ func TestBatchVerifier(t *testing.T) { return func(bl blocks.ROBlob, reqs []Requirement) BlobVerifier { return &MockBlobVerifier{ ErrSidecarInclusionProven: ErrSidecarInclusionProofInvalid, - cbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { + CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) { t.Fatal("Batch verifier should stop before this point") return blocks.VerifiedROBlob{}, nil }} diff --git a/beacon-chain/verification/blob.go b/beacon-chain/verification/blob.go index 916ddff3bc31..2a323c8542a9 100644 --- a/beacon-chain/verification/blob.go +++ b/beacon-chain/verification/blob.go @@ -67,6 +67,8 @@ var InitsyncSidecarRequirements = requirementList(GossipSidecarRequirements).exc RequireSidecarProposerExpected, ) +var ELMemPoolRequirements = []Requirement{RequireSidecarKzgProofVerified} + // BackfillSidecarRequirements is the same as InitsyncSidecarRequirements. var BackfillSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding() diff --git a/beacon-chain/verification/mock.go b/beacon-chain/verification/mock.go index 8f956911de67..66c6e49071ff 100644 --- a/beacon-chain/verification/mock.go +++ b/beacon-chain/verification/mock.go @@ -18,11 +18,11 @@ type MockBlobVerifier struct { ErrSidecarInclusionProven error ErrSidecarKzgProofVerified error ErrSidecarProposerExpected error - cbVerifiedROBlob func() (blocks.VerifiedROBlob, error) + CbVerifiedROBlob func() (blocks.VerifiedROBlob, error) } func (m *MockBlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { - return m.cbVerifiedROBlob() + return m.CbVerifiedROBlob() } func (m *MockBlobVerifier) BlobIndexInBounds() (err error) { diff --git a/proto/engine/v1/execution_engine.pb.go b/proto/engine/v1/execution_engine.pb.go index 141fc09baf2d..74c7d1c267e4 100755 --- a/proto/engine/v1/execution_engine.pb.go +++ b/proto/engine/v1/execution_engine.pb.go @@ -1697,6 +1697,61 @@ func (x *Blob) GetData() []byte { return nil } +type BlobAndProof struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Blob []byte `protobuf:"bytes,1,opt,name=blob,proto3" json:"blob,omitempty" ssz-size:"131072"` + KzgProof []byte `protobuf:"bytes,2,opt,name=kzg_proof,json=kzgProof,proto3" json:"kzg_proof,omitempty" ssz-size:"48"` +} + +func (x *BlobAndProof) Reset() { + *x = BlobAndProof{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_engine_v1_execution_engine_proto_msgTypes[16] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BlobAndProof) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BlobAndProof) ProtoMessage() {} + +func (x *BlobAndProof) ProtoReflect() protoreflect.Message { + mi := &file_proto_engine_v1_execution_engine_proto_msgTypes[16] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BlobAndProof.ProtoReflect.Descriptor instead. +func (*BlobAndProof) Descriptor() ([]byte, []int) { + return file_proto_engine_v1_execution_engine_proto_rawDescGZIP(), []int{16} +} + +func (x *BlobAndProof) GetBlob() []byte { + if x != nil { + return x.Blob + } + return nil +} + +func (x *BlobAndProof) GetKzgProof() []byte { + if x != nil { + return x.KzgProof + } + return nil +} + var File_proto_engine_v1_execution_engine_proto protoreflect.FileDescriptor var file_proto_engine_v1_execution_engine_proto_rawDesc = []byte{ @@ -2072,17 +2127,23 @@ var file_proto_engine_v1_execution_engine_proto_rawDesc = []byte{ 0x31, 0x30, 0x37, 0x32, 0x92, 0xb5, 0x18, 0x04, 0x34, 0x30, 0x39, 0x36, 0x52, 0x05, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x22, 0x26, 0x0a, 0x04, 0x42, 0x6c, 0x6f, 0x62, 0x12, 0x1e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x0a, 0x8a, 0xb5, 0x18, 0x06, 0x31, - 0x33, 0x31, 0x30, 0x37, 0x32, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x96, 0x01, 0x0a, 0x16, - 0x6f, 0x72, 0x67, 0x2e, 0x65, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x65, 0x6e, 0x67, - 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x14, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x69, 0x6f, - 0x6e, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x3a, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, - 0x61, 0x74, 0x69, 0x63, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, - 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x76, - 0x31, 0x3b, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x76, 0x31, 0xaa, 0x02, 0x12, 0x45, 0x74, 0x68, - 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2e, 0x56, 0x31, 0xca, - 0x02, 0x12, 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x5c, 0x45, 0x6e, 0x67, 0x69, 0x6e, - 0x65, 0x5c, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x33, 0x31, 0x30, 0x37, 0x32, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x53, 0x0a, 0x0c, 0x42, + 0x6c, 0x6f, 0x62, 0x41, 0x6e, 0x64, 0x50, 0x72, 0x6f, 0x6f, 0x66, 0x12, 0x1e, 0x0a, 0x04, 0x62, + 0x6c, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x0a, 0x8a, 0xb5, 0x18, 0x06, 0x31, + 0x33, 0x31, 0x30, 0x37, 0x32, 0x52, 0x04, 0x62, 0x6c, 0x6f, 0x62, 0x12, 0x23, 0x0a, 0x09, 0x6b, + 0x7a, 0x67, 0x5f, 0x70, 0x72, 0x6f, 0x6f, 0x66, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x42, 0x06, + 0x8a, 0xb5, 0x18, 0x02, 0x34, 0x38, 0x52, 0x08, 0x6b, 0x7a, 0x67, 0x50, 0x72, 0x6f, 0x6f, 0x66, + 0x42, 0x96, 0x01, 0x0a, 0x16, 0x6f, 0x72, 0x67, 0x2e, 0x65, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, + 0x6d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2e, 0x76, 0x31, 0x42, 0x14, 0x45, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x50, 0x01, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x70, 0x72, 0x79, 0x73, 0x6d, 0x61, 0x74, 0x69, 0x63, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, + 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x35, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x65, 0x6e, 0x67, + 0x69, 0x6e, 0x65, 0x2f, 0x76, 0x31, 0x3b, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x76, 0x31, 0xaa, + 0x02, 0x12, 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x45, 0x6e, 0x67, 0x69, 0x6e, + 0x65, 0x2e, 0x56, 0x31, 0xca, 0x02, 0x12, 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x5c, + 0x45, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x5c, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -2098,7 +2159,7 @@ func file_proto_engine_v1_execution_engine_proto_rawDescGZIP() []byte { } var file_proto_engine_v1_execution_engine_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_proto_engine_v1_execution_engine_proto_msgTypes = make([]protoimpl.MessageInfo, 16) +var file_proto_engine_v1_execution_engine_proto_msgTypes = make([]protoimpl.MessageInfo, 17) var file_proto_engine_v1_execution_engine_proto_goTypes = []interface{}{ (PayloadStatus_Status)(0), // 0: ethereum.engine.v1.PayloadStatus.Status (*ExecutionPayload)(nil), // 1: ethereum.engine.v1.ExecutionPayload @@ -2117,6 +2178,7 @@ var file_proto_engine_v1_execution_engine_proto_goTypes = []interface{}{ (*Withdrawal)(nil), // 14: ethereum.engine.v1.Withdrawal (*BlobsBundle)(nil), // 15: ethereum.engine.v1.BlobsBundle (*Blob)(nil), // 16: ethereum.engine.v1.Blob + (*BlobAndProof)(nil), // 17: ethereum.engine.v1.BlobAndProof } var file_proto_engine_v1_execution_engine_proto_depIdxs = []int32{ 14, // 0: ethereum.engine.v1.ExecutionPayloadCapella.withdrawals:type_name -> ethereum.engine.v1.Withdrawal @@ -2332,6 +2394,18 @@ func file_proto_engine_v1_execution_engine_proto_init() { return nil } } + file_proto_engine_v1_execution_engine_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BlobAndProof); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -2339,7 +2413,7 @@ func file_proto_engine_v1_execution_engine_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_engine_v1_execution_engine_proto_rawDesc, NumEnums: 1, - NumMessages: 16, + NumMessages: 17, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/engine/v1/execution_engine.proto b/proto/engine/v1/execution_engine.proto index 17ac3b258248..9aa06d72884c 100644 --- a/proto/engine/v1/execution_engine.proto +++ b/proto/engine/v1/execution_engine.proto @@ -224,3 +224,7 @@ message Blob { bytes data = 1 [(ethereum.eth.ext.ssz_size) = "blob.size"]; } +message BlobAndProof{ + bytes blob = 1 [(ethereum.eth.ext.ssz_size) = "blob.size"]; + bytes kzg_proof = 2 [(ethereum.eth.ext.ssz_size) = "48"]; +} diff --git a/proto/engine/v1/json_marshal_unmarshal.go b/proto/engine/v1/json_marshal_unmarshal.go index 13ed85726cad..8e94ab2160b6 100644 --- a/proto/engine/v1/json_marshal_unmarshal.go +++ b/proto/engine/v1/json_marshal_unmarshal.go @@ -830,6 +830,11 @@ func (b BlobBundleJSON) ToProto() *BlobsBundle { } } +type BlobAndProofJson struct { + Blob hexutil.Bytes `json:"blob"` + KzgProof hexutil.Bytes `json:"proof"` +} + // MarshalJSON -- func (e *ExecutionPayloadDeneb) MarshalJSON() ([]byte, error) { transactions := make([]hexutil.Bytes, len(e.Transactions)) @@ -1120,3 +1125,21 @@ func RecastHexutilByteSlice(h []hexutil.Bytes) [][]byte { } return r } + +// UnmarshalJSON implements the json unmarshaler interface for BlobAndProof. +func (b *BlobAndProof) UnmarshalJSON(enc []byte) error { + var dec *BlobAndProofJson + if err := json.Unmarshal(enc, &dec); err != nil { + return err + } + + blob := make([]byte, fieldparams.BlobLength) + copy(blob, dec.Blob) + b.Blob = blob + + proof := make([]byte, fieldparams.BLSPubkeyLength) + copy(proof, dec.KzgProof) + b.KzgProof = proof + + return nil +} diff --git a/proto/prysm/v1alpha1/light_client.pb.go b/proto/prysm/v1alpha1/light_client.pb.go index e0bf066767c8..34992141bc7c 100755 --- a/proto/prysm/v1alpha1/light_client.pb.go +++ b/proto/prysm/v1alpha1/light_client.pb.go @@ -7,13 +7,14 @@ package eth import ( + reflect "reflect" + sync "sync" + github_com_prysmaticlabs_prysm_v5_consensus_types_primitives "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" v1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" _ "github.com/prysmaticlabs/prysm/v5/proto/eth/ext" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" ) const (