Skip to content

Commit

Permalink
Move nio ip filter rule to be a channel handler (#43507)
Browse files Browse the repository at this point in the history
Currently nio implements ip filtering at the channel context level. This
is kind of a hack as the application logic should be implemented at the
handler level. This commit moves the ip filtering into a channel
handler. This requires adding an indicator to the channel handler to
show when a channel should be closed.
  • Loading branch information
Tim-Brooks committed Jun 24, 2019
1 parent fac7efb commit 38516a4
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,12 @@

import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Predicate;

public class BytesChannelContext extends SocketChannelContext {

public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler handler, InboundChannelBuffer channelBuffer) {
this(channel, selector, exceptionHandler, handler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
}

public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler handler, InboundChannelBuffer channelBuffer,
Predicate<NioSocketChannel> allowChannelPredicate) {
super(channel, selector, exceptionHandler, handler, channelBuffer, allowChannelPredicate);
NioChannelHandler handler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, handler, channelBuffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import java.util.function.BiConsumer;

public abstract class BytesWriteHandler implements ReadWriteHandler {
public abstract class BytesWriteHandler implements NioChannelHandler {

private static final List<FlushOperation> EMPTY_LIST = Collections.emptyList();

Expand All @@ -48,6 +48,11 @@ public List<FlushOperation> pollFlushOperations() {
return EMPTY_LIST;
}

@Override
public boolean closeNow() {
return false;
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.nio;

import java.io.IOException;
import java.util.List;
import java.util.function.BiConsumer;

public abstract class DelegatingHandler implements NioChannelHandler {

private NioChannelHandler delegate;

public DelegatingHandler(NioChannelHandler delegate) {
this.delegate = delegate;
}

@Override
public void channelRegistered() {
this.delegate.channelRegistered();
}

@Override
public WriteOperation createWriteOperation(SocketChannelContext context, Object message, BiConsumer<Void, Exception> listener) {
return delegate.createWriteOperation(context, message, listener);
}

@Override
public List<FlushOperation> writeToBytes(WriteOperation writeOperation) {
return delegate.writeToBytes(writeOperation);
}

@Override
public List<FlushOperation> pollFlushOperations() {
return delegate.pollFlushOperations();
}

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
return delegate.consumeReads(channelBuffer);
}

@Override
public boolean closeNow() {
return delegate.closeNow();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import java.util.function.BiConsumer;

/**
* Implements the application specific logic for handling inbound and outbound messages for a channel.
* Implements the application specific logic for handling channel operations.
*/
public interface ReadWriteHandler {
public interface NioChannelHandler {

/**
* This method is called when the channel is registered with its selector.
Expand Down Expand Up @@ -72,5 +72,12 @@ public interface ReadWriteHandler {
*/
int consumeReads(InboundChannelBuffer channelBuffer) throws IOException;

/**
* This method indicates if the underlying channel should be closed.
*
* @return if the channel should be closed
*/
boolean closeNow();

void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* This context should implement the specific logic for a channel. When a channel receives a notification
Expand All @@ -45,28 +44,23 @@
*/
public abstract class SocketChannelContext extends ChannelContext<SocketChannel> {

protected static final Predicate<NioSocketChannel> ALWAYS_ALLOW_CHANNEL = (c) -> true;

protected final NioSocketChannel channel;
protected final InboundChannelBuffer channelBuffer;
protected final AtomicBoolean isClosing = new AtomicBoolean(false);
private final ReadWriteHandler readWriteHandler;
private final Predicate<NioSocketChannel> allowChannelPredicate;
private final NioChannelHandler readWriteHandler;
private final NioSelector selector;
private final CompletableContext<Void> connectContext = new CompletableContext<>();
private final LinkedList<FlushOperation> pendingFlushes = new LinkedList<>();
private boolean closeNow;
private Exception connectException;

protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
Predicate<NioSocketChannel> allowChannelPredicate) {
NioChannelHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
super(channel.getRawChannel(), exceptionHandler);
this.selector = selector;
this.channel = channel;
this.readWriteHandler = readWriteHandler;
this.channelBuffer = channelBuffer;
this.allowChannelPredicate = allowChannelPredicate;
}

@Override
Expand Down Expand Up @@ -171,9 +165,6 @@ protected FlushOperation getPendingFlush() {
protected void register() throws IOException {
super.register();
readWriteHandler.channelRegistered();
if (allowChannelPredicate.test(channel) == false) {
closeNow = true;
}
}

@Override
Expand Down Expand Up @@ -233,7 +224,7 @@ public boolean readyForFlush() {
public abstract boolean selectorShouldClose();

protected boolean closeNow() {
return closeNow;
return closeNow || readWriteHandler.closeNow();
}

protected void setCloseNow() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/**
* This is a basic write operation that can be queued with a channel. The only requirements of a write
* operation is that is has a listener and a reference to its channel. The actual conversion of the write
* operation implementation to bytes will be performed by the {@link ReadWriteHandler}.
* operation implementation to bytes will be performed by the {@link NioChannelHandler}.
*/
public interface WriteOperation {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class EventHandlerTests extends ESTestCase {
private Consumer<Exception> channelExceptionHandler;
private Consumer<Exception> genericExceptionHandler;

private ReadWriteHandler readWriteHandler;
private NioChannelHandler readWriteHandler;
private EventHandler handler;
private DoNotRegisterSocketContext context;
private DoNotRegisterServerContext serverContext;
Expand All @@ -56,7 +56,7 @@ public class EventHandlerTests extends ESTestCase {
public void setUpHandler() throws IOException {
channelExceptionHandler = mock(Consumer.class);
genericExceptionHandler = mock(Consumer.class);
readWriteHandler = mock(ReadWriteHandler.class);
readWriteHandler = mock(NioChannelHandler.class);
channelFactory = mock(ChannelFactory.class);
NioSelector selector = mock(NioSelector.class);
ArrayList<NioSelector> selectors = new ArrayList<>();
Expand Down Expand Up @@ -260,7 +260,7 @@ private class DoNotRegisterSocketContext extends BytesChannelContext {


DoNotRegisterSocketContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler handler) {
NioChannelHandler handler) {
super(channel, selector, exceptionHandler, handler, InboundChannelBuffer.allocatingInstance());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.function.Predicate;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
Expand All @@ -54,7 +53,7 @@ public class SocketChannelContextTests extends ESTestCase {
private NioSocketChannel channel;
private BiConsumer<Void, Exception> listener;
private NioSelector selector;
private ReadWriteHandler readWriteHandler;
private NioChannelHandler readWriteHandler;
private ByteBuffer ioBuffer = ByteBuffer.allocate(1024);

@SuppressWarnings("unchecked")
Expand All @@ -68,7 +67,7 @@ public void setup() throws Exception {
when(channel.getRawChannel()).thenReturn(rawChannel);
exceptionHandler = mock(Consumer.class);
selector = mock(NioSelector.class);
readWriteHandler = mock(ReadWriteHandler.class);
readWriteHandler = mock(NioChannelHandler.class);
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);

Expand Down Expand Up @@ -102,22 +101,6 @@ public void testSignalWhenPeerClosed() throws IOException {
assertTrue(context.closeNow());
}

public void testValidateInRegisterCanSucceed() throws IOException {
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> true);
assertFalse(context.closeNow());
context.register();
assertFalse(context.closeNow());
}

public void testValidateInRegisterCanFail() throws IOException {
InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance();
context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> false);
assertFalse(context.closeNow());
context.register();
assertTrue(context.closeNow());
}

public void testConnectSucceeds() throws IOException {
AtomicBoolean listenerCalled = new AtomicBoolean(false);
when(rawChannel.finishConnect()).thenReturn(false, true);
Expand Down Expand Up @@ -394,14 +377,8 @@ public void testFlushBuffersHandlesIOExceptionSecondTimeThroughLoop() throws IOE
private static class TestSocketChannelContext extends SocketChannelContext {

private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
this(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL);
}

private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer<Exception> exceptionHandler,
ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer,
Predicate<NioSocketChannel> allowChannelPredicate) {
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate);
NioChannelHandler readWriteHandler, InboundChannelBuffer channelBuffer) {
super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.elasticsearch.http.nio.cors.NioCorsHandler;
import org.elasticsearch.nio.FlushOperation;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.NioChannelHandler;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.TaskScheduler;
import org.elasticsearch.nio.WriteOperation;
Expand All @@ -50,7 +50,7 @@
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;

public class HttpReadWriteHandler implements ReadWriteHandler {
public class HttpReadWriteHandler implements NioChannelHandler {

private final NettyAdaptor adaptor;
private final NioHttpChannel nioHttpChannel;
Expand Down Expand Up @@ -140,6 +140,11 @@ public List<FlushOperation> pollFlushOperations() {
return copiedOperations;
}

@Override
public boolean closeNow() {
return false;
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ReadWriteHandler;
import org.elasticsearch.nio.NioChannelHandler;
import org.elasticsearch.nio.SocketChannelContext;
import org.elasticsearch.nio.WriteOperation;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -207,7 +207,7 @@ public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSo
}
}

private static class HttpClientHandler implements ReadWriteHandler {
private static class HttpClientHandler implements NioChannelHandler {

private final NettyAdaptor adaptor;
private final CountDownLatch latch;
Expand Down Expand Up @@ -277,6 +277,11 @@ public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
return bytesConsumed;
}

@Override
public boolean closeNow() {
return false;
}

@Override
public void close() throws IOException {
try {
Expand Down
Loading

0 comments on commit 38516a4

Please sign in to comment.