Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate chain halts when syncing #7162

Merged
merged 26 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c094ee4
disconnect peer when too many tries, and add logging
pinges Jun 2, 2024
1c08b74
increase timeout to give request a chance to timeout
pinges Jun 3, 2024
8073b52
let the task time out and make sure the connection is closed
pinges Jun 3, 2024
91ff91f
disconnect if needed and add more logging
pinges Jun 4, 2024
223eb27
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 4, 2024
5f10205
disable verification
pinges Jun 4, 2024
8e21f2a
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 6, 2024
c954f32
set retries to 5
pinges Jun 7, 2024
90ae742
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 7, 2024
c5aafce
cleanup
pinges Jun 7, 2024
95adfaa
more cleanup
pinges Jun 7, 2024
36890d0
fix another timeout
pinges Jun 10, 2024
7ea854e
fix unit test
pinges Jun 11, 2024
0142c32
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 12, 2024
e6a2522
clean up
pinges Jun 12, 2024
c4fdc17
Merge branch 'main' into chainHalt
pinges Jun 12, 2024
36c11d1
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 18, 2024
816f050
Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/s…
pinges Jun 18, 2024
c98084a
Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/s…
pinges Jun 18, 2024
7dded30
Update ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/s…
pinges Jun 18, 2024
661704a
Merge branch 'main' into chainHalt
pinges Jun 18, 2024
7601084
add CHANGELOG entry
pinges Jun 19, 2024
35d8ba9
Merge branch 'main' of github.com:hyperledger/besu into chainHalt
pinges Jun 19, 2024
aafb194
Merge branch 'chainHalt' of github.com:pinges/besu into chainHalt
pinges Jun 19, 2024
4bf18f3
updates after review
pinges Jun 20, 2024
75cfaa7
add comments
pinges Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected EthProtocolManager createEthProtocolManager(
var mergeBestPeerComparator =
new TransitionBestPeerComparator(
genesisConfigOptions.getTerminalTotalDifficulty().map(Difficulty::of).orElseThrow());
ethPeers.setBestChainComparator(mergeBestPeerComparator);
ethPeers.setBestPeerComparator(mergeBestPeerComparator);
mergeContext.observeNewIsPostMergeState(mergeBestPeerComparator);

Optional<MergePeerFilter> filterToUse = Optional.of(new MergePeerFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,12 @@ private boolean registerDisconnect(final EthPeer peer, final PeerConnection conn
peer.handleDisconnect();
abortPendingRequestsAssignedToDisconnectedPeers();
if (peer.getReputation().getScore() > USEFULL_PEER_SCORE_THRESHOLD) {
LOG.debug("Disconnected USEFULL peer {}", peer);
LOG.atDebug().setMessage("Disconnected USEFULL peer {}").addArgument(peer).log();
} else {
LOG.debug("Disconnected EthPeer {}", peer.getLoggableId());
LOG.atDebug()
.setMessage("Disconnected EthPeer {}")
.addArgument(peer.getLoggableId())
.log();
}
}
}
Expand Down Expand Up @@ -318,11 +321,11 @@ public Stream<EthPeer> streamAvailablePeers() {
public Stream<EthPeer> streamBestPeers() {
return streamAvailablePeers()
.filter(EthPeer::isFullyValidated)
.sorted(getBestChainComparator().reversed());
.sorted(getBestPeerComparator().reversed());
}

public Optional<EthPeer> bestPeer() {
return streamAvailablePeers().max(getBestChainComparator());
return streamAvailablePeers().max(getBestPeerComparator());
}

public Optional<EthPeer> bestPeerWithHeightEstimate() {
Expand All @@ -331,15 +334,15 @@ public Optional<EthPeer> bestPeerWithHeightEstimate() {
}

public Optional<EthPeer> bestPeerMatchingCriteria(final Predicate<EthPeer> matchesCriteria) {
return streamAvailablePeers().filter(matchesCriteria).max(getBestChainComparator());
return streamAvailablePeers().filter(matchesCriteria).max(getBestPeerComparator());
}

public void setBestChainComparator(final Comparator<EthPeer> comparator) {
public void setBestPeerComparator(final Comparator<EthPeer> comparator) {
LOG.info("Updating the default best peer comparator");
bestPeerComparator = comparator;
}

public Comparator<EthPeer> getBestChainComparator() {
public Comparator<EthPeer> getBestPeerComparator() {
return bestPeerComparator;
}

Expand Down Expand Up @@ -394,8 +397,7 @@ public boolean shouldConnect(final Peer peer, final boolean inbound) {

public void disconnectWorstUselessPeer() {
streamAvailablePeers()
.sorted(getBestChainComparator())
.findFirst()
.min(getBestPeerComparator())
.ifPresent(
peer -> {
LOG.atDebug()
Expand Down Expand Up @@ -551,29 +553,40 @@ private boolean addPeerToEthPeers(final EthPeer peer) {
if (!randomPeerPriority) {
// Disconnect if too many peers
if (!canExceedPeerLimits(id) && peerCount() >= peerUpperBound) {
LOG.trace(
"Too many peers. Disconnect connection: {}, max connections {}",
connection,
peerUpperBound);
LOG.atTrace()
.setMessage("Too many peers. Disconnect connection: {}, max connections {}")
.addArgument(connection)
.addArgument(peerUpperBound)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
// Disconnect if too many remotely-initiated connections
if (connection.inboundInitiated()
&& !canExceedPeerLimits(id)
&& remoteConnectionLimitReached()) {
LOG.trace(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}",
connection,
maxRemotelyInitiatedConnections);
LOG.atTrace()
.setMessage(
"Too many remotely-initiated connections. Disconnect incoming connection: {}, maxRemote={}")
.addArgument(connection)
.addArgument(maxRemotelyInitiatedConnections)
.log();
connection.disconnect(DisconnectMessage.DisconnectReason.TOO_MANY_PEERS);
return false;
}
final boolean added = (completeConnections.putIfAbsent(id, peer) == null);
if (added) {
LOG.trace("Added peer {} with connection {} to completeConnections", id, connection);
LOG.atTrace()
.setMessage("Added peer {} with connection {} to completeConnections")
.addArgument(id)
.addArgument(connection)
.log();
} else {
LOG.trace("Did not add peer {} with connection {} to completeConnections", id, connection);
LOG.atTrace()
.setMessage("Did not add peer {} with connection {} to completeConnections")
.addArgument(id)
.addArgument(connection)
.log();
}
return added;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
public class EthScheduler {
private static final Logger LOG = LoggerFactory.getLogger(EthScheduler.class);

private final Duration defaultTimeout = Duration.ofSeconds(5);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final CountDownLatch shutdown = new CountDownLatch(1);
private static final int TX_WORKER_CAPACITY = 1_000;
Expand Down Expand Up @@ -219,10 +218,6 @@ public CompletableFuture<Void> scheduleBlockCreationTask(final Runnable task) {
return CompletableFuture.runAsync(task, blockCreationExecutor);
}

public <T> CompletableFuture<T> timeout(final EthTask<T> task) {
return timeout(task, defaultTimeout);
}

public <T> CompletableFuture<T> timeout(final EthTask<T> task, final Duration timeout) {
final CompletableFuture<T> future = task.run();
final CompletableFuture<T> result = timeout(future, timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
public class RetryingGetAccountRangeFromPeerTask
extends AbstractRetryingPeerTask<AccountRangeMessage.AccountRangeData> {

public static final int MAX_RETRIES = 5;
jframe marked this conversation as resolved.
Show resolved Hide resolved
private final EthContext ethContext;
private final Bytes32 startKeyHash;
private final Bytes32 endKeyHash;
Expand All @@ -43,7 +44,10 @@ private RetryingGetAccountRangeFromPeerTask(
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(
ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem);
ethContext,
MAX_RETRIES,
data -> data.accounts().isEmpty() && data.proofs().isEmpty(),
metricsSystem);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,18 @@ protected void handleTaskError(final Throwable error) {
() ->
ethContext
.getScheduler()
// wait for a new peer for up to 5 seconds
.timeout(waitTask, Duration.ofSeconds(5))
// execute the task again
.whenComplete((r, t) -> executeTaskTimed()));
return;
}

LOG.debug(
"Retrying after recoverable failure from peer task {}: {}",
this.getClass().getSimpleName(),
cause.getMessage());
LOG.atDebug()
.setMessage("Retrying after recoverable failure from peer task {}: {}")
.addArgument(this.getClass().getSimpleName())
.addArgument(cause.getMessage())
.log();
// Wait before retrying on failure
executeSubTask(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class PivotBlockRetriever {

private static final Logger LOG = LoggerFactory.getLogger(PivotBlockRetriever.class);
public static final int MAX_QUERY_RETRIES_PER_PEER = 4;
public static final int MAX_QUERY_RETRIES_PER_PEER = 5;
private static final int DEFAULT_MAX_PIVOT_BLOCK_RESETS = 250;
private static final int SUSPICIOUS_NUMBER_OF_RETRIES = 5;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator;
import org.hyperledger.besu.plugin.services.MetricsSystem;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -93,7 +94,9 @@ protected CompletableFuture<Optional<EthPeer>> selectBestAvailableSyncTarget() {
return completedFuture(Optional.empty());
} else {
final EthPeer bestPeer = maybeBestPeer.get();
if (bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
// Do not check the best peers estimated height if we are doing PoS
if (!protocolSchedule.getByBlockHeader(pivotBlockHeader).isPoS()
&& bestPeer.chainState().getEstimatedHeight() < pivotBlockHeader.getNumber()) {
LOG.info(
"Best peer {} has chain height {} below pivotBlock height {}. Waiting for better peers. Current {} of max {}",
maybeBestPeer.map(EthPeer::getLoggableId).orElse("none"),
Expand Down Expand Up @@ -121,7 +124,8 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
task.assignPeer(bestPeer);
return ethContext
.getScheduler()
.timeout(task)
.timeout(task, Duration.ofSeconds(MAX_QUERY_RETRIES_PER_PEER * 5 + 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the 5 that is referred to here coming from the timeout in AbstractRetryingPeerTask? It would be nicer to create a constant there and use it here instead of having a magic value.

// 5 because there is a 5 sec timout per request
.thenCompose(
result -> {
if (peerHasDifferentPivotBlock(result)) {
Expand All @@ -147,11 +151,13 @@ private CompletableFuture<Optional<EthPeer>> confirmPivotBlockHeader(final EthPe
})
.exceptionally(
error -> {
LOG.debug(
"Could not confirm best peer {} had pivot block {}",
bestPeer.getLoggableId(),
pivotBlockHeader.getNumber(),
error);
LOG.atDebug()
.setMessage("Could not confirm best peer {} had pivot block {}, {}")
.addArgument(bestPeer.getLoggableId())
.addArgument(pivotBlockHeader.getNumber())
.addArgument(error)
.log();
bestPeer.disconnect(DisconnectReason.USELESS_PEER_CANNOT_CONFIRM_PIVOT_BLOCK);
return Optional.empty();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public boolean shouldSwitchSyncTarget(final EthPeer currentSyncTarget) {
return maybeBestPeer
.map(
bestPeer -> {
if (ethPeers.getBestChainComparator().compare(bestPeer, currentSyncTarget) <= 0) {
if (ethPeers.getBestPeerComparator().compare(bestPeer, currentSyncTarget) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public RangeHeadersFetcher(

public CompletableFuture<List<BlockHeader>> getNextRangeHeaders(
final EthPeer peer, final BlockHeader previousRangeHeader) {
LOG.atTrace()
.setMessage("Requesting next range headers from peer {}")
.addArgument(peer.getLoggableId())
.log();
final int skip = syncConfig.getDownloaderChainSegmentSize() - 1;
final int maximumHeaderRequestSize = syncConfig.getDownloaderHeaderRequestSize();
final long previousRangeNumber = previousRangeHeader.getNumber();
Expand All @@ -78,11 +82,19 @@ public CompletableFuture<List<BlockHeader>> getNextRangeHeaders(
final BlockHeader targetHeader = finalRangeHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - previousRangeNumber;
if (blocksUntilTarget <= 0) {
LOG.atTrace()
.setMessage("Requesting next range headers: no blocks until target: {}")
.addArgument(blocksUntilTarget)
.log();
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
LOG.atTrace()
.setMessage("Requesting next range headers: additional header count is 0, blocks until target: {}")
.addArgument(blocksUntilTarget)
.log();
return completedFuture(singletonList(targetHeader));
}
} else {
Expand All @@ -97,11 +109,12 @@ private CompletableFuture<List<BlockHeader>> requestHeaders(
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
LOG.trace(
"Requesting {} range headers, starting from {}, {} blocks apart",
headerCount,
referenceHeader.getNumber(),
skip);
LOG.atTrace()
.setMessage("Requesting {} range headers, starting from {}, {} blocks apart")
.addArgument(headerCount)
.addArgument(referenceHeader.getNumber())
.addArgument(skip)
.log();
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
Expand All @@ -114,7 +127,19 @@ private CompletableFuture<List<BlockHeader>> requestHeaders(
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(headers -> stripExistingRangeHeaders(referenceHeader, headers));
.thenApply(
headers -> {
if (headers.size() < headerCount) {
LOG.atTrace()
.setMessage(
"Peer {} returned fewer headers than requested. Expected: {}, Actual: {}")
.addArgument(peer)
.addArgument(headerCount)
.addArgument(headers.size())
.log();
}
return stripExistingRangeHeaders(referenceHeader, headers);
});
}

private List<BlockHeader> stripExistingRangeHeaders(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class SyncTargetRangeSource implements Iterator<SyncTargetRange> {
private static final Logger LOG = LoggerFactory.getLogger(SyncTargetRangeSource.class);
private static final Duration RETRY_DELAY_DURATION = Duration.ofSeconds(2);
public static final int DEFAULT_TIME_TO_WAIT_IN_SECONDS = 6;
jframe marked this conversation as resolved.
Show resolved Hide resolved

private final RangeHeadersFetcher fetcher;
private final SyncTargetChecker syncTargetChecker;
Expand Down Expand Up @@ -70,7 +71,7 @@ public SyncTargetRangeSource(
peer,
commonAncestor,
retriesPermitted,
Duration.ofSeconds(5),
Duration.ofSeconds(DEFAULT_TIME_TO_WAIT_IN_SECONDS),
terminationCondition);
}

Expand Down Expand Up @@ -153,7 +154,7 @@ private SyncTargetRange getRangeFromPendingRequest() {
if (retryCount >= retriesPermitted) {
LOG.atDebug()
.setMessage(
"Disconnecting target peer for providing useless or empty range header: {}.")
"Disconnecting target peer {} for providing useless or empty range headers.")
.addArgument(peer)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
Expand All @@ -169,12 +170,20 @@ private SyncTargetRange getRangeFromPendingRequest() {
} catch (final InterruptedException e) {
LOG.trace("Interrupted while waiting for new range headers", e);
return null;
} catch (final ExecutionException e) {
LOG.debug("Failed to retrieve new range headers", e);
this.pendingRequests = Optional.empty();
} catch (final ExecutionException | TimeoutException e) {
if (e instanceof ExecutionException) {
this.pendingRequests = Optional.empty();
}
retryCount++;
return null;
} catch (final TimeoutException e) {
if (retryCount >= retriesPermitted) {
LOG.atDebug()
.setMessage(
"Disconnecting target peer {} for not providing useful range headers: Exception: {}.")
.addArgument(peer)
.addArgument(e)
.log();
peer.disconnect(DisconnectMessage.DisconnectReason.USELESS_PEER_USELESS_RESPONSES);
}
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CompleteBlocksTask extends AbstractRetryingPeerTask<List<Block>> {
private static final Logger LOG = LoggerFactory.getLogger(CompleteBlocksTask.class);

private static final int MIN_SIZE_INCOMPLETE_LIST = 1;
private static final int DEFAULT_RETRIES = 4;
private static final int DEFAULT_RETRIES = 5;

private final EthContext ethContext;
private final ProtocolSchedule protocolSchedule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
*/
public class DownloadHeaderSequenceTask extends AbstractRetryingPeerTask<List<BlockHeader>> {
private static final Logger LOG = LoggerFactory.getLogger(DownloadHeaderSequenceTask.class);
private static final int DEFAULT_RETRIES = 4;
private static final int DEFAULT_RETRIES = 5;

private final EthContext ethContext;
private final ProtocolContext protocolContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class GetReceiptsForHeadersTask
extends AbstractRetryingPeerTask<Map<BlockHeader, List<TransactionReceipt>>> {
private static final Logger LOG = LoggerFactory.getLogger(GetReceiptsForHeadersTask.class);
private static final int DEFAULT_RETRIES = 4;
private static final int DEFAULT_RETRIES = 5;

private final EthContext ethContext;

Expand Down
Loading
Loading