Skip to content

Commit

Permalink
Ensure that the BeamFnLoggingClient terminates process if stream brea…
Browse files Browse the repository at this point in the history
…ks (#25186)

* Ensure that a failure of the logging stream in BeamFnLoggingClient
does not cause logging to block indefinitely but instead triggers
SDK teardown.

* fix test

* fix racy test

* address comments
  • Loading branch information
scwhittle committed Jun 21, 2023
1 parent f2240ff commit 5e942ae
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
* <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
* which waits for advancement of the phase if the {@link CallStreamObserver} is not ready. Creator
* is expected to advance the {@link Phaser} whenever the underlying {@link CallStreamObserver}
* becomes ready.
* becomes ready. If the {@link Phaser} is terminated, {@link DirectStreamObserver<T>.onNext(T)}
* will no longer wait for the {@link CallStreamObserver} to become ready.
*/
@ThreadSafe
public final class DirectStreamObserver<T> implements StreamObserver<T> {
Expand Down Expand Up @@ -70,37 +71,40 @@ public void onNext(T value) {
synchronized (lock) {
if (++numMessages >= maxMessagesBeforeCheck) {
numMessages = 0;
int waitTime = 1;
int totalTimeWaited = 0;
int waitSeconds = 1;
int totalSecondsWaited = 0;
int phase = phaser.getPhase();
// Record the initial phase in case we are in the inbound gRPC thread where the phase won't
// advance.
int initialPhase = phase;
while (!outboundObserver.isReady()) {
// A negative phase indicates that the phaser is terminated.
while (phase >= 0 && !outboundObserver.isReady()) {
try {
phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
phase = phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS);
} catch (TimeoutException e) {
totalTimeWaited += waitTime;
waitTime = waitTime * 2;
totalSecondsWaited += waitSeconds;
// Double the backoff for re-evaluating the isReady bit up to a maximum of once per
// minute. This bounds the waiting if the onReady callback is not called as expected.
waitSeconds = Math.min(waitSeconds * 2, 60);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
if (totalTimeWaited > 0) {
if (totalSecondsWaited > 0) {
// If the phase didn't change, this means that the installed onReady callback had not
// been invoked.
if (initialPhase == phase) {
LOG.info(
"Output channel stalled for {}s, outbound thread {}. See: "
+ "https://issues.apache.org/jira/browse/BEAM-4280 for the history for "
+ "this issue.",
totalTimeWaited,
totalSecondsWaited,
Thread.currentThread().getName());
} else {
LOG.debug(
"Output channel stalled for {}s, outbound thread {}.",
totalTimeWaited,
totalSecondsWaited,
Thread.currentThread().getName());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ManageLoggingClientAndService() {
.build();
server.start();
loggingClient =
new BeamFnLoggingClient(
BeamFnLoggingClient.createAndStart(
PipelineOptionsFactory.create(),
apiServiceDescriptor,
managedChannelFactory::forDescriptor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -228,7 +229,7 @@ public static void main(
// The logging client variable is not used per se, but during its lifetime (until close()) it
// intercepts logging and sends it to the logging service.
try (BeamFnLoggingClient logging =
new BeamFnLoggingClient(
BeamFnLoggingClient.createAndStart(
options, loggingApiServiceDescriptor, channelFactory::forDescriptor)) {
LOG.info("Fn Harness started");
// Register standard file systems.
Expand Down Expand Up @@ -353,11 +354,13 @@ private BeamFnApi.ProcessBundleDescriptor loadDescriptor(String id) {
outboundObserverFactory,
executorService,
handlers);
control.waitForTermination();
CompletableFuture.anyOf(control.terminationFuture(), logging.terminationFuture()).get();
if (beamFnStatusClient != null) {
beamFnStatusClient.close();
}
processBundleHandler.shutdown();
} catch (Exception e) {
System.out.println("Shutting down harness due to exception: " + e.toString());
} finally {
System.out.println("Shutting SDK harness down.");
executionStateSampler.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.util.EnumMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
Expand Down Expand Up @@ -139,8 +138,8 @@ public void onCompleted() {
}

/** This method blocks until the control stream has completed. */
public void waitForTermination() throws InterruptedException, ExecutionException {
onFinish.get();
public CompletableFuture<Object> terminationFuture() {
return onFinish;
}

public BeamFnApi.InstructionResponse delegateOnInstructionRequestType(
Expand Down
Loading

0 comments on commit 5e942ae

Please sign in to comment.