diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKTimeWindowTest.java b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKTimeWindowTest.java index 7981b28e358c9..a3369b9f9fc4f 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKTimeWindowTest.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKTimeWindowTest.java @@ -226,5 +226,50 @@ private List fmt(List> xs) { .collect(Collectors.toList()); } + @Test + public void testEventWindowingWithAllowedLateness() throws Exception { + ListDataSink> output = ListDataSink.get(1); + + ListDataSource> source = + ListDataSource.unbounded( + asList( + Pair.of("one", 1), + Pair.of("one", 2), + Pair.of("two", 7), + Pair.of("two", 3), // latecomer, but in limit of allowed lateness + Pair.of("two", 1), // latecomer, will be dropped + Pair.of("two", 8), + Pair.of("three", 8))) + .withReadDelay(Duration.ofMillis(200)); + + Flow f = Flow.create("test-attached-windowing"); + Dataset> reduced = + ReduceByKey.of(f.createInput(source)) + .keyBy(Pair::getFirst) + .valueBy(e -> 1L) + .combineBy(Sums.ofLongs()) + .windowBy(Time.of(Duration.ofMillis(5)), + // ~ event time + e -> (long) e.getSecond()) + .setNumPartitions(1) + .output(); + + Util.extractWindows(reduced, TimeInterval.class).persist(output); + + new TestFlinkExecutor() + .setStateBackend(new RocksDBStateBackend("file:///tmp/flink-checkpoint")) + .setAllowedLateness(Duration.ofMillis(4)) + .submit(f) + .get(); + + assertEquals( + asList("0:one-2", "0:two-1", "5:three-1", "5:two-2"), + output.getOutput(0) + .stream() + .map(p -> p.getFirst().getStartMillis() + ":" + p.getSecond() + "-" + p.getThird()) + .sorted() + .collect(Collectors.toList())); + + } }