From 1736b9c21b563bc1f1b663c210d520fd5c95f251 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Tue, 18 Dec 2018 14:57:57 +0100 Subject: [PATCH] Prevent concurrent JobServer shutdown issue --- .../beam/runners/flink/FlinkJobServerDriver.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java index de30eda2a8bc4..be36a5dc94fc1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java @@ -49,8 +49,8 @@ public class FlinkJobServerDriver implements Runnable { @VisibleForTesting ServerConfiguration configuration; private final ServerFactory jobServerFactory; private final ServerFactory artifactServerFactory; - private GrpcFnServer jobServer; - private GrpcFnServer artifactStagingServer; + private volatile GrpcFnServer jobServer; + private volatile GrpcFnServer artifactStagingServer; /** Configuration for the jobServer. */ public static class ServerConfiguration { @@ -184,12 +184,16 @@ public void run() { } } + // This method is executed by TestPortableRunner via Reflection public String start() throws IOException { jobServer = createJobServer(); return jobServer.getApiServiceDescriptor().getUrl(); } - public void stop() { + // This method is executed by TestPortableRunner via Reflection + // Needs to be synchronized to prevent concurrency issues in testing shutdown + @SuppressWarnings("WeakerAccess") + public synchronized void stop() { if (jobServer != null) { try { jobServer.close();