Skip to content

Commit

Permalink
Merge pull request apache#81 from seznam/vanekjar/61/AllowedLateness
Browse files Browse the repository at this point in the history
apache#61 Allowed lateness unit test
  • Loading branch information
xitep authored Apr 10, 2017
2 parents 1293823 + 542f3c6 commit 59b0679
Showing 1 changed file with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,5 +226,50 @@ private List<String> fmt(List<Triple<TimeInterval, String, String>> xs) {
.collect(Collectors.toList());
}

@Test
public void testEventWindowingWithAllowedLateness() throws Exception {
ListDataSink<Triple<TimeInterval, String, Long>> output = ListDataSink.get(1);

ListDataSource<Pair<String, Integer>> source =
ListDataSource.unbounded(
asList(
Pair.of("one", 1),
Pair.of("one", 2),
Pair.of("two", 7),
Pair.of("two", 3), // latecomer, but in limit of allowed lateness
Pair.of("two", 1), // latecomer, will be dropped
Pair.of("two", 8),
Pair.of("three", 8)))
.withReadDelay(Duration.ofMillis(200));

Flow f = Flow.create("test-attached-windowing");
Dataset<Pair<String, Long>> reduced =
ReduceByKey.of(f.createInput(source))
.keyBy(Pair::getFirst)
.valueBy(e -> 1L)
.combineBy(Sums.ofLongs())
.windowBy(Time.of(Duration.ofMillis(5)),
// ~ event time
e -> (long) e.getSecond())
.setNumPartitions(1)
.output();

Util.extractWindows(reduced, TimeInterval.class).persist(output);

new TestFlinkExecutor()
.setStateBackend(new RocksDBStateBackend("file:///tmp/flink-checkpoint"))
.setAllowedLateness(Duration.ofMillis(4))
.submit(f)
.get();

assertEquals(
asList("0:one-2", "0:two-1", "5:three-1", "5:two-2"),
output.getOutput(0)
.stream()
.map(p -> p.getFirst().getStartMillis() + ":" + p.getSecond() + "-" + p.getThird())
.sorted()
.collect(Collectors.toList()));

}

}

0 comments on commit 59b0679

Please sign in to comment.