Skip to content

Commit

Permalink
map streamer
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <yangkr920208@gmail.com>
  • Loading branch information
KeranYang committed Nov 6, 2024
1 parent 561dc5d commit b63faee
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 24 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/numaproj/numaflow/batchmapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server.isTerminated()) {
if (server != null && server.isTerminated()) {
return;
}
try {
Expand All @@ -95,7 +95,7 @@ public void start() throws Exception {

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
if (server != null && server.isTerminated()) {
return;

Check warning on line 99 in src/main/java/io/numaproj/numaflow/batchmapper/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/batchmapper/Server.java#L99

Added line #L99 was not covered by tests
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/mapper/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server.isTerminated()) {
if (server != null && server.isTerminated()) {
return;
}
try {
Expand Down
52 changes: 40 additions & 12 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
import io.numaproj.numaflow.shared.GrpcServerHelper;
import io.numaproj.numaflow.shared.GrpcServerUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -20,8 +22,10 @@ public class Server {

private final GRPCConfig grpcConfig;
private final Service service;
private final CompletableFuture<Void> shutdownSignal;
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
private io.grpc.Server server;
private final GrpcServerHelper grpcServerHelper;

/**
* constructor to create sink gRPC server.
Expand All @@ -39,8 +43,10 @@ public Server(MapStreamer mapStreamer) {
* @param mapStreamer to process the message
*/
public Server(MapStreamer mapStreamer, GRPCConfig grpcConfig) {
this.service = new Service(mapStreamer);
this.shutdownSignal = new CompletableFuture<>();
this.service = new Service(mapStreamer, this.shutdownSignal);
this.grpcConfig = grpcConfig;
this.grpcServerHelper = new GrpcServerHelper();
}

/**
Expand All @@ -57,35 +63,55 @@ public void start() throws Exception {
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));

if (this.server == null) {
// create server builder
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
this.server = this.grpcServerHelper.createServer(

Check warning on line 66 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L66

Added line #L66 was not covered by tests
grpcConfig.getSocketPath(),
grpcConfig.getMaxMessageSize(),
grpcConfig.isLocal(),
grpcConfig.getPort());
// build server
this.server = serverBuilder
.addService(this.service)
.build();
grpcConfig.getPort(),

Check warning on line 70 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L70

Added line #L70 was not covered by tests
this.service);
}

// start server
server.start();

log.info(
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
"server started, listening on socket path: " + grpcConfig.getSocketPath());

// register shutdown hook
// register shutdown hook to gracefully shut down the server
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
System.err.println("*** shutting down map streamer gRPC server since JVM is shutting down");
if (server != null && server.isTerminated()) {
return;
}
try {
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();

Check warning on line 89 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L88-L89

Added lines #L88 - L89 were not covered by tests
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace(System.err);
}
}));

// if there are any exceptions, shutdown the server gracefully.
shutdownSignal.whenCompleteAsync((v, e) -> {
if (server.isTerminated()) {
return;

Check warning on line 99 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L99

Added line #L99 was not covered by tests
}

if (e != null) {
System.err.println("*** shutting down map streamer gRPC server because of an exception - " + e.getMessage());
try {
log.info("stopping server");
Server.this.stop();
log.info("gracefully shutting down event loop groups");
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
} catch (InterruptedException ex) {
Thread.interrupted();
ex.printStackTrace(System.err);

Check warning on line 111 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L109-L111

Added lines #L109 - L111 were not covered by tests
}
}
});
}

/**
Expand All @@ -96,7 +122,9 @@ public void start() throws Exception {
* @throws InterruptedException if the current thread is interrupted while waiting
*/
public void awaitTermination() throws InterruptedException {
log.info("sink server is waiting for termination");

Check warning on line 125 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L125

Added line #L125 was not covered by tests
server.awaitTermination();
log.info("sink server has terminated");

Check warning on line 127 in src/main/java/io/numaproj/numaflow/mapstreamer/Server.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Server.java#L127

Added line #L127 was not covered by tests
}

/**
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/io/numaproj/numaflow/mapstreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.extern.slf4j.Slf4j;

import java.time.Instant;
import java.util.concurrent.CompletableFuture;

import static io.numaproj.numaflow.map.v1.MapGrpc.getMapFnMethod;

Expand All @@ -17,6 +18,7 @@
class Service extends MapGrpc.MapImplBase {

private final MapStreamer mapStreamer;
private final CompletableFuture<Void> shutdownSignal;

@Override
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {
Expand Down Expand Up @@ -57,9 +59,11 @@ public void onNext(MapOuterClass.MapRequest request) {
constructHandlerDatum(request),
new OutputObserverImpl(responseObserver));
} catch (Exception e) {
log.error("Error processing message", e);
responseObserver.onError(Status.UNKNOWN
log.error("Encountered error in mapFn onNext - {}", e.getMessage());
shutdownSignal.completeExceptionally(e);
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
return;
}
Expand All @@ -76,11 +80,12 @@ public void onNext(MapOuterClass.MapRequest request) {

@Override
public void onError(Throwable throwable) {
log.error("Error Encountered in mapStream Stream", throwable);
var status = Status.UNKNOWN
log.error("Encountered error in mapStream Stream - {}", throwable.getMessage());
shutdownSignal.completeExceptionally(throwable);
responseObserver.onError(Status.UNKNOWN

Check warning on line 85 in src/main/java/io/numaproj/numaflow/mapstreamer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Service.java#L83-L85

Added lines #L83 - L85 were not covered by tests
.withDescription(throwable.getMessage())
.withCause(throwable);
responseObserver.onError(status.asException());
.withCause(throwable)
.asException());

Check warning on line 88 in src/main/java/io/numaproj/numaflow/mapstreamer/Service.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/mapstreamer/Service.java#L87-L88

Added lines #L87 - L88 were not covered by tests
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sideinput/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
if (server.isTerminated()) {
if (server != null && server.isTerminated()) {
return;
}
try {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sinker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void start() throws Exception {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down sink gRPC server since JVM is shutting down");
if (server.isTerminated()) {
if (server != null && server.isTerminated()) {
return;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void TestMapStreamerErr() {
fail("Should have thrown an exception");
} catch (Exception e) {
assertEquals(
"io.grpc.StatusRuntimeException: UNKNOWN: unknown exception",
"io.grpc.StatusRuntimeException: INTERNAL: unknown exception",
e.getMessage());
}
}
Expand Down

0 comments on commit b63faee

Please sign in to comment.