Skip to content

Commit

Permalink
Move P2P, RPC, historical sync to new BlobSidecar (#7724)
Browse files Browse the repository at this point in the history
* Move P2P, RPC, historical sync to new BlobSidecar

* Fix JavaCase warning
  • Loading branch information
zilm13 committed Nov 20, 2023
1 parent f3300d8 commit 94d3782
Show file tree
Hide file tree
Showing 56 changed files with 664 additions and 715 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.Eth2Peer;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;

public class FetchBlobSidecarTask extends AbstractFetchTask<BlobIdentifier, BlobSidecarOld> {
public class FetchBlobSidecarTask extends AbstractFetchTask<BlobIdentifier, BlobSidecar> {

private static final Logger LOG = LogManager.getLogger();

Expand All @@ -49,7 +49,7 @@ public BlobIdentifier getKey() {
}

@Override
SafeFuture<FetchResult<BlobSidecarOld>> fetch(final Eth2Peer peer) {
SafeFuture<FetchResult<BlobSidecar>> fetch(final Eth2Peer peer) {
return peer.requestBlobSidecarByRoot(blobIdentifier)
.thenApply(
maybeBlobSidecar ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
Expand Down Expand Up @@ -60,7 +60,7 @@ public BatchImporter(
public SafeFuture<BatchImportResult> importBatch(final Batch batch) {
// Copy the data from batch as we're going to use them from off the event thread.
final List<SignedBeaconBlock> blocks = new ArrayList<>(batch.getBlocks());
final Map<Bytes32, List<BlobSidecarOld>> blobSidecarsByBlockRoot =
final Map<Bytes32, List<BlobSidecar>> blobSidecarsByBlockRoot =
Map.copyOf(batch.getBlobSidecarsByBlockRoot());

final Optional<SyncSource> source = batch.getSource();
Expand Down Expand Up @@ -103,20 +103,20 @@ public SafeFuture<BatchImportResult> importBatch(final Batch batch) {

private SafeFuture<BlockImportResult> importBlockAndBlobSidecars(
final SignedBeaconBlock block,
final Map<Bytes32, List<BlobSidecarOld>> blobSidecarsByBlockRoot,
final Map<Bytes32, List<BlobSidecar>> blobSidecarsByBlockRoot,
final SyncSource source) {
final Bytes32 blockRoot = block.getRoot();
if (!blobSidecarsByBlockRoot.containsKey(blockRoot)) {
return importBlock(block, source);
}
final List<BlobSidecarOld> blobSidecars = blobSidecarsByBlockRoot.get(blockRoot);
final List<BlobSidecar> blobSidecars = blobSidecarsByBlockRoot.get(blockRoot);
LOG.debug(
"Sending {} blob sidecars to the pool for block with root {}",
blobSidecars.size(),
blockRoot);
// Add blob sidecars to the pool in order for them to be available when the block is being
// imported
blobSidecarPool.onCompletedBlockAndBlobSidecarsOld(block, blobSidecars);
blobSidecarPool.onCompletedBlockAndBlobSidecars(block, blobSidecars);
return importBlock(block, source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

/** A section of a particular target chain that can be downloaded in parallel. */
Expand All @@ -37,7 +37,7 @@ public interface Batch {

List<SignedBeaconBlock> getBlocks();

Map<Bytes32, List<BlobSidecarOld>> getBlobSidecarsByBlockRoot();
Map<Bytes32, List<BlobSidecar>> getBlobSidecarsByBlockRoot();

Optional<SyncSource> getSource();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class EventThreadOnlyBatch implements Batch {
Expand Down Expand Up @@ -71,7 +71,7 @@ public List<SignedBeaconBlock> getBlocks() {
}

@Override
public Map<Bytes32, List<BlobSidecarOld>> getBlobSidecarsByBlockRoot() {
public Map<Bytes32, List<BlobSidecar>> getBlobSidecarsByBlockRoot() {
eventThread.checkOnEventThread();
return delegate.getBlobSidecarsByBlockRoot();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlocksByRangeResponseInvalidResponseException;
import tech.pegasys.teku.networking.p2p.peer.PeerDisconnectedException;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.BeaconBlockBodyDeneb;
Expand All @@ -65,7 +65,7 @@ public class SyncSourceBatch implements Batch {
private boolean awaitingBlocks = false;

private final List<SignedBeaconBlock> blocks = new ArrayList<>();
private final Map<Bytes32, List<BlobSidecarOld>> blobSidecarsByBlockRoot = new HashMap<>();
private final Map<Bytes32, List<BlobSidecar>> blobSidecarsByBlockRoot = new HashMap<>();

SyncSourceBatch(
final EventThread eventThread,
Expand Down Expand Up @@ -117,7 +117,7 @@ public List<SignedBeaconBlock> getBlocks() {
}

@Override
public Map<Bytes32, List<BlobSidecarOld>> getBlobSidecarsByBlockRoot() {
public Map<Bytes32, List<BlobSidecar>> getBlobSidecarsByBlockRoot() {
return blobSidecarsByBlockRoot;
}

Expand Down Expand Up @@ -318,7 +318,7 @@ private void onRequestComplete(
blocks.addAll(newBlocks);

if (maybeBlobSidecarRequestHandler.isPresent()) {
final Map<Bytes32, List<BlobSidecarOld>> newBlobSidecarsByBlockRoot =
final Map<Bytes32, List<BlobSidecar>> newBlobSidecarsByBlockRoot =
maybeBlobSidecarRequestHandler.get().complete();
if (!validateNewBlobSidecars(newBlocks, newBlobSidecarsByBlockRoot)) {
markAsInvalid();
Expand Down Expand Up @@ -373,11 +373,11 @@ private boolean validateNewBlocks(final List<SignedBeaconBlock> newBlocks) {

private boolean validateNewBlobSidecars(
final List<SignedBeaconBlock> newBlocks,
final Map<Bytes32, List<BlobSidecarOld>> newBlobSidecarsByBlockRoot) {
final Map<Bytes32, List<BlobSidecar>> newBlobSidecarsByBlockRoot) {
final Set<Bytes32> blockRootsWithKzgCommitments = new HashSet<>(newBlocks.size());
for (final SignedBeaconBlock block : newBlocks) {
final Bytes32 blockRoot = block.getRoot();
final List<BlobSidecarOld> blobSidecars =
final List<BlobSidecar> blobSidecars =
newBlobSidecarsByBlockRoot.getOrDefault(blockRoot, List.of());
final int numberOfKzgCommitments =
block
Expand All @@ -399,7 +399,7 @@ private boolean validateNewBlobSidecars(
return false;
}
final UInt64 blockSlot = block.getSlot();
for (final BlobSidecarOld blobSidecar : blobSidecars) {
for (final BlobSidecar blobSidecar : blobSidecars) {
if (!blobSidecar.getSlot().equals(blockSlot)) {
LOG.debug(
"Marking batch invalid because blob sidecar for root {} was received with slot {} which is different than the block slot {}",
Expand Down Expand Up @@ -446,19 +446,19 @@ public List<SignedBeaconBlock> complete() {
}
}

private static class BlobSidecarRequestHandler implements RpcResponseListener<BlobSidecarOld> {
private final Map<Bytes32, List<BlobSidecarOld>> blobSidecarsByBlockRoot = new HashMap<>();
private static class BlobSidecarRequestHandler implements RpcResponseListener<BlobSidecar> {
private final Map<Bytes32, List<BlobSidecar>> blobSidecarsByBlockRoot = new HashMap<>();

@Override
public SafeFuture<?> onResponse(final BlobSidecarOld blobSidecar) {
final List<BlobSidecarOld> blobSidecars =
public SafeFuture<?> onResponse(final BlobSidecar blobSidecar) {
final List<BlobSidecar> blobSidecars =
blobSidecarsByBlockRoot.computeIfAbsent(
blobSidecar.getBlockRoot(), __ -> new ArrayList<>());
blobSidecars.add(blobSidecar);
return SafeFuture.COMPLETE;
}

public Map<Bytes32, List<BlobSidecarOld>> complete() {
public Map<Bytes32, List<BlobSidecar>> complete() {
return blobSidecarsByBlockRoot;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.networking.p2p.reputation.ReputationAdjustment;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;

public class ThrottlingSyncSource implements SyncSource {
Expand Down Expand Up @@ -72,9 +72,7 @@ public SafeFuture<Void> requestBlocksByRange(

@Override
public SafeFuture<Void> requestBlobSidecarsByRange(
final UInt64 startSlot,
final UInt64 count,
final RpcResponseListener<BlobSidecarOld> listener) {
final UInt64 startSlot, final UInt64 count, final RpcResponseListener<BlobSidecar> listener) {
if (blobSidecarsRateTracker.approveObjectsRequest(count.longValue()).isPresent()) {
LOG.debug("Sending request for {} blob sidecars", count);
return delegate.requestBlobSidecarsByRange(startSlot, count, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import tech.pegasys.teku.networking.eth2.rpc.core.RpcException;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager;
Expand Down Expand Up @@ -194,7 +194,7 @@ private SafeFuture<PeerSyncResult> executeSync(
requestContext.count,
block -> {
// at this point, blob sidecars (if any) have been received
final Optional<List<BlobSidecarOld>> blobSidecars =
final Optional<List<BlobSidecar>> blobSidecars =
blobSidecarListener.getReceivedBlobSidecars(block.getSlot());
return importBlock(block, blobSidecars);
});
Expand Down Expand Up @@ -336,14 +336,14 @@ private RequestContext(final UInt64 startSlot, final UInt64 count, final UInt64
}

private SafeFuture<Void> importBlock(
final SignedBeaconBlock block, final Optional<List<BlobSidecarOld>> maybeBlobSidecars) {
final SignedBeaconBlock block, final Optional<List<BlobSidecar>> maybeBlobSidecars) {
if (stopped.get()) {
throw new CancellationException("Peer sync was cancelled");
}
// Add blob sidecars to the pool in order for them to be available when the block is being
// imported
maybeBlobSidecars.ifPresent(
blobSidecars -> blobSidecarPool.onCompletedBlockAndBlobSidecarsOld(block, blobSidecars));
blobSidecars -> blobSidecarPool.onCompletedBlockAndBlobSidecars(block, blobSidecars));

return blockImporter
.importBlock(block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public class PeerSyncBlobSidecarListener implements RpcResponseListener<BlobSidecarOld> {
public class PeerSyncBlobSidecarListener implements RpcResponseListener<BlobSidecar> {

private final Map<UInt64, List<BlobSidecarOld>> blobSidecarsBySlot = new HashMap<>();
private final Map<UInt64, List<BlobSidecar>> blobSidecarsBySlot = new HashMap<>();

private final UInt64 startSlot;
private final UInt64 endSlot;
Expand All @@ -36,7 +36,7 @@ public PeerSyncBlobSidecarListener(final UInt64 startSlot, final UInt64 endSlot)
}

@Override
public SafeFuture<?> onResponse(final BlobSidecarOld blobSidecar) {
public SafeFuture<?> onResponse(final BlobSidecar blobSidecar) {
final UInt64 sidecarSlot = blobSidecar.getSlot();
if (sidecarSlot.isLessThan(startSlot) || sidecarSlot.isGreaterThan(endSlot)) {
final String exceptionMessage =
Expand All @@ -45,13 +45,13 @@ public SafeFuture<?> onResponse(final BlobSidecarOld blobSidecar) {
sidecarSlot, startSlot, endSlot);
return SafeFuture.failedFuture(new IllegalArgumentException(exceptionMessage));
}
final List<BlobSidecarOld> blobSidecars =
final List<BlobSidecar> blobSidecars =
blobSidecarsBySlot.computeIfAbsent(sidecarSlot, __ -> new ArrayList<>());
blobSidecars.add(blobSidecar);
return SafeFuture.COMPLETE;
}

public Optional<List<BlobSidecarOld>> getReceivedBlobSidecars(final UInt64 slot) {
public Optional<List<BlobSidecar>> getReceivedBlobSidecars(final UInt64 slot) {
return Optional.ofNullable(blobSidecarsBySlot.get(slot));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

package tech.pegasys.teku.beacon.sync.gossip.blobs;

import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;

public interface BlobSidecarSubscriber {

void onBlobSidecar(BlobSidecarOld blobSidecar);
void onBlobSidecar(BlobSidecar blobSidecar);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobIdentifier;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;

public class RecentBlobSidecarsFetchService
extends AbstractFetchService<BlobIdentifier, FetchBlobSidecarTask, BlobSidecarOld>
extends AbstractFetchService<BlobIdentifier, FetchBlobSidecarTask, BlobSidecar>
implements RecentBlobSidecarsFetcher {

private static final Logger LOG = LogManager.getLogger();
Expand Down Expand Up @@ -127,7 +127,7 @@ public FetchBlobSidecarTask createTask(final BlobIdentifier key) {
}

@Override
public void processFetchedResult(final FetchBlobSidecarTask task, final BlobSidecarOld result) {
public void processFetchedResult(final FetchBlobSidecarTask task, final BlobSidecar result) {
LOG.trace("Successfully fetched blob sidecar: {}", result);
blobSidecarSubscribers.forEach(s -> s.onBlobSidecar(result));
// After retrieved blob sidecar has been processed, stop tracking it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.SpecConfig;
import tech.pegasys.teku.spec.constants.Domain;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecarOld;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
Expand Down Expand Up @@ -74,7 +74,7 @@ public class HistoricalBatchFetcher {
private final BlobSidecarManager blobSidecarManager;
private final SafeFuture<BeaconBlockSummary> future = new SafeFuture<>();
private final Deque<SignedBeaconBlock> blocksToImport = new ConcurrentLinkedDeque<>();
private final Map<SlotAndBlockRoot, List<BlobSidecarOld>> blobSidecarsBySlotToImport =
private final Map<SlotAndBlockRoot, List<BlobSidecar>> blobSidecarsBySlotToImport =
new ConcurrentHashMap<>();
private Optional<UInt64> maybeEarliestBlobSidecarSlot = Optional.empty();
private final AtomicInteger requestCount = new AtomicInteger(0);
Expand Down Expand Up @@ -247,7 +247,7 @@ private SafeFuture<Boolean> requestBlocksAndBlobSidecarsByRange() {
.thenApply(__ -> shouldRetryBlocksAndBlobSidecarsByRangeRequest());
}

private void processBlobSidecar(final BlobSidecarOld blobSidecar) {
private void processBlobSidecar(final BlobSidecar blobSidecar) {
blobSidecarsBySlotToImport
.computeIfAbsent(blobSidecar.getSlotAndBlockRoot(), __ -> new ArrayList<>())
.add(blobSidecar);
Expand Down Expand Up @@ -399,12 +399,12 @@ private void validateBlobSidecars(
}

private void validateBlobSidecars(final SignedBeaconBlock block) {
final List<BlobSidecarOld> blobSidecars =
final List<BlobSidecar> blobSidecars =
blobSidecarsBySlotToImport.getOrDefault(
block.getSlotAndBlockRoot(), Collections.emptyList());
LOG.trace("Validating {} blob sidecars for block {}", blobSidecars.size(), block.getRoot());
final BlobSidecarsAndValidationResult validationResult =
blobSidecarManager.createAvailabilityCheckerAndValidateImmediatelyOld(block, blobSidecars);
blobSidecarManager.createAvailabilityCheckerAndValidateImmediately(block, blobSidecars);

if (validationResult.isFailure()) {
final String causeMessage =
Expand Down Expand Up @@ -455,7 +455,7 @@ private static class RequestManager {
private final Bytes32 lastBlockRoot;
private final Optional<SignedBeaconBlock> previousBlock;
private final Consumer<SignedBeaconBlock> blockProcessor;
private final Consumer<BlobSidecarOld> blobSidecarProcessor;
private final Consumer<BlobSidecar> blobSidecarProcessor;

private final AtomicInteger blocksReceived = new AtomicInteger(0);
private final AtomicBoolean foundLastBlock = new AtomicBoolean(false);
Expand All @@ -464,7 +464,7 @@ private RequestManager(
final Bytes32 lastBlockRoot,
final Optional<SignedBeaconBlock> previousBlock,
final Consumer<SignedBeaconBlock> blockProcessor,
final Consumer<BlobSidecarOld> blobSidecarProcessor) {
final Consumer<BlobSidecar> blobSidecarProcessor) {
this.lastBlockRoot = lastBlockRoot;
this.previousBlock = previousBlock;
this.blockProcessor = blockProcessor;
Expand Down Expand Up @@ -493,7 +493,7 @@ private SafeFuture<?> processBlock(final SignedBeaconBlock block) {
});
}

private SafeFuture<?> processBlobSidecar(final BlobSidecarOld blobSidecar) {
private SafeFuture<?> processBlobSidecar(final BlobSidecar blobSidecar) {
blobSidecarProcessor.accept(blobSidecar);
return SafeFuture.COMPLETE;
}
Expand Down
Loading

0 comments on commit 94d3782

Please sign in to comment.