-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use engine api get-blobs
for block subscriber
#14513
Conversation
f52c455
to
7f73d11
Compare
|
||
// Initialize KZG hashes and retrieve blobs | ||
kzgHashes := make([]common.Hash, len(kzgCommitments)) | ||
blobs, err := s.GetBlobs(ctx, kzgHashes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check if the EL supports this API (cached exchange capabilities result?) before attempting to call it? Otherwise the logs could be noisy for anyone who upgrades to this version with an EL that does not support getBlobs.
defer span.End() | ||
|
||
result := make([]*pb.BlobAndProof, len(versionedHashes)) | ||
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see the json methodset defined on this type, you probably need to define unmarshaling for the encoding to come through corrrectly.
if err != nil { | ||
return nil, errors.Wrap(err, "could not create RO blob with root") | ||
} | ||
verifiedBlobs = append(verifiedBlobs, blocks.NewVerifiedROBlob(roBlob)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're calling NewVerifiedROBlob and minting VerifiedROBlobs here, rather than getting those values from a verifier. That is risky. Would it be possible to define a verifier for this? Maybe something like the initial sync verifier that can deal with a batch.
On that note, I don't see you verifying the commitment inclusion proof anywhere in this PR.
Terence reminded me we are constructing the proof so we don't have to verify it, derp.
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob") | ||
} | ||
|
||
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safe to broadcast before we call ReceiveBlob? I think it would be best to do these steps across multiple loops, like:
- verify all of the blobs (I actually think we should do this inside ReconstructBlobSidecars since that method returns VerifiedROBlobs).
- call BroadcastBlob for all the blobs. - this ensures we get the most benefit from idontwant. I think that calling broadcast is non-blocking; io with the individual peers happens in separate libp2p threads. don't need to worry about "rebroadcasting" blobs where we have the index on disk, because libp2p handles that.
- Call ReceiveBlob for each blob. Do this after all the broadcasts because it will save the blobs to disk, which can block, esp if they have fsync enabled.
f75b0e0
to
e64a726
Compare
82b87b9
to
a7ed3fa
Compare
return err | ||
} | ||
|
||
blob := make([]byte, fieldparams.BlobLength) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use this function bytesutil.DecodeHexWithLength(dec.Blob, fieldparams.BlobLength)
to make sure there is a length check for dec.Blob
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m sort of leaning against doing it because, at first glance, DecodeHexWithLength
looks more tedious, and it does an unnecessary round trip of encoding for this usage. We trust the length from EL in many instances here...
func (s *Service) reconstructAndBroadcastBlobs(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock) { | ||
startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), block.Block().Slot()) | ||
if err != nil { | ||
log.WithError(err).Error("Failed to convert slot to time") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we not return here if it errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only used for log, and it doesn't feel right to ditch the rest of the process if we can't log one field correctly
return | ||
} | ||
if len(blobSidecars) == 0 { | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth adding a debug log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what we gain from a debug log here. ReconstructBlobSidecars
could no-op often. Let me know if there's something specific you are thinking
|
||
// Broadcast blob sidecars first than save them to the db | ||
for _, sidecar := range blobSidecars { | ||
if indices[sidecar.Index] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this panic if the index is way out of the indices array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can't, but will add an extra condition
} | ||
|
||
// Refresh indices as new blobs may have been added to the db | ||
indices, err = s.cfg.blobStorage.Indices(blockRoot) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this way of calling it twice sort of bothers me but don't have a better solution to suggest
if indices[sidecar.Index] { | ||
continue | ||
} | ||
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we not need to update indices map as we loop here ? if it's not found in indicies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont follow this question. Indices is what's saved in the DB, we shouldn't update them here
if err != nil { | ||
return nil, errors.Wrap(err, "could not get blobs") | ||
} | ||
if blobs == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this ever be nil? getBlobs always makes []*pb.BlobAndProof even if length is 0, instead maybe len(blobs) == 0 check?
|
||
blob := blobs[blobIndex] | ||
blobIndex++ | ||
if blob == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment to the blobs comment
continue | ||
} | ||
|
||
blob := blobs[blobIndex] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it worth having a len check between kzgCommitments and returned blobs? before hand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to compare returned blobs with kzg hashes, but the point is there could be nil in the middle, example so we need to track i
and blobIndex
separately
Client software MUST place responses in the order given in the request, using null for any missing blobs. For instance, if the request is [A_versioned_hash, B_versioned_hash, C_versioned_hash] and client software has data for blobs A and C, but doesn't have data for B, the response MUST be [A, null, C].
beacon-chain/execution/service.go
Outdated
capabilities []string | ||
capabilitiesLock sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having these two exposed and having to have the following type of logic on each call:
s.capabilitiesLock.RLock()
if !slices.Contains(s.capabilities, GetBlobsV1) {
s.capabilitiesLock.RUnlock()
return nil, nil
}
s.capabilitiesLock.RUnlock()
It's better to have a simple cache (you can copy any of the latest caches that are a struct with a lock inside. And you call s.capabilitiesCache.Get() / Set() that are self locking
3004aeb
to
1fb8b91
Compare
// Broadcast blob sidecars first than save them to the db | ||
for _, sidecar := range blobSidecars { | ||
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] { | ||
continue | ||
} | ||
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { | ||
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to broadcast blob sidecar") | ||
} | ||
} | ||
|
||
for _, sidecar := range blobSidecars { | ||
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] { | ||
blobExistedInDBCount.Inc() | ||
continue | ||
} | ||
if err := s.subscribeBlob(ctx, sidecar); err != nil { | ||
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob") | ||
continue | ||
} | ||
|
||
blobRecoveredFromELCount.Inc() | ||
fields := blobFields(sidecar.ROBlob) | ||
fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime) | ||
log.WithFields(fields).Debug("Processed blob sidecar from EL") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this all be grouped like so:
// Broadcast blob sidecars first than save them to the db | |
for _, sidecar := range blobSidecars { | |
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] { | |
continue | |
} | |
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { | |
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to broadcast blob sidecar") | |
} | |
} | |
for _, sidecar := range blobSidecars { | |
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] { | |
blobExistedInDBCount.Inc() | |
continue | |
} | |
if err := s.subscribeBlob(ctx, sidecar); err != nil { | |
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob") | |
continue | |
} | |
blobRecoveredFromELCount.Inc() | |
fields := blobFields(sidecar.ROBlob) | |
fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime) | |
log.WithFields(fields).Debug("Processed blob sidecar from EL") | |
} | |
// Broadcast blob sidecars first than save them to the db | |
for _, sidecar := range blobSidecars { | |
if sidecar.Index >= uint64(len(indices)) || indices[sidecar.Index] { | |
blobExistedInDBCount.Inc() | |
continue | |
} | |
if err := s.cfg.p2p.BroadcastBlob(ctx, sidecar.Index, sidecar.BlobSidecar); err != nil { | |
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to broadcast blob sidecar") | |
} | |
if err := s.subscribeBlob(ctx, sidecar); err != nil { | |
log.WithFields(blobFields(sidecar.ROBlob)).WithError(err).Error("Failed to receive blob") | |
continue | |
} | |
blobRecoveredFromELCount.Inc() | |
fields := blobFields(sidecar.ROBlob) | |
fields["sinceSlotStartTime"] = s.cfg.clock.Now().Sub(startTime) | |
log.WithFields(fields).Debug("Processed blob sidecar from EL") | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review... will resume later tonight
CHANGELOG.md
Outdated
@@ -76,6 +76,7 @@ Updating to this release is recommended at your convenience. | |||
- fastssz version bump (better error messages). | |||
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413) | |||
- Added GetPoolAttesterSlashingsV2 endpoint. | |||
- Use engine api get-blobs for block subscriber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a bit more to this changelog entry? The PR description claims this could be quite impactful, but the changelog entry is rather insignificant looking
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs, | ||
// and constructs the corresponding verified read-only blob sidecars. | ||
// | ||
// The 'exists' argument is a boolean array of length 6, where each element corresponds to whether a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this magic number 6?
return nil, errors.Wrap(err, "could not get header") | ||
} | ||
|
||
// Reconstruct verify blob sidecars |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Reconstruct verify blob sidecars | |
// Reconstruct verified blob sidecars |
@@ -224,3 +224,7 @@ message Blob { | |||
bytes data = 1 [(ethereum.eth.ext.ssz_size) = "blob.size"]; | |||
} | |||
|
|||
message BlobAndProof{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message BlobAndProof{ | |
message BlobAndProof { |
beacon-chain/sync/metrics.go
Outdated
Name: "blob_recovered_from_el_count", | ||
Help: "Count the number of times blobs have been recovered from the execution layer.", | ||
}, | ||
) | ||
|
||
blobExistedInDBCount = promauto.NewCounter( | ||
prometheus.CounterOpts{ | ||
Name: "blob_existed_in_db_count", | ||
Help: "Count the number of times blobs have been found in the database.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Name: "blob_recovered_from_el_count", | |
Help: "Count the number of times blobs have been recovered from the execution layer.", | |
}, | |
) | |
blobExistedInDBCount = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Name: "blob_existed_in_db_count", | |
Help: "Count the number of times blobs have been found in the database.", | |
Name: "blob_recovered_from_el_total", | |
Help: "The number of times blobs have been recovered from the execution layer.", | |
}, | |
) | |
blobExistedInDBCount = promauto.NewCounter( | |
prometheus.CounterOpts{ | |
Name: "blob_existed_in_db_total", | |
Help: "The number of times blobs have been found in the database.", |
Please use _total
suffix. See https://prometheus.io/docs/practices/naming/
beacon-chain/execution/service.go
Outdated
type capabilityCache struct { | ||
capabilities []string | ||
capabilitiesLock sync.RWMutex | ||
} | ||
|
||
func (c *capabilityCache) Save(cs []string) { | ||
c.capabilitiesLock.Lock() | ||
defer c.capabilitiesLock.Unlock() | ||
|
||
c.capabilities = cs | ||
} | ||
|
||
func (c *capabilityCache) Has(capability string) bool { | ||
c.capabilitiesLock.RLock() | ||
defer c.capabilitiesLock.RUnlock() | ||
|
||
for _, existingCapability := range c.capabilities { | ||
if existingCapability == capability { | ||
return true | ||
} | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a map? O(n) storage, O(1) lookup
type capabilityCache struct { | |
capabilities []string | |
capabilitiesLock sync.RWMutex | |
} | |
func (c *capabilityCache) Save(cs []string) { | |
c.capabilitiesLock.Lock() | |
defer c.capabilitiesLock.Unlock() | |
c.capabilities = cs | |
} | |
func (c *capabilityCache) Has(capability string) bool { | |
c.capabilitiesLock.RLock() | |
defer c.capabilitiesLock.RUnlock() | |
for _, existingCapability := range c.capabilities { | |
if existingCapability == capability { | |
return true | |
} | |
} | |
return false | |
} | |
type capabilityCache struct { | |
capabilities map[string]interface{} | |
capabilitiesLock sync.RWMutex | |
} | |
func (c *capabilityCache) Save(cs []string) { | |
c.capabilitiesLock.Lock() | |
defer c.capabilitiesLock.Unlock() | |
if c.capabilities == nil { | |
c.capabilities = make(map[string]interface{}) | |
} | |
c.capabilities[cs] = nil | |
} | |
func (c *capabilityCache) Has(capability string) bool { | |
c.capabilitiesLock.RLock() | |
defer c.capabilitiesLock.RUnlock() | |
if c.capabilities == nil { | |
return false | |
} | |
_, ok := c.capabilities[capability] | |
return ok | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
map is good too!
Debug changelog add proto marshal and unmarshal Kasey's feedback
1fb8b91
to
9f0449c
Compare
9f0449c
to
3780805
Compare
Reference: ethereum/execution-apis#559
Background
The$T_{BlockArrival} + \triangle_{BlobsOverEngineAPI} < T_{BlockArrival} + \triangle_{BlobsOverP2P}$ which simplifies to: $\triangle_{BlobsOverEngineAPI} < \triangle_{BlobsOverP2P}$
engine_getBlobsV1
function allows the CL to retrieve blobs and their proofs from the EL by providing the block body's KZG version hashes. If the blobs and proofs are already available in the local mempool, the EL will return them. Once the CL receives the blobs and proofs, it can import the block and run the fork choice algorithm to update the head. In the best-case scenario, this process is faster than waiting for blobs to propagate through the network. By assuming the following condition holds:The benefits include:
Prysm Implementation
Now we will discuss how Prysm may implement this optimization. Feel free to stop reading here if you're not interested in the Prysm side of things.
One way to study this is by reviewing the following task timeline:
Implementation Details
get blobs
endpoint, it will be a no-op instead of returning an error. We added exchange capabilities caching and a check beforehand.The preliminary result is documented here: https://hackmd.io/@ttsao/get-blobs-early-results