Skip to content

Commit

Permalink
Merge pull request #29162: [runners-spark] Do not set accTimestamp to…
Browse files Browse the repository at this point in the history
… null in SparkCombineFn (#28256)
  • Loading branch information
je-ik committed Oct 31, 2023
2 parents ba71422 + 524a7bf commit 6a66b72
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.beam.runners.spark.util.SideInputBroadcast;
import org.apache.beam.runners.spark.util.SparkSideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineWithContext;
Expand Down Expand Up @@ -101,7 +100,7 @@ void add(WindowedValue<InputT> value, SparkCombineFn<InputT, ValueT, AccumT, ?>
throws Exception;

/**
* Merge other acccumulator into this one.
* Merge other accumulator into this one.
*
* @param other the other accumulator to merge
*/
Expand Down Expand Up @@ -173,7 +172,7 @@ static <InputT, ValueT, AccumT> SingleWindowWindowedAccumulator<InputT, ValueT,
return new SingleWindowWindowedAccumulator<>(toValue);
}

static <InputT, ValueT, AccumT> WindowedAccumulator<InputT, ValueT, AccumT, ?> create(
static <InputT, ValueT, AccumT> SingleWindowWindowedAccumulator<InputT, ValueT, AccumT> create(
Function<InputT, ValueT> toValue, WindowedValue<AccumT> accumulator) {
return new SingleWindowWindowedAccumulator<>(toValue, accumulator);
}
Expand All @@ -191,10 +190,7 @@ static <InputT, ValueT, AccumT> SingleWindowWindowedAccumulator<InputT, ValueT,
Function<InputT, ValueT> toValue, WindowedValue<AccumT> accumulator) {
this.toValue = toValue;
this.windowAccumulator = accumulator.getValue();
this.accTimestamp =
accumulator.getTimestamp().equals(BoundedWindow.TIMESTAMP_MIN_VALUE)
? null
: accumulator.getTimestamp();
this.accTimestamp = accumulator.getTimestamp();
this.accWindow = getWindow(accumulator);
}

Expand Down Expand Up @@ -247,7 +243,7 @@ public void merge(
@Override
public Collection<WindowedValue<AccumT>> extractOutput() {
if (windowAccumulator != null) {
return Arrays.asList(
return Collections.singletonList(
WindowedValue.of(
windowAccumulator, accTimestamp, accWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING));
}
Expand Down Expand Up @@ -516,7 +512,8 @@ static class WindowedAccumulatorCoder<InputT, ValueT, AccumT>

@Override
public void encode(WindowedAccumulator<InputT, ValueT, AccumT, ?> value, OutputStream outStream)
throws CoderException, IOException {
throws IOException {

if (type.isMapBased()) {
wrap.encode(((MapBasedWindowedAccumulator<?, ?, AccumT, ?>) value).map.values(), outStream);
} else {
Expand All @@ -536,7 +533,8 @@ public void encode(WindowedAccumulator<InputT, ValueT, AccumT, ?> value, OutputS

@Override
public WindowedAccumulator<InputT, ValueT, AccumT, ?> decode(InputStream inStream)
throws CoderException, IOException {
throws IOException {

if (type.isMapBased()) {
return WindowedAccumulator.create(toValue, type, wrap.decode(inStream), windowComparator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -219,6 +220,34 @@ public void testSlidingCombineFnExplode() throws Exception {
result);
}

@Test
public void testGlobalWindowMergeAccumulatorsWithEarliestCombiner() throws Exception {
SparkCombineFn<KV<String, Integer>, Integer, Long, Long> sparkCombineFn =
SparkCombineFn.keyed(
combineFn,
opts,
Collections.emptyMap(),
WindowingStrategy.globalDefault().withTimestampCombiner(TimestampCombiner.EARLIEST));

Instant ts = BoundedWindow.TIMESTAMP_MIN_VALUE;
WindowedValue<KV<String, Integer>> first = input("key", 1, ts);
WindowedValue<KV<String, Integer>> second = input("key", 2, ts);
WindowedValue<KV<String, Integer>> third = input("key", 3, ts);
WindowedValue<Long> accumulator = WindowedValue.valueInGlobalWindow(0L);
SparkCombineFn.SingleWindowWindowedAccumulator<KV<String, Integer>, Integer, Long> acc1 =
SparkCombineFn.SingleWindowWindowedAccumulator.create(KV::getValue, accumulator);
SparkCombineFn.SingleWindowWindowedAccumulator<KV<String, Integer>, Integer, Long> acc2 =
SparkCombineFn.SingleWindowWindowedAccumulator.create(KV::getValue, accumulator);
SparkCombineFn.SingleWindowWindowedAccumulator<KV<String, Integer>, Integer, Long> acc3 =
SparkCombineFn.SingleWindowWindowedAccumulator.create(KV::getValue, accumulator);
acc1.add(first, sparkCombineFn);
acc2.add(second, sparkCombineFn);
acc3.merge(acc1, sparkCombineFn);
acc3.merge(acc2, sparkCombineFn);
acc3.add(third, sparkCombineFn);
assertEquals(6, (long) Iterables.getOnlyElement(sparkCombineFn.extractOutput(acc3)).getValue());
}

private static Combine.CombineFn<Integer, Long, Long> getSumFn() {
return new Combine.CombineFn<Integer, Long, Long>() {

Expand Down

0 comments on commit 6a66b72

Please sign in to comment.