-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Integrate direct path/fan out logic #31504
Conversation
f6cf353
to
e41988f
Compare
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
R: @scwhittle |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
...src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
Show resolved
Hide resolved
@@ -49,6 +56,8 @@ | |||
"nullness" // TODO(https://github.com/apache/beam/issues/20497) | |||
}) | |||
public class MetricTrackingWindmillServerStub { | |||
private static final Logger LOG = LoggerFactory.getLogger(MetricTrackingWindmillServerStub.class); | |||
private static final String FAN_OUT_REFRESH_WORK_EXECUTOR = "FanOutActiveWorkRefreshExecutor"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
append NAME to variable name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
Show resolved
Hide resolved
worker.start(); | ||
} catch (Throwable e) { | ||
LOG.error("Harness shutting down due to uncaught exception.", e); | ||
worker.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just throw the exception instead of trying to tear-down cleanly?
As is the process exit status will be ok since the exception is being caught, and stop() could get stuck somehow and we'd rather just teardown possibly uncleanly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { | ||
boolean isIpV6Enabled = options.getDataflowServiceOptions().contains(ENABLE_IPV6_EXPERIMENT); | ||
if (options.isEnableWindmillServiceDirectPath() && !isIpV6Enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log only if isEnableStreamingEngine is also set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
long secondsWaited = 0; | ||
long waitFor = 10; | ||
while (!batchSent) { | ||
if (isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by polling we could add latency in cases where the threads are tied up waiting for reads and thus new work is not processed.
Do we need some finally (batch.countDown()) above to handle exceptions in sendBatch if the stream is closed? In that case it seems like it woudl unspool without needing this isClosed check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
messaged offline
* @implNote Used to create stream observers for direct path streams that do not share any | ||
* underlying resources between threads. | ||
*/ | ||
private StreamObserverFactory newSimpleStreamObserverFactory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we may still want the isReady checking, as otherwise if outgoing sending is falling behind it just buffers more and more in grpc output stream buffer and can oom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotcha we may want to bump the deadline then
"Exceeded timeout waiting for the outboundObserver to become ready meaning "
+ "that the stream deadline was not respected."
was getting a lot of these errors in testing.
reverted
@@ -209,11 +261,29 @@ public GetWorkerMetadataStream createGetWorkerMetadataStream( | |||
onNewWindmillEndpoints); | |||
} | |||
|
|||
private StreamObserverFactory newStreamObserverFactory() { | |||
private StreamObserverFactory newBufferringStreamObserverFactory() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is confusing name since it doesn't buffer beyond what grpc stream internally does. It just respects grpc flow control which it seems like we'd want to do in the direct path case as well.
There is a different BufferingStreamObserver for fnapi which does do buffering with a queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
.setGlobalDataStreams( | ||
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints())) | ||
.build(); | ||
|
||
closeStaleStreams.join(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does building the connection state take any time? Or is there a reasno we can't move this join to after updating connections and the budget refresh below so we start using hte new streams earlier?
Otherwise it seems simpler to just keep the closeStaleStreamsAndCreateNewStreams which internally does them in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point moved
.overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()), | ||
windmillServiceRpcChannelTimeoutSec) | ||
AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress) { | ||
return AltsChannelBuilder.forAddress( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still want the withDefaultChannelOptions
Probably want the flow control window and ssl context from createRemoteChannel unless we've tested otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted
de5266c
to
080a511
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry meant to send these yesterday but they were still pending
} | ||
|
||
/** Returns whether an exception was caused by a {@link WindmillStreamClosedException}. */ | ||
public static boolean isWindmillStreamCancelledException(Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update method name, actually since this is static and nested under class name maybe the method should just be wasCausedOf so it looks like:
WindmillStreamClosedException.wasCauseOf(e);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
int stuckCommitDurationMillis = | ||
windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0 | ||
? options.getStuckCommitDurationMillis() | ||
: 0; | ||
if (isDirectPathPipeline(options)) { | ||
this.streamingEngineClient = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For SE non-direct path, can we still use StreamingEngineClient but just hard-code the single dispatcher as the worker endpoint? Seems like it would allow us to remove streamingDispatchLoop and other duplication and also seems like we want that to work if we need to fallback from non direct path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can
We can probably just inject the endpoints manually
might be better to do in a separate PR since we would have to change the way we test in StreamingDataflowWorker as that uses FakeWindmillServer and StreamingEngineClient does not.
keyedRequest -> | ||
"KeyedGetState=[" | ||
+ "key=" | ||
+ keyedRequest.getKey() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid logging customer keys, they might be PII and they can also be large
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But also not sure it's worth having all the information in the exception, each key can lot itself when it's read finishes with the error instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
// Stream has been explicitly closed. Drain pending input streams and request batches. | ||
// Future calls to send RPCs will fail. | ||
pending.values().forEach(AppendableInputStream::cancel); | ||
pending.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clearing pending here differs from cancellation in onNewStream and I think doing so could lead to verify errors in onResponse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we receive a response after the stream is explictly closed what should we do? just drop it correct as that data/state would be invalid (except maybe side input state?)
@@ -308,7 +364,12 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon | |||
} | |||
} | |||
|
|||
private void queueRequestAndWait(QueuedRequest request) throws InterruptedException { | |||
private void tryQueueRequestAndWaitForSend(QueuedRequest request) throws InterruptedException { | |||
if (isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move beneath syncrhonized blcok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reverted this code
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
.../src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java
Show resolved
Hide resolved
sender.sendHeartbeats(heartbeat.getValue()); | ||
} | ||
|
||
// There are multiple destinations to send heartbeat requests. Fan out requests in parallel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think
else {
// comment
is easier to read
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public interface HeartbeatSender { | ||
void sendHeartbeats(Map<String, List<Windmill.HeartbeatRequest>> heartbeats); | ||
|
||
default boolean isInvalid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove this and just handle it internally to the stream?
In either case it's racy checking this and then sending on stream so the stream will have to handle it anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wanted to find a way to mark the work as failed for the stream since in this case we will skip, then still try to process the work
will think about another way to propagate this information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added late binding of an onStreamClosed handler for DirectHeartbeatSender
2ba55f7
to
5d7cfc2
Compare
95a7f2e
to
bca34e1
Compare
fix get data stream
…ce the stream is shutdown, no more interactions should be performed on the stream.
bca34e1
to
50b5da6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sending before looking at tests
} | ||
|
||
/** | ||
* Attempts to refresh active work, fanning out to each {@link GetDataStream} in parallel. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment based upon heartbeat sender
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
heartbeatStreamPool.releaseStream(stream); | ||
} | ||
if (heartbeats.size() == 1) { | ||
// There is 1 destination to send heartbeat requests. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not setting active heartbeats to one here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
activeHeartbeats.getAndUpdate(existing -> existing + heartbeat.getValue().size()); | ||
HeartbeatSender sender = heartbeat.getKey(); | ||
Map<String, List<HeartbeatRequest>> heartbeatRequests = heartbeat.getValue(); | ||
sender.sendHeartbeats(heartbeatRequests); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as follow up might be nice if this returned a completable future instead of blocking, then we wouldn't have to have a thread blocked per-destination for heartbeats
Don't do in this PR, it's big enough. Could put todo for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.orElse(false); | ||
if (options.isEnableStreamingEngine() | ||
&& options.getIsWindmillServiceDirectPathEnabled() | ||
&& !isIpV6Enabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about structuring to avoid duplication of conditions. Seems less brittle since prevents accidents like just modifying one of the conditions.
if (options.isEnableStreamingEngine()
&& options.getIsWindmillServiceDirectPathEnabled()) {
if (isIpV6Enabled) {
return true;
}
LOG.warn(...);
}
return false;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
dispatchThread.join(); | ||
workCommitter.stop(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't add comment at the line below, but can you rename dispatchLoop to applianceDispatchLoop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) { | ||
while (true) { | ||
// Handle stream closure during loop. | ||
if (isShutdown() || isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to other comments, I'd remove this since queueReuqstAndWait already needs to handle it and having another check here is just more possible codepaths.
|
||
@Override | ||
public void sendHeartbeats(Map<String, List<HeartbeatRequest>> heartbeats) { | ||
if (getDataStream.isShutdown() || getDataStream.isClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whe have shutdown||closed in the other file too, can we just always close if we shutdown and simplify these?
} | ||
|
||
public HeartbeatSender withStreamClosedHandler(Runnable onStreamClosed) { | ||
return new DirectHeartbeatSender(getDataStream, onStreamClosed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think this shoudl fail if one was already set (but see other comment, I think that we can have this noticed more from the top to avoid instance of checks)
/** Interface for sending heartbeats. */ | ||
@FunctionalInterface | ||
public interface HeartbeatSender { | ||
void sendHeartbeats(Map<String, List<Windmill.HeartbeatRequest>> heartbeats); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** StreamingEngine stream pool based implementation of {@link HeartbeatSender}. */ | ||
@Internal | ||
public final class PoolBackedHeartbeatSender implements HeartbeatSender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamPoolHeartbeatSender?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
Integrate direct path/fan out logic
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.