forked from apache/beam
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add ConcurrentPartitionedPriorityQueueTest More docs. Minor renaming.
- Loading branch information
Showing
9 changed files
with
392 additions
and
211 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,8 @@ | |
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; | ||
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform; | ||
import com.google.cloud.dataflow.sdk.transforms.PTransform; | ||
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; | ||
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; | ||
import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger; | ||
import com.google.cloud.dataflow.sdk.util.ConcurrentPartitionedPriorityQueue; | ||
import com.google.cloud.dataflow.sdk.util.ExecutionContext; | ||
import com.google.cloud.dataflow.sdk.util.SideInputReader; | ||
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData; | ||
|
@@ -40,20 +38,14 @@ | |
import com.google.cloud.dataflow.sdk.values.PCollection; | ||
import com.google.cloud.dataflow.sdk.values.PCollectionView; | ||
import com.google.cloud.dataflow.sdk.values.PValue; | ||
import com.google.common.collect.ComparisonChain; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.collect.Iterables; | ||
import com.google.common.collect.Ordering; | ||
|
||
import org.joda.time.Instant; | ||
|
||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.Executors; | ||
|
||
import javax.annotation.Nullable; | ||
|
||
|
@@ -85,19 +77,17 @@ class InProcessEvaluationContext { | |
/** The current processing time and event time watermarks and timers. */ | ||
private final InMemoryWatermarkManager watermarkManager; | ||
|
||
private final ConcurrentPartitionedPriorityQueue<AppliedPTransform<?, ?, ?>, WatermarkCallback> | ||
pendingCallbacks; | ||
/** Executes callbacks based on the progression of the watermark. */ | ||
private final WatermarkCallbackExecutor callbackExecutor; | ||
|
||
// The stateInternals of the world, by applied PTransform and key. | ||
/** The stateInternals of the world, by applied PTransform and key. */ | ||
private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>> | ||
applicationStateInternals; | ||
|
||
private final InProcessSideInputContainer sideInputContainer; | ||
|
||
private final CounterSet mergedCounters; | ||
|
||
private final Executor callbackExecutor; | ||
|
||
public static InProcessEvaluationContext create( | ||
InProcessPipelineOptions options, | ||
Collection<AppliedPTransform<?, ?, ?>> rootTransforms, | ||
|
@@ -121,7 +111,6 @@ private InProcessEvaluationContext( | |
checkNotNull(views); | ||
this.stepNames = stepNames; | ||
|
||
this.pendingCallbacks = ConcurrentPartitionedPriorityQueue.create(new CallbackOrdering()); | ||
this.watermarkManager = | ||
InMemoryWatermarkManager.create( | ||
NanosOffsetClock.create(), rootTransforms, valueToConsumers); | ||
|
@@ -130,7 +119,7 @@ private InProcessEvaluationContext( | |
this.applicationStateInternals = new ConcurrentHashMap<>(); | ||
this.mergedCounters = new CounterSet(); | ||
|
||
this.callbackExecutor = Executors.newCachedThreadPool(); | ||
this.callbackExecutor = WatermarkCallbackExecutor.create(); | ||
} | ||
|
||
/** | ||
|
@@ -161,7 +150,7 @@ public synchronized Iterable<? extends CommittedBundle<?>> handleResult( | |
result.getTimerUpdate().withCompletedTimers(completedTimers), | ||
committedBundles, | ||
result.getWatermarkHold()); | ||
watermarksUpdated(); | ||
fireAllAvailableCallbacks(); | ||
// Update counters | ||
if (result.getCounters() != null) { | ||
mergedCounters.merge(result.getCounters()); | ||
|
@@ -200,36 +189,15 @@ private Iterable<? extends CommittedBundle<?>> commitBundles( | |
return completed.build(); | ||
} | ||
|
||
private void watermarksUpdated() { | ||
private void fireAllAvailableCallbacks() { | ||
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) { | ||
checkCallbacks(transform); | ||
fireAvailableCallbacks(transform); | ||
} | ||
} | ||
|
||
private void checkCallbacks(AppliedPTransform<?, ?, ?> producingTransform) { | ||
private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) { | ||
TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform); | ||
ShouldFirePredicate shouldFire = new ShouldFirePredicate(watermarks.getOutputWatermark()); | ||
WatermarkCallback callback; | ||
do { | ||
callback = pendingCallbacks.pollIfSatisfies(producingTransform, shouldFire); | ||
if (callback != null) { | ||
callbackExecutor.execute(callback.getCallback()); | ||
} | ||
} while (callback != null); | ||
} | ||
|
||
private static class ShouldFirePredicate | ||
implements SerializableFunction<WatermarkCallback, Boolean> { | ||
private final Instant currentInstant; | ||
|
||
public ShouldFirePredicate(Instant currentInstant) { | ||
this.currentInstant = currentInstant; | ||
} | ||
|
||
@Override | ||
public Boolean apply(WatermarkCallback input) { | ||
return input.shouldFire(currentInstant); | ||
} | ||
callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark()); | ||
} | ||
|
||
/** | ||
|
@@ -273,23 +241,26 @@ public void add(Iterable<WindowedValue<ElemT>> values) { | |
} | ||
|
||
/** | ||
* Execute the specified callback after the watermark for a {@link PValue} passes the point at | ||
* Schedule a callback to be executed after output would be produced for the given window | ||
* if there had been input. | ||
* | ||
* <p>Output would be produced when the watermark for a {@link PValue} passes the point at | ||
* which the trigger for the specified window (with the specified windowing strategy) must have | ||
* fired from the perspective of that {@link PValue}, as specified by the value of | ||
* {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the | ||
* {@link WindowingStrategy}. When the callback has fired, either values will have been produced | ||
* for a key in that window, or the window is empty. | ||
* for a key in that window, or the window is empty. The callback will be executed regardless of | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
tgroh
Author
Owner
|
||
* weather values have been produced. | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong. |
||
*/ | ||
public void callAfterOutputMustHaveBeenProduced( | ||
public void scheduleAfterOutputWouldBeProduced( | ||
PValue value, | ||
BoundedWindow window, | ||
WindowingStrategy<?, ?> windowingStrategy, | ||
Runnable runnable) { | ||
WatermarkCallback callback = | ||
WatermarkCallback.onGuaranteedFiring(window, windowingStrategy, runnable); | ||
AppliedPTransform<?, ?, ?> producing = getProducing(value); | ||
pendingCallbacks.offer(producing, callback); | ||
checkCallbacks(producing); | ||
callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable); | ||
|
||
fireAvailableCallbacks(lookupProducing(value)); | ||
} | ||
|
||
private AppliedPTransform<?, ?, ?> getProducing(PValue value) { | ||
|
@@ -351,7 +322,7 @@ public String getStepName(AppliedPTransform<?, ?, ?> application) { | |
* {@link PCollectionView PCollectionViews} | ||
*/ | ||
public SideInputReader createSideInputReader(final List<PCollectionView<?>> sideInputs) { | ||
return sideInputContainer.withViews(sideInputs); | ||
return sideInputContainer.createReaderForViews(sideInputs); | ||
} | ||
|
||
/** | ||
|
@@ -374,47 +345,11 @@ public CounterSet getCounters() { | |
return mergedCounters; | ||
} | ||
|
||
private static class WatermarkCallback { | ||
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring( | ||
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) { | ||
@SuppressWarnings("unchecked") | ||
Instant firingAfter = | ||
strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window); | ||
return new WatermarkCallback(firingAfter, callback); | ||
} | ||
|
||
private final Instant fireAfter; | ||
private final Runnable callback; | ||
|
||
private WatermarkCallback(Instant fireAfter, Runnable callback) { | ||
this.fireAfter = fireAfter; | ||
this.callback = callback; | ||
} | ||
|
||
public boolean shouldFire(Instant currentWatermark) { | ||
return currentWatermark.isAfter(fireAfter) | ||
|| currentWatermark.equals(BoundedWindow.TIMESTAMP_MAX_VALUE); | ||
} | ||
|
||
public Runnable getCallback() { | ||
return callback; | ||
} | ||
} | ||
|
||
private static class CallbackOrdering extends Ordering<WatermarkCallback> { | ||
@Override | ||
public int compare(WatermarkCallback left, WatermarkCallback right) { | ||
return ComparisonChain.start() | ||
.compare(left.fireAfter, right.fireAfter) | ||
.compare(left.callback, right.callback, Ordering.arbitrary()) | ||
.result(); | ||
} | ||
} | ||
|
||
/** | ||
* Extracts all timers that have been fired and have not already been extracted. | ||
* | ||
* <p>This is a destructive operation. Timers will only appear in the result of this method once. | ||
* <p>This is a destructive operation. Timers will only appear in the result of this method once | ||
* for each time they are set. | ||
*/ | ||
public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { | ||
return watermarkManager.extractFiredTimers(); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
1 comment
on commit 2635f86
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
"or the window is empty" is not entirely accurate. It could also mean that all contents of the window were late, therefore the ON_TIME trigger would have fired earlier. New data could still show up.