Skip to content

Commit

Permalink
Use engine api get-blobs for block subscriber
Browse files Browse the repository at this point in the history
Debug

changelog

add proto marshal and unmarshal

Kasey's feedback
  • Loading branch information
terencechain committed Oct 18, 2024
1 parent 073cf19 commit a7ed3fa
Show file tree
Hide file tree
Showing 30 changed files with 759 additions and 213 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Updating to this release is recommended at your convenience.
- fastssz version bump (better error messages).
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
- Added GetPoolAttesterSlashingsV2 endpoint.
- Use engine api get-blobs for block subscriber

### Changed

Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/execution/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down Expand Up @@ -105,8 +106,11 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/execution/testing:go_default_library",
"//beacon-chain/execution/types:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/doubly-linked-tree:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down
127 changes: 123 additions & 4 deletions beacon-chain/execution/engine_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"slices"
"strings"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -79,6 +82,8 @@ const (
GetPayloadBodiesByRangeV1 = "engine_getPayloadBodiesByRangeV1"
// ExchangeCapabilities request string for JSON-RPC.
ExchangeCapabilities = "engine_exchangeCapabilities"
// GetBlobsV1 request string for JSON-RPC.
GetBlobsV1 = "engine_getBlobsV1"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
Expand All @@ -93,16 +98,15 @@ type ForkchoiceUpdatedResponse struct {
ValidationError string `json:"validationError"`
}

// PayloadReconstructor defines a service that can reconstruct a full beacon
// block with an execution payload from a signed beacon block and a connection
// to an execution client's engine API.
type PayloadReconstructor interface {
// Reconstructor defines a service responsible for reconstructing full beacon chain objects by utilizing the execution API and making requests through the execution client.
type Reconstructor interface {
ReconstructFullBlock(
ctx context.Context, blindedBlock interfaces.ReadOnlySignedBeaconBlock,
) (interfaces.SignedBeaconBlock, error)
ReconstructFullBellatrixBlockBatch(
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices [6]bool) ([]blocks.VerifiedROBlob, error)
}

// EngineCaller defines a client that can interact with an Ethereum
Expand Down Expand Up @@ -494,6 +498,23 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H
return hdr, err
}

// GetBlobs returns the blob and proof from the execution engine for the given versioned hashes.
func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()
// If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later.
s.capabilitiesLock.RLock()
if !slices.Contains(s.capabilities, GetBlobsV1) {
s.capabilitiesLock.RUnlock()
return nil, nil
}
s.capabilitiesLock.RUnlock()

result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
return result, handleRPCError(err)
}

// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
Expand Down Expand Up @@ -522,6 +543,104 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
return unb, nil
}

// ReconstructBlobSidecars reconstructs the verified blob sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs,
// and constructs the corresponding verified read-only blob sidecars.
//
// The 'exists' argument is a boolean array of length 6, where each element corresponds to whether a
// particular blob sidecar already exists. If exists[i] is true, the blob for the i-th KZG commitment
// has already been retrieved and does not need to be fetched again from the execution layer (EL).
//
// For example:
// - If exists = [true, false, true, false, true, false], the function will fetch the blobs
// associated with indices 1, 3, and 5 (since those are marked as non-existent).
// - If exists = [false ... x 6], the function will attempt to fetch all blobs.
//
// Only the blobs that do not already exist (where exists[i] is false) are fetched using the KZG commitments from block body.
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists [6]bool) ([]blocks.VerifiedROBlob, error) {
blockBody := block.Block().Body()
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
}

// Collect KZG hashes for non-existing blobs
var kzgHashes []common.Hash
for i, commitment := range kzgCommitments {
if !exists[i] {
kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment))
}
}
if len(kzgHashes) == 0 {
return nil, nil
}

// Fetch blobs from EL
blobs, err := s.GetBlobs(ctx, kzgHashes)
if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
}
if blobs == nil {
return nil, nil
}

header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
}

// Reconstruct verify blob sidecars
var verifiedBlobs []blocks.VerifiedROBlob
for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ {
if exists[i] {
continue
}

blob := blobs[blobIndex]
blobIndex++
if blob == nil {
continue
}

proof, err := blocks.MerkleProofKZGCommitment(blockBody, i)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment")
continue
}
sidecar := &ethpb.BlobSidecar{
Index: uint64(i),
Blob: blob.Blob,
KzgCommitment: kzgCommitments[i],
KzgProof: blob.KzgProof,
SignedBlockHeader: header,
CommitmentInclusionProof: proof,
}

roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to create RO blob with root")
continue
}

// Verify the sidecar KZG proof
v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements)
if err := v.SidecarKzgProofVerified(); err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar")
continue
}

verifiedBlob, err := v.VerifiedROBlob()
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify RO blob")
continue
}

verifiedBlobs = append(verifiedBlobs, verifiedBlob)
}

return verifiedBlobs, nil
}

func fullPayloadFromPayloadBody(
header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int,
) (interfaces.ExecutionData, error) {
Expand Down
111 changes: 109 additions & 2 deletions beacon-chain/execution/engine_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package execution

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
mocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
Expand All @@ -37,9 +39,9 @@ import (
)

var (
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&Service{})
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&mocks.EngineClient{})
)

Expand Down Expand Up @@ -2390,3 +2392,108 @@ func Test_ExchangeCapabilities(t *testing.T) {
}
})
}

func TestReconstructBlobSidecars(t *testing.T) {
client := &Service{}
b := util.NewBeaconBlockDeneb()
kzgCommitments := createRandomKzgCommitments(t, 6)

b.Block.Body.BlobKzgCommitments = kzgCommitments
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)

ctx := context.Background()
t.Run("all seen", func(t *testing.T) {
exists := [6]bool{true, true, true, true, true, true}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})

t.Run("get-blobs end point is not supported", func(t *testing.T) {
exists := [6]bool{true, true, true, true, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})

t.Run("recovered 6 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 6)
defer srv.Close()

rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()

exists := [6]bool{}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 6, len(verifiedBlobs))
})

t.Run("recovered 3 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 3)
defer srv.Close()

rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()

exists := [6]bool{true, false, true, false, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 3, len(verifiedBlobs))
})
}

func createRandomKzgCommitments(t *testing.T, num int) [][]byte {
kzgCommitments := make([][]byte, num)
for i := range kzgCommitments {
kzgCommitments[i] = make([]byte, 48)
_, err := rand.Read(kzgCommitments[i])
require.NoError(t, err)
}
return kzgCommitments
}

func createBlobServer(t *testing.T, numBlobs int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
defer func() {
require.NoError(t, r.Body.Close())
}()

blobs := make([]pb.BlobAndProofJson, numBlobs)
for i := range blobs {
blobs[i] = pb.BlobAndProofJson{Blob: []byte(fmt.Sprintf("blob%d", i+1)), KzgProof: []byte(fmt.Sprintf("proof%d", i+1))}
}

respJSON := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"result": blobs,
}
require.NoError(t, json.NewEncoder(w).Encode(respJSON))
}))
}

func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Service) {
rpcClient, err := rpc.DialHTTP(url)
require.NoError(t, err)

client.rpcClient = rpcClient
client.capabilities = []string{GetBlobsV1}
client.blobVerifier = testNewBlobVerifier()

return rpcClient, client
}

func testNewBlobVerifier() verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return &verification.MockBlobVerifier{
CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
return blocks.VerifiedROBlob{}, nil
},
}
}
}
9 changes: 9 additions & 0 deletions beacon-chain/execution/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/network"
"github.com/prysmaticlabs/prysm/v5/network/authorization"
)
Expand Down Expand Up @@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option {
return nil
}
}

// WithVerifierWaiter gives the sync package direct access to the verifier waiter.
func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return func(s *Service) error {
s.verifierWaiter = v
return nil
}
}
9 changes: 9 additions & 0 deletions beacon-chain/execution/rpc_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
currClient.Close()
}
log.WithField("endpoint", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)).Info("Connected to new endpoint")

c, err := s.ExchangeCapabilities(ctx)
if err != nil {
errorLogger(err, "Could not exchange capabilities with execution client")
}
s.capabilitiesLock.Lock()
s.capabilities = c
s.capabilitiesLock.Unlock()

return
case <-s.ctx.Done():
log.Debug("Received cancelled context,closing existing powchain service")
Expand Down
Loading

0 comments on commit a7ed3fa

Please sign in to comment.