Skip to content

Commit

Permalink
refactor: abstract gRPC server (#156)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang authored Nov 8, 2024
1 parent ea910b3 commit 8241fdf
Show file tree
Hide file tree
Showing 46 changed files with 435 additions and 882 deletions.
24 changes: 24 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.map.evenodd.EvenOddFunction
Expand All @@ -156,6 +159,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>io.numaproj.numaflow.examples.sink.simple.SimpleSink
</mainClass>
Expand All @@ -172,6 +178,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reduce.sum.SumFactory
Expand All @@ -189,6 +198,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducestreamer.sum.SumFactory
Expand Down Expand Up @@ -228,6 +240,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reduce.count.CounterFactory
Expand All @@ -245,6 +260,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sideinput.simple.SimpleSideInput
Expand All @@ -263,6 +281,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.sideinput.udf.SimpleMapWithSideInput
Expand Down Expand Up @@ -302,6 +323,9 @@
<goal>dockerBuild</goal>
</goals>
<configuration>
<from>
<image>amazoncorretto:11</image>
</from>
<container>
<mainClass>
io.numaproj.numaflow.examples.reducesession.counter.CountFactory
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.batchmapper;

import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;

Expand All @@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;

Expand Down
78 changes: 22 additions & 56 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import io.numaproj.numaflow.shared.GrpcServerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
* Server is the gRPC server for executing batch map operation.
Expand All @@ -24,8 +23,7 @@ public class Server {
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;
private final GrpcServerWrapper server;

/**
* constructor to create sink gRPC server.
Expand All @@ -36,6 +34,14 @@ public Server(BatchMapper batchMapper) {
this(batchMapper, GRPCConfig.defaultGrpcConfig());
}

@VisibleForTesting
protected Server(GRPCConfig grpcConfig, BatchMapper service, ServerInterceptor interceptor, String serverName) {
this.grpcConfig = grpcConfig;
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(service, this.shutdownSignal);
this.server = new GrpcServerWrapper(interceptor, serverName, this.service);
}

/**
* constructor to create sink gRPC server with gRPC config.
*
Expand All @@ -46,7 +52,7 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(batchMapper, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
this.server = new GrpcServerWrapper(this.grpcConfig, this.service);
}

/**
Expand All @@ -56,56 +62,34 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
this.serverInfoAccessor,
this.grpcConfig.getSocketPath(),
this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
this.server = grpcServerHelper.createServer(
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort(),
this.service);
}

server.start();
this.server.start();

log.info(
"server started, listening on socket path: " + grpcConfig.getSocketPath());
log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());

// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
this.stop();
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server != null && server.isTerminated()) {
return;
}

this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
this.stop();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);
Expand All @@ -123,7 +107,7 @@ public void start() throws Exception {
*/
public void awaitTermination() throws InterruptedException {
log.info("batch map server is waiting for termination");
server.awaitTermination();
this.server.awaitTermination();
log.info("batch map server has terminated");
}

Expand All @@ -134,25 +118,7 @@ public void awaitTermination() throws InterruptedException {
* @throws InterruptedException if shutdown is interrupted
*/
public void stop() throws InterruptedException {
this.server.gracefullyShutdown();
this.service.shutDown();
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
// force shutdown if not terminated
if (!server.isTerminated()) {
server.shutdownNow();
}
}
}

/**
* Set server builder for testing.
*
* @param serverBuilder in process server builder can be used for testing
*/
@VisibleForTesting
public void setServerBuilder(ServerBuilder<?> serverBuilder) {
this.server = serverBuilder
.addService(this.service)
.build();
}
}
7 changes: 4 additions & 3 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,16 @@ private HandlerDatum constructHandlerDatum(MapOuterClass.MapRequest d) {
);
}

// Shuts down the executor service which is used for batch map
// Shuts down the executor service
public void shutDown() {
this.mapTaskExecutor.shutdown();
try {
if (!mapTaskExecutor.awaitTermination(SHUTDOWN_TIME, TimeUnit.SECONDS)) {
log.error("BatchMap executor did not terminate in the specified time.");
List<Runnable> droppedTasks = mapTaskExecutor.shutdownNow();
log.error("BatchMap executor was abruptly shut down. " + droppedTasks.size()
+ " tasks will not be executed.");
log.error(
"BatchMap executor was abruptly shut down. {} tasks will not be executed.",
droppedTasks.size());
} else {
log.info("BatchMap executor was terminated.");
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/numaproj/numaflow/mapper/GRPCConfig.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.numaproj.numaflow.mapper;

import io.numaproj.numaflow.shared.GrpcConfigRetriever;
import lombok.Builder;
import lombok.Getter;

Expand All @@ -8,7 +9,7 @@
*/
@Getter
@Builder(builderMethodName = "newBuilder")
public class GRPCConfig {
public class GRPCConfig implements GrpcConfigRetriever {
@Builder.Default
private String socketPath = Constants.DEFAULT_SOCKET_PATH;

Expand Down
Loading

0 comments on commit 8241fdf

Please sign in to comment.