Skip to content

Commit

Permalink
Correctly also run SSL tasks when produced while sending data (java-n…
Browse files Browse the repository at this point in the history
…ative-access#446)

Motivation:

We also need to ensure we run all SSL tasks when sending data as otherwise we might stale.

Modifications:

- Correctly run all tasks in all cases
- Reenable usage of executor during tests

Result:

Correctly handling tasks offloading for all cases
  • Loading branch information
normanmaurer authored Sep 29, 2022
1 parent 1a721e9 commit b5b0ddf
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,39 @@ private void fireExceptionEvents(Throwable cause) {
pipeline().fireExceptionCaught(cause);
}

private boolean runTasksDirectly() {
return sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
sslTaskExecutor == ImmediateEventExecutor.INSTANCE;
}

private void runAllTaskSend(Executor sslTaskExecutor, Runnable task) {
sslTaskExecutor.execute(decorateTaskSend(sslTaskExecutor, task));
}

private Runnable decorateTaskSend(Executor sslTaskExecutor, Runnable task) {
return () -> {
try {
task.run();
} finally {
// Move back to the EventLoop.
eventLoop().execute(() -> {
if (connection != null) {
Runnable nextTask = connection.sslTask();
// Consume all tasks before moving back to the EventLoop.
if (nextTask == null) {
// Call connection send to continue handshake if needed.
if (connectionSend()) {
forceFlushParent();
}
} else {
sslTaskExecutor.execute(decorateTaskSend(sslTaskExecutor, nextTask));
}
}
});
}
};
}

private boolean connectionSendSegments(SegmentedDatagramPacketAllocator segmentedDatagramPacketAllocator) {
List<ByteBuf> bufferList = new ArrayList<>(segmentedDatagramPacketAllocator.maxNumSegments());
long connAddr = connection.address();
Expand Down Expand Up @@ -1177,6 +1210,23 @@ private boolean connectionSend() {
} else {
packetWasWritten = connectionSendSimple();
}

// Process / schedule all tasks that were created.
Runnable task = connection.sslTask();
if (task != null) {
if (runTasksDirectly()) {
// Consume all tasks
do {
task.run();
} while ((task = connection.sslTask()) != null);

// Let's try again sending after we did process all tasks.
return packetWasWritten | connectionSend();
} else {
runAllTaskSend(sslTaskExecutor, task);
}
}

if (packetWasWritten) {
timeoutHandler.scheduleTimeout();
}
Expand Down Expand Up @@ -1326,80 +1376,27 @@ void connectionRecv(InetSocketAddress recipient, InetSocketAddress sender, ByteB
fireExceptionEvents(e);
}

// Process / schedule all tasks that were created.
Runnable task = connection.sslTask();
if (task != null) {
if (sslTaskExecutor == null || sslTaskExecutor == ImmediateExecutor.INSTANCE ||
sslTaskExecutor == ImmediateEventExecutor.INSTANCE) {
if (runTasksDirectly()) {
// Consume all tasks
do {
task.run();
} while ((task = connection.sslTask()) != null);
processReceived(connAddr);
} else {
Runnable finalTask = task;
sslTaskExecutor.execute(() -> {
try {
finalTask.run();
} finally {
// Move back to the EventLoop.
eventLoop().execute(() -> {
if (connection != null) {
Runnable nextTask = connection.sslTask();
// Consume all tasks before moving back to the EventLoop.
if (nextTask == null) {
// Call connection send to continue handshake if needed.
if (connectionSend()) {
forceFlushParent();
}
} else {
sslTaskExecutor.execute(nextTask);
}
}
});
}
});
}
}

// Handle pending channelActive if needed.
if (handlePendingChannelActive()) {
// Connection was closed right away.
return;
}

notifyAboutHandshakeCompletionIfNeeded(null);

if (Quiche.quiche_conn_is_established(connAddr) ||
Quiche.quiche_conn_is_in_early_data(connAddr)) {
long uniLeftOld = uniStreamsLeft;
long bidiLeftOld = bidiStreamsLeft;
// Only fetch new stream info when we used all our credits
if (uniLeftOld == 0 || bidiLeftOld == 0) {
long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
uniStreamsLeft = uniLeft;
bidiStreamsLeft = bidiLeft;
if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
}
}

if (handleWritableStreams()) {
// Some data was produced, let's flush.
flushParent();
runAllTaskRecv(sslTaskExecutor, task);
}

datagramReadable = true;
streamReadable = true;
recvDatagram();
recvStream();
} else {
processReceived(connAddr);
}

if (done) {
break;
} else {
memoryAddress += res;
bufferReadable -= res;
}
memoryAddress += res;
bufferReadable -= res;
} while (bufferReadable > 0);
} finally {
buffer.skipBytes((int) (memoryAddress - Quiche.memoryAddress(buffer)));
Expand All @@ -1416,6 +1413,71 @@ void connectionRecv(InetSocketAddress recipient, InetSocketAddress sender, ByteB
}
}

private void processReceived(long connAddr) {
// Handle pending channelActive if needed.
if (handlePendingChannelActive()) {
// Connection was closed right away.
return;
}

notifyAboutHandshakeCompletionIfNeeded(null);

if (Quiche.quiche_conn_is_established(connAddr) ||
Quiche.quiche_conn_is_in_early_data(connAddr)) {
long uniLeftOld = uniStreamsLeft;
long bidiLeftOld = bidiStreamsLeft;
// Only fetch new stream info when we used all our credits
if (uniLeftOld == 0 || bidiLeftOld == 0) {
long uniLeft = Quiche.quiche_conn_peer_streams_left_uni(connAddr);
long bidiLeft = Quiche.quiche_conn_peer_streams_left_bidi(connAddr);
uniStreamsLeft = uniLeft;
bidiStreamsLeft = bidiLeft;
if (uniLeftOld != uniLeft || bidiLeftOld != bidiLeft) {
pipeline().fireUserEventTriggered(QuicStreamLimitChangedEvent.INSTANCE);
}
}

if (handleWritableStreams()) {
// Some data was produced, let's flush.
flushParent();
}

datagramReadable = true;
streamReadable = true;
recvDatagram();
recvStream();
}
}

private void runAllTaskRecv(Executor sslTaskExecutor, Runnable task) {
sslTaskExecutor.execute(decorateTaskRecv(sslTaskExecutor, task));
}

private Runnable decorateTaskRecv(Executor sslTaskExecutor, Runnable task) {
return () -> {
try {
task.run();
} finally {
// Move back to the EventLoop.
eventLoop().execute(() -> {
if (connection != null) {
Runnable nextTask = connection.sslTask();
// Consume all tasks before moving back to the EventLoop.
if (nextTask == null) {
processReceived(connection.address());

// Call connection send to continue handshake if needed.
if (connectionSend()) {
forceFlushParent();
}
} else {
sslTaskExecutor.execute(decorateTaskRecv(sslTaskExecutor, nextTask));
}
}
});
}
};
}
void recv() {
if ((reantranceGuard & IN_RECV) != 0 || isConnDestroyed()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,12 @@ Runnable sslTask() {
if (task == null) {
return null;
}

return () -> {
if (connection == -1) {
return;
}

task.run();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ public static void shutdownExecutor() {
public static void createExecutors() {
executors = new Executor[] {
ImmediateExecutor.INSTANCE,
// TODO: Investigate
//Executors.newCachedThreadPool()
Executors.newCachedThreadPool()
};
}

Expand Down

0 comments on commit b5b0ddf

Please sign in to comment.