Skip to content

Commit

Permalink
Fix assemble
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Nov 23, 2024
1 parent 1bcd572 commit 5d3e269
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ public class Constants {
public static final int ATTESTATION_SUBNET_COUNT = 64;

// Teku Networking Specific
// Global RPC timeout for reading an incoming request or for receiving a response chunk after
// initiating a request
public static final Duration RPC_TIMEOUT = Duration.ofSeconds(10);
public static final int VALID_BLOCK_SET_SIZE = 1000;
// Target holding two slots worth of aggregators (16 aggregators, 64 committees and 2 slots)
public static final int VALID_AGGREGATE_SET_SIZE = 16 * 64 * 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@

package tech.pegasys.teku.networking.eth2.rpc.core;

import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
Expand All @@ -35,6 +34,7 @@ public class Eth2IncomingRequestHandler<
TRequest extends RpcRequest & SszData, TResponse extends SszData>
implements RpcRequestHandler {
private static final Logger LOG = LogManager.getLogger();
private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10);

private final PeerLookup peerLookup;
private final LocalMessageHandler<TRequest, TResponse> localMessageHandler;
Expand Down Expand Up @@ -119,13 +119,13 @@ private void handleRequest(

private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) {
asyncRunner
.getDelayedFuture(RPC_TIMEOUT)
.getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT)
.thenAccept(
(__) -> {
if (!requestHandled.get()) {
LOG.debug(
"Failed to receive incoming request data within {} sec for protocol {}. Close stream.",
RPC_TIMEOUT.toSeconds(),
RECEIVE_INCOMING_REQUEST_TIMEOUT.toSeconds(),
protocolId);
stream.closeAbruptly().ifExceptionGetsHereRaiseABug();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.DATA_COMPLETED;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.EXPECT_DATA;
import static tech.pegasys.teku.networking.eth2.rpc.core.Eth2OutgoingRequestHandler.State.READ_COMPLETE;
import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -55,46 +54,37 @@ enum State {
private static final Logger LOG = LogManager.getLogger();

private final AsyncRunner asyncRunner;
private final AsyncRunner timeoutRunner;
private final int maximumResponseChunks;
private final Eth2RpcResponseHandler<TResponse, ?> responseHandler;
private final ResponseStream<TResponse> responseStream;

private final AtomicBoolean hasReceivedInitialBytes = new AtomicBoolean(false);
private final AtomicInteger currentChunkCount = new AtomicInteger(0);
private final AtomicReference<State> state;
private final AtomicReference<AsyncResponseProcessor<TResponse>> responseProcessor =
new AtomicReference<>();

private final String protocolId;
private final RpcResponseDecoder<TResponse, ?> responseDecoder;
private final boolean shouldReceiveResponse;

public Eth2OutgoingRequestHandler(
final AsyncRunner asyncRunner,
final AsyncRunner timeoutRunner,
final String protocolId,
final RpcResponseDecoder<TResponse, ?> responseDecoder,
final boolean shouldReceiveResponse,
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
this.asyncRunner = asyncRunner;
this.timeoutRunner = timeoutRunner;
this.maximumResponseChunks = request.getMaximumResponseChunks();
this.responseHandler = responseHandler;
responseStream = new ResponseStream<>(responseHandler);
this.responseDecoder = responseDecoder;
this.shouldReceiveResponse = shouldReceiveResponse;
this.protocolId = protocolId;
this.state = new AtomicReference<>(shouldReceiveResponse ? EXPECT_DATA : DATA_COMPLETED);
}

public void handleInitialPayloadSent(final RpcStream stream) {
// Close the write side of the stream
stream.closeWriteStream().ifExceptionGetsHereRaiseABug();
if (!shouldReceiveResponse) {
ensureReadCompleteArrivesInTime(stream);
}
}

@Override
Expand All @@ -111,8 +101,6 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data));
}

onFirstByteReceived(rpcStream);

List<TResponse> maybeResponses = responseDecoder.decodeNextResponses(data);
final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size());

Expand All @@ -123,16 +111,9 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By
for (TResponse maybeResponse : maybeResponses) {
getResponseProcessor(rpcStream).processResponse(maybeResponse);
}
if (chunksReceived < maximumResponseChunks) {
if (!maybeResponses.isEmpty()) {
ensureNextResponseChunkArrivesInTime(rpcStream, chunksReceived, currentChunkCount);
}
} else {
if (!transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
return;
}
ensureReadCompleteArrivesInTime(rpcStream);
if (chunksReceived >= maximumResponseChunks
&& !transferToState(DATA_COMPLETED, List.of(EXPECT_DATA))) {
abortRequest(rpcStream, new IllegalStateException("Unexpected state: " + state));
}
} catch (final RpcException e) {
abortRequest(rpcStream, e);
Expand All @@ -144,14 +125,14 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By

private AsyncResponseProcessor<TResponse> getResponseProcessor(final RpcStream rpcStream) {
return responseProcessor.updateAndGet(
oldVal -> {
if (oldVal == null) {
return new AsyncResponseProcessor<>(
asyncRunner, responseStream, throwable -> abortRequest(rpcStream, throwable));
} else {
return oldVal;
}
});
oldVal ->
Objects.requireNonNullElseGet(
oldVal,
() ->
new AsyncResponseProcessor<>(
asyncRunner,
responseStream,
throwable -> abortRequest(rpcStream, throwable))));
}

private String bufToString(final ByteBuf buf) {
Expand Down Expand Up @@ -204,13 +185,6 @@ private boolean transferToState(final State toState, final Collection<State> fro
return false;
}

private void onFirstByteReceived(final RpcStream rpcStream) {
if (hasReceivedInitialBytes.compareAndSet(false, true)) {
// Setup initial chunk timeout
ensureNextResponseChunkArrivesInTime(rpcStream, currentChunkCount.get(), currentChunkCount);
}
}

private void completeRequest(final RpcStream rpcStream) {
getResponseProcessor(rpcStream)
.finishProcessing()
Expand Down Expand Up @@ -254,40 +228,6 @@ private void abortRequest(final RpcStream rpcStream, final Throwable error, fina
}
}

private void ensureNextResponseChunkArrivesInTime(
final RpcStream stream,
final int previousResponseCount,
final AtomicInteger currentResponseCount) {
timeoutRunner
.getDelayedFuture(RPC_TIMEOUT)
.thenAccept(
(__) -> {
if (previousResponseCount == currentResponseCount.get()) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for response chunk " + previousResponseCount,
RPC_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

private void ensureReadCompleteArrivesInTime(final RpcStream stream) {
timeoutRunner
.getDelayedFuture(RPC_TIMEOUT)
.thenAccept(
(__) -> {
if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) {
abortRequest(
stream,
new RpcTimeoutException(
"Timed out waiting for read channel close", RPC_TIMEOUT));
}
})
.ifExceptionGetsHereRaiseABug();
}

@VisibleForTesting
State getState() {
return state.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public Eth2OutgoingRequestHandler<TRequest, TResponse> createOutgoingRequestHand
final TRequest request,
final Eth2RpcResponseHandler<TResponse, ?> responseHandler) {
return new Eth2OutgoingRequestHandler<>(
asyncRunner,
asyncRunner,
protocolId,
createResponseDecoder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static tech.pegasys.teku.spec.config.Constants.RPC_TIMEOUT;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -96,7 +95,6 @@ public void setup() {
getRpcEncoding(), spec.getGenesisSchemaDefinitions().getSignedBeaconBlockSchema());
return new Eth2OutgoingRequestHandler<>(
asyncRequestRunner,
timeoutRunner,
method.getIds().get(0),
responseDecoder,
method.shouldReceiveResponse(),
Expand Down Expand Up @@ -263,33 +261,6 @@ public void shouldWorkWhenSendAllChunksPlusEmptyExtraChunk() throws Exception {
verify(rpcStream, never()).closeAbruptly();
}

@Test
public void disconnectsIfFirstChunkIsNotReceivedInTime() {
sendInitialPayload();

deliverInitialBytes();

// Run timeouts
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void disconnectsIfSecondChunkNotReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Run timeouts
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis());
timeoutRunner.executeDueActions();
verify(rpcStream).closeAbruptly();
}

@Test
public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
sendInitialPayload();
Expand All @@ -305,29 +276,6 @@ public void shouldCompleteExceptionallyWhenClosedWithTruncatedMessage() {
assertThat(finishedProcessingFuture).isCompletedExceptionally();
}

@Test
public void doNotDisconnectsIfSecondChunkReceivedInTime() {
sendInitialPayload();

timeProvider.advanceTimeByMillis(100);
deliverChunk(0);
asyncRequestRunner.executeQueuedActions();
assertThat(blocks.size()).isEqualTo(1);

// Second chunk is received just in time
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
deliverChunk(1);
asyncRequestRunner.executeQueuedActions();

// Go past the time the second chunk would have timed out but not enough to trigger timeout on
// the third chunk and ensure the timeout never fires.
timeProvider.advanceTimeByMillis(RPC_TIMEOUT.toMillis() - 1);
timeoutRunner.executeDueActions();
verify(rpcStream, never()).closeAbruptly();
assertThat(blocks.size()).isEqualTo(2);
}

@Test
public void shouldWorkWhenInitialPayloadEventIsLate() throws Exception {
deliverChunk(0);
Expand Down

0 comments on commit 5d3e269

Please sign in to comment.