Skip to content

Commit

Permalink
Closes #17
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Mar 30, 2016
2 parents ffb7002 + b21a0be commit 9793fa2
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ public static BoundedCountingInput upTo(long numElements) {
*/
public static UnboundedCountingInput unbounded() {
return new UnboundedCountingInput(
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
new NowTimestampFn(),
1L /* Elements per period */,
Duration.ZERO /* Period length */,
Optional.<Long>absent() /* Maximum number of records */,
Optional.<Duration>absent() /* Maximum read duration */);
}

/**
Expand Down Expand Up @@ -126,14 +130,20 @@ public PCollection<Long> apply(PBegin begin) {
*/
public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
private final SerializableFunction<Long, Instant> timestampFn;
private final long elementsPerPeriod;
private final Duration period;
private final Optional<Long> maxNumRecords;
private final Optional<Duration> maxReadTime;

private UnboundedCountingInput(
SerializableFunction<Long, Instant> timestampFn,
long elementsPerPeriod,
Duration period,
Optional<Long> maxNumRecords,
Optional<Duration> maxReadTime) {
this.timestampFn = timestampFn;
this.elementsPerPeriod = elementsPerPeriod;
this.period = period;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
}
Expand All @@ -145,7 +155,8 @@ private UnboundedCountingInput(
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -158,7 +169,23 @@ public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant
public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
checkArgument(
maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime);
}

/**
* Returns an {@link UnboundedCountingInput} like this one, but with output production limited
* to an aggregate rate of no more than the number of elements per the period length.
*
* <p>Note that when there are multiple splits, each split outputs independently. This may lead
* to elements not being produced evenly across time, though the aggregate rate will still
* approach the specified rate.
*
* <p>A duration of {@link Duration#ZERO} will produce output as fast as possible.
*/
public UnboundedCountingInput withRate(long numElements, Duration periodLength) {
return new UnboundedCountingInput(
timestampFn, numElements, periodLength, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -170,13 +197,18 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
*/
public UnboundedCountingInput withMaxReadTime(Duration readTime) {
checkNotNull(readTime, "ReadTime cannot be null");
return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime));
}

@SuppressWarnings("deprecation")
@Override
public PCollection<Long> apply(PBegin begin) {
Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
Unbounded<Long> read =
Read.from(
CountingSource.createUnbounded()
.withTimestampFn(timestampFn)
.withRate(elementsPerPeriod, period));
if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
return begin.apply(read);
} else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.google.cloud.dataflow.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
Expand All @@ -30,6 +31,7 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.ImmutableList;

import org.joda.time.Duration;
import org.joda.time.Instant;

import java.io.IOException;
Expand Down Expand Up @@ -82,6 +84,14 @@ public static BoundedSource<Long> upTo(long numElements) {
return new BoundedCountingSource(0, numElements);
}

/**
* Create a new {@link UnboundedCountingSource}.
*/
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
static UnboundedCountingSource createUnbounded() {
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn());
}

/**
* Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
* {@link Long#MAX_VALUE}.
Expand Down Expand Up @@ -114,7 +124,7 @@ public static UnboundedSource<Long, CounterMark> unbounded() {
@Deprecated
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingSource(0, 1, timestampFn);
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn);
}

/////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -227,11 +237,15 @@ public void close() throws IOException {}
/**
* An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
*/
private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
/** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
private final long start;
/** The interval between numbers generated by this {@link UnboundedCountingSource}. */
private final long stride;
/** The number of elements to produce each period. */
private final long elementsPerPeriod;
/** The time between producing numbers from this {@link UnboundedCountingSource}. */
private final Duration period;
/** The function used to produce timestamps for the generated elements. */
private final SerializableFunction<Long, Instant> timestampFn;

Expand All @@ -244,13 +258,45 @@ private static class UnboundedCountingSource extends UnboundedSource<Long, Count
*
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource(
long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
private UnboundedCountingSource(
long start,
long stride,
long elementsPerPeriod,
Duration period,
SerializableFunction<Long, Instant> timestampFn) {
this.start = start;
this.stride = stride;
checkArgument(
elementsPerPeriod > 0L,
"Must produce at least one element per period, got %s",
elementsPerPeriod);
this.elementsPerPeriod = elementsPerPeriod;
checkArgument(
period.getMillis() >= 0L, "Must have a non-negative period length, got %s", period);
this.period = period;
this.timestampFn = timestampFn;
}

/**
* Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements
* will be produced with an interval between them equal to the period.
*/
public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) {
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
}

/**
* Returns an {@link UnboundedCountingSource} like this one where the timestamp of output
* elements are supplied by the specified function.
*
* <p>Note that timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource withTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
checkNotNull(timestampFn);
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
}

/**
* Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
* {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
Expand All @@ -271,7 +317,9 @@ public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generat
for (int i = 0; i < desiredNumSplits; ++i) {
// Starts offset by the original stride. Using Javadoc example, this generates starts of
// 0, 2, and 4.
splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
splits.add(
new UnboundedCountingSource(
start + i * stride, newStride, elementsPerPeriod, period, timestampFn));
}
return splits.build();
}
Expand Down Expand Up @@ -305,6 +353,7 @@ private static class UnboundedCountingReader extends UnboundedReader<Long> {
private UnboundedCountingSource source;
private long current;
private Instant currentTimestamp;
private Instant firstStarted;

public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
this.source = source;
Expand All @@ -314,11 +363,15 @@ public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark)
this.current = source.start - source.stride;
} else {
this.current = mark.getLastEmitted();
this.firstStarted = mark.getStartTime();
}
}

@Override
public boolean start() throws IOException {
if (firstStarted == null) {
this.firstStarted = Instant.now();
}
return advance();
}

Expand All @@ -328,19 +381,33 @@ public boolean advance() throws IOException {
if (Long.MAX_VALUE - source.stride < current) {
return false;
}
current += source.stride;
long nextValue = current + source.stride;
if (expectedValue() < nextValue) {
return false;
}
current = nextValue;
currentTimestamp = source.timestampFn.apply(current);
return true;
}

private long expectedValue() {
if (source.period.getMillis() == 0L) {
return Long.MAX_VALUE;
}
double periodsElapsed =
(Instant.now().getMillis() - firstStarted.getMillis())
/ (double) source.period.getMillis();
return (long) (source.elementsPerPeriod * periodsElapsed);
}

@Override
public Instant getWatermark() {
return source.timestampFn.apply(current);
}

@Override
public CounterMark getCheckpointMark() {
return new CounterMark(current);
return new CounterMark(current, firstStarted);
}

@Override
Expand All @@ -360,6 +427,12 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {

@Override
public void close() throws IOException {}

@Override
public long getSplitBacklogBytes() {
long expected = expectedValue();
return Math.max(0L, 8 * (expected - current) / source.stride);
}
}

/**
Expand All @@ -370,12 +443,14 @@ public void close() throws IOException {}
public static class CounterMark implements UnboundedSource.CheckpointMark {
/** The last value emitted. */
private final long lastEmitted;
private final Instant startTime;

/**
* Creates a checkpoint mark reflecting the last emitted value.
*/
public CounterMark(long lastEmitted) {
public CounterMark(long lastEmitted, Instant startTime) {
this.lastEmitted = lastEmitted;
this.startTime = startTime;
}

/**
Expand All @@ -385,11 +460,19 @@ public long getLastEmitted() {
return lastEmitted;
}

/**
* Returns the time the reader was started.
*/
public Instant getStartTime() {
return startTime;
}

/////////////////////////////////////////////////////////////////////////////////////

@SuppressWarnings("unused") // For AvroCoder
private CounterMark() {
this.lastEmitted = 0L;
this.startTime = Instant.now();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package com.google.cloud.dataflow.sdk.io;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
Expand All @@ -32,6 +35,7 @@
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -83,6 +87,27 @@ public void testUnboundedInput() {
p.run();
}

@Test
public void testUnboundedInputRate() {
Pipeline p = TestPipeline.create();
long numElements = 5000;

long elemsPerPeriod = 10L;
Duration periodLength = Duration.millis(8);
PCollection<Long> input =
p.apply(
CountingInput.unbounded()
.withRate(elemsPerPeriod, periodLength)
.withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
Instant startTime = Instant.now();
p.run();
Instant endTime = Instant.now();
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}

private static class ElementValueDiff extends DoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
Expand Down
Loading

0 comments on commit 9793fa2

Please sign in to comment.