Skip to content

Commit

Permalink
fixup! Add UnboundedCountingInpu
Browse files Browse the repository at this point in the history
Documenting is great
  • Loading branch information
tgroh committed Mar 29, 2016
1 parent 5163a7e commit c57ee1b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ public static BoundedCountingInput upTo(long numElements) {
public static UnboundedCountingInput unbounded() {
return new UnboundedCountingInput(
new NowTimestampFn(),
// Elements per period
1L,
// period length
Duration.ZERO,
// max num records
Optional.<Long>absent(),
// max read duration
Optional.<Duration>absent());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,20 @@ public void testUnboundedInputRate() {
Pipeline p = TestPipeline.create();
long numElements = 5000;

long elemsPerPeriod = 10L;
Duration periodLength = Duration.millis(8);
PCollection<Long> input =
p.apply(
CountingInput.unbounded()
.withRate(10L, Duration.millis(8))
.withRate(elemsPerPeriod, periodLength)
.withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
Instant startTime = Instant.now();
p.run();
assertThat(Instant.now().isAfter(startTime.plus(Duration.millis(4000))), is(true));
Instant endTime = Instant.now();
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}

private static class ElementValueDiff extends DoFn<Long, Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void testUnboundedSourceRateSplits() throws Exception {
Instant startTime = Instant.now();
p.run();
Instant endTime = Instant.now();
// 5s
// 500 ms if the readers are all initialized in parallel; 5000 ms if they are evaluated serially
long expectedMinimumMillis = (numElements * period.getMillis()) / elementsPerPeriod;
assertThat(expectedMinimumMillis, lessThan(endTime.getMillis() - startTime.getMillis()));
}
Expand Down

0 comments on commit c57ee1b

Please sign in to comment.