Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang committed Nov 7, 2024
1 parent fc9f1ac commit f522b23
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 27 deletions.
10 changes: 5 additions & 5 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
*/
public void start() throws Exception {
GrpcServerUtils.writeServerInfo(
serverInfoAccessor,
grpcConfig.getSocketPath(),
grpcConfig.getInfoFilePath(),
this.serverInfoAccessor,
this.grpcConfig.getSocketPath(),
this.grpcConfig.getInfoFilePath(),
ContainerType.MAPPER,
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

this.server.start();

log.info("server started, listening on socket path: {}", grpcConfig.getSocketPath());
log.info("server started, listening on socket path: {}", this.grpcConfig.getSocketPath());

// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Expand All @@ -85,7 +85,7 @@ public void start() throws Exception {
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
this.shutdownSignal.whenCompleteAsync((v, e) -> {
if (e != null) {
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.numaproj.numaflow.shared;

// Currently each of the UDFs (Mapper, BatchMapper, Sourcer) has its own GrpcConfig class.
// To start a gRPC server, we need to pass gRPC configurations to the GrpcServerWrapper.
// In order to make the GrpcServerWrapper more generic, we create this GrpcConfigRetriever interface,
// which is implemented by the UDFs' GrpcConfig classes.
public interface GrpcConfigRetriever {
String getSocketPath();
int getMaxMessageSize();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
package io.numaproj.numaflow.shared;

import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import io.grpc.netty.NettyServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
Expand Down
25 changes: 12 additions & 13 deletions src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
import io.netty.channel.unix.DomainSocketAddress;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_END;
import static io.numaproj.numaflow.shared.GrpcServerUtils.DATUM_METADATA_WIN_START;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_END_TIME;
import static io.numaproj.numaflow.shared.GrpcServerUtils.WINDOW_START_TIME;

/**
* GrpcServerWrapper is a wrapper class for gRPC server.
* It takes care of creating, starting and gracefully shutting down the server.
*/
@Slf4j
public class GrpcServerWrapper {
private final Server server;
Expand Down Expand Up @@ -70,8 +73,12 @@ public void start() throws Exception {

public void awaitTermination() throws InterruptedException {
this.server.awaitTermination();

Check warning on line 75 in src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java#L75

Added line #L75 was not covered by tests
this.workerEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS);
this.bossEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS);
// if the server has been terminated, we should expect the event loop groups to be terminated as well.
if (!(this.workerEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS) &&
this.bossEventLoopGroup.awaitTermination(30, TimeUnit.SECONDS))) {
log.error("Timed out to gracefully shutdown event loop groups");
throw new InterruptedException("Timed out to gracefully shutdown event loop groups");

Check warning on line 80 in src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java#L79-L80

Added lines #L79 - L80 were not covered by tests
}
}

Check warning on line 82 in src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/shared/GrpcServerWrapper.java#L82

Added line #L82 was not covered by tests

public void gracefullyShutdown() throws InterruptedException{
Expand All @@ -87,17 +94,9 @@ public void gracefullyShutdown() throws InterruptedException{
this.gracefullyShutdownEventLoopGroups();
}

public void gracefullyShutdownEventLoopGroups() {
private void gracefullyShutdownEventLoopGroups() {
if (this.bossEventLoopGroup != null) {
Future<?> bossFuture = this.bossEventLoopGroup.shutdownGracefully();
while (!bossFuture.isDone()) {
try {
log.info("waiting for boss event loop group to shutdown...");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
log.error("Interrupted while waiting for boss event loop group to shutdown", e);
}
}
this.bossEventLoopGroup.shutdownGracefully();
}
if (this.workerEventLoopGroup != null) {
this.workerEventLoopGroup.shutdownGracefully();
Expand Down

0 comments on commit f522b23

Please sign in to comment.