Skip to content

Commit

Permalink
[proxima-beam-core] #345 add WatermarkShift transform
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Dec 11, 2024
1 parent 6bf5f12 commit 416aad5
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,13 @@ public static <T> FilterLatecomers<T> of() {
}

@SuppressWarnings("unchecked")
public static <T> PCollection<T> getOnTime(PCollectionTuple tuple, TypeDescriptor<T> type) {
Coder<T> coder = getCoder(tuple, type);
return (PCollection<T>)
tuple.get(ON_TIME_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder);
public static <T> PCollection<T> getOnTime(PCollectionTuple tuple) {
return (PCollection<T>) tuple.get(ON_TIME_TAG);
}

@SuppressWarnings("unchecked")
public static <T> PCollection<T> getLate(PCollectionTuple tuple, TypeDescriptor<T> type) {
final Coder<T> coder = getCoder(tuple, type);
return (PCollection<T>)
tuple.get(LATE_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder);
public static <T> PCollection<T> getLate(PCollectionTuple tuple) {
return (PCollection<T>) tuple.get(LATE_TAG);
}

private static <T> Coder<T> getCoder(PCollectionTuple tuple, TypeDescriptor<T> type) {
Expand Down Expand Up @@ -107,16 +103,21 @@ public TypeDescriptor<T> getOutputTypeDescriptor() {
public void timer() {}
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public PCollectionTuple expand(PCollection<T> input) {
PCollection<KV<Integer, T>> withKeys =
input.apply(
WithKeys.<Integer, T>of(Object::hashCode).withKeyType(TypeDescriptors.integers()));
TupleTag<T> mainTag = (TupleTag<T>) ON_TIME_TAG;
return withKeys.apply(
"filter",
ParDo.of(new FilterFn<>(input.getTypeDescriptor()))
.withOutputTags(mainTag, TupleTagList.of(LATE_TAG)));
PCollectionTuple tuple =
withKeys.apply(
"filter",
ParDo.of(new FilterFn<>(input.getTypeDescriptor()))
.withOutputTags(mainTag, TupleTagList.of(LATE_TAG)));
final Coder<T> coder = input.getCoder();
tuple.get(LATE_TAG).setCoder((Coder) coder).setTypeDescriptor(input.getTypeDescriptor());
tuple.get(ON_TIME_TAG).setCoder((Coder) coder).setTypeDescriptor(input.getTypeDescriptor());
return tuple;
}
}
102 changes: 102 additions & 0 deletions beam/core/src/main/java/cz/o2/proxima/beam/util/WatermarkShift.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.util;

import cz.o2.proxima.beam.util.state.ExcludeExternal;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
* Shift watermark of input {@link PCollection} by given duration back in time.
*
* @param <T> type parameter
*/
public class WatermarkShift<T> extends PTransform<PCollection<T>, PCollection<T>> {

public static <T> WatermarkShift<T> of(Duration duration) {
return new WatermarkShift<>(duration);
}

private final Duration shiftDuration;

public WatermarkShift(Duration shiftDuration) {
this.shiftDuration = shiftDuration;
}

@Override
public PCollection<T> expand(PCollection<T> input) {
PCollection<byte[]> impulse = input.getPipeline().apply(Impulse.create());
// filter elements out, just take watermark
PCollection<byte[]> originalWatermark =
input
.apply(Filter.by(e -> false))
.apply(MapElements.into(impulse.getTypeDescriptor()).via(e -> new byte[0]));
PCollection<T> holdWatermark =
PCollectionList.of(impulse)
.and(originalWatermark)
.apply(Flatten.pCollections())
.apply(WithKeys.of(""))
.apply("hold", ParDo.of(new HoldFn()))
.apply(MapElements.into(input.getTypeDescriptor()).via(e -> null))
.setCoder(input.getCoder());
return PCollectionList.of(input)
.and(holdWatermark)
.apply(Flatten.pCollections())
.setCoder(input.getCoder())
.setTypeDescriptor(input.getTypeDescriptor());
}

@ExcludeExternal
private class HoldFn extends DoFn<KV<String, byte[]>, byte[]> {

private final Instant minInstant = BoundedWindow.TIMESTAMP_MIN_VALUE.plus(shiftDuration);

@TimerId("holder")
private final TimerSpec holderTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processImpulse(@TimerId("holder") Timer holder) {
// start the timer
holder.withOutputTimestamp(BoundedWindow.TIMESTAMP_MIN_VALUE).set(minInstant);
}

@OnTimer("holder")
public void onTimer(@TimerId("holder") Timer holder) {
Instant current = holder.getCurrentRelativeTime();
if (current.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
Instant shifted = current.minus(shiftDuration);
holder.withOutputTimestamp(shifted).offset(Duration.ZERO).setRelative();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Instant;
import org.junit.Test;

Expand All @@ -44,10 +43,8 @@ public void testFiltering() {
.addElements(TimestampedValue.of(0, now.minus(1)))
.advanceWatermarkToInfinity());
PCollectionTuple result = input.apply(FilterLatecomers.of());
PAssert.that(FilterLatecomers.getOnTime(result, TypeDescriptors.integers()))
.containsInAnyOrder(1, 2, 3);
PAssert.that(FilterLatecomers.getLate(result, TypeDescriptors.integers()))
.containsInAnyOrder(0);
PAssert.that(FilterLatecomers.getOnTime(result)).containsInAnyOrder(1, 2, 3);
PAssert.that(FilterLatecomers.getLate(result)).containsInAnyOrder(0);
p.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017-2024 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.beam.util;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;

public class WatermarkShiftTest {

@Test
public void testWatermarkShift() {
Pipeline p = Pipeline.create();
Instant now = new Instant(0);
PCollection<Integer> input =
p.apply(
TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(now)
.addElements(
TimestampedValue.of(1, now.plus(1)), TimestampedValue.of(2, now.plus(2)))
.advanceWatermarkTo(now.plus(5))
.addElements(TimestampedValue.of(0, now.minus(1)))
.advanceWatermarkToInfinity());
PCollectionTuple filtered = input.apply(FilterLatecomers.of());
PCollection<Integer> onTimeBeforeShift = FilterLatecomers.getOnTime(filtered);
filtered = input.apply(WatermarkShift.of(Duration.millis(6))).apply(FilterLatecomers.of());
PCollection<Integer> onTimeAfterShift = FilterLatecomers.getOnTime(filtered);
PCollection<KV<Integer, Long>> counts =
PCollectionList.of(onTimeAfterShift)
.and(onTimeBeforeShift)
.apply(Flatten.pCollections())
.apply(
Window.<Integer>into(new GlobalWindows())
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes())
.apply(Count.perElement());
// assert that 1, 2 are always on time, but 0 only after the shift
PAssert.that(counts).containsInAnyOrder(KV.of(1, 2L), KV.of(2, 2L), KV.of(0, 1L));
p.run();
}
}

0 comments on commit 416aad5

Please sign in to comment.