Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BFT mining coordinator being temporarily stopped while syncing #7657

Merged
merged 10 commits into from
Oct 9, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- Fix reading `tx-pool-min-score` option from configuration file [#7623](https://github.com/hyperledger/besu/pull/7623)
- Fix an unhandled PeerTable exception [#7733](https://github.com/hyperledger/besu/issues/7733)
- Fix RocksDBException: Busy leading to MerkleTrieException: Unable to load trie node value [#7745](https://github.com/hyperledger/besu/pull/7745)
- If a BFT validator node is syncing, pause block production until sync has completed [#7657](https://github.com/hyperledger/besu/pull/7657)

## 24.9.1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,18 @@ protected MiningCoordinator createMiningCoordinator(
.getValue()
.getBlockPeriodSeconds()));

if (syncState.isInitialSyncPhaseDone()) {
ibftMiningCoordinator.enable();
}
syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping IBFT mining coordinator while we are syncing");
ibftMiningCoordinator.stop();
} else {
LOG.info("Starting IBFT mining coordinator following sync");
ibftMiningCoordinator.enable();
ibftMiningCoordinator.start();
}
});

syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,18 @@ protected MiningCoordinator createMiningCoordinator(
.getEmptyBlockPeriodSeconds());
});

if (syncState.isInitialSyncPhaseDone()) {
miningCoordinator.enable();
}
syncState.subscribeSyncStatus(
syncStatus -> {
if (syncState.syncTarget().isPresent()) {
// We're syncing so stop doing other stuff
LOG.info("Stopping QBFT mining coordinator while we are syncing");
miningCoordinator.stop();
} else {
LOG.info("Starting QBFT mining coordinator following sync");
miningCoordinator.enable();
miningCoordinator.start();
}
});

syncState.subscribeCompletionReached(
new BesuEvents.InitialSyncCompletionListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public void start() {
started.set(true);
}

/** Stop the event queue. No events will be queued for processing until it is started. */
public void stop() {
started.set(false);
}

private boolean isStarted() {
return started.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static BftExecutors create(

/** Start. */
public synchronized void start() {
if (state != State.IDLE) {
if (state != State.IDLE && state != State.STOPPED) {
// Nothing to do
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,13 @@ public BftProcessor(final BftEventQueue incomingQueue, final EventMultiplexer ev
this.eventMultiplexer = eventMultiplexer;
}

/** Indicate to the processor that it can be started */
public synchronized void start() {
shutdown = false;
}

/** Indicate to the processor that it should gracefully stop at its next opportunity */
public void stop() {
public synchronized void stop() {
shutdown = true;
}

Expand All @@ -67,6 +72,8 @@ public void run() {
while (!shutdown) {
nextEvent().ifPresent(eventMultiplexer::handleBftEvent);
}

incomingQueue.stop();
} catch (final Throwable t) {
LOG.error("BFT Mining thread has suffered a fatal error, mining has been halted", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public BftMiningCoordinator(

@Override
public void start() {
if (state.compareAndSet(State.IDLE, State.RUNNING)) {
if (state.compareAndSet(State.IDLE, State.RUNNING)
|| state.compareAndSet(State.STOPPED, State.RUNNING)) {
bftProcessor.start();
bftExecutors.start();
blockAddedObserverId = blockchain.observeBlockAdded(this);
eventHandler.start();
Expand All @@ -110,7 +112,7 @@ public void stop() {
try {
bftProcessor.awaitStop();
} catch (final InterruptedException e) {
LOG.debug("Interrupted while waiting for IbftProcessor to stop.", e);
LOG.debug("Interrupted while waiting for BftProcessor to stop.", e);
Thread.currentThread().interrupt();
}
bftExecutors.stop();
Expand All @@ -135,12 +137,17 @@ public boolean enable() {

@Override
public boolean disable() {
if (state.get() == State.PAUSED
|| state.compareAndSet(State.IDLE, State.PAUSED)
|| state.compareAndSet(State.RUNNING, State.PAUSED)) {
return true;
}
return false;
}

@Override
public boolean isMining() {
return true;
return state.get() == State.RUNNING;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,40 @@ public void doNotAddUntilStarted() throws InterruptedException {
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
}

@Test
public void supportsStopAndRestart() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);
queue.start();

assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
final DummyBftEvent dummyMessageEvent = new DummyBftEvent();
final DummyRoundExpiryBftEvent dummyRoundTimerEvent = new DummyRoundExpiryBftEvent();
final DummyNewChainHeadBftEvent dummyNewChainHeadEvent = new DummyNewChainHeadBftEvent();

queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();

queue.stop();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();

queue.start();
queue.add(dummyMessageEvent);
queue.add(dummyRoundTimerEvent);
queue.add(dummyNewChainHeadEvent);
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNotNull();
assertThat(queue.poll(0, TimeUnit.MICROSECONDS)).isNull();
}

@Test
public void alwaysAddBlockTimerExpiryEvents() throws InterruptedException {
final BftEventQueue queue = new BftEventQueue(MAX_QUEUE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import org.hyperledger.besu.consensus.common.bft.BftEventQueue;
import org.hyperledger.besu.consensus.common.bft.BftExecutors;
Expand Down Expand Up @@ -82,6 +83,28 @@ public void stopsMining() {
verify(bftProcessor).stop();
}

@Test
public void restartsMiningAfterStop() {
assertThat(bftMiningCoordinator.isMining()).isFalse();
bftMiningCoordinator.stop();
verify(bftProcessor, never()).stop();

bftMiningCoordinator.enable();
bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();

bftMiningCoordinator.stop();
assertThat(bftMiningCoordinator.isMining()).isFalse();
verify(bftProcessor).stop();

bftMiningCoordinator.start();
assertThat(bftMiningCoordinator.isMining()).isTrue();

// BFT processor should be started once for every time the mining
// coordinator is restarted
verify(bftProcessor, times(2)).start();
}

@Test
public void getsMinTransactionGasPrice() {
final Wei minGasPrice = Wei.of(10);
Expand Down
Loading