Skip to content

Commit

Permalink
For now use first port/bind host listed
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <carrofin@amazon.com>
  • Loading branch information
finnegancarroll committed Oct 29, 2024
1 parent da4d892 commit 91f7025
Showing 1 changed file with 18 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,66 @@
import io.grpc.netty.NettyServerBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lifecycle.Lifecycle;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.network.NetworkService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.common.transport.BoundTransportAddress;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.grpc.AbstractGrpcServerTransport;
import org.opensearch.grpc.GrpcInfo;
import org.opensearch.grpc.GrpcStats;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.NettyAllocator;
import org.opensearch.transport.SharedGroupFactory;
import io.netty.channel.EventLoopGroup;

import io.grpc.Server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import static org.opensearch.grpc.GrpcTransportSettings.SETTING_GRPC_PORT;
import static org.opensearch.grpc.GrpcTransportSettings.SETTING_GRPC_BIND_HOST;

public class Netty4GrpcServerTransport extends AbstractGrpcServerTransport {
private static final Logger logger = LogManager.getLogger(Netty4GrpcServerTransport.class);

public static final Setting<Integer> SETTING_GRPC_PORT =
Setting.intSetting("grpc.port", 9400, Property.NodeScope);

public static final Setting<ByteSizeValue> SETTING_GRPC_MAX_MESSAGE_SIZE =
Setting.byteSizeSetting("grpc.max_message_size", new ByteSizeValue(100 * 1024 * 1024), Property.NodeScope);

public static final Setting<Integer> SETTING_GRPC_WORKER_COUNT =
Setting.intSetting("grpc.worker_count", 1, Property.NodeScope);
Setting.intSetting("grpc.worker_count", 1, Setting.Property.NodeScope);

private final SharedGroupFactory sharedGroupFactory;
private final int maxMessageSize;
private final int port;
private final String host;
private volatile Server grpcServer;
private volatile SharedGroupFactory.SharedGroup sharedGroup;

public Netty4GrpcServerTransport(
Settings settings,
NetworkService networkService,
BigArrays bigArrays,
ThreadPool threadPool,
ClusterSettings clusterSettings,
SharedGroupFactory sharedGroupFactory
) {
this.sharedGroupFactory = sharedGroupFactory;
this.maxMessageSize = Math.toIntExact(SETTING_GRPC_MAX_MESSAGE_SIZE.get(settings).getBytes());
this.port = SETTING_GRPC_PORT.get(settings);
this.port = SETTING_GRPC_PORT.get(settings).ports()[0]; // TODO: HANDLE PORT RANGE
this.host = SETTING_GRPC_BIND_HOST.get(settings).get(0); // TODO: HANDLE BIND_HOST LIST
}

@Override
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getGRPCGroup();
EventLoopGroup workerGroup = sharedGroup.getLowLevelGroup();

grpcServer = NettyServerBuilder.forPort(port)
.workerEventLoopGroup(workerGroup)
.maxInboundMessageSize(maxMessageSize)
// gRPC service definitions need to be injected here?
// .addService(new GrpcQueryServiceImpl(this))
EventLoopGroup eventLoopGroup = sharedGroup.getLowLevelGroup();

grpcServer = NettyServerBuilder
.forAddress(new InetSocketAddress(host, port))
.bossEventLoopGroup(eventLoopGroup)
.workerEventLoopGroup(eventLoopGroup)
.channelType(NettyAllocator.getServerChannelType())
// TODO: INJECT SERVICE DEFINITIONS // .addService(new GrpcQueryServiceImpl(this))
.build();

// Start the server
grpcServer.start();

// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (grpcServer != null) {
grpcServer.shutdown();
Expand Down Expand Up @@ -115,6 +104,6 @@ protected void doStop() {

@Override
protected void doClose() {

grpcServer.shutdown();
}
}

0 comments on commit 91f7025

Please sign in to comment.