Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate direct path/fan out logic #31504

Closed
wants to merge 13 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
Expand Down Expand Up @@ -221,13 +222,11 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions {

void setWindmillServiceStreamMaxBackoffMillis(int value);

@Description(
"If true, Dataflow streaming pipeline will be running in direct path mode."
+ " VMs must have IPv6 enabled for this to work.")
@Default.Boolean(false)
@Description("Enables direct path mode for streaming engine.")
@Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();
scwhittle marked this conversation as resolved.
Show resolved Hide resolved

void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled);
void setIsWindmillServiceDirectPathEnabled(boolean value);

/**
* Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for
Expand Down Expand Up @@ -302,4 +301,12 @@ public Integer create(PipelineOptions options) {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 1;
}
}

/** EnableStreamingEngine defaults to false unless one of the two experiments is set. */
class EnableWindmillServiceDirectPathFactory implements DefaultValueFactory<Boolean> {
@Override
public Boolean create(PipelineOptions options) {
return ExperimentalOptions.hasExperiment(options, "enable_windmill_service_direct_path");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@
package org.apache.beam.runners.dataflow.worker;

import com.google.auto.value.AutoBuilder;
import com.google.common.collect.Iterables;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamClosedException;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.SettableFuture;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Wrapper around a {@link WindmillServerStub} that tracks metrics for the number of in-flight
Expand All @@ -49,80 +57,51 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class MetricTrackingWindmillServerStub {
private static final Logger LOG = LoggerFactory.getLogger(MetricTrackingWindmillServerStub.class);
private static final String FAN_OUT_REFRESH_WORK_EXECUTOR_NAME =
"FanOutActiveWorkRefreshExecutor";

private static final int MAX_READS_PER_BATCH = 60;
private static final int MAX_ACTIVE_READS = 10;
private static final Duration STREAM_TIMEOUT = Duration.standardSeconds(30);
private final AtomicInteger activeSideInputs = new AtomicInteger();
private final AtomicInteger activeStateReads = new AtomicInteger();
private final AtomicInteger activeHeartbeats = new AtomicInteger();
private final WindmillServerStub server;
private final MemoryMonitor gcThrashingMonitor;
private final boolean useStreamingRequests;

private final WindmillStreamPool<GetDataStream> getDataStreamPool;

// This may be the same instance as getDataStreamPool based upon options.
private final WindmillStreamPool<GetDataStream> heartbeatStreamPool;
private final @Nullable WindmillStreamPool<GetDataStream> getDataStreamPool;
private final ExecutorService fanOutActiveWorkRefreshExecutor;

@GuardedBy("this")
private final List<ReadBatch> pendingReadBatches;

@GuardedBy("this")
private int activeReadThreads = 0;

@Internal
@AutoBuilder(ofClass = MetricTrackingWindmillServerStub.class)
public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract Builder setUseSeparateHeartbeatStreams(boolean useSeparateHeartbeatStreams);

abstract Builder setNumGetDataStreams(int numGetDataStreams);

abstract MetricTrackingWindmillServerStub build();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
return new AutoBuilder_MetricTrackingWindmillServerStub_Builder()
.setServer(server)
.setGcThrashingMonitor(gcThrashingMonitor)
.setUseStreamingRequests(false)
.setUseSeparateHeartbeatStreams(false)
.setNumGetDataStreams(1);
}

MetricTrackingWindmillServerStub(
WindmillServerStub server,
MemoryMonitor gcThrashingMonitor,
boolean useStreamingRequests,
boolean useSeparateHeartbeatStreams,
int numGetDataStreams) {
@Nullable WindmillStreamPool<GetDataStream> getDataStreamPool) {
this.server = server;
this.gcThrashingMonitor = gcThrashingMonitor;
this.useStreamingRequests = useStreamingRequests;
if (useStreamingRequests) {
getDataStreamPool =
WindmillStreamPool.create(
Math.max(1, numGetDataStreams), STREAM_TIMEOUT, this.server::getDataStream);
if (useSeparateHeartbeatStreams) {
heartbeatStreamPool =
WindmillStreamPool.create(1, STREAM_TIMEOUT, this.server::getDataStream);
} else {
heartbeatStreamPool = getDataStreamPool;
}
} else {
getDataStreamPool = heartbeatStreamPool = null;
}
this.fanOutActiveWorkRefreshExecutor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat(FAN_OUT_REFRESH_WORK_EXECUTOR_NAME).build());
this.getDataStreamPool = getDataStreamPool;
// This is used as a queue but is expected to be less than 10 batches.
this.pendingReadBatches = new ArrayList<>();
}

public static Builder builder(WindmillServerStub server, MemoryMonitor gcThrashingMonitor) {
return new AutoBuilder_MetricTrackingWindmillServerStub_Builder()
.setServer(server)
.setGcThrashingMonitor(gcThrashingMonitor)
.setUseStreamingRequests(false)
.setGetDataStreamPool(null);
}

// Adds the entry to a read batch for sending to the windmill server. If a non-null batch is
// returned, this thread will be responsible for sending the batch and should wait for the batch
// startRead to be notified.
Expand Down Expand Up @@ -254,6 +233,27 @@ public Windmill.KeyedGetDataResponse getStateData(
}
}

public Windmill.KeyedGetDataResponse getStateData(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is now a little confusing, it is named stub and takes a single stub which is to a particular endpoint. But by exposing these methods we may be using a stream to another unrelated endpoint. I think that it would be better to extract the blocking for resources and metric tracking to a separate object that the MetricTrackingWindmilServerStub and the StreamingEngineClient could both use.

Something where you can get closables to track something would be convenient.
I think you could have something like

AutoClosable enterGetData() {
waitfor resources, increment counter, return closable that decrements
}

and then caller could do
try (enterGetData()) {
stream.getDataStream(...);
}

Perhaps that refactoring could be done in a separate PR which we can submit before this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm might be helpful to include the work provider implementations we were talking about in a PR as well

then we don't need a nullable heartbeatSender and work committer and can hide those in the impls

GetDataStream getDataStream, String computation, Windmill.KeyedGetDataRequest request) {
gcThrashingMonitor.waitForResources("GetStateData");
if (getDataStream.isShutdown()) {
throw new WorkItemCancelledException(request.getShardingKey());
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
}

try {
activeStateReads.getAndIncrement();
return getDataStream.requestKeyedData(computation, request);
} catch (Exception e) {
if (WindmillStreamClosedException.wasCauseOf(e)) {
LOG.debug("Tried to fetch keyed data from a closed stream. Work has been cancelled.", e);
throw new WorkItemCancelledException(request.getShardingKey());
}
throw new RuntimeException(e);
} finally {
activeStateReads.getAndDecrement();
}
}

public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request) {
gcThrashingMonitor.waitForResources("GetSideInputData");
activeSideInputs.getAndIncrement();
Expand All @@ -278,47 +278,80 @@ public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request)
}
}

/** Tells windmill processing is ongoing for the given keys. */
public void refreshActiveWork(Map<String, List<HeartbeatRequest>> heartbeats) {
public Windmill.GlobalData getSideInputData(
GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
gcThrashingMonitor.waitForResources("GetSideInputData");
activeSideInputs.getAndIncrement();
try {
return getDataStream.requestGlobalData(request);
} catch (Exception e) {
scwhittle marked this conversation as resolved.
Show resolved Hide resolved
if (WindmillStreamClosedException.wasCauseOf(e)) {
LOG.error("Tried to fetch global data from a closed stream. Work has been cancelled", e);
throw new WorkItemCancelledException("Failed to get side input.", e);
}
throw new RuntimeException("Failed to get side input: ", e);
} finally {
activeSideInputs.getAndDecrement();
}
}

/**
* Attempts to refresh active work, fanning out to each {@link GetDataStream} in parallel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update comment based upon heartbeat sender

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* @implNote Skips closed {@link GetDataStream}(s).
*/
public void refreshActiveWork(
Map<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeats) {
if (heartbeats.isEmpty()) {
return;
}
activeHeartbeats.set(heartbeats.size());

try {
if (useStreamingRequests) {
GetDataStream stream = heartbeatStreamPool.getStream();
try {
stream.refreshActiveWork(heartbeats);
} finally {
heartbeatStreamPool.releaseStream(stream);
}
if (heartbeats.size() == 1) {
// There is 1 destination to send heartbeat requests.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not setting active heartbeats to one here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeat =
Iterables.getOnlyElement(heartbeats.entrySet());
HeartbeatSender sender = heartbeat.getKey();
sender.sendHeartbeats(heartbeat.getValue());
} else {
// This code path is only used by appliance which sends heartbeats (used to refresh active
// work) as KeyedGetDataRequests. So we must translate the HeartbeatRequest to a
// KeyedGetDataRequest here regardless of the value of sendKeyedGetDataRequests.
Windmill.GetDataRequest.Builder builder = Windmill.GetDataRequest.newBuilder();
for (Map.Entry<String, List<HeartbeatRequest>> entry : heartbeats.entrySet()) {
Windmill.ComputationGetDataRequest.Builder perComputationBuilder =
Windmill.ComputationGetDataRequest.newBuilder();
perComputationBuilder.setComputationId(entry.getKey());
for (HeartbeatRequest request : entry.getValue()) {
perComputationBuilder.addRequests(
Windmill.KeyedGetDataRequest.newBuilder()
.setShardingKey(request.getShardingKey())
.setWorkToken(request.getWorkToken())
.setCacheToken(request.getCacheToken())
.addAllLatencyAttribution(request.getLatencyAttributionList())
.build());
}
builder.addRequests(perComputationBuilder.build());
}
server.getData(builder.build());
// There are multiple destinations to send heartbeat requests. Fan out requests in parallel.
refreshActiveWorkWithFanOut(heartbeats);
}
} finally {
activeHeartbeats.set(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe goes away with the refactoring, but this set(0) seems like it will hide leaked threads etc. It would be better to have the try/finally scoped just around various increments and just decrement the matching amount in finally. No max needed either then.

}
}

private void refreshActiveWorkWithFanOut(
Map<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeats) {
List<CompletableFuture<Void>> fanOutRefreshActiveWork = new ArrayList<>();
for (Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeat :
heartbeats.entrySet()) {
fanOutRefreshActiveWork.add(sendHeartbeatOnStreamFuture(heartbeat));
}

// Don't block until we kick off all the refresh active work RPCs.
@SuppressWarnings("rawtypes")
CompletableFuture<Void> parallelFanOutRefreshActiveWork =
CompletableFuture.allOf(fanOutRefreshActiveWork.toArray(new CompletableFuture[0]));
parallelFanOutRefreshActiveWork.join();
}

private CompletableFuture<Void> sendHeartbeatOnStreamFuture(
Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeat) {
return CompletableFuture.runAsync(
() -> {
activeHeartbeats.getAndUpdate(existing -> existing + heartbeat.getValue().size());
HeartbeatSender sender = heartbeat.getKey();
Map<String, List<HeartbeatRequest>> heartbeatRequests = heartbeat.getValue();
sender.sendHeartbeats(heartbeatRequests);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as follow up might be nice if this returned a completable future instead of blocking, then we wouldn't have to have a thread blocked per-destination for heartbeats

Don't do in this PR, it's big enough. Could put todo for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Active heartbeats should never drop below 0.
activeHeartbeats.getAndUpdate(
existing -> Math.max(existing - heartbeat.getValue().size(), 0));
},
fanOutActiveWorkRefreshExecutor);
}

public void printHtml(PrintWriter writer) {
writer.println("Active Fetches:");
writer.println(" Side Inputs: " + activeSideInputs.get());
Expand All @@ -332,6 +365,22 @@ public void printHtml(PrintWriter writer) {
writer.println("Heartbeat Keys Active: " + activeHeartbeats.get());
}

@Internal
@AutoBuilder(ofClass = MetricTrackingWindmillServerStub.class)
public abstract static class Builder {

abstract Builder setServer(WindmillServerStub server);

abstract Builder setGcThrashingMonitor(MemoryMonitor gcThrashingMonitor);

abstract Builder setGetDataStreamPool(
@Nullable WindmillStreamPool<GetDataStream> getDataStreamPool);

abstract Builder setUseStreamingRequests(boolean useStreamingRequests);

abstract MetricTrackingWindmillServerStub build();
}

private static final class ReadBatch {
ArrayList<QueueEntry> reads = new ArrayList<>();
SettableFuture<Boolean> startRead = SettableFuture.create();
Expand Down
Loading
Loading