diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java index a82d381951b76..9984f1a18e5f9 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/BytesChannelContext.java @@ -21,12 +21,19 @@ import java.io.IOException; import java.util.function.Consumer; +import java.util.function.Predicate; public class BytesChannelContext extends SocketChannelContext { public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, ReadWriteHandler handler, InboundChannelBuffer channelBuffer) { - super(channel, selector, exceptionHandler, handler, channelBuffer); + this(channel, selector, exceptionHandler, handler, channelBuffer, ALWAYS_ALLOW_CHANNEL); + } + + public BytesChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, + ReadWriteHandler handler, InboundChannelBuffer channelBuffer, + Predicate allowChannelPredicate) { + super(channel, selector, exceptionHandler, handler, channelBuffer, allowChannelPredicate); } @Override @@ -77,7 +84,7 @@ public void closeChannel() { @Override public boolean selectorShouldClose() { - return isPeerClosed() || hasIOException() || isClosing.get(); + return closeNow() || isClosing.get(); } /** diff --git a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java index a7cb21d95f537..b26636cb1581e 100644 --- a/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java +++ b/libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java @@ -47,6 +47,11 @@ public abstract class ChannelContext { + public static final Predicate ALWAYS_ALLOW_CHANNEL = (c) -> true; + protected final NioSocketChannel channel; protected final InboundChannelBuffer channelBuffer; protected final AtomicBoolean isClosing = new AtomicBoolean(false); private final ReadWriteHandler readWriteHandler; + private final Predicate allowChannelPredicate; private final NioSelector selector; private final CompletableContext connectContext = new CompletableContext<>(); private final LinkedList pendingFlushes = new LinkedList<>(); - private boolean ioException; - private boolean peerClosed; + private boolean closeNow; private Exception connectException; protected SocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, - ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { + ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer, + Predicate allowChannelPredicate) { super(channel.getRawChannel(), exceptionHandler); this.selector = selector; this.channel = channel; this.readWriteHandler = readWriteHandler; this.channelBuffer = channelBuffer; + this.allowChannelPredicate = allowChannelPredicate; } @Override @@ -161,6 +166,14 @@ protected FlushOperation getPendingFlush() { return pendingFlushes.peekFirst(); } + @Override + protected void register() throws IOException { + super.register(); + if (allowChannelPredicate.test(channel) == false) { + closeNow = true; + } + } + @Override public void closeFromSelector() throws IOException { getSelector().assertOnSelectorThread(); @@ -217,24 +230,20 @@ public boolean readyForFlush() { */ public abstract boolean selectorShouldClose(); - protected boolean hasIOException() { - return ioException; - } - - protected boolean isPeerClosed() { - return peerClosed; + protected boolean closeNow() { + return closeNow; } protected int readFromChannel(ByteBuffer buffer) throws IOException { try { int bytesRead = rawChannel.read(buffer); if (bytesRead < 0) { - peerClosed = true; + closeNow = true; bytesRead = 0; } return bytesRead; } catch (IOException e) { - ioException = true; + closeNow = true; throw e; } } @@ -243,12 +252,12 @@ protected int readFromChannel(ByteBuffer[] buffers) throws IOException { try { int bytesRead = (int) rawChannel.read(buffers); if (bytesRead < 0) { - peerClosed = true; + closeNow = true; bytesRead = 0; } return bytesRead; } catch (IOException e) { - ioException = true; + closeNow = true; throw e; } } @@ -257,7 +266,7 @@ protected int flushToChannel(ByteBuffer buffer) throws IOException { try { return rawChannel.write(buffer); } catch (IOException e) { - ioException = true; + closeNow = true; throw e; } } @@ -266,7 +275,7 @@ protected int flushToChannel(ByteBuffer[] buffers) throws IOException { try { return (int) rawChannel.write(buffers); } catch (IOException e) { - ioException = true; + closeNow = true; throw e; } } diff --git a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java index dee50724f34c9..bc9a7c33f0f77 100644 --- a/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java +++ b/libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import static org.mockito.Matchers.any; @@ -77,7 +78,7 @@ public void testIOExceptionSetIfEncountered() throws IOException { when(rawChannel.write(any(ByteBuffer.class))).thenThrow(new IOException()); when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenThrow(new IOException()); when(rawChannel.read(any(ByteBuffer.class))).thenThrow(new IOException()); - assertFalse(context.hasIOException()); + assertFalse(context.closeNow()); expectThrows(IOException.class, () -> { if (randomBoolean()) { context.read(); @@ -85,15 +86,31 @@ public void testIOExceptionSetIfEncountered() throws IOException { context.flushChannel(); } }); - assertTrue(context.hasIOException()); + assertTrue(context.closeNow()); } public void testSignalWhenPeerClosed() throws IOException { when(rawChannel.read(any(ByteBuffer[].class), anyInt(), anyInt())).thenReturn(-1L); when(rawChannel.read(any(ByteBuffer.class))).thenReturn(-1); - assertFalse(context.isPeerClosed()); + assertFalse(context.closeNow()); context.read(); - assertTrue(context.isPeerClosed()); + assertTrue(context.closeNow()); + } + + public void testValidateInRegisterCanSucceed() throws IOException { + InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance(); + context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> true); + assertFalse(context.closeNow()); + context.register(); + assertFalse(context.closeNow()); + } + + public void testValidateInRegisterCanFail() throws IOException { + InboundChannelBuffer channelBuffer = InboundChannelBuffer.allocatingInstance(); + context = new TestSocketChannelContext(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, (c) -> false); + assertFalse(context.closeNow()); + context.register(); + assertTrue(context.closeNow()); } public void testConnectSucceeds() throws IOException { @@ -277,7 +294,13 @@ private static class TestSocketChannelContext extends SocketChannelContext { private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { - super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer); + this(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL); + } + + private TestSocketChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, + ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer, + Predicate allowChannelPredicate) { + super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate); } @Override @@ -309,6 +332,11 @@ public boolean selectorShouldClose() { public void closeChannel() { isClosing.set(true); } + + @Override + void doSelectorRegister() { + // We do not want to call the actual register with selector method as it will throw a NPE + } } private static byte[] createMessage(int length) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java index 8d813925e33dc..610876e1c54ec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityField.java @@ -13,6 +13,7 @@ public final class SecurityField { public static final String NAME4 = XPackField.SECURITY + "4"; + public static final String NIO = XPackField.SECURITY + "-nio"; public static final Setting> USER_SETTING = new Setting<>(setting("user"), (String) null, Optional::ofNullable, Setting.Property.NodeScope); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java index c48245c054fb8..bceb1de29491d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecuritySettings.java @@ -19,9 +19,10 @@ public static Settings addTransportSettings(final Settings settings) { final Settings.Builder builder = Settings.builder(); if (NetworkModule.TRANSPORT_TYPE_SETTING.exists(settings)) { final String transportType = NetworkModule.TRANSPORT_TYPE_SETTING.get(settings); - if (SecurityField.NAME4.equals(transportType) == false) { + if (SecurityField.NAME4.equals(transportType) == false && SecurityField.NIO.equals(transportType) == false) { throw new IllegalArgumentException("transport type setting [" + NetworkModule.TRANSPORT_TYPE_KEY - + "] must be [" + SecurityField.NAME4 + "] but is [" + transportType + "]"); + + "] must be [" + SecurityField.NAME4 + "] or [" + SecurityField.NIO + "]" + " but is [" + + transportType + "]"); } } else { // default to security4 @@ -39,7 +40,7 @@ public static Settings addUserSettings(final Settings settings) { final int i = userSetting.indexOf(":"); if (i < 0 || i == userSetting.length() - 1) { throw new IllegalArgumentException("invalid [" + SecurityField.USER_SETTING.getKey() - + "] setting. must be in the form of \":\""); + + "] setting. must be in the form of \":\""); } String username = userSetting.substring(0, i); String password = userSetting.substring(i + 1); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 5b4f8cbbdef68..596acaeeac6a0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -203,6 +203,7 @@ import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport; import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4ServerTransport; import org.elasticsearch.xpack.core.template.TemplateUtils; +import org.elasticsearch.xpack.security.transport.nio.SecurityNioTransport; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -846,8 +847,14 @@ public Map> getTransports(Settings settings, ThreadP if (transportClientMode || enabled == false) { // don't register anything if we are not enabled, or in transport client mode return Collections.emptyMap(); } - return Collections.singletonMap(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool, - networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + + Map> transports = new HashMap<>(); + transports.put(SecurityField.NAME4, () -> new SecurityNetty4ServerTransport(settings, threadPool, + networkService, bigArrays, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + transports.put(SecurityField.NIO, () -> new SecurityNioTransport(settings, threadPool, + networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, ipFilter.get(), getSslService())); + + return Collections.unmodifiableMap(transports); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java index 2170c55ee0192..da348ea1f78e1 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SSLChannelContext.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; /** * Provides a TLS/SSL read/write layer over a channel. This context will use a {@link SSLDriver} to handshake @@ -30,7 +31,13 @@ public final class SSLChannelContext extends SocketChannelContext { SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer) { - super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer); + this(channel, selector, exceptionHandler, sslDriver, readWriteHandler, channelBuffer, ALWAYS_ALLOW_CHANNEL); + } + + SSLChannelContext(NioSocketChannel channel, NioSelector selector, Consumer exceptionHandler, SSLDriver sslDriver, + ReadWriteHandler readWriteHandler, InboundChannelBuffer channelBuffer, + Predicate allowChannelPredicate) { + super(channel, selector, exceptionHandler, readWriteHandler, channelBuffer, allowChannelPredicate); this.sslDriver = sslDriver; } @@ -52,7 +59,7 @@ public void queueWriteOperation(WriteOperation writeOperation) { @Override public void flushChannel() throws IOException { - if (hasIOException()) { + if (closeNow()) { return; } // If there is currently data in the outbound write buffer, flush the buffer. @@ -116,7 +123,7 @@ public boolean readyForFlush() { @Override public int read() throws IOException { int bytesRead = 0; - if (hasIOException()) { + if (closeNow()) { return bytesRead; } bytesRead = readFromChannel(sslDriver.getNetworkReadBuffer()); @@ -133,7 +140,7 @@ public int read() throws IOException { @Override public boolean selectorShouldClose() { - return isPeerClosed() || hasIOException() || sslDriver.isClosed(); + return closeNow() || sslDriver.isClosed(); } @Override diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java index 874dc36a31cce..1e00019793025 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/nio/SecurityNioTransport.java @@ -5,30 +5,39 @@ */ package org.elasticsearch.xpack.security.transport.nio; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.network.CloseableChannel; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.nio.BytesChannelContext; import org.elasticsearch.nio.InboundChannelBuffer; import org.elasticsearch.nio.NioSelector; import org.elasticsearch.nio.NioSocketChannel; import org.elasticsearch.nio.ServerChannelContext; +import org.elasticsearch.nio.SocketChannelContext; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.nio.NioTcpChannel; import org.elasticsearch.transport.nio.NioTcpServerChannel; import org.elasticsearch.transport.nio.NioTransport; import org.elasticsearch.transport.nio.TcpReadWriteHandler; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.security.transport.SSLExceptionHelper; import org.elasticsearch.xpack.core.security.transport.netty4.SecurityNetty4Transport; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; import org.elasticsearch.xpack.core.ssl.SSLService; +import org.elasticsearch.xpack.security.transport.filter.IPFilter; import javax.net.ssl.SSLEngine; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; @@ -36,6 +45,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.function.Supplier; import static org.elasticsearch.xpack.core.security.SecurityField.setting; @@ -45,42 +55,83 @@ * protocol that allows two channels to go through a handshake process prior to application data being * exchanged. The handshake process enables the channels to exchange parameters that will allow them to * encrypt the application data they exchange. - * + *

* The specific SSL/TLS parameters and configurations are setup in the {@link SSLService} class. The actual * implementation of the SSL/TLS layer is in the {@link SSLChannelContext} and {@link SSLDriver} classes. */ public class SecurityNioTransport extends NioTransport { - private final SSLConfiguration sslConfiguration; + private final IPFilter authenticator; private final SSLService sslService; private final Map profileConfiguration; private final boolean sslEnabled; - SecurityNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, - CircuitBreakerService circuitBreakerService, SSLService sslService) { + public SecurityNioTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, + PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService, @Nullable final IPFilter authenticator, + SSLService sslService) { super(settings, threadPool, networkService, bigArrays, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService); + this.authenticator = authenticator; this.sslService = sslService; this.sslEnabled = XPackSettings.TRANSPORT_SSL_ENABLED.get(settings); final Settings transportSSLSettings = settings.getByPrefix(setting("transport.ssl.")); if (sslEnabled) { - this.sslConfiguration = sslService.sslConfiguration(transportSSLSettings, Settings.EMPTY); Map profileSettingsMap = settings.getGroups("transport.profiles.", true); Map profileConfiguration = new HashMap<>(profileSettingsMap.size() + 1); for (Map.Entry entry : profileSettingsMap.entrySet()) { Settings profileSettings = entry.getValue(); final Settings profileSslSettings = SecurityNetty4Transport.profileSslSettings(profileSettings); - SSLConfiguration configuration = sslService.sslConfiguration(profileSslSettings, transportSSLSettings); + SSLConfiguration configuration = sslService.sslConfiguration(profileSslSettings, transportSSLSettings); profileConfiguration.put(entry.getKey(), configuration); } if (profileConfiguration.containsKey(TcpTransport.DEFAULT_PROFILE) == false) { - profileConfiguration.put(TcpTransport.DEFAULT_PROFILE, sslConfiguration); + profileConfiguration.put(TcpTransport.DEFAULT_PROFILE, sslService.sslConfiguration(transportSSLSettings, Settings.EMPTY)); } this.profileConfiguration = Collections.unmodifiableMap(profileConfiguration); } else { - throw new IllegalArgumentException("Currently only support SSL enabled."); + profileConfiguration = Collections.emptyMap(); + } + } + + @Override + protected void doStart() { + super.doStart(); + if (authenticator != null) { + authenticator.setBoundTransportAddress(boundAddress(), profileBoundAddresses()); + } + } + + @Override + public void onException(TcpChannel channel, Exception e) { + if (!lifecycle.started()) { + // just close and ignore - we are already stopped and just need to make sure we release all resources + CloseableChannel.closeChannel(channel); + } else if (SSLExceptionHelper.isNotSslRecordException(e)) { + if (logger.isTraceEnabled()) { + logger.trace( + new ParameterizedMessage("received plaintext traffic on an encrypted channel, closing connection {}", channel), e); + } else { + logger.warn("received plaintext traffic on an encrypted channel, closing connection {}", channel); + } + CloseableChannel.closeChannel(channel); + } else if (SSLExceptionHelper.isCloseDuringHandshakeException(e)) { + if (logger.isTraceEnabled()) { + logger.trace(new ParameterizedMessage("connection {} closed during ssl handshake", channel), e); + } else { + logger.warn("connection {} closed during handshake", channel); + } + CloseableChannel.closeChannel(channel); + } else if (SSLExceptionHelper.isReceivedCertificateUnknownException(e)) { + if (logger.isTraceEnabled()) { + logger.trace(new ParameterizedMessage("client did not trust server's certificate, closing connection {}", channel), e); + } else { + logger.warn("client did not trust this server's certificate, closing connection {}", channel); + } + CloseableChannel.closeChannel(channel); + } else { + super.onException(channel, e); } } @@ -89,9 +140,13 @@ protected TcpChannelFactory channelFactory(ProfileSettings profileSettings, bool return new SecurityTcpChannelFactory(profileSettings, isClient); } - @Override - protected void acceptChannel(NioSocketChannel channel) { - super.acceptChannel(channel); + private boolean validateChannel(NioSocketChannel channel) { + if (authenticator != null) { + NioTcpChannel nioTcpChannel = (NioTcpChannel) channel; + return authenticator.accept(nioTcpChannel.getProfile(), nioTcpChannel.getRemoteAddress()); + } else { + return true; + } } private class SecurityTcpChannelFactory extends TcpChannelFactory { @@ -101,30 +156,46 @@ private class SecurityTcpChannelFactory extends TcpChannelFactory { private SecurityTcpChannelFactory(ProfileSettings profileSettings, boolean isClient) { super(new RawChannelFactory(profileSettings.tcpNoDelay, - profileSettings.tcpKeepAlive, - profileSettings.reuseAddress, - Math.toIntExact(profileSettings.sendBufferSize.getBytes()), - Math.toIntExact(profileSettings.receiveBufferSize.getBytes()))); + profileSettings.tcpKeepAlive, + profileSettings.reuseAddress, + Math.toIntExact(profileSettings.sendBufferSize.getBytes()), + Math.toIntExact(profileSettings.receiveBufferSize.getBytes()))); this.profileName = profileSettings.profileName; this.isClient = isClient; } @Override public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException { - SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE); - SSLEngine sslEngine = sslService.createSSLEngine(profileConfiguration.getOrDefault(profileName, defaultConfig), null, -1); - SSLDriver sslDriver = new SSLDriver(sslEngine, isClient); NioTcpChannel nioChannel = new NioTcpChannel(profileName, channel); + SocketChannelContext context; Supplier pageSupplier = () -> { Recycler.V bytes = pageCacheRecycler.bytePage(false); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close); }; - TcpReadWriteHandler readWriteHandler = new TcpReadWriteHandler(nioChannel, SecurityNioTransport.this); InboundChannelBuffer buffer = new InboundChannelBuffer(pageSupplier); Consumer exceptionHandler = (e) -> onException(nioChannel, e); - SSLChannelContext context = new SSLChannelContext(nioChannel, selector, exceptionHandler, sslDriver, readWriteHandler, buffer); + Predicate filter = SecurityNioTransport.this::validateChannel; + + if (sslEnabled) { + SSLEngine sslEngine; + SSLConfiguration defaultConfig = profileConfiguration.get(TcpTransport.DEFAULT_PROFILE); + SSLConfiguration sslConfig = profileConfiguration.getOrDefault(profileName, defaultConfig); + boolean hostnameVerificationEnabled = sslConfig.verificationMode().isHostnameVerificationEnabled(); + if (hostnameVerificationEnabled) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.getRemoteAddress(); + // we create the socket based on the name given. don't reverse DNS + sslEngine = sslService.createSSLEngine(sslConfig, inetSocketAddress.getHostString(), inetSocketAddress.getPort()); + } else { + sslEngine = sslService.createSSLEngine(sslConfig, null, -1); + } + SSLDriver sslDriver = new SSLDriver(sslEngine, isClient); + context = new SSLChannelContext(nioChannel, selector, exceptionHandler, sslDriver, readWriteHandler, buffer, filter); + } else { + context = new BytesChannelContext(nioChannel, selector, exceptionHandler, readWriteHandler, buffer, filter); + } nioChannel.setContext(context); + return nioChannel; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java index 97dd7866dc006..e6db3407496eb 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecurityIntegTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkAddress; +import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; @@ -242,6 +243,7 @@ protected Settings nodeSettings(int nodeOrdinal) { Settings customSettings = customSecuritySettingsSource.nodeSettings(nodeOrdinal); builder.put(customSettings, false); // handle secure settings separately builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); + builder.put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? SecurityField.NAME4 : SecurityField.NIO); Settings.Builder customBuilder = Settings.builder().put(customSettings); if (customBuilder.getSecureSettings() != null) { SecuritySettingsSource.addSecureSettings(builder, secureSettings -> @@ -262,6 +264,7 @@ protected Path nodeConfigPath(int nodeOrdinal) { @Override protected Settings transportClientSettings() { return Settings.builder().put(super.transportClientSettings()) + .put(NetworkModule.TRANSPORT_TYPE_KEY, SecurityField.NIO) .put(customSecuritySettingsSource.transportClientSettings()) .build(); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySettingsSource.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySettingsSource.java index cc8c61a5c32e4..2e0662264a248 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySettingsSource.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/test/SecuritySettingsSource.java @@ -21,12 +21,12 @@ import org.elasticsearch.transport.Netty4Plugin; import org.elasticsearch.xpack.core.XPackClientPlugin; import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.security.LocalStateSecurity; import org.elasticsearch.xpack.core.security.SecurityField; -import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail; import org.elasticsearch.xpack.core.security.authc.esnative.NativeRealmSettings; import org.elasticsearch.xpack.core.security.authc.file.FileRealmSettings; import org.elasticsearch.xpack.core.security.authc.support.Hasher; +import org.elasticsearch.xpack.security.LocalStateSecurity; +import org.elasticsearch.xpack.security.audit.logfile.LoggingAuditTrail; import java.io.IOException; import java.io.InputStream; @@ -125,6 +125,7 @@ public Settings nodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal)) .put(XPackSettings.SECURITY_ENABLED.getKey(), true) + .put(NetworkModule.TRANSPORT_TYPE_KEY, randomBoolean() ? SecurityField.NAME4 : SecurityField.NIO) //TODO: for now isolate security tests from watcher & monitoring (randomize this later) .put(XPackSettings.WATCHER_ENABLED.getKey(), false) .put(XPackSettings.MONITORING_ENABLED.getKey(), false) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java index 2e2a931f78f87..cb1b69708bdf2 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/index/IndexAuditTrailTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.security.SecurityField; import org.elasticsearch.xpack.core.security.authc.Authentication; import org.elasticsearch.xpack.core.security.authc.Authentication.RealmRef; import org.elasticsearch.xpack.core.security.authc.AuthenticationToken; @@ -187,7 +188,9 @@ public Settings nodeSettings(int nodeOrdinal) { // Disable native ML autodetect_process as the c++ controller won't be available // .put(MachineLearningField.AUTODETECT_PROCESS.getKey(), false) .put(XPackSettings.SECURITY_ENABLED.getKey(), useSecurity); - if (useSecurity == false && builder.get(NetworkModule.TRANSPORT_TYPE_KEY) == null) { + String transport = builder.get(NetworkModule.TRANSPORT_TYPE_KEY); + if (useSecurity == false && (transport == null || SecurityField.NAME4.equals(transport) + || SecurityField.NIO.equals(transport))) { builder.put(NetworkModule.TRANSPORT_TYPE_KEY, getTestTransportType()); } return builder.build(); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index c5a6a525d4e10..835fcf302c727 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -80,7 +80,7 @@ public MockTransportService nioFromThreadPool(Settings settings, ThreadPool thre .put("xpack.security.transport.ssl.enabled", true).build(); Transport transport = new SecurityNioTransport(settings1, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, new MockPageCacheRecycler(settings), namedWriteableRegistry, - new NoneCircuitBreakerService(), createSSLService()) { + new NoneCircuitBreakerService(), null, createSSLService()) { @Override protected Version executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout) throws IOException,