diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
index 16677a18e3964..99a831c20ee6c 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java
@@ -33,7 +33,8 @@
*
Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
* which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Creator
* is expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
- * becomes ready.
+ * becomes ready. If the {@link Phaser} is terminated, {@link DirectStreamObserver.onNext(T)}
+ * will no longer wait for the {@link CallStreamObserver} to become ready.
*/
@ThreadSafe
public final class DirectStreamObserver implements StreamObserver {
@@ -70,24 +71,27 @@ public void onNext(T value) {
synchronized (lock) {
if (++numMessages >= maxMessagesBeforeCheck) {
numMessages = 0;
- int waitTime = 1;
- int totalTimeWaited = 0;
+ int waitSeconds = 1;
+ int totalSecondsWaited = 0;
int phase = phaser.getPhase();
// Record the initial phase in case we are in the inbound gRPC thread where the phase won't
// advance.
int initialPhase = phase;
- while (!outboundObserver.isReady()) {
+ // A negative phase indicates that the phaser is terminated.
+ while (phase >= 0 && !outboundObserver.isReady()) {
try {
- phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
+ phase = phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
+ totalSecondsWaited += waitSeconds;
+ // Double the backoff for re-evaluating the isReady bit up to a maximum of once per
+ // minute. This bounds the waiting if the onReady callback is not called as expected.
+ waitSeconds = Math.min(waitSeconds * 2, 60);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
- if (totalTimeWaited > 0) {
+ if (totalSecondsWaited > 0) {
// If the phase didn't change, this means that the installed onReady callback had not
// been invoked.
if (initialPhase == phase) {
@@ -95,12 +99,12 @@ public void onNext(T value) {
"Output channel stalled for {}s, outbound thread {}. See: "
+ "https://issues.apache.org/jira/browse/BEAM-4280 for the history for "
+ "this issue.",
- totalTimeWaited,
+ totalSecondsWaited,
Thread.currentThread().getName());
} else {
LOG.debug(
"Output channel stalled for {}s, outbound thread {}.",
- totalTimeWaited,
+ totalSecondsWaited,
Thread.currentThread().getName());
}
}
diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
index d7c18059dc20a..1a5558ed67000 100644
--- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
+++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java
@@ -98,7 +98,7 @@ public ManageLoggingClientAndService() {
.build();
server.start();
loggingClient =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.create(),
apiServiceDescriptor,
managedChannelFactory::forDescriptor);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 59d21f2539124..0d25137beefb7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
@@ -228,7 +229,7 @@ public static void main(
// The logging client variable is not used per se, but during its lifetime (until close()) it
// intercepts logging and sends it to the logging service.
try (BeamFnLoggingClient logging =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
@@ -353,11 +354,13 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
outboundObserverFactory,
executorService,
handlers);
- control.waitForTermination();
+ CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get();
if (beamFnStatusClient != null) {
beamFnStatusClient.close();
}
processBundleHandler.shutdown();
+ } catch (Exception e) {
+ System.out.println("Shutting down harness due to exception: " + e.toString());
} finally {
System.out.println("Shutting SDK harness down.");
executionStateSampler.stop();
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
index ca4428a34628a..ffac6d7aa98e9 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java
@@ -21,7 +21,6 @@
import java.util.EnumMap;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -139,8 +138,8 @@ public void onCompleted() {
}
/** This method blocks until the control stream has completed. */
- public void waitForTermination() throws InterruptedException, ExecutionException {
- onFinish.get();
+ public CompletableFuture terminationFuture() {
+ return onFinish;
}
public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 68ad6727c4d55..3b76f7fc4d01f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -17,22 +17,21 @@
*/
package org.apache.beam.fn.harness.logging;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
@@ -47,6 +46,8 @@
import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
+import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
@@ -54,13 +55,15 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.initialization.qual.UnderInitialization;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.slf4j.MDC;
/**
@@ -76,6 +79,10 @@ public class BeamFnLoggingClient implements AutoCloseable {
.put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG)
.put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE)
.build();
+ private static final ImmutableMap REVERSE_LOG_LEVEL_MAP =
+ ImmutableMap.builder()
+ .putAll(LOG_LEVEL_MAP.asMultimap().inverse().entries())
+ .build();
private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
@@ -87,28 +94,98 @@ public class BeamFnLoggingClient implements AutoCloseable {
private static final Object COMPLETED = new Object();
+ private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
+
+ private final StreamWriter streamWriter;
+
+ private final LogRecordHandler logRecordHandler;
+
/* We need to store a reference to the configured loggers so that they are not
* garbage collected. java.util.logging only has weak references to the loggers
* so if they are garbage collected, our hierarchical configuration will be lost. */
- private final Collection configuredLoggers;
- private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
- private final ManagedChannel channel;
- private final CallStreamObserver outboundObserver;
- private final LogControlObserver inboundObserver;
- private final LogRecordHandler logRecordHandler;
- private final CompletableFuture inboundObserverCompletion;
- private final Phaser phaser;
+ private final Collection configuredLoggers = new ArrayList<>();
+
private @Nullable ProcessBundleHandler processBundleHandler;
- public BeamFnLoggingClient(
+ private final BlockingQueue bufferedLogEntries =
+ new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+
+ /**
+ * Future that completes with the background thread consuming logs from bufferedLogEntries.
+ * Completes with COMPLETED or with exception.
+ */
+ private final CompletableFuture> bufferedLogConsumer;
+
+ /**
+ * Safe object publishing is not required since we only care if the thread that set this field is
+ * equal to the thread also attempting to add a log entry.
+ */
+ private @Nullable Thread logEntryHandlerThread = null;
+
+ public static BeamFnLoggingClient createAndStart(
PipelineOptions options,
Endpoints.ApiServiceDescriptor apiServiceDescriptor,
Function channelFactory) {
+ BeamFnLoggingClient client =
+ new BeamFnLoggingClient(
+ apiServiceDescriptor,
+ new StreamWriter(channelFactory.apply(apiServiceDescriptor)),
+ options.as(SdkHarnessOptions.class).getLogMdc(),
+ options.as(ExecutorOptions.class).getScheduledExecutorService(),
+ options.as(SdkHarnessOptions.class));
+ return client;
+ }
+
+ private BeamFnLoggingClient(
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor,
+ StreamWriter streamWriter,
+ boolean logMdc,
+ ScheduledExecutorService executorService,
+ SdkHarnessOptions options) {
this.apiServiceDescriptor = apiServiceDescriptor;
- this.inboundObserverCompletion = new CompletableFuture<>();
- this.phaser = new Phaser(1);
- this.channel = channelFactory.apply(apiServiceDescriptor);
+ this.streamWriter = streamWriter;
+ this.logRecordHandler = new LogRecordHandler(logMdc);
+ logRecordHandler.setLevel(Level.ALL);
+ logRecordHandler.setFormatter(DEFAULT_FORMATTER);
+ CompletableFuture started = new CompletableFuture<>();
+ this.bufferedLogConsumer =
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ logEntryHandlerThread = Thread.currentThread();
+ installLogging(options);
+ started.complete(COMPLETED);
+
+ // Logging which occurs in this thread will attempt to publish log entries into the
+ // above handler which should never block if the queue is full otherwise
+ // this thread will get stuck.
+ streamWriter.drainQueueToStream(bufferedLogEntries);
+ } finally {
+ restoreLoggers();
+ // Now that loggers are restored, do a final flush of any buffered logs
+ // in case they help with understanding above failures.
+ flushFinalLogs();
+ }
+ return COMPLETED;
+ },
+ executorService);
+ try {
+ // Wait for the thread to be running and log handlers installed or an error with the thread
+ // that is supposed to be consuming logs.
+ CompletableFuture.anyOf(this.bufferedLogConsumer, started).get();
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Error starting background log thread " + e.getCause());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+
+ @RequiresNonNull("logRecordHandler")
+ @RequiresNonNull("configuredLoggers")
+ private void installLogging(
+ @UnderInitialization BeamFnLoggingClient this, SdkHarnessOptions options) {
// Reset the global log manager, get the root logger and remove the default log handlers.
LogManager logManager = LogManager.getLogManager();
logManager.reset();
@@ -117,50 +194,205 @@ public BeamFnLoggingClient(
rootLogger.removeHandler(handler);
}
// configure loggers from default sdk harness log level and log level overrides
- this.configuredLoggers =
- SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
+ this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options));
- BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
- inboundObserver = new LogControlObserver();
- logRecordHandler = new LogRecordHandler();
- logRecordHandler.setLevel(Level.ALL);
- logRecordHandler.setFormatter(DEFAULT_FORMATTER);
- logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService());
- boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc();
- logRecordHandler.setLogMdc(logMdc);
- QuotaEvent.setEnabled(logMdc);
- outboundObserver = (CallStreamObserver) stub.logging(inboundObserver);
+ // Install a handler that queues to the buffer read by the background thread.
rootLogger.addHandler(logRecordHandler);
}
- public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) {
- this.processBundleHandler = processBundleHandler;
+ private static class StreamWriter {
+ private final ManagedChannel channel;
+ private final StreamObserver outboundObserver;
+ private final LogControlObserver inboundObserver;
+
+ private final CompletableFuture inboundObserverCompletion;
+ private final AdvancingPhaser streamPhaser;
+
+ // Used to note we are attempting to close the logging client and to gracefully drain the
+ // current logs to the stream.
+ private final CompletableFuture softClosing = new CompletableFuture<>();
+
+ public StreamWriter(ManagedChannel channel) {
+ this.inboundObserverCompletion = new CompletableFuture<>();
+ this.streamPhaser = new AdvancingPhaser(1);
+ this.channel = channel;
+
+ BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel);
+ this.inboundObserver = new LogControlObserver();
+ this.outboundObserver =
+ new DirectStreamObserver(
+ this.streamPhaser,
+ (CallStreamObserver) stub.logging(inboundObserver));
+ }
+
+ public void drainQueueToStream(BlockingQueue bufferedLogEntries) {
+ Throwable thrown = null;
+ try {
+ List additionalLogEntries =
+ new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ // As long as we haven't yet terminated the stream, then attempt to send on it.
+ while (!streamPhaser.isTerminated()) {
+ // We wait for a limited period so that we can evaluate if the stream closed or if
+ // we are gracefully closing the client.
+ BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS);
+ if (logEntry == null) {
+ if (softClosing.isDone()) {
+ break;
+ }
+ continue;
+ }
+
+ // Batch together as many log messages as possible that are held within the buffer
+ BeamFnApi.LogEntry.List.Builder builder =
+ BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
+ bufferedLogEntries.drainTo(additionalLogEntries);
+ builder.addAllLogEntries(additionalLogEntries);
+ outboundObserver.onNext(builder.build());
+ additionalLogEntries.clear();
+ }
+ if (inboundObserverCompletion.isDone()) {
+ try {
+ // If the inbound observer failed with an exception, get() will throw an
+ // ExecutionException.
+ inboundObserverCompletion.get();
+ // Otherwise it is an error for the server to close the stream before we closed our end.
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly with success before it was closed by the client.");
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "Logging stream terminated unexpectedly before it was closed by the client with error: "
+ + e.getCause());
+ } catch (InterruptedException e) {
+ // Should never happen because of the isDone check.
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ } catch (Throwable t) {
+ thrown = t;
+ throw new RuntimeException(t);
+ } finally {
+ if (thrown == null) {
+ outboundObserver.onCompleted();
+ } else {
+ outboundObserver.onError(thrown);
+ }
+ channel.shutdown();
+ boolean shutdownFinished = false;
+ try {
+ shutdownFinished = channel.awaitTermination(10, SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ if (!shutdownFinished) {
+ channel.shutdownNow();
+ }
+ }
+ }
+ }
+
+ public void softClose() {
+ softClosing.complete(COMPLETED);
+ }
+
+ public void hardClose() {
+ streamPhaser.forceTermination();
+ }
+
+ private class LogControlObserver
+ implements ClientResponseObserver {
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver requestStream) {
+ requestStream.setOnReadyHandler(streamPhaser::arrive);
+ }
+
+ @Override
+ public void onNext(BeamFnApi.LogControl value) {}
+
+ @Override
+ public void onError(Throwable t) {
+ inboundObserverCompletion.completeExceptionally(t);
+ hardClose();
+ }
+
+ @Override
+ public void onCompleted() {
+ inboundObserverCompletion.complete(COMPLETED);
+ hardClose();
+ }
+ }
}
@Override
public void close() throws Exception {
+ checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
try {
- // Reset the logging configuration to what it is at startup
- for (Logger logger : configuredLoggers) {
- logger.setLevel(null);
+ try {
+ // Wait for buffered log messages to drain for a short period.
+ streamWriter.softClose();
+ bufferedLogConsumer.get(10, SECONDS);
+ } catch (TimeoutException e) {
+ // Terminate the phaser that we block on when attempting to honor flow control on the
+ // outbound observer.
+ streamWriter.hardClose();
+ // Wait for the sending thread to exit.
+ bufferedLogConsumer.get();
}
- configuredLoggers.clear();
- LogManager.getLogManager().readConfiguration();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ }
+ throw e;
+ }
+ }
- // Hang up with the server
- logRecordHandler.close();
+ public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) {
+ this.processBundleHandler = processBundleHandler;
+ }
+
+ // Reset the logging configuration to what it is at startup.
+ @RequiresNonNull("configuredLoggers")
+ @RequiresNonNull("logRecordHandler")
+ private void restoreLoggers(@UnderInitialization BeamFnLoggingClient this) {
+ for (Logger logger : configuredLoggers) {
+ logger.setLevel(null);
+ // Explicitly remove the installed handler in case reading the configuration fails.
+ logger.removeHandler(logRecordHandler);
+ }
+ configuredLoggers.clear();
+ LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(logRecordHandler);
+ try {
+ LogManager.getLogManager().readConfiguration();
+ } catch (IOException e) {
+ System.out.print("Unable to restore log managers from configuration: " + e.toString());
+ }
+ }
- // Wait for the server to hang up
- inboundObserverCompletion.get();
- } finally {
- // Shut the channel down
- channel.shutdown();
- if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
- channel.shutdownNow();
+ @RequiresNonNull("bufferedLogEntries")
+ void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) {
+ List finalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
+ bufferedLogEntries.drainTo(finalLogEntries);
+ for (BeamFnApi.LogEntry logEntry : finalLogEntries) {
+ LogRecord logRecord =
+ new LogRecord(REVERSE_LOG_LEVEL_MAP.get(logEntry.getSeverity()), logEntry.getMessage());
+ logRecord.setLoggerName(logEntry.getLogLocation());
+ logRecord.setMillis(
+ logEntry.getTimestamp().getSeconds() * 1000
+ + logEntry.getTimestamp().getNanos() / 1_000_000);
+ logRecord.setThreadID(Integer.parseInt(logEntry.getThread()));
+ if (!logEntry.getTrace().isEmpty()) {
+ logRecord.setThrown(new Throwable(logEntry.getTrace()));
}
+ LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord);
}
}
+ public CompletableFuture> terminationFuture() {
+ checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started");
+ return bufferedLogConsumer;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(BeamFnLoggingClient.class)
@@ -168,24 +400,11 @@ public String toString() {
.toString();
}
- private class LogRecordHandler extends Handler implements Runnable {
- private final BlockingQueue bufferedLogEntries =
- new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
- private @Nullable Future> bufferedLogWriter = null;
- /**
- * Safe object publishing is not required since we only care if the thread that set this field
- * is equal to the thread also attempting to add a log entry.
- */
- private @Nullable Thread logEntryHandlerThread = null;
-
- private boolean logMdc = true;
-
- private void setLogMdc(boolean value) {
- logMdc = value;
- }
+ private class LogRecordHandler extends Handler {
+ private final boolean logMdc;
- private void executeOn(ExecutorService executorService) {
- bufferedLogWriter = executorService.submit(this);
+ LogRecordHandler(boolean logMdc) {
+ this.logMdc = logMdc;
}
@Override
@@ -242,7 +461,6 @@ public void publish(LogRecord record) {
}
// The thread that sends log records should never perform a blocking publish and
- // only insert log records best effort.
if (Thread.currentThread() != logEntryHandlerThread) {
// Blocks caller till enough space exists to publish this log entry.
try {
@@ -257,114 +475,14 @@ public void publish(LogRecord record) {
}
}
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE")
- private void dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
- bufferedLogEntries.offer(logEntry);
- }
-
- @Override
- public void run() {
- // Logging which occurs in this thread will attempt to publish log entries into the
- // above handler which should never block if the queue is full otherwise
- // this thread will get stuck.
- logEntryHandlerThread = Thread.currentThread();
-
- List additionalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
- Throwable thrown = null;
- try {
- // As long as we haven't yet terminated, then attempt
- while (!phaser.isTerminated()) {
- // Try to wait for a message to show up.
- BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, TimeUnit.SECONDS);
- // If we don't have a message then we need to try this loop again.
- if (logEntry == null) {
- continue;
- }
-
- // Attempt to honor flow control. Phaser termination causes await advance to return
- // immediately.
- int phase = phaser.getPhase();
- if (!outboundObserver.isReady()) {
- phaser.awaitAdvance(phase);
- }
-
- // Batch together as many log messages as possible that are held within the buffer
- BeamFnApi.LogEntry.List.Builder builder =
- BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry);
- bufferedLogEntries.drainTo(additionalLogEntries);
- builder.addAllLogEntries(additionalLogEntries);
- outboundObserver.onNext(builder.build());
- additionalLogEntries.clear();
- }
-
- // Perform one more final check to see if there are any log entries to guarantee that
- // if a log entry was added on the thread performing termination that we will send it.
- bufferedLogEntries.drainTo(additionalLogEntries);
- if (!additionalLogEntries.isEmpty()) {
- outboundObserver.onNext(
- BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build());
- }
- } catch (Throwable t) {
- thrown = t;
- }
- if (thrown != null) {
- outboundObserver.onError(
- Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException());
- throw new IllegalStateException(thrown);
- } else {
- outboundObserver.onCompleted();
- }
+ private boolean dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
+ return bufferedLogEntries.offer(logEntry);
}
@Override
public void flush() {}
@Override
- public synchronized void close() {
- // If we are done, then a previous caller has already shutdown the queue processing thread
- // hence we don't need to do it again.
- if (phaser.isTerminated()) {
- return;
- }
-
- // Terminate the phaser that we block on when attempting to honor flow control on the
- // outbound observer.
- phaser.forceTermination();
-
- if (bufferedLogWriter != null) {
- try {
- bufferedLogWriter.get();
- } catch (CancellationException e) {
- // Ignore cancellations
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- private class LogControlObserver
- implements ClientResponseObserver {
-
- @Override
- public void beforeStart(ClientCallStreamObserver requestStream) {
- requestStream.setOnReadyHandler(phaser::arrive);
- }
-
- @Override
- public void onNext(BeamFnApi.LogControl value) {}
-
- @Override
- public void onError(Throwable t) {
- inboundObserverCompletion.completeExceptionally(t);
- }
-
- @Override
- public void onCompleted() {
- inboundObserverCompletion.complete(COMPLETED);
- }
+ public synchronized void close() {}
}
}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
index ad763a588875f..5157e124b3190 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java
@@ -166,7 +166,7 @@ public StreamObserver control(
// Ensure that the server completing the stream translates to the completable future
// being completed allowing for a successful shutdown of the client.
outboundServerObserver.onCompleted();
- client.waitForTermination();
+ client.terminationFuture().get();
} finally {
server.shutdownNow();
}
@@ -236,7 +236,7 @@ public StreamObserver control(
// Ensure that the client shuts down when an Error is thrown from the harness
try {
- client.waitForTermination();
+ client.terminationFuture().get();
throw new IllegalStateException("The future should have terminated with an error");
} catch (ExecutionException errorWrapper) {
assertThat(errorWrapper.getCause().getMessage(), containsString("Test Error"));
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
index 66b9246d6193f..1fd8e249dd05b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java
@@ -28,6 +28,7 @@
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Handler;
@@ -44,7 +45,13 @@
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientCall;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientInterceptor;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingClientCall;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
+import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.MethodDescriptor;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder;
@@ -157,7 +164,7 @@ public StreamObserver logging(
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -223,6 +230,12 @@ public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
values.addAll(logEntries.getLogEntriesList()))
.build();
+ // Keep a strong reference to the loggers. Otherwise the call to client.close()
+ // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136).
+ Logger rootLogger = null;
+ Logger configuredLogger = null;
+ Phaser streamBlocker = new Phaser(1);
+
Endpoints.ApiServiceDescriptor apiServiceDescriptor =
Endpoints.ApiServiceDescriptor.newBuilder()
.setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
@@ -234,6 +247,9 @@ public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception {
@Override
public StreamObserver logging(
StreamObserver outboundObserver) {
+ // Block before returning an error on the stream so that we can observe the
+ // loggers before they are reset.
+ streamBlocker.awaitAdvance(1);
outboundServerObserver.set(outboundObserver);
outboundObserver.onError(
Status.INTERNAL.withDescription("TEST ERROR").asException());
@@ -244,15 +260,9 @@ public StreamObserver logging(
server.start();
ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-
- // Keep a strong reference to the loggers. Otherwise the call to client.close()
- // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136).
- Logger rootLogger = null;
- Logger configuredLogger = null;
-
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -261,15 +271,16 @@ public StreamObserver logging(
.create(),
apiServiceDescriptor,
(Endpoints.ApiServiceDescriptor descriptor) -> channel);
-
+ // The loggers should be installed before createAndStart returns.
rootLogger = LogManager.getLogManager().getLogger("");
configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
-
+ // Allow the stream to return with an error.
+ assertEquals(0, streamBlocker.arrive());
thrown.expectMessage("TEST ERROR");
client.close();
} finally {
assertNotNull("rootLogger should be initialized before exception", rootLogger);
- assertNotNull("configuredLogger should be initialized before exception", rootLogger);
+ assertNotNull("configuredLogger should be initialized before exception", configuredLogger);
// Verify that after close, log levels are reset.
assertEquals(Level.INFO, rootLogger.getLevel());
@@ -311,11 +322,12 @@ public StreamObserver logging(
})
.build();
server.start();
+ thrown.expectMessage("Logging stream terminated unexpectedly");
ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
try {
BeamFnLoggingClient client =
- new BeamFnLoggingClient(
+ BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.fromArgs(
new String[] {
"--defaultSdkHarnessLogLevel=OFF",
@@ -329,7 +341,6 @@ public StreamObserver logging(
// removes the only reference and the logger may get GC'd before the assertions (BEAM-4136).
Logger rootLogger = LogManager.getLogManager().getLogger("");
Logger configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
-
client.close();
// Verify that after close, log levels are reset.
@@ -341,4 +352,167 @@ public StreamObserver logging(
server.shutdownNow();
}
}
+
+ @Test
+ public void testClosableWhenBlockingForOnReady() throws Exception {
+ BeamFnLoggingMDC.setInstructionId("instruction-1");
+ Collection values = new ConcurrentLinkedQueue<>();
+ AtomicReference> outboundServerObserver =
+ new AtomicReference<>();
+
+ final AtomicBoolean elementsAllowed = new AtomicBoolean(true);
+ CallStreamObserver inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) ->
+ values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(
+ new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver logging(
+ StreamObserver outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel =
+ InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl())
+ .intercept(
+ new ClientInterceptor() {
+ @Override
+ public ClientCall interceptCall(
+ MethodDescriptor method, CallOptions callOptions, Channel next) {
+ ClientCall delegate = next.newCall(method, callOptions);
+ return new ForwardingClientCall.SimpleForwardingClientCall(
+ delegate) {
+ @Override
+ public boolean isReady() {
+ return elementsAllowed.get();
+ }
+ };
+ }
+ })
+ .build();
+
+ // Keep a strong reference to the loggers. Otherwise the call to client.close()
+ // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136).
+ Logger rootLogger = null;
+ Logger configuredLogger = null;
+
+ try {
+ BeamFnLoggingClient client =
+ BeamFnLoggingClient.createAndStart(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ })
+ .create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ rootLogger = LogManager.getLogManager().getLogger("");
+ configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger");
+
+ long numEntries = 2000;
+ for (int i = 0; i < numEntries; ++i) {
+ configuredLogger.log(TEST_RECORD);
+ }
+ // Measure how long it takes all the logs to appear.
+ int sleepTime = 0;
+ while (values.size() < numEntries) {
+ ++sleepTime;
+ Thread.sleep(1);
+ }
+ // Attempt to enter the blocking state by pushing back on the stream, publishing records and
+ // then giving them time for it to block.
+ elementsAllowed.set(false);
+ for (int i = 0; i < numEntries; ++i) {
+ configuredLogger.log(TEST_RECORD);
+ }
+ Thread.sleep(sleepTime * 3);
+ // At this point, the background thread is either blocking as intended or the background
+ // thread hasn't yet observed all the input. In either case the test should pass.
+ assertTrue(values.size() < numEntries * 2);
+
+ client.close();
+
+ assertNotNull("rootLogger should be initialized before exception", rootLogger);
+ assertNotNull("configuredLogger should be initialized before exception", rootLogger);
+
+ // Verify that after stream terminates, log levels are reset.
+ assertEquals(Level.INFO, rootLogger.getLevel());
+ assertNull(configuredLogger.getLevel());
+
+ assertTrue(channel.isShutdown());
+ } finally {
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testServerCloseNotifiesTermination() throws Exception {
+ BeamFnLoggingMDC.setInstructionId("instruction-1");
+ Collection values = new ConcurrentLinkedQueue<>();
+ AtomicReference> outboundServerObserver =
+ new AtomicReference<>();
+ CallStreamObserver inboundServerObserver =
+ TestStreams.withOnNext(
+ (BeamFnApi.LogEntry.List logEntries) ->
+ values.addAll(logEntries.getLogEntriesList()))
+ .build();
+
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ Endpoints.ApiServiceDescriptor.newBuilder()
+ .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString())
+ .build();
+ Server server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(
+ new BeamFnLoggingGrpc.BeamFnLoggingImplBase() {
+ @Override
+ public StreamObserver logging(
+ StreamObserver outboundObserver) {
+ outboundServerObserver.set(outboundObserver);
+ outboundObserver.onCompleted();
+ return inboundServerObserver;
+ }
+ })
+ .build();
+ server.start();
+
+ ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+ try {
+ BeamFnLoggingClient client =
+ BeamFnLoggingClient.createAndStart(
+ PipelineOptionsFactory.fromArgs(
+ new String[] {
+ "--defaultSdkHarnessLogLevel=OFF",
+ "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}"
+ })
+ .create(),
+ apiServiceDescriptor,
+ (Endpoints.ApiServiceDescriptor descriptor) -> channel);
+
+ thrown.expectMessage("Logging stream terminated unexpectedly");
+ client.terminationFuture().get();
+ } finally {
+ // Verify that after termination, log levels are reset.
+ assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel());
+ assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel());
+
+ assertTrue(channel.isShutdown());
+ server.shutdownNow();
+ }
+ }
}