Skip to content

Commit

Permalink
add more visibility via debug capture pages
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Jun 26, 2024
1 parent 519435e commit 95a7f2e
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,15 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
writer.println(
"<tr><th>Key</th><th>Token</th><th>Queued</th><th>Active For</th><th>State</th><th>State Active For</th></tr>");
"<tr>"
+ "<th>Key</th>"
+ "<th>Token</th>"
+ "<th>Queued</th>"
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Backend Worker Token</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
int commitsPendingCount = 0;
Expand All @@ -353,6 +361,8 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
activeWorkStatus.append(activeWork.getState());
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(elapsedString(activeWork.getStateStartTime(), now));
activeWorkStatus.append("</td><td>");
activeWorkStatus.append(activeWork.backendWorkerToken());
activeWorkStatus.append("</td></tr>\n");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public static ProcessingContext createProcessingContext(
BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse> getKeyedDataFn,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return ProcessingContext.create(computationId, getKeyedDataFn, workCommitter, heartbeatSender);
return ProcessingContext.create(computationId, getKeyedDataFn, workCommitter, heartbeatSender)
.build();
}

private static LatencyAttribution.Builder createLatencyAttributionWithActiveLatencyBreakdown(
Expand Down Expand Up @@ -344,6 +345,10 @@ public boolean isFailed() {
return isFailed;
}

public String backendWorkerToken() {
return processingContext.backendWorkerToken();
}

boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
return currentState.state() == Work.State.COMMITTING
&& currentState.startTime().isBefore(stuckCommitDeadline);
Expand Down Expand Up @@ -435,21 +440,24 @@ private boolean isCommitPending() {

@AutoValue
public abstract static class ProcessingContext {
private static final String UNKNOWN_BACKEND_WORKER_TOKEN = "UNKNOWN";

private static ProcessingContext create(
private static ProcessingContext.Builder create(
String computationId,
BiFunction<String, KeyedGetDataRequest, KeyedGetDataResponse> getKeyedDataFn,
Consumer<Commit> workCommitter,
HeartbeatSender heartbeatSender) {
return new AutoValue_Work_ProcessingContext.Builder()
.setBackendWorkerToken(UNKNOWN_BACKEND_WORKER_TOKEN)
.setComputationId(computationId)
.setHeartbeatSender(heartbeatSender)
.setWorkCommitter(workCommitter)
.setKeyedDataFetcher(
request -> Optional.ofNullable(getKeyedDataFn.apply(computationId, request)))
.build();
request -> Optional.ofNullable(getKeyedDataFn.apply(computationId, request)));
}

abstract String backendWorkerToken();

/** Computation that the {@link Work} belongs to. */
public abstract String computationId();

Expand All @@ -465,10 +473,12 @@ private static ProcessingContext create(

public abstract HeartbeatSender heartbeatSender();

abstract Builder toBuilder();
public abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
public abstract static class Builder {
public abstract Builder setBackendWorkerToken(String value);

abstract Builder setComputationId(String value);

abstract Builder setKeyedDataFetcher(
Expand All @@ -478,7 +488,7 @@ abstract Builder setKeyedDataFetcher(

abstract Builder setHeartbeatSender(HeartbeatSender value);

abstract ProcessingContext build();
public abstract ProcessingContext build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ public void start(DataflowWorkerHarnessOptions options) {
statusPages.addStatusDataProvider(
"exception", "Last Exception", new LastExceptionDataProvider());
statusPages.addStatusDataProvider("cache", "State Cache", stateCache);
statusPages.addStatusDataProvider("activeGetWorkBudget", "Active GetWork Budget", writer -> {});
statusPages.addStatusDataProvider(
"activeGetWorkBudget",
"Active GetWork Budget",
writer ->
writer.format("<b>%s</b>", computationStateCache.totalCurrentActiveGetWorkBudget()));

if (isStreamingEngine()) {
addStreamingEngineStatusPages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStream.class);
protected final AtomicBoolean clientClosed;
private final String streamId;
protected final WindmillStream.Id streamId;
private final AtomicLong lastSendTimeMs;
private final Executor executor;
private final ExecutorService sendExecutor;
private final ExecutorService requestSender;
private final BackOff backoff;
private final AtomicLong startTimeMs;
private final AtomicLong lastResponseTimeMs;
Expand All @@ -97,11 +97,11 @@ protected AbstractWindmillStream(
int logEveryNStreamFailures,
String backendWorkerToken) {
this.streamId =
WindmillStream.Id.create(this, backendWorkerToken, backendWorkerToken.isEmpty()).toString();
WindmillStream.Id.create(this, backendWorkerToken, !backendWorkerToken.isEmpty());
this.executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(streamId + "-thread").build());
this.sendExecutor =
this.requestSender =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
Expand All @@ -128,11 +128,8 @@ protected AbstractWindmillStream(
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()));
}

private static long debugDuration(long nowMs, long startMs) {
if (startMs <= 0) {
return -1;
}
return Math.max(0, nowMs - startMs);
private static String debugDuration(long nowMs, long startMs) {
return (startMs <= 0 ? -1 : Math.max(0, nowMs - startMs)) + "ms";
}

/** Called on each response from the server. */
Expand All @@ -152,14 +149,15 @@ private static long debugDuration(long nowMs, long startMs) {
protected abstract void startThrottleTimer();

/** Send a request to the server. */
@SuppressWarnings("FutureReturnValueIgnored")
protected final void send(RequestT request) {
lastSendTimeMs.set(Instant.now().getMillis());
synchronized (this) {
if (streamClosed.get()) {
throw new IllegalStateException("Send called on a client closed stream.");
}

sendExecutor.submit(
requestSender.submit(
() -> {
if (isClosed()) {
return;
Expand Down Expand Up @@ -225,33 +223,65 @@ public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) {

protected abstract void sendHealthCheck();

// Care is taken that synchronization on this is unnecessary for all status page information.
// Blocking sends are made beneath this stream object's lock which could block status page
// rendering.
/**
* @implNote Care is taken that synchronization on this is unnecessary for all status page
* information.Blocking sends are made beneath this stream object's lock which could block
* status page rendering.
*/
public final void appendSummaryHtml(PrintWriter writer) {
writer.format("<h3>%s</h3>:", streamId);
appendSpecificHtml(writer);
writer.format("id: %s; ", streamId);
writer.println("<strong>Status:</strong>");
writer.println(
"<table border=\"1\" "
+ "style=\"border-collapse:collapse;padding:5px;border-spacing:5px;border:1px\">");
writer.println(
"<tr>"
+ "<th>Error Count</th>"
+ "<th>Last Error</th>"
+ "<th>Last Error Received Time</th>"
+ "<th>Is Client Closed</th>"
+ "<th>Is Closed</th>"
+ "<th>BackOff Remaining</th>"
+ "<th>Current Stream Age Millis</th>"
+ "<th>Last Request Sent Time</th>"
+ "<th>Last Received Response Time</th>"
+ "</tr>");

StringBuilder statusString = new StringBuilder();
statusString.append("<tr>");
statusString.append("<td>");
if (errorCount.get() > 0) {
writer.format(
", %d errors, last error [ %s ] at [%s]",
errorCount.get(), lastError.get(), lastErrorTime.get());
}
if (clientClosed.get()) {
writer.write(", client closed");
statusString.append(errorCount.get());
statusString.append("</td><td>");
statusString.append(lastError.get());
statusString.append("</td><td>");
statusString.append(lastErrorTime.get());
} else {
statusString.append(0);
statusString.append("</td><td>");
statusString.append("N/A");
statusString.append("</td><td>");
statusString.append("N/A");
}
statusString.append("</td><td>");

writer.format(", isClosed=[%s]", isClosed());
statusString.append(clientClosed.get());
statusString.append("</td><td>");
statusString.append(isClosed());
statusString.append("</td><td>");
long nowMs = Instant.now().getMillis();
long sleepLeft = sleepUntil.get() - nowMs;
if (sleepLeft > 0) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
streamClosed.get());
statusString.append(Math.max(sleepLeft, 0));
statusString.append("</td><td>");
statusString.append(debugDuration(nowMs, startTimeMs.get()));
statusString.append("</td><td>");
statusString.append(debugDuration(nowMs, lastSendTimeMs.get()));
statusString.append("</td><td>");
statusString.append(debugDuration(nowMs, lastResponseTimeMs.get()));
statusString.append("</td></tr>\n");
writer.print(statusString);
writer.println("</table>");
}

// Don't require synchronization on stream, see the appendSummaryHtml comment.
Expand Down Expand Up @@ -354,7 +384,7 @@ private void onStreamFinished(@Nullable Throwable t) {
if (clientClosed.get() && !hasPendingRequests()) {
streamRegistry.remove(AbstractWindmillStream.this);
finishLatch.countDown();
sendExecutor.shutdownNow();
requestSender.shutdownNow();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,20 +111,25 @@ interface GetWorkerMetadataStream extends WindmillStream {}

@AutoValue
abstract class Id {
public static Id create(WindmillStream stream, String backendWorkerToken, boolean isDirect) {
private static final String GET_WORK_STREAM_TYPE = "GetWork";
private static final String GET_DATA_STREAM_TYPE = "GetData";
private static final String GET_WORKER_METADATA_STREAM_TYPE = "GetWorkerMetadata";
private static final String COMMIT_WORK_STREAM_TYPE = "CommitWork";

static Id create(WindmillStream stream, String backendWorkerToken, boolean isDirect) {
return new AutoValue_WindmillStream_Id(
Id.getStreamType(stream), backendWorkerToken, isDirect);
}

private static String getStreamType(WindmillStream windmillStream) {
if (windmillStream instanceof GetWorkStream) {
return "GetWork-";
return GET_WORK_STREAM_TYPE;
} else if (windmillStream instanceof GetWorkerMetadataStream) {
return "GetWorkerMetadata-";
return GET_WORKER_METADATA_STREAM_TYPE;
} else if (windmillStream instanceof GetDataStream) {
return "GetData-";
return GET_DATA_STREAM_TYPE;
} else if (windmillStream instanceof CommitWorkStream) {
return "CommitWork-";
return COMMIT_WORK_STREAM_TYPE;
}

// Should not happen conditions above are exhaustive.
Expand All @@ -133,17 +138,17 @@ private static String getStreamType(WindmillStream windmillStream) {

abstract String streamType();

abstract String backendWorkerToken();
public abstract String backendWorkerToken();

abstract boolean isDirect();

@Override
public String toString() {
public final String toString() {
return String.format(
"[%s]-[%s]-[%s]",
streamType(),
isDirect() ? "direct" : "dispatched",
backendWorkerToken().isEmpty() ? "no_worker_token" : backendWorkerToken());
isDirect() ? "Direct" : "Dispatched",
backendWorkerToken().isEmpty() ? "Unknown" : backendWorkerToken());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkStreamTimingInfo.Event;
Expand Down Expand Up @@ -170,20 +171,41 @@ List<LatencyAttribution> getLatencyAttributions() {
return latencyAttributions;
}

Map<State, SumAndMaxDurations> getAggregatedGetWorkStreamLatencies() {
return Collections.unmodifiableMap(aggregatedGetWorkStreamLatencies);
}

Instant getWorkItemCreationEndTime() {
return workItemCreationEndTime;
}

Instant getWorkItemLastChunkReceivedByWorkerTime() {
return workItemLastChunkReceivedByWorkerTime;
}

Optional<LatencyAttribution> getWorkItemCreationLatency() {
return Optional.ofNullable(workItemCreationLatency);
}

void reset() {
this.aggregatedGetWorkStreamLatencies.clear();
this.workItemCreationEndTime = Instant.EPOCH;
this.workItemLastChunkReceivedByWorkerTime = Instant.EPOCH;
this.workItemCreationLatency = null;
}

private static class SumAndMaxDurations {
static class SumAndMaxDurations {
private Duration sum;
private Duration max;

private SumAndMaxDurations(Duration sum, Duration max) {
this.sum = sum;
this.max = max;
}

@Override
public String toString() {
return "Latency{" + "sum=" + sum + ", max=" + max + '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public static GrpcCommitWorkStream create(

@Override
public void appendSpecificHtml(PrintWriter writer) {
writer.format("CommitWorkStream: %d pending", pendingRequests.size());
writer.format("<br>%d Pending Requests<br>", pendingRequests.size());
}

@Override
Expand Down
Loading

0 comments on commit 95a7f2e

Please sign in to comment.