Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-94] Add UnboundedCountingInput#withRate #17

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ public static BoundedCountingInput upTo(long numElements) {
*/
public static UnboundedCountingInput unbounded() {
return new UnboundedCountingInput(
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent());
new NowTimestampFn(),
1L /* Elements per period */,
Duration.ZERO /* Period length */,
Optional.<Long>absent() /* Maximum number of records */,
Optional.<Duration>absent() /* Maximum read duration */);
}

/**
Expand Down Expand Up @@ -125,14 +129,20 @@ public PCollection<Long> apply(PBegin begin) {
*/
public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> {
private final SerializableFunction<Long, Instant> timestampFn;
private final long elementsPerPeriod;
private final Duration period;
private final Optional<Long> maxNumRecords;
private final Optional<Duration> maxReadTime;

private UnboundedCountingInput(
SerializableFunction<Long, Instant> timestampFn,
long elementsPerPeriod,
Duration period,
Optional<Long> maxNumRecords,
Optional<Duration> maxReadTime) {
this.timestampFn = timestampFn;
this.elementsPerPeriod = elementsPerPeriod;
this.period = period;
this.maxNumRecords = maxNumRecords;
this.maxReadTime = maxReadTime;
}
Expand All @@ -144,7 +154,8 @@ private UnboundedCountingInput(
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -157,7 +168,23 @@ public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant
public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
checkArgument(
maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime);
}

/**
* Returns an {@link UnboundedCountingInput} like this one, but with output production limited
* to an aggregate rate of no more than the number of elements per the period length.
*
* <p>Note that when there are multiple splits, each split outputs independently. This may lead
* to elements not being produced evenly across time, though the aggregate rate will still
* approach the specified rate.
*
* <p>A duration of {@link Duration#ZERO} will produce output as fast as possible.
*/
public UnboundedCountingInput withRate(long numElements, Duration periodLength) {
return new UnboundedCountingInput(
timestampFn, numElements, periodLength, maxNumRecords, maxReadTime);
}

/**
Expand All @@ -169,13 +196,18 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
*/
public UnboundedCountingInput withMaxReadTime(Duration readTime) {
checkNotNull(readTime, "ReadTime cannot be null");
return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
return new UnboundedCountingInput(
timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime));
}

@SuppressWarnings("deprecation")
@Override
public PCollection<Long> apply(PBegin begin) {
Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
Unbounded<Long> read =
Read.from(
CountingSource.createUnbounded()
.withTimestampFn(timestampFn)
.withRate(elementsPerPeriod, period));
if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
return begin.apply(read);
} else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.dataflow.sdk.io;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
Expand All @@ -29,6 +30,7 @@
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.ImmutableList;

import org.joda.time.Duration;
import org.joda.time.Instant;

import java.io.IOException;
Expand Down Expand Up @@ -81,6 +83,14 @@ public static BoundedSource<Long> upTo(long numElements) {
return new BoundedCountingSource(0, numElements);
}

/**
* Create a new {@link UnboundedCountingSource}.
*/
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type.
static UnboundedCountingSource createUnbounded() {
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn());
}

/**
* Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to
* {@link Long#MAX_VALUE}.
Expand Down Expand Up @@ -113,7 +123,7 @@ public static UnboundedSource<Long, CounterMark> unbounded() {
@Deprecated
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
return new UnboundedCountingSource(0, 1, timestampFn);
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn);
}

/////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -226,11 +236,15 @@ public void close() throws IOException {}
/**
* An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}.
*/
private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> {
/** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */
private final long start;
/** The interval between numbers generated by this {@link UnboundedCountingSource}. */
private final long stride;
/** The number of elements to produce each period. */
private final long elementsPerPeriod;
/** The time between producing numbers from this {@link UnboundedCountingSource}. */
private final Duration period;
/** The function used to produce timestamps for the generated elements. */
private final SerializableFunction<Long, Instant> timestampFn;

Expand All @@ -243,13 +257,45 @@ private static class UnboundedCountingSource extends UnboundedSource<Long, Count
*
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource(
long start, long stride, SerializableFunction<Long, Instant> timestampFn) {
private UnboundedCountingSource(
long start,
long stride,
long elementsPerPeriod,
Duration period,
SerializableFunction<Long, Instant> timestampFn) {
this.start = start;
this.stride = stride;
checkArgument(
elementsPerPeriod > 0L,
"Must produce at least one element per period, got %s",
elementsPerPeriod);
this.elementsPerPeriod = elementsPerPeriod;
checkArgument(
period.getMillis() >= 0L, "Must have a non-negative period length, got %s", period);
this.period = period;
this.timestampFn = timestampFn;
}

/**
* Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements
* will be produced with an interval between them equal to the period.
*/
public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) {
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
}

/**
* Returns an {@link UnboundedCountingSource} like this one where the timestamp of output
* elements are supplied by the specified function.
*
* <p>Note that timestamps produced by {@code timestampFn} may not decrease.
*/
public UnboundedCountingSource withTimestampFn(
SerializableFunction<Long, Instant> timestampFn) {
checkNotNull(timestampFn);
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn);
}

/**
* Splits an unbounded source {@code desiredNumSplits} ways by giving each split every
* {@code desiredNumSplits}th element that this {@link UnboundedCountingSource}
Expand All @@ -270,7 +316,9 @@ public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generat
for (int i = 0; i < desiredNumSplits; ++i) {
// Starts offset by the original stride. Using Javadoc example, this generates starts of
// 0, 2, and 4.
splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn));
splits.add(
new UnboundedCountingSource(
start + i * stride, newStride, elementsPerPeriod, period, timestampFn));
}
return splits.build();
}
Expand Down Expand Up @@ -304,6 +352,7 @@ private static class UnboundedCountingReader extends UnboundedReader<Long> {
private UnboundedCountingSource source;
private long current;
private Instant currentTimestamp;
private Instant firstStarted;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new blank lines necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, reverted.

public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
this.source = source;
Expand All @@ -313,11 +362,15 @@ public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark)
this.current = source.start - source.stride;
} else {
this.current = mark.getLastEmitted();
this.firstStarted = mark.getStartTime();
}
}

@Override
public boolean start() throws IOException {
if (firstStarted == null) {
this.firstStarted = Instant.now();
}
return advance();
}

Expand All @@ -327,19 +380,33 @@ public boolean advance() throws IOException {
if (Long.MAX_VALUE - source.stride < current) {
return false;
}
current += source.stride;
long nextValue = current + source.stride;
if (expectedValue() < nextValue) {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return false will indicate no more input available. (Based on advance() javadoc)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the correct behavior - if we're not at or beyond the next time we should produce, we have no input available, but may in the future.

}
current = nextValue;
currentTimestamp = source.timestampFn.apply(current);
return true;
}

private long expectedValue() {
if (source.period.getMillis() == 0L) {
return Long.MAX_VALUE;
}
double periodsElapsed =
(Instant.now().getMillis() - firstStarted.getMillis())
/ (double) source.period.getMillis();
return (long) (source.elementsPerPeriod * periodsElapsed);
}

@Override
public Instant getWatermark() {
return source.timestampFn.apply(current);
}

@Override
public CounterMark getCheckpointMark() {
return new CounterMark(current);
return new CounterMark(current, firstStarted);
}

@Override
Expand All @@ -359,6 +426,12 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {

@Override
public void close() throws IOException {}

@Override
public long getSplitBacklogBytes() {
long expected = expectedValue();
return Math.max(0L, 8 * (expected - current) / source.stride);
}
}

/**
Expand All @@ -369,12 +442,14 @@ public void close() throws IOException {}
public static class CounterMark implements UnboundedSource.CheckpointMark {
/** The last value emitted. */
private final long lastEmitted;
private final Instant startTime;

/**
* Creates a checkpoint mark reflecting the last emitted value.
*/
public CounterMark(long lastEmitted) {
public CounterMark(long lastEmitted, Instant startTime) {
this.lastEmitted = lastEmitted;
this.startTime = startTime;
}

/**
Expand All @@ -384,11 +459,19 @@ public long getLastEmitted() {
return lastEmitted;
}

/**
* Returns the time the reader was started.
*/
public Instant getStartTime() {
return startTime;
}

/////////////////////////////////////////////////////////////////////////////////////

@SuppressWarnings("unused") // For AvroCoder
private CounterMark() {
this.lastEmitted = 0L;
this.startTime = Instant.now();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package com.google.cloud.dataflow.sdk.io;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
Expand All @@ -31,6 +34,7 @@
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.values.PCollection;

import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -82,6 +86,27 @@ public void testUnboundedInput() {
p.run();
}

@Test
public void testUnboundedInputRate() {
Pipeline p = TestPipeline.create();
long numElements = 5000;

long elemsPerPeriod = 10L;
Duration periodLength = Duration.millis(8);
PCollection<Long> input =
p.apply(
CountingInput.unbounded()
.withRate(elemsPerPeriod, periodLength)
.withMaxNumRecords(numElements));

addCountingAsserts(input, numElements);
long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
Instant startTime = Instant.now();
p.run();
Instant endTime = Instant.now();
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}

private static class ElementValueDiff extends DoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
Expand Down
Loading