Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/Refactor GetData interfaces for Direct Path integration #31784

Merged
merged 8 commits into from
Jul 30, 2024

Conversation

m-trieu
Copy link
Contributor

@m-trieu m-trieu commented Jul 5, 2024

GetData/RefreshWork changes as part of breaking up #31504

Remove MetricTrackingWindmillServerStub and replace w/ GetDataClient and WorkRefreshClient
GetDataClient has 3 implementations Appliance, StreamingEngine, and Direct
WorkRefreshClient also has 3 implementations Appliance, StreamingEngine, and FanOut

R: @scwhittle


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

github-actions bot commented Jul 5, 2024

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for separating this out! It's a lot easier to review than mixed in with the rest of the direct path changes.

Nothing too major. Since I'm out next week feel free to ask someone else to finish up reviewing.


/** Returns a new {@link Work} instance with the same state and a different failure handler. */
public Work withFailureHandler(Runnable onFailed) {
return new Work(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert there was not already a failure handler? maybe just have a setfailurehandler method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropped this to add in a different PR

@@ -85,7 +88,7 @@ public static StreamingEngineWorkCommitter create(
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
if (!commitSenders.isShutdown()) {
if (isRunning.compareAndSet(false, true) && !commitSenders.isShutdown()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we call start multiple times?
seems simpler if we just fail on it called multiple times and remove if it is possible

Copy link
Contributor Author

@m-trieu m-trieu Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not
was just opting for this route to not use locks and to future proof incase changes are made in the future where someone intentionally or not calls start() multiple times. In this case start() and stop() logic are only executed once, no matter how many times called.

I don't think we should throw an exception since we will have to handle it in a caller or else crash, but if this is best can update thie code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for some things like this which we expect to just be started once and live for ever, it can be simpler to just prevent some behavior instead of trying to support it. If start can be called multiple times, one caller could stop while the other doesn't but hte end state is very different
start1/start2/stop1 -> stopped
start1/stop1/start2 -> started
So it seems safer to just prevent it since we only really expect these methods to be called by the main startup path and then in teardown for tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack updating

@@ -104,17 +116,17 @@ public long currentActiveCommitBytes() {

@Override
public void stop() {
if (!commitSenders.isTerminated()) {
if (isRunning.compareAndSet(true, false) && !commitSenders.isTerminated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, can we just enforce stop called once and after start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return;
}

for (Map.Entry<HeartbeatSender, Heartbeat> heartbeatToSend : heartbeats.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should jsut be a single sender in this case, could assert and simplify code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return () ->
getDataMetrics
.activeHeartbeats()
.getAndUpdate(existing -> Math.max(existing - numHeartbeats, 0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think you can get rid of the max now that we're no longer setting to 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Override
public int hashCode() {
return getDataStream.hashCode();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use Objects.hash(DirectHeartbeatSender.class, getDataStream);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* stream instance.
*/
@Internal
public final class DirectHeartbeatSender implements HeartbeatSender {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe FixedStreamHeartbeatSender/ SingleStreamHeartbeatSender ?

Since the code woudl work if the stream was direct or to the dispatcher

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/** Heartbeat requests and the work that was used to generate the heartbeat requests. */
@AutoValue
public abstract class Heartbeat {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this shoudl be Heartbeats or HeartbeatsBatch , something to indicate it is multiple.

Can we change the colleciton of work and requests to be immutable?
Since just created one place you could use some ImmutalbeMap builder there and change create to take those immutable maps.

Or could add some builder class for this that has immutable map builder inside of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Get heartbeat requests for computation's current active work, aggregated by GetDataStream
// to correctly fan-out the heartbeat requests.
Table<HeartbeatSender, RefreshableWork, HeartbeatRequest> heartbeats =
HeartbeatRequests.getRefreshableKeyHeartbeats(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be the only use of this and it doesn't really use the table, just iterates over the cells. Can this method instead be changed to add to the heartbeatsender -> heartbeats builder? (see other comment)

Seems like we're giong to have a lot of unneeded allocations to build the rows and tables instead of just building into what we want in the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @damondouglas added as fallback since no labels match configuration

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@@ -147,16 +150,25 @@ private static LatencyAttribution.Builder createLatencyAttributionWithActiveLate
return latencyAttribution;
}

public RefreshableWork refreshableView() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this? can it just auto-cast? or in the case where perhaps it isn't for streams just do it via a cast there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Instant::now,
Collections.emptyList());
}

private static GetDataClient createMockGetDataClient() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does mock(GetDataClient) not work?
ditto for other tests

if it doesn't for some reason, can we have a shared factory for this for tests?


ShardedKey getShardedKey();

boolean isRefreshable(Instant refreshDeadline);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the meaning of refreshDeadline isn't particularly clear. We could add a comment but I'm wondering if it makes more sense to have this determination outside of refreshablework class and instead when we are generating refreshablework.

Maybe should we just expose the start time or last refresh time and just compare when generating refreshablework?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

void shutdown();

/** Reflects that {@link #shutdown()} was explicitly called. */
boolean isShutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be removed? shutdown indicates that no further interaction with the stream should happen after shutdown. So if external callers are checking shutdown state before doing something it seems racy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/** Reflects that {@link #shutdown()} was explicitly called. */
boolean isShutdown();

Type streamType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed? I don't see any callers and
generally it is a code-smell because it means that if we add a type we have to update scattered usages of the type where if instead whatever callers are switching on the type of was part of the interface everything is grouped.

https://code-smells.com/object-orientation-abusers/switch-statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -94,7 +97,16 @@ public void start() {

@Override
public void commit(Commit commit) {
commitQueue.put(commit);
if (commit.work().isFailed() || !isRunning.get()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log should only be if !isRunning

the commit work could be failed due to heartbeat response and the log woudl be inaccurate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -85,7 +88,7 @@ public static StreamingEngineWorkCommitter create(
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
if (!commitSenders.isShutdown()) {
if (isRunning.compareAndSet(false, true) && !commitSenders.isShutdown()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for some things like this which we expect to just be started once and live for ever, it can be simpler to just prevent some behavior instead of trying to support it. If start can be called multiple times, one caller could stop while the other doesn't but hte end state is very different
start1/start2/stop1 -> stopped
start1/stop1/start2 -> started
So it seems safer to just prevent it since we only really expect these methods to be called by the main startup path and then in teardown for tests.

@m-trieu m-trieu force-pushed the mt-get-data branch 7 times, most recently from 34d960b to 15e982d Compare July 27, 2024 03:44
Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments but nothing major!

return Type.GET_WORKER_METADATA;
interface GetWorkerMetadataStream extends WindmillStream {}

class WindmillStreamShutdownException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be moved into AbstractWindmillStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@scwhittle scwhittle merged commit 570f2f8 into apache:master Jul 30, 2024
17 checks passed
}
}

sendHeartbeatSafely(firstHeartbeat);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

firstHeartbeat here will be null, if heartbeatsBySender is empty

I see exceptions like

java.lang.NullPointerException: Cannot invoke "java.util.Map$Entry.getValue()" because "heartbeat" is null
	at org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher.sendHeartbeatSafely(ActiveWorkRefresher.java:182)
	at org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher.refreshActiveWork(ActiveWorkRefresher.java:149)
	at org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher.lambda$start$0(ActiveWorkRefresher.java:94)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops we lost the empty case in refactoring, sending PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sgtm! I'll approve

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants