From 5e942ae3790bc95148413c43ab7e43a01a2d82ae Mon Sep 17 00:00:00 2001 From: scwhittle Date: Wed, 21 Jun 2023 02:19:32 +0200 Subject: [PATCH] Ensure that the BeamFnLoggingClient terminates process if stream breaks (#25186) * Ensure that a failure of the logging stream in BeamFnLoggingClient does not cause logging to block indefinitely but instead triggers SDK teardown. * fix test * fix racy test * address comments --- .../sdk/fn/stream/DirectStreamObserver.java | 24 +- .../logging/BeamFnLoggingClientBenchmark.java | 2 +- .../org/apache/beam/fn/harness/FnHarness.java | 7 +- .../harness/control/BeamFnControlClient.java | 5 +- .../harness/logging/BeamFnLoggingClient.java | 454 +++++++++++------- .../control/BeamFnControlClientTest.java | 4 +- .../logging/BeamFnLoggingClientTest.java | 200 +++++++- 7 files changed, 497 insertions(+), 199 deletions(-) diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java index 16677a18e3964..99a831c20ee6c 100644 --- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java +++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java @@ -33,7 +33,8 @@ *

Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser} * which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Creator * is expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver} - * becomes ready. + * becomes ready. If the {@link Phaser} is terminated, {@link DirectStreamObserver.onNext(T)} + * will no longer wait for the {@link CallStreamObserver} to become ready. */ @ThreadSafe public final class DirectStreamObserver implements StreamObserver { @@ -70,24 +71,27 @@ public void onNext(T value) { synchronized (lock) { if (++numMessages >= maxMessagesBeforeCheck) { numMessages = 0; - int waitTime = 1; - int totalTimeWaited = 0; + int waitSeconds = 1; + int totalSecondsWaited = 0; int phase = phaser.getPhase(); // Record the initial phase in case we are in the inbound gRPC thread where the phase won't // advance. int initialPhase = phase; - while (!outboundObserver.isReady()) { + // A negative phase indicates that the phaser is terminated. + while (phase >= 0 && !outboundObserver.isReady()) { try { - phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS); + phase = phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS); } catch (TimeoutException e) { - totalTimeWaited += waitTime; - waitTime = waitTime * 2; + totalSecondsWaited += waitSeconds; + // Double the backoff for re-evaluating the isReady bit up to a maximum of once per + // minute. This bounds the waiting if the onReady callback is not called as expected. + waitSeconds = Math.min(waitSeconds * 2, 60); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } } - if (totalTimeWaited > 0) { + if (totalSecondsWaited > 0) { // If the phase didn't change, this means that the installed onReady callback had not // been invoked. if (initialPhase == phase) { @@ -95,12 +99,12 @@ public void onNext(T value) { "Output channel stalled for {}s, outbound thread {}. See: " + "https://issues.apache.org/jira/browse/BEAM-4280 for the history for " + "this issue.", - totalTimeWaited, + totalSecondsWaited, Thread.currentThread().getName()); } else { LOG.debug( "Output channel stalled for {}s, outbound thread {}.", - totalTimeWaited, + totalSecondsWaited, Thread.currentThread().getName()); } } diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java index d7c18059dc20a..1a5558ed67000 100644 --- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java +++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/logging/BeamFnLoggingClientBenchmark.java @@ -98,7 +98,7 @@ public ManageLoggingClientAndService() { .build(); server.start(); loggingClient = - new BeamFnLoggingClient( + BeamFnLoggingClient.createAndStart( PipelineOptionsFactory.create(), apiServiceDescriptor, managedChannelFactory::forDescriptor); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java index 59d21f2539124..0d25137beefb7 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.EnumMap; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Function; import javax.annotation.Nullable; @@ -228,7 +229,7 @@ public static void main( // The logging client variable is not used per se, but during its lifetime (until close()) it // intercepts logging and sends it to the logging service. try (BeamFnLoggingClient logging = - new BeamFnLoggingClient( + BeamFnLoggingClient.createAndStart( options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) { LOG.info("Fn Harness started"); // Register standard file systems. @@ -353,11 +354,13 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) { outboundObserverFactory, executorService, handlers); - control.waitForTermination(); + CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get(); if (beamFnStatusClient != null) { beamFnStatusClient.close(); } processBundleHandler.shutdown(); + } catch (Exception e) { + System.out.println("Shutting down harness due to exception: " + e.toString()); } finally { System.out.println("Shutting SDK harness down."); executionStateSampler.stop(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java index ca4428a34628a..ffac6d7aa98e9 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BeamFnControlClient.java @@ -21,7 +21,6 @@ import java.util.EnumMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC; import org.apache.beam.model.fnexecution.v1.BeamFnApi; @@ -139,8 +138,8 @@ public void onCompleted() { } /** This method blocks until the control stream has completed. */ - public void waitForTermination() throws InterruptedException, ExecutionException { - onFinish.get(); + public CompletableFuture terminationFuture() { + return onFinish; } public BeamFnApi.InstructionResponse delegateOnInstructionRequestType( diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java index 68ad6727c4d55..3b76f7fc4d01f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java @@ -17,22 +17,21 @@ */ package org.apache.beam.fn.harness.logging; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.logging.Formatter; import java.util.logging.Handler; @@ -47,6 +46,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.LogEntry; import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc; import org.apache.beam.model.pipeline.v1.Endpoints; +import org.apache.beam.sdk.fn.stream.AdvancingPhaser; +import org.apache.beam.sdk.fn.stream.DirectStreamObserver; import org.apache.beam.sdk.options.ExecutorOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.SdkHarnessOptions; @@ -54,13 +55,15 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; -import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.CallStreamObserver; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.initialization.qual.UnderInitialization; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; import org.slf4j.MDC; /** @@ -76,6 +79,10 @@ public class BeamFnLoggingClient implements AutoCloseable { .put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG) .put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE) .build(); + private static final ImmutableMap REVERSE_LOG_LEVEL_MAP = + ImmutableMap.builder() + .putAll(LOG_LEVEL_MAP.asMultimap().inverse().entries()) + .build(); private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter(); @@ -87,28 +94,98 @@ public class BeamFnLoggingClient implements AutoCloseable { private static final Object COMPLETED = new Object(); + private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; + + private final StreamWriter streamWriter; + + private final LogRecordHandler logRecordHandler; + /* We need to store a reference to the configured loggers so that they are not * garbage collected. java.util.logging only has weak references to the loggers * so if they are garbage collected, our hierarchical configuration will be lost. */ - private final Collection configuredLoggers; - private final Endpoints.ApiServiceDescriptor apiServiceDescriptor; - private final ManagedChannel channel; - private final CallStreamObserver outboundObserver; - private final LogControlObserver inboundObserver; - private final LogRecordHandler logRecordHandler; - private final CompletableFuture inboundObserverCompletion; - private final Phaser phaser; + private final Collection configuredLoggers = new ArrayList<>(); + private @Nullable ProcessBundleHandler processBundleHandler; - public BeamFnLoggingClient( + private final BlockingQueue bufferedLogEntries = + new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + + /** + * Future that completes with the background thread consuming logs from bufferedLogEntries. + * Completes with COMPLETED or with exception. + */ + private final CompletableFuture bufferedLogConsumer; + + /** + * Safe object publishing is not required since we only care if the thread that set this field is + * equal to the thread also attempting to add a log entry. + */ + private @Nullable Thread logEntryHandlerThread = null; + + public static BeamFnLoggingClient createAndStart( PipelineOptions options, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function channelFactory) { + BeamFnLoggingClient client = + new BeamFnLoggingClient( + apiServiceDescriptor, + new StreamWriter(channelFactory.apply(apiServiceDescriptor)), + options.as(SdkHarnessOptions.class).getLogMdc(), + options.as(ExecutorOptions.class).getScheduledExecutorService(), + options.as(SdkHarnessOptions.class)); + return client; + } + + private BeamFnLoggingClient( + Endpoints.ApiServiceDescriptor apiServiceDescriptor, + StreamWriter streamWriter, + boolean logMdc, + ScheduledExecutorService executorService, + SdkHarnessOptions options) { this.apiServiceDescriptor = apiServiceDescriptor; - this.inboundObserverCompletion = new CompletableFuture<>(); - this.phaser = new Phaser(1); - this.channel = channelFactory.apply(apiServiceDescriptor); + this.streamWriter = streamWriter; + this.logRecordHandler = new LogRecordHandler(logMdc); + logRecordHandler.setLevel(Level.ALL); + logRecordHandler.setFormatter(DEFAULT_FORMATTER); + CompletableFuture started = new CompletableFuture<>(); + this.bufferedLogConsumer = + CompletableFuture.supplyAsync( + () -> { + try { + logEntryHandlerThread = Thread.currentThread(); + installLogging(options); + started.complete(COMPLETED); + + // Logging which occurs in this thread will attempt to publish log entries into the + // above handler which should never block if the queue is full otherwise + // this thread will get stuck. + streamWriter.drainQueueToStream(bufferedLogEntries); + } finally { + restoreLoggers(); + // Now that loggers are restored, do a final flush of any buffered logs + // in case they help with understanding above failures. + flushFinalLogs(); + } + return COMPLETED; + }, + executorService); + try { + // Wait for the thread to be running and log handlers installed or an error with the thread + // that is supposed to be consuming logs. + CompletableFuture.anyOf(this.bufferedLogConsumer, started).get(); + } catch (ExecutionException e) { + throw new RuntimeException("Error starting background log thread " + e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @RequiresNonNull("logRecordHandler") + @RequiresNonNull("configuredLoggers") + private void installLogging( + @UnderInitialization BeamFnLoggingClient this, SdkHarnessOptions options) { // Reset the global log manager, get the root logger and remove the default log handlers. LogManager logManager = LogManager.getLogManager(); logManager.reset(); @@ -117,50 +194,205 @@ public BeamFnLoggingClient( rootLogger.removeHandler(handler); } // configure loggers from default sdk harness log level and log level overrides - this.configuredLoggers = - SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class)); + this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(options)); - BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel); - inboundObserver = new LogControlObserver(); - logRecordHandler = new LogRecordHandler(); - logRecordHandler.setLevel(Level.ALL); - logRecordHandler.setFormatter(DEFAULT_FORMATTER); - logRecordHandler.executeOn(options.as(ExecutorOptions.class).getScheduledExecutorService()); - boolean logMdc = options.as(SdkHarnessOptions.class).getLogMdc(); - logRecordHandler.setLogMdc(logMdc); - QuotaEvent.setEnabled(logMdc); - outboundObserver = (CallStreamObserver) stub.logging(inboundObserver); + // Install a handler that queues to the buffer read by the background thread. rootLogger.addHandler(logRecordHandler); } - public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) { - this.processBundleHandler = processBundleHandler; + private static class StreamWriter { + private final ManagedChannel channel; + private final StreamObserver outboundObserver; + private final LogControlObserver inboundObserver; + + private final CompletableFuture inboundObserverCompletion; + private final AdvancingPhaser streamPhaser; + + // Used to note we are attempting to close the logging client and to gracefully drain the + // current logs to the stream. + private final CompletableFuture softClosing = new CompletableFuture<>(); + + public StreamWriter(ManagedChannel channel) { + this.inboundObserverCompletion = new CompletableFuture<>(); + this.streamPhaser = new AdvancingPhaser(1); + this.channel = channel; + + BeamFnLoggingGrpc.BeamFnLoggingStub stub = BeamFnLoggingGrpc.newStub(channel); + this.inboundObserver = new LogControlObserver(); + this.outboundObserver = + new DirectStreamObserver( + this.streamPhaser, + (CallStreamObserver) stub.logging(inboundObserver)); + } + + public void drainQueueToStream(BlockingQueue bufferedLogEntries) { + Throwable thrown = null; + try { + List additionalLogEntries = + new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + // As long as we haven't yet terminated the stream, then attempt to send on it. + while (!streamPhaser.isTerminated()) { + // We wait for a limited period so that we can evaluate if the stream closed or if + // we are gracefully closing the client. + BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, SECONDS); + if (logEntry == null) { + if (softClosing.isDone()) { + break; + } + continue; + } + + // Batch together as many log messages as possible that are held within the buffer + BeamFnApi.LogEntry.List.Builder builder = + BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry); + bufferedLogEntries.drainTo(additionalLogEntries); + builder.addAllLogEntries(additionalLogEntries); + outboundObserver.onNext(builder.build()); + additionalLogEntries.clear(); + } + if (inboundObserverCompletion.isDone()) { + try { + // If the inbound observer failed with an exception, get() will throw an + // ExecutionException. + inboundObserverCompletion.get(); + // Otherwise it is an error for the server to close the stream before we closed our end. + throw new IllegalStateException( + "Logging stream terminated unexpectedly with success before it was closed by the client."); + } catch (ExecutionException e) { + throw new IllegalStateException( + "Logging stream terminated unexpectedly before it was closed by the client with error: " + + e.getCause()); + } catch (InterruptedException e) { + // Should never happen because of the isDone check. + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } catch (Throwable t) { + thrown = t; + throw new RuntimeException(t); + } finally { + if (thrown == null) { + outboundObserver.onCompleted(); + } else { + outboundObserver.onError(thrown); + } + channel.shutdown(); + boolean shutdownFinished = false; + try { + shutdownFinished = channel.awaitTermination(10, SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if (!shutdownFinished) { + channel.shutdownNow(); + } + } + } + } + + public void softClose() { + softClosing.complete(COMPLETED); + } + + public void hardClose() { + streamPhaser.forceTermination(); + } + + private class LogControlObserver + implements ClientResponseObserver { + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(streamPhaser::arrive); + } + + @Override + public void onNext(BeamFnApi.LogControl value) {} + + @Override + public void onError(Throwable t) { + inboundObserverCompletion.completeExceptionally(t); + hardClose(); + } + + @Override + public void onCompleted() { + inboundObserverCompletion.complete(COMPLETED); + hardClose(); + } + } } @Override public void close() throws Exception { + checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started"); try { - // Reset the logging configuration to what it is at startup - for (Logger logger : configuredLoggers) { - logger.setLevel(null); + try { + // Wait for buffered log messages to drain for a short period. + streamWriter.softClose(); + bufferedLogConsumer.get(10, SECONDS); + } catch (TimeoutException e) { + // Terminate the phaser that we block on when attempting to honor flow control on the + // outbound observer. + streamWriter.hardClose(); + // Wait for the sending thread to exit. + bufferedLogConsumer.get(); } - configuredLoggers.clear(); - LogManager.getLogManager().readConfiguration(); + } catch (ExecutionException e) { + if (e.getCause() instanceof Exception) { + throw (Exception) e.getCause(); + } + throw e; + } + } - // Hang up with the server - logRecordHandler.close(); + public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) { + this.processBundleHandler = processBundleHandler; + } + + // Reset the logging configuration to what it is at startup. + @RequiresNonNull("configuredLoggers") + @RequiresNonNull("logRecordHandler") + private void restoreLoggers(@UnderInitialization BeamFnLoggingClient this) { + for (Logger logger : configuredLoggers) { + logger.setLevel(null); + // Explicitly remove the installed handler in case reading the configuration fails. + logger.removeHandler(logRecordHandler); + } + configuredLoggers.clear(); + LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(logRecordHandler); + try { + LogManager.getLogManager().readConfiguration(); + } catch (IOException e) { + System.out.print("Unable to restore log managers from configuration: " + e.toString()); + } + } - // Wait for the server to hang up - inboundObserverCompletion.get(); - } finally { - // Shut the channel down - channel.shutdown(); - if (!channel.awaitTermination(10, TimeUnit.SECONDS)) { - channel.shutdownNow(); + @RequiresNonNull("bufferedLogEntries") + void flushFinalLogs(@UnderInitialization BeamFnLoggingClient this) { + List finalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); + bufferedLogEntries.drainTo(finalLogEntries); + for (BeamFnApi.LogEntry logEntry : finalLogEntries) { + LogRecord logRecord = + new LogRecord(REVERSE_LOG_LEVEL_MAP.get(logEntry.getSeverity()), logEntry.getMessage()); + logRecord.setLoggerName(logEntry.getLogLocation()); + logRecord.setMillis( + logEntry.getTimestamp().getSeconds() * 1000 + + logEntry.getTimestamp().getNanos() / 1_000_000); + logRecord.setThreadID(Integer.parseInt(logEntry.getThread())); + if (!logEntry.getTrace().isEmpty()) { + logRecord.setThrown(new Throwable(logEntry.getTrace())); } + LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord); } } + public CompletableFuture terminationFuture() { + checkNotNull(bufferedLogConsumer, "BeamFnLoggingClient not fully started"); + return bufferedLogConsumer; + } + @Override public String toString() { return MoreObjects.toStringHelper(BeamFnLoggingClient.class) @@ -168,24 +400,11 @@ public String toString() { .toString(); } - private class LogRecordHandler extends Handler implements Runnable { - private final BlockingQueue bufferedLogEntries = - new ArrayBlockingQueue<>(MAX_BUFFERED_LOG_ENTRY_COUNT); - private @Nullable Future bufferedLogWriter = null; - /** - * Safe object publishing is not required since we only care if the thread that set this field - * is equal to the thread also attempting to add a log entry. - */ - private @Nullable Thread logEntryHandlerThread = null; - - private boolean logMdc = true; - - private void setLogMdc(boolean value) { - logMdc = value; - } + private class LogRecordHandler extends Handler { + private final boolean logMdc; - private void executeOn(ExecutorService executorService) { - bufferedLogWriter = executorService.submit(this); + LogRecordHandler(boolean logMdc) { + this.logMdc = logMdc; } @Override @@ -242,7 +461,6 @@ public void publish(LogRecord record) { } // The thread that sends log records should never perform a blocking publish and - // only insert log records best effort. if (Thread.currentThread() != logEntryHandlerThread) { // Blocks caller till enough space exists to publish this log entry. try { @@ -257,114 +475,14 @@ public void publish(LogRecord record) { } } - @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_BAD_PRACTICE") - private void dropIfBufferFull(BeamFnApi.LogEntry logEntry) { - bufferedLogEntries.offer(logEntry); - } - - @Override - public void run() { - // Logging which occurs in this thread will attempt to publish log entries into the - // above handler which should never block if the queue is full otherwise - // this thread will get stuck. - logEntryHandlerThread = Thread.currentThread(); - - List additionalLogEntries = new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT); - Throwable thrown = null; - try { - // As long as we haven't yet terminated, then attempt - while (!phaser.isTerminated()) { - // Try to wait for a message to show up. - BeamFnApi.LogEntry logEntry = bufferedLogEntries.poll(1, TimeUnit.SECONDS); - // If we don't have a message then we need to try this loop again. - if (logEntry == null) { - continue; - } - - // Attempt to honor flow control. Phaser termination causes await advance to return - // immediately. - int phase = phaser.getPhase(); - if (!outboundObserver.isReady()) { - phaser.awaitAdvance(phase); - } - - // Batch together as many log messages as possible that are held within the buffer - BeamFnApi.LogEntry.List.Builder builder = - BeamFnApi.LogEntry.List.newBuilder().addLogEntries(logEntry); - bufferedLogEntries.drainTo(additionalLogEntries); - builder.addAllLogEntries(additionalLogEntries); - outboundObserver.onNext(builder.build()); - additionalLogEntries.clear(); - } - - // Perform one more final check to see if there are any log entries to guarantee that - // if a log entry was added on the thread performing termination that we will send it. - bufferedLogEntries.drainTo(additionalLogEntries); - if (!additionalLogEntries.isEmpty()) { - outboundObserver.onNext( - BeamFnApi.LogEntry.List.newBuilder().addAllLogEntries(additionalLogEntries).build()); - } - } catch (Throwable t) { - thrown = t; - } - if (thrown != null) { - outboundObserver.onError( - Status.INTERNAL.withDescription(getStackTraceAsString(thrown)).asException()); - throw new IllegalStateException(thrown); - } else { - outboundObserver.onCompleted(); - } + private boolean dropIfBufferFull(BeamFnApi.LogEntry logEntry) { + return bufferedLogEntries.offer(logEntry); } @Override public void flush() {} @Override - public synchronized void close() { - // If we are done, then a previous caller has already shutdown the queue processing thread - // hence we don't need to do it again. - if (phaser.isTerminated()) { - return; - } - - // Terminate the phaser that we block on when attempting to honor flow control on the - // outbound observer. - phaser.forceTermination(); - - if (bufferedLogWriter != null) { - try { - bufferedLogWriter.get(); - } catch (CancellationException e) { - // Ignore cancellations - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - } - } - } - - private class LogControlObserver - implements ClientResponseObserver { - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.setOnReadyHandler(phaser::arrive); - } - - @Override - public void onNext(BeamFnApi.LogControl value) {} - - @Override - public void onError(Throwable t) { - inboundObserverCompletion.completeExceptionally(t); - } - - @Override - public void onCompleted() { - inboundObserverCompletion.complete(COMPLETED); - } + public synchronized void close() {} } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java index ad763a588875f..5157e124b3190 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/BeamFnControlClientTest.java @@ -166,7 +166,7 @@ public StreamObserver control( // Ensure that the server completing the stream translates to the completable future // being completed allowing for a successful shutdown of the client. outboundServerObserver.onCompleted(); - client.waitForTermination(); + client.terminationFuture().get(); } finally { server.shutdownNow(); } @@ -236,7 +236,7 @@ public StreamObserver control( // Ensure that the client shuts down when an Error is thrown from the harness try { - client.waitForTermination(); + client.terminationFuture().get(); throw new IllegalStateException("The future should have terminated with an error"); } catch (ExecutionException errorWrapper) { assertThat(errorWrapper.getCause().getMessage(), containsString("Test Error")); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java index 66b9246d6193f..1fd8e249dd05b 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientTest.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Phaser; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Handler; @@ -44,7 +45,13 @@ import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.CallOptions; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientCall; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ClientInterceptor; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingClientCall; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.MethodDescriptor; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status; import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.inprocess.InProcessChannelBuilder; @@ -157,7 +164,7 @@ public StreamObserver logging( try { BeamFnLoggingClient client = - new BeamFnLoggingClient( + BeamFnLoggingClient.createAndStart( PipelineOptionsFactory.fromArgs( new String[] { "--defaultSdkHarnessLogLevel=OFF", @@ -223,6 +230,12 @@ public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception { values.addAll(logEntries.getLogEntriesList())) .build(); + // Keep a strong reference to the loggers. Otherwise the call to client.close() + // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136). + Logger rootLogger = null; + Logger configuredLogger = null; + Phaser streamBlocker = new Phaser(1); + Endpoints.ApiServiceDescriptor apiServiceDescriptor = Endpoints.ApiServiceDescriptor.newBuilder() .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) @@ -234,6 +247,9 @@ public void testWhenServerFailsThatClientIsAbleToCleanup() throws Exception { @Override public StreamObserver logging( StreamObserver outboundObserver) { + // Block before returning an error on the stream so that we can observe the + // loggers before they are reset. + streamBlocker.awaitAdvance(1); outboundServerObserver.set(outboundObserver); outboundObserver.onError( Status.INTERNAL.withDescription("TEST ERROR").asException()); @@ -244,15 +260,9 @@ public StreamObserver logging( server.start(); ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); - - // Keep a strong reference to the loggers. Otherwise the call to client.close() - // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136). - Logger rootLogger = null; - Logger configuredLogger = null; - try { BeamFnLoggingClient client = - new BeamFnLoggingClient( + BeamFnLoggingClient.createAndStart( PipelineOptionsFactory.fromArgs( new String[] { "--defaultSdkHarnessLogLevel=OFF", @@ -261,15 +271,16 @@ public StreamObserver logging( .create(), apiServiceDescriptor, (Endpoints.ApiServiceDescriptor descriptor) -> channel); - + // The loggers should be installed before createAndStart returns. rootLogger = LogManager.getLogManager().getLogger(""); configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger"); - + // Allow the stream to return with an error. + assertEquals(0, streamBlocker.arrive()); thrown.expectMessage("TEST ERROR"); client.close(); } finally { assertNotNull("rootLogger should be initialized before exception", rootLogger); - assertNotNull("configuredLogger should be initialized before exception", rootLogger); + assertNotNull("configuredLogger should be initialized before exception", configuredLogger); // Verify that after close, log levels are reset. assertEquals(Level.INFO, rootLogger.getLevel()); @@ -311,11 +322,12 @@ public StreamObserver logging( }) .build(); server.start(); + thrown.expectMessage("Logging stream terminated unexpectedly"); ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); try { BeamFnLoggingClient client = - new BeamFnLoggingClient( + BeamFnLoggingClient.createAndStart( PipelineOptionsFactory.fromArgs( new String[] { "--defaultSdkHarnessLogLevel=OFF", @@ -329,7 +341,6 @@ public StreamObserver logging( // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136). Logger rootLogger = LogManager.getLogManager().getLogger(""); Logger configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger"); - client.close(); // Verify that after close, log levels are reset. @@ -341,4 +352,167 @@ public StreamObserver logging( server.shutdownNow(); } } + + @Test + public void testClosableWhenBlockingForOnReady() throws Exception { + BeamFnLoggingMDC.setInstructionId("instruction-1"); + Collection values = new ConcurrentLinkedQueue<>(); + AtomicReference> outboundServerObserver = + new AtomicReference<>(); + + final AtomicBoolean elementsAllowed = new AtomicBoolean(true); + CallStreamObserver inboundServerObserver = + TestStreams.withOnNext( + (BeamFnApi.LogEntry.List logEntries) -> + values.addAll(logEntries.getLogEntriesList())) + .build(); + + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() + .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) + .build(); + Server server = + InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) + .addService( + new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { + @Override + public StreamObserver logging( + StreamObserver outboundObserver) { + outboundServerObserver.set(outboundObserver); + return inboundServerObserver; + } + }) + .build(); + server.start(); + + ManagedChannel channel = + InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()) + .intercept( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + ClientCall delegate = next.newCall(method, callOptions); + return new ForwardingClientCall.SimpleForwardingClientCall( + delegate) { + @Override + public boolean isReady() { + return elementsAllowed.get(); + } + }; + } + }) + .build(); + + // Keep a strong reference to the loggers. Otherwise the call to client.close() + // removes the only reference and the logger may get GC'd before the assertions (BEAM-4136). + Logger rootLogger = null; + Logger configuredLogger = null; + + try { + BeamFnLoggingClient client = + BeamFnLoggingClient.createAndStart( + PipelineOptionsFactory.fromArgs( + new String[] { + "--defaultSdkHarnessLogLevel=OFF", + "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" + }) + .create(), + apiServiceDescriptor, + (Endpoints.ApiServiceDescriptor descriptor) -> channel); + + rootLogger = LogManager.getLogManager().getLogger(""); + configuredLogger = LogManager.getLogManager().getLogger("ConfiguredLogger"); + + long numEntries = 2000; + for (int i = 0; i < numEntries; ++i) { + configuredLogger.log(TEST_RECORD); + } + // Measure how long it takes all the logs to appear. + int sleepTime = 0; + while (values.size() < numEntries) { + ++sleepTime; + Thread.sleep(1); + } + // Attempt to enter the blocking state by pushing back on the stream, publishing records and + // then giving them time for it to block. + elementsAllowed.set(false); + for (int i = 0; i < numEntries; ++i) { + configuredLogger.log(TEST_RECORD); + } + Thread.sleep(sleepTime * 3); + // At this point, the background thread is either blocking as intended or the background + // thread hasn't yet observed all the input. In either case the test should pass. + assertTrue(values.size() < numEntries * 2); + + client.close(); + + assertNotNull("rootLogger should be initialized before exception", rootLogger); + assertNotNull("configuredLogger should be initialized before exception", rootLogger); + + // Verify that after stream terminates, log levels are reset. + assertEquals(Level.INFO, rootLogger.getLevel()); + assertNull(configuredLogger.getLevel()); + + assertTrue(channel.isShutdown()); + } finally { + server.shutdownNow(); + } + } + + @Test + public void testServerCloseNotifiesTermination() throws Exception { + BeamFnLoggingMDC.setInstructionId("instruction-1"); + Collection values = new ConcurrentLinkedQueue<>(); + AtomicReference> outboundServerObserver = + new AtomicReference<>(); + CallStreamObserver inboundServerObserver = + TestStreams.withOnNext( + (BeamFnApi.LogEntry.List logEntries) -> + values.addAll(logEntries.getLogEntriesList())) + .build(); + + Endpoints.ApiServiceDescriptor apiServiceDescriptor = + Endpoints.ApiServiceDescriptor.newBuilder() + .setUrl(this.getClass().getName() + "-" + UUID.randomUUID().toString()) + .build(); + Server server = + InProcessServerBuilder.forName(apiServiceDescriptor.getUrl()) + .addService( + new BeamFnLoggingGrpc.BeamFnLoggingImplBase() { + @Override + public StreamObserver logging( + StreamObserver outboundObserver) { + outboundServerObserver.set(outboundObserver); + outboundObserver.onCompleted(); + return inboundServerObserver; + } + }) + .build(); + server.start(); + + ManagedChannel channel = InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build(); + try { + BeamFnLoggingClient client = + BeamFnLoggingClient.createAndStart( + PipelineOptionsFactory.fromArgs( + new String[] { + "--defaultSdkHarnessLogLevel=OFF", + "--sdkHarnessLogLevelOverrides={\"ConfiguredLogger\": \"DEBUG\"}" + }) + .create(), + apiServiceDescriptor, + (Endpoints.ApiServiceDescriptor descriptor) -> channel); + + thrown.expectMessage("Logging stream terminated unexpectedly"); + client.terminationFuture().get(); + } finally { + // Verify that after termination, log levels are reset. + assertEquals(Level.INFO, LogManager.getLogManager().getLogger("").getLevel()); + assertNull(LogManager.getLogManager().getLogger("ConfiguredLogger").getLevel()); + + assertTrue(channel.isShutdown()); + server.shutdownNow(); + } + } }