Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share netty event loops between transports #46346

Merged
merged 10 commits into from
May 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
Expand Down Expand Up @@ -62,14 +61,14 @@
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.NettyAllocator;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
Expand Down Expand Up @@ -126,9 +125,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
// Netty's CompositeByteBuf implementation does not allow less than two components.
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);

public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count",
(s) -> Integer.toString(EsExecutors.allocatedProcessors(s) * 2),
(s) -> Setting.parseInt(s, 1, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = Setting.intSetting("http.netty.worker_count", 0, Property.NodeScope);

public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
Expand All @@ -137,29 +134,30 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;

private final int workerCount;

private final int pipeliningMaxEvents;

private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;

private final int maxCompositeBufferComponents;

private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;

public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings) {
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher, ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;

this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);

this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);

this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());

Expand All @@ -180,10 +178,10 @@ public Settings settings() {
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();

serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.group(sharedGroup.getLowLevelGroup());

// NettyAllocator will return the channel type designed to work with the configuredAllocator
serverBootstrap.channel(NettyAllocator.getServerChannelType());
Expand Down Expand Up @@ -260,9 +258,9 @@ protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Excepti

@Override
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
if (sharedGroup != null) {
sharedGroup.shutdown();
sharedGroup = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -48,6 +49,8 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
public static final String NETTY_TRANSPORT_NAME = "netty4";
public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4";

private final SetOnce<SharedGroupFactory> groupFactory = new SetOnce<>();

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
Expand Down Expand Up @@ -77,7 +80,7 @@ public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadP
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService, getSharedGroupFactory(settings)));
}

@Override
Expand All @@ -90,6 +93,17 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings set
ClusterSettings clusterSettings) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher,
clusterSettings));
clusterSettings, getSharedGroupFactory(settings)));
}

private SharedGroupFactory getSharedGroupFactory(Settings settings) {
SharedGroupFactory groupFactory = this.groupFactory.get();
if (groupFactory != null) {
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";
return groupFactory;
} else {
this.groupFactory.set(new SharedGroupFactory(settings));
return this.groupFactory.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.transport.netty4.Netty4Transport;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Creates and returns {@link io.netty.channel.EventLoopGroup} instances. It will return a shared group for
* both {@link #getHttpGroup()} and {@link #getTransportGroup()} if
* {@link org.elasticsearch.http.netty4.Netty4HttpServerTransport#SETTING_HTTP_WORKER_COUNT} is configured to be 0.
* If that setting is not 0, then it will return a different group in the {@link #getHttpGroup()} call.
*/
public final class SharedGroupFactory {

private static final Logger logger = LogManager.getLogger(SharedGroupFactory.class);

private final Settings settings;
private final int workerCount;
private final int httpWorkerCount;

private RefCountedGroup genericGroup;
private SharedGroup dedicatedHttpGroup;

public SharedGroupFactory(Settings settings) {
this.settings = settings;
this.workerCount = Netty4Transport.WORKER_COUNT.get(settings);
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
this.httpWorkerCount = Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT.get(settings);
}

public Settings getSettings() {
return settings;
}

public int getTransportWorkerCount() {
return workerCount;
}

public synchronized SharedGroup getTransportGroup() {
return getGenericGroup();
}

public synchronized SharedGroup getHttpGroup() {
if (httpWorkerCount == 0) {
return getGenericGroup();
} else {
if (dedicatedHttpGroup == null) {
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(httpWorkerCount,
daemonThreadFactory(settings, HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX));
dedicatedHttpGroup = new SharedGroup(new RefCountedGroup(eventLoopGroup));
}
return dedicatedHttpGroup;
}
}

private SharedGroup getGenericGroup() {
if (genericGroup == null) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(workerCount,
daemonThreadFactory(settings, TcpTransport.TRANSPORT_WORKER_THREAD_NAME_PREFIX));
this.genericGroup = new RefCountedGroup(eventLoopGroup);
} else {
genericGroup.incRef();
}
return new SharedGroup(genericGroup);
}

private static class RefCountedGroup extends AbstractRefCounted {

public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;

private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}

@Override
protected void closeInternal() {
Future<?> shutdownFuture = eventLoopGroup.shutdownGracefully(0, 5, TimeUnit.SECONDS);
shutdownFuture.awaitUninterruptibly();
if (shutdownFuture.isSuccess() == false) {
logger.warn("Error closing netty event loop group", shutdownFuture.cause());
}
}
}

/**
* Wraps the {@link RefCountedGroup}. Calls {@link RefCountedGroup#decRef()} on close. After close,
* this wrapped instance can no longer be used.
*/
public static class SharedGroup {

private final RefCountedGroup refCountedGroup;

private final AtomicBoolean isOpen = new AtomicBoolean(true);

private SharedGroup(RefCountedGroup refCountedGroup) {
this.refCountedGroup = refCountedGroup;
}

public EventLoopGroup getLowLevelGroup() {
return refCountedGroup.eventLoopGroup;
}

public void shutdown() {
if (isOpen.compareAndSet(true, false)) {
refCountedGroup.decRef();
}
}
}
}
Loading