Skip to content

Commit

Permalink
Add UnboundedCountingInput#withPeriod
Browse files Browse the repository at this point in the history
The period between elements controls the rate at which
UnboundedCountingInput will output elements. This is an aggregate rate
across all instances of the source, and thus elements will not
necessarily be output "smoothly", or within the first period. The
aggregate rate, however, will be approximately equal to the provided
rate.

Add package-private CountingSource#createUnbounded() to expose the
UnboundedCountingSource type. Make UnboundedCountingSource
package-private.
  • Loading branch information
tgroh committed Mar 4, 2016
1 parent 0c6c833 commit fe366ea
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Period;

/**
* A {@link PTransform} that produces longs. When used to produce a
Expand Down Expand Up @@ -89,7 +90,7 @@ public static BoundedCountingInput upTo(long numElements) {
*/
public static UnboundedCountingInput unbounded() {
return new UnboundedCountingInput(
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
new NowTimestampFn(), Period.ZERO, Optional.<Long>absent(), Optional.<Duration>absent());
}

/**
Expand Down Expand Up @@ -125,14 +126,17 @@ public PCollection<Long> apply(PBegin begin) {
*/
public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
private final SerializableFunction<Long, Instant> timestampFn;
private final Period period;
private final Optional<Long> maxNumRecords;
private final Optional<Duration> maxReadTime;

private UnboundedCountingInput(
SerializableFunction<Long, Instant> timestampFn,
Period period,
Optional<Long> maxNumRecords,
Optional<Duration> maxReadTime) {
this.timestampFn = timestampFn;
this.period = period;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
}
Expand All @@ -144,7 +148,7 @@ private UnboundedCountingInput(
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
return new UnboundedCountingInput(timestampFn, period, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -157,7 +161,18 @@ public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant
public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
checkArgument(
maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
return new UnboundedCountingInput(timestampFn, period, Optional.of(maxRecords), maxReadTime);
}

/**
* Returns an {@link UnboundedCountingInput} like this one, but with output production limited
* to an aggregate rate of no more than one element per period.
*
* <p>Note that this period is taken in aggregate across all instances of the
* {@link PTransform}, which may cause elements to be produced in bunches.
*/
public UnboundedCountingInput withPeriod(Period period) {
return new UnboundedCountingInput(timestampFn, period, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -169,13 +184,15 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
*/
public UnboundedCountingInput withMaxReadTime(Duration readTime) {
checkNotNull(readTime, "ReadTime cannot be null");
return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
return new UnboundedCountingInput(timestampFn, period, maxNumRecords, Optional.of(readTime));
}

@SuppressWarnings("deprecation")
@Override
public PCollection<Long> apply(PBegin begin) {
Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
Unbounded<Long> read =
Read.from(
CountingSource.createUnbounded().withTimestampFn(timestampFn).withPeriod(period));
if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
return begin.apply(read);
} else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.dataflow.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
Expand All @@ -30,6 +31,7 @@
import com.google.common.collect.ImmutableList;

import org.joda.time.Instant;
import org.joda.time.Period;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -81,6 +83,14 @@ public static BoundedSource<Long> upTo(long numElements) {
return new BoundedCountingSource(0, numElements);
}

/**
* Create a new {@link UnboundedCountingSource}.
*/
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
static UnboundedCountingSource createUnbounded() {
return new UnboundedCountingSource(0, 1, Period.ZERO, new NowTimestampFn());
}

/**
* Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
* {@link Long#MAX_VALUE}.
Expand Down Expand Up @@ -113,7 +123,7 @@ public static UnboundedSource<Long, CounterMark> unbounded() {
@Deprecated
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingSource(0, 1, timestampFn);
return new UnboundedCountingSource(0, 1, Period.ZERO, timestampFn);
}

/////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -226,11 +236,13 @@ public void close() throws IOException {}
/**
* An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
*/
private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
/** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
private final long start;
/** The interval between numbers generated by this {@link UnboundedCountingSource}. */
private final long stride;
/** The time between producing numbers from this {@link UnboundedCountingSource}. */
private final Period period;
/** The function used to produce timestamps for the generated elements. */
private final SerializableFunction<Long, Instant> timestampFn;

Expand All @@ -243,13 +255,37 @@ private static class UnboundedCountingSource extends UnboundedSource<Long, Count
*
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource(
long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
private UnboundedCountingSource(
long start, long stride, Period period, SerializableFunction<Long, Instant> timestampFn) {
this.start = start;
this.stride = stride;
this.period = period;
this.timestampFn = timestampFn;
}

/**
* Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements
* will be produced with an interval between them equal to the period.
*/
public UnboundedCountingSource withPeriod(Period period) {
checkArgument(
period.getMillis() >= 0,
"Period must be a non-negative value, got " + period.getMillis());
return new UnboundedCountingSource(start, stride, period, timestampFn);
}

/**
* Returns an {@link UnboundedCountingSource} like this one where the timestamp of output
* elements are supplied by the specified function.
*
* <p>Note that timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource withTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
checkNotNull(timestampFn);
return new UnboundedCountingSource(start, stride, period, timestampFn);
}

/**
* Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
* {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
Expand All @@ -265,12 +301,14 @@ public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generat
int desiredNumSplits, PipelineOptions options) throws Exception {
// Using Javadoc example, stride 2 with 3 splits becomes stride 6.
long newStride = stride * desiredNumSplits;
Period newPeriod = period.multipliedBy(desiredNumSplits);

ImmutableList.Builder<UnboundedCountingSource> splits = ImmutableList.builder();
for (int i = 0; i < desiredNumSplits; ++i) {
// Starts offset by the original stride. Using Javadoc example, this generates starts of
// 0, 2, and 4.
splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
splits.add(
new UnboundedCountingSource(start + i * stride, newStride, newPeriod, timestampFn));
}
return splits.build();
}
Expand All @@ -287,7 +325,9 @@ public Coder<CountingSource.CounterMark> getCheckpointMarkCoder() {
}

@Override
public void validate() {}
public void validate() {
checkArgument(period.getMillis() >= 0L);
}

@Override
public Coder<Long> getDefaultOutputCoder() {
Expand All @@ -304,15 +344,20 @@ private static class UnboundedCountingReader extends UnboundedReader<Long> {
private UnboundedCountingSource source;
private long current;
private Instant currentTimestamp;
private Instant lastTimeProduced;

public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
this.source = source;
if (mark == null) {
// Because we have not emitted an element yet, and start() calls advance, we need to
// "un-advance" so that start() produces the correct output.
this.current = source.start - source.stride;
// Do not produce an element immediately, to ensure that we do not exceed the requested
// period due to splits.
this.lastTimeProduced = Instant.now();
} else {
this.current = mark.getLastEmitted();
this.lastTimeProduced = mark.getLastTimeProduced();
}
}

Expand All @@ -327,8 +372,15 @@ public boolean advance() throws IOException {
if (Long.MAX_VALUE - source.stride < current) {
return false;
}
Instant nextTimeToProduce =
lastTimeProduced.plus(source.period.toDurationFrom(lastTimeProduced));
if (Instant.now().isBefore(nextTimeToProduce)) {
return false;
}
current += source.stride;
currentTimestamp = source.timestampFn.apply(current);
// Advance the time by at most the period, so if we're behind we can catch up.
lastTimeProduced = nextTimeToProduce;
return true;
}

Expand All @@ -339,7 +391,7 @@ public Instant getWatermark() {

@Override
public CounterMark getCheckpointMark() {
return new CounterMark(current);
return new CounterMark(current, lastTimeProduced);
}

@Override
Expand Down Expand Up @@ -369,12 +421,14 @@ public void close() throws IOException {}
public static class CounterMark implements UnboundedSource.CheckpointMark {
/** The last value emitted. */
private final long lastEmitted;
private final Instant lastProduced;

/**
* Creates a checkpoint mark reflecting the last emitted value.
*/
public CounterMark(long lastEmitted) {
public CounterMark(long lastEmitted, Instant lastProduced) {
this.lastEmitted = lastEmitted;
this.lastProduced = lastProduced;
}

/**
Expand All @@ -384,11 +438,19 @@ public long getLastEmitted() {
return lastEmitted;
}

/**
* Returns the last time elements were produced by this source.
*/
public Instant getLastTimeProduced() {
return lastProduced;
}

/////////////////////////////////////////////////////////////////////////////////////

@SuppressWarnings("unused") // For AvroCoder
private CounterMark() {
this.lastEmitted = 0L;
this.lastProduced = Instant.now();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.dataflow.sdk.io;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
Expand All @@ -30,7 +33,9 @@
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Period;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand Down Expand Up @@ -78,6 +83,21 @@ public void testUnboundedInput() {
p.run();
}

@Test
public void testUnboundedInputPeriod() {
Pipeline p = TestPipeline.create();
long numElements = 1000;

PCollection<Long> input =
p.apply(
CountingInput.unbounded().withPeriod(Period.millis(3)).withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
Instant startTime = Instant.now();
p.run();
assertThat(Instant.now().isAfter(startTime.plus(Duration.millis(3000))), is(true));
}

private static class ElementValueDiff extends DoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.dataflow.sdk.io;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.cloud.dataflow.sdk.Pipeline;
Expand All @@ -37,7 +39,9 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Period;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -149,6 +153,40 @@ public void testUnboundedSourceTimestamps() {
p.run();
}

@Test
public void testUnboundedSourceWithPeriod() {
Pipeline p = TestPipeline.create();

Period period = Period.millis(2);
long numElements = 1000L;

PCollection<Long> input =
p.apply(
Read.from(
CountingSource.createUnbounded()
.withTimestampFn(new ValueAsTimestampFn())
.withPeriod(period))
.withMaxNumRecords(numElements));
addCountingAsserts(input, numElements);

PCollection<Long> diffs =
input
.apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
.apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
// This assert also confirms that diffs only has one unique value.
DataflowAssert.thatSingleton(diffs).isEqualTo(0L);

Instant started = Instant.now();
p.run();
Instant finished = Instant.now();
Duration expectedDuration = period.multipliedBy((int) numElements).toDurationFrom(started);
assertThat(
started
.plus(expectedDuration)
.isBefore(finished),
is(true));
}

@Test
@Category(RunnableOnService.class)
public void testUnboundedSourceSplits() throws Exception {
Expand Down

0 comments on commit fe366ea

Please sign in to comment.