Skip to content

Commit

Permalink
Memoize some objects for timer processing to reduce overhead. (#16207)
Browse files Browse the repository at this point in the history
  • Loading branch information
scwhittle authored Feb 25, 2022
1 parent 47ab260 commit c75807c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;

Expand Down Expand Up @@ -64,12 +65,16 @@ public Map<String, ByteString> getExecutionTimeMonitoringData(ShortIdMap shortId
for (SimpleExecutionState state : executionStates) {
if (state.getTotalMillis() != 0) {
String shortId = state.getTotalMillisShortId(shortIds);
if (result.containsKey(shortId)) {
// This can happen due to flatten unzipping.
result.put(shortId, state.mergeTotalMillisPayload(result.get(shortId)));
} else {
result.put(shortId, state.getTotalMillisPayload());
}
result.compute(
shortId,
(String k, @Nullable ByteString existing) -> {
if (existing != null) {
// This can happen due to flatten unzipping.
return state.mergeTotalMillisPayload(existing);
} else {
return state.getTotalMillisPayload();
}
});
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
context.getWindowingStrategies(),
context::addStartBundleFunction,
context::addFinishBundleFunction,
context::addResetFunction,
context::addTearDownFunction,
context::getPCollectionConsumer,
(pCollectionId, consumer, valueCoder) ->
Expand Down Expand Up @@ -242,6 +243,7 @@ static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT,
private final OnTimerContext<?> onTimerContext;
private final OnWindowExpirationContext<?> onWindowExpirationContext;
private final FinishBundleArgumentProvider finishBundleArgumentProvider;
private final Duration allowedLateness;

/**
* Used to guarantee a consistent view of this {@link FnApiDoFnRunner} while setting up for {@link
Expand Down Expand Up @@ -344,6 +346,7 @@ private interface TriFunction<FirstT, SecondT, ThirdT> {
Map<String, RunnerApi.WindowingStrategy> windowingStrategies,
Consumer<ThrowingRunnable> addStartFunction,
Consumer<ThrowingRunnable> addFinishFunction,
Consumer<ThrowingRunnable> addResetFunction,
Consumer<ThrowingRunnable> addTearDownFunction,
Function<String, FnDataReceiver<WindowedValue<?>>> getPCollectionConsumer,
TriFunction<String, FnDataReceiver<WindowedValue<?>>, Coder<?>> addPCollectionConsumer,
Expand Down Expand Up @@ -457,6 +460,13 @@ private interface TriFunction<FirstT, SecondT, ThirdT> {
}
timerFamilyInfos = timerFamilyInfosBuilder.build();

this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
this.allowedLateness =
rehydratedComponents
.getPCollection(pTransform.getInputsOrThrow(mainInputId))
.getWindowingStrategy()
.getAllowedLateness();

} catch (IOException exn) {
throw new IllegalArgumentException("Malformed ParDoPayload", exn);
}
Expand All @@ -473,12 +483,11 @@ private interface TriFunction<FirstT, SecondT, ThirdT> {
this.bundleFinalizer = bundleFinalizer;
this.onTimerContext = new OnTimerContext();
this.onWindowExpirationContext = new OnWindowExpirationContext<>();
this.timerBundleTracker =
new FnApiTimerBundleTracker(
keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow);
addResetFunction.accept(timerBundleTracker::reset);

try {
this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
} catch (IOException e) {
throw new RuntimeException(e);
}
this.mainOutputConsumers =
(Collection<FnDataReceiver<WindowedValue<OutputT>>>)
(Collection) localNameToConsumer.get(mainOutputTag.getId());
Expand Down Expand Up @@ -756,9 +765,6 @@ private Object getCurrentKey() {
}

private void startBundle() {
timerBundleTracker =
new FnApiTimerBundleTracker(
keyCoder, windowCoder, this::getCurrentKey, () -> currentWindow);
doFnInvoker.invokeStartBundle(startBundleArgumentProvider);
}

Expand Down Expand Up @@ -1694,14 +1700,9 @@ private <K> void processTimer(
// The timerIdOrTimerFamilyId contains either a timerId from timer declaration or
// timerFamilyId
// from timer family declaration.
String timerId =
timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)
? ""
: timerIdOrTimerFamilyId;
String timerFamilyId =
timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)
? timerIdOrTimerFamilyId
: "";
boolean isFamily = timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX);
String timerId = isFamily ? "" : timerIdOrTimerFamilyId;
String timerFamilyId = isFamily ? timerIdOrTimerFamilyId : "";
processTimerDirect(timerFamilyId, timerId, timeDomain, timer);
}
}
Expand Down Expand Up @@ -1778,7 +1779,6 @@ private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {
private final K userKey;
private final String dynamicTimerTag;
private final TimeDomain timeDomain;
private final Duration allowedLateness;
private final Instant fireTimestamp;
private final Instant elementTimestampOrTimerHoldTimestamp;
private final BoundedWindow boundedWindow;
Expand Down Expand Up @@ -1817,18 +1817,6 @@ private class FnApiTimer<K> implements org.apache.beam.sdk.state.Timer {
throw new IllegalArgumentException(
String.format("Unknown or unsupported time domain %s", timeDomain));
}

try {
this.allowedLateness =
rehydratedComponents
.getPCollection(
pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName(pTransform)))
.getWindowingStrategy()
.getAllowedLateness();
} catch (IOException e) {
throw new IllegalArgumentException(
String.format("Unable to get allowed lateness for timer %s", timerIdOrFamily));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public FnApiTimerBundleTracker(
});
}

public void reset() {
timerModifications.clear();
}

public void timerModified(String timerFamilyOrId, TimeDomain timeDomain, Timer<K> timer) {
ByteString keyString = encodedCurrentKeySupplier.get();
ByteString windowString = encodedCurrentWindowSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ public void testOrderOfSetupTeardownCalls() throws Exception {
PCollection.newBuilder()
.setWindowingStrategyId("window-strategy")
.setCoderId("2L-output-coder")
.setIsBounded(IsBounded.Enum.BOUNDED)
.build())
.putWindowingStrategies(
"window-strategy",
Expand Down

0 comments on commit c75807c

Please sign in to comment.