From 6bf5f12fa46cb40822fef80a156a07f0c9e6f087 Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Wed, 11 Dec 2024 09:44:32 +0100 Subject: [PATCH] [proxima-beam-core] #344 add FilterLatecomers transform --- .../proxima/beam/util/FilterLatecomers.java | 122 ++++++++++++++++++ .../beam/util/FilterLatecomersTest.java | 53 ++++++++ 2 files changed, 175 insertions(+) create mode 100644 beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java create mode 100644 beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java diff --git a/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java b/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java new file mode 100644 index 000000000..1313cc440 --- /dev/null +++ b/beam/core/src/main/java/cz/o2/proxima/beam/util/FilterLatecomers.java @@ -0,0 +1,122 @@ +/* + * Copyright 2017-2024 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.util; + +import cz.o2.proxima.beam.util.state.ExcludeExternal; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Instant; + +/** + * Split input {@link PCollection} on late and on-time elements. + * + * @param type parameter + */ +public class FilterLatecomers extends PTransform, PCollectionTuple> { + + private static final TupleTag ON_TIME_TAG = new TupleTag<>(); + private static final TupleTag LATE_TAG = new TupleTag<>(); + + public static FilterLatecomers of() { + return new FilterLatecomers<>(); + } + + @SuppressWarnings("unchecked") + public static PCollection getOnTime(PCollectionTuple tuple, TypeDescriptor type) { + Coder coder = getCoder(tuple, type); + return (PCollection) + tuple.get(ON_TIME_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder); + } + + @SuppressWarnings("unchecked") + public static PCollection getLate(PCollectionTuple tuple, TypeDescriptor type) { + final Coder coder = getCoder(tuple, type); + return (PCollection) + tuple.get(LATE_TAG).setTypeDescriptor((TypeDescriptor) type).setCoder(coder); + } + + private static Coder getCoder(PCollectionTuple tuple, TypeDescriptor type) { + try { + return tuple.getPipeline().getCoderRegistry().getCoder(type); + } catch (CannotProvideCoderException e) { + throw new IllegalStateException(e); + } + } + + @ExcludeExternal + @SuppressWarnings("unchecked") + private static class FilterFn extends DoFn, T> { + + private final TypeDescriptor inputDescriptor; + + @TimerId("watermark") + private final TimerSpec watermarkSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + private FilterFn(TypeDescriptor inputDescriptor) { + this.inputDescriptor = inputDescriptor; + } + + @ProcessElement + public void process( + @Element KV elem, + @Timestamp Instant ts, + @TimerId("watermark") Timer watermark, + MultiOutputReceiver output) { + + if (ts.isBefore(watermark.getCurrentRelativeTime())) { + output.get((TupleTag) LATE_TAG).output(elem.getValue()); + } else { + output.get((TupleTag) ON_TIME_TAG).output(elem.getValue()); + } + } + + @Override + public TypeDescriptor getOutputTypeDescriptor() { + return inputDescriptor; + } + + @OnTimer("watermark") + public void timer() {} + } + + @SuppressWarnings("unchecked") + @Override + public PCollectionTuple expand(PCollection input) { + PCollection> withKeys = + input.apply( + WithKeys.of(Object::hashCode).withKeyType(TypeDescriptors.integers())); + TupleTag mainTag = (TupleTag) ON_TIME_TAG; + return withKeys.apply( + "filter", + ParDo.of(new FilterFn<>(input.getTypeDescriptor())) + .withOutputTags(mainTag, TupleTagList.of(LATE_TAG))); + } +} diff --git a/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java b/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java new file mode 100644 index 000000000..86264b40b --- /dev/null +++ b/beam/core/src/test/java/cz/o2/proxima/beam/util/FilterLatecomersTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017-2024 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.util; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.joda.time.Instant; +import org.junit.Test; + +public class FilterLatecomersTest { + + @Test + public void testFiltering() { + Instant now = new Instant(0); + Pipeline p = Pipeline.create(); + PCollection input = + p.apply( + TestStream.create(VarIntCoder.of()) + .advanceWatermarkTo(now) + .addElements( + TimestampedValue.of(1, now), + TimestampedValue.of(2, now.plus(1)), + TimestampedValue.of(3, now.plus(2))) + .advanceWatermarkTo(now.plus(5)) + .addElements(TimestampedValue.of(0, now.minus(1))) + .advanceWatermarkToInfinity()); + PCollectionTuple result = input.apply(FilterLatecomers.of()); + PAssert.that(FilterLatecomers.getOnTime(result, TypeDescriptors.integers())) + .containsInAnyOrder(1, 2, 3); + PAssert.that(FilterLatecomers.getLate(result, TypeDescriptors.integers())) + .containsInAnyOrder(0); + p.run(); + } +}