diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java index e3702c2880a18..a7cb21d95f537 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java @@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) { * @throws IOException during channel / context close */ public void closeFromSelector() throws IOException { - if (closeContext.isDone() == false) { + if (isOpen()) { try { rawChannel.close(); closeContext.complete(null); diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java index 3c52423c7aff3..87a2489fdbc27 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java @@ -159,8 +159,7 @@ protected void listenerException(Exception exception) { } /** - * This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a - * channel. + * This method is called after events (READ, WRITE, CONNECT) have been handled for a channel. * * @param context that was handled */ diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java index ab6709bcc5bd4..9f82cc2c50d44 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java @@ -43,9 +43,6 @@ * {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing * of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by * this selector. - *

- * Children of this class should implement the specific {@link #processKey(SelectionKey)}, - * {@link #preSelect()}, and {@link #cleanup()} functionality. */ public class NioSelector implements Closeable { @@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException { this(eventHandler, Selector.open()); } - public NioSelector(EventHandler eventHandler, Selector selector) throws IOException { + public NioSelector(EventHandler eventHandler, Selector selector) { this.selector = selector; this.eventHandler = eventHandler; } @@ -165,7 +162,7 @@ void singleLoop() { } void cleanupAndCloseChannels() { - cleanup(); + cleanupPendingWrites(); channelsToClose.addAll(channelsToRegister); channelsToRegister.clear(); channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext) sk.attachment()).collect(Collectors.toList())); @@ -234,16 +231,6 @@ void preSelect() { handleQueuedWrites(); } - /** - * Called once as the selector is being closed. - */ - void cleanup() { - WriteOperation op; - while ((op = queuedWrites.poll()) != null) { - executeFailedListener(op.getListener(), new ClosedSelectorException()); - } - } - /** * Queues a write operation to be handled by the event loop. This can be called by any thread and is the * api available for non-selector threads to schedule writes. @@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) { } /** - * Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed - * by the selector thread. As a result, this method should only be called by the selector thread. + * Queues a write operation directly in a channel's buffer. If this channel does not have pending writes + * already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector + * thread. As a result, this method should only be called by the selector thread. If this channel does + * not have pending writes already, the channel will be flushed. * * @param writeOperation to be queued in a channel's buffer */ - public void queueWriteInChannelBuffer(WriteOperation writeOperation) { + public void writeToChannel(WriteOperation writeOperation) { assertOnSelectorThread(); SocketChannelContext context = writeOperation.getChannel(); + // If the channel does not currently have anything that is ready to flush, we should flush after + // the write operation is queued. + boolean shouldFlushAfterQueuing = context.readyForFlush() == false; try { SelectionKeyUtils.setWriteInterested(context.getSelectionKey()); context.queueWriteOperation(writeOperation); } catch (Exception e) { + shouldFlushAfterQueuing = false; executeFailedListener(writeOperation.getListener(), e); } + + if (shouldFlushAfterQueuing) { + handleWrite(context); + eventHandler.postHandling(context); + } } /** @@ -332,6 +330,13 @@ public void executeFailedListener(BiConsumer listener, Excepti } } + private void cleanupPendingWrites() { + WriteOperation op; + while ((op = queuedWrites.poll()) != null) { + executeFailedListener(op.getListener(), new ClosedSelectorException()); + } + } + private void wakeup() { // TODO: Do we need the wakeup optimizations that some other libraries use? selector.wakeup(); @@ -394,7 +399,7 @@ private void handleQueuedWrites() { WriteOperation writeOperation; while ((writeOperation = queuedWrites.poll()) != null) { if (writeOperation.getChannel().isOpen()) { - queueWriteInChannelBuffer(writeOperation); + writeToChannel(writeOperation); } else { executeFailedListener(writeOperation.getListener(), new ClosedChannelException()); } diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java index 53be0e7f89fe0..53fb0da432f48 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java @@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer listener) { return; } - selector.queueWriteInChannelBuffer(writeOperation); + selector.writeToChannel(writeOperation); } public void queueWriteOperation(WriteOperation writeOperation) { @@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() { @Override public void closeFromSelector() throws IOException { getSelector().assertOnSelectorThread(); - if (channel.isOpen()) { + if (isOpen()) { ArrayList closingExceptions = new ArrayList<>(3); try { super.closeFromSelector(); diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java index dd3fea8bf50e8..bd5f1c1eb346f 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java @@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception { public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); - assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); + assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE)); - selector.queueWriteInChannelBuffer(writeOperation); + when(channelContext.readyForFlush()).thenReturn(true); + selector.writeToChannel(writeOperation); verify(channelContext).queueWriteOperation(writeOperation); + verify(eventHandler, times(0)).handleWrite(channelContext); + verify(eventHandler, times(0)).postHandling(channelContext); + assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); + } + + public void testShouldFlushIfNoPendingFlushes() throws Exception { + WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener); + + assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE)); + + when(channelContext.readyForFlush()).thenReturn(false); + selector.writeToChannel(writeOperation); + + verify(channelContext).queueWriteOperation(writeOperation); + verify(eventHandler).handleWrite(channelContext); + verify(eventHandler).postHandling(channelContext); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0); } @@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws CancelledKeyException cancelledKeyException = new CancelledKeyException(); when(channelContext.getSelectionKey()).thenReturn(selectionKey); + when(channelContext.readyForFlush()).thenReturn(false); when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException); - selector.queueWriteInChannelBuffer(writeOperation); + selector.writeToChannel(writeOperation); verify(channelContext, times(0)).queueWriteOperation(writeOperation); + verify(eventHandler, times(0)).handleWrite(channelContext); + verify(eventHandler, times(0)).postHandling(channelContext); verify(listener).accept(null, cancelledKeyException); } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java index fdb4a77b922e2..dee50724f34c9 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java @@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() { when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation); context.sendMessage(buffers, listener); - verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture()); + verify(selector).writeToChannel(writeOpCaptor.capture()); WriteOperation writeOp = writeOpCaptor.getValue(); assertSame(writeOperation, writeOp); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index 95af766515777..2170c55ee0192 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -145,7 +145,7 @@ public void closeChannel() { selector.queueWrite(writeOperation); return; } - selector.queueWriteInChannelBuffer(writeOperation); + selector.writeToChannel(writeOperation); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java index 14a22d300d12d..bfee50b65bff4 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContextTests.java @@ -345,7 +345,7 @@ public void testInitiateCloseFromSameThreadSchedulesCloseNotify() { context.closeChannel(); ArgumentCaptor captor = ArgumentCaptor.forClass(WriteOperation.class); - verify(selector).queueWriteInChannelBuffer(captor.capture()); + verify(selector).writeToChannel(captor.capture()); context.queueWriteOperation(captor.getValue()); verify(sslDriver).initiateClose();