-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-94] Add UnboundedCountingInput#withRate #17
Changes from 1 commit
e2f9688
9470cf2
2dc8d5a
3708e03
ec37b48
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,15 @@ public static BoundedCountingInput upTo(long numElements) { | |
*/ | ||
public static UnboundedCountingInput unbounded() { | ||
return new UnboundedCountingInput( | ||
new NowTimestampFn(), Optional.<Long>absent(), Optional.<Duration>absent()); | ||
new NowTimestampFn(), | ||
// Elements per period | ||
1L, | ||
// period length | ||
Duration.ZERO, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add comments /* elementsPerPeriod */ ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
// max num records | ||
Optional.<Long>absent(), | ||
// max read duration | ||
Optional.<Duration>absent()); | ||
} | ||
|
||
/** | ||
|
@@ -125,14 +133,20 @@ public PCollection<Long> apply(PBegin begin) { | |
*/ | ||
public static class UnboundedCountingInput extends PTransform<PBegin, PCollection<Long>> { | ||
private final SerializableFunction<Long, Instant> timestampFn; | ||
private final long elementsPerPeriod; | ||
private final Duration period; | ||
private final Optional<Long> maxNumRecords; | ||
private final Optional<Duration> maxReadTime; | ||
|
||
private UnboundedCountingInput( | ||
SerializableFunction<Long, Instant> timestampFn, | ||
long elementsPerPeriod, | ||
Duration period, | ||
Optional<Long> maxNumRecords, | ||
Optional<Duration> maxReadTime) { | ||
this.timestampFn = timestampFn; | ||
this.elementsPerPeriod = elementsPerPeriod; | ||
this.period = period; | ||
this.maxNumRecords = maxNumRecords; | ||
this.maxReadTime = maxReadTime; | ||
} | ||
|
@@ -144,7 +158,8 @@ private UnboundedCountingInput( | |
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease. | ||
*/ | ||
public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant> timestampFn) { | ||
return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime); | ||
return new UnboundedCountingInput( | ||
timestampFn, elementsPerPeriod, period, maxNumRecords, maxReadTime); | ||
} | ||
|
||
/** | ||
|
@@ -157,7 +172,22 @@ public UnboundedCountingInput withTimestampFn(SerializableFunction<Long, Instant | |
public UnboundedCountingInput withMaxNumRecords(long maxRecords) { | ||
checkArgument( | ||
maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords); | ||
return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime); | ||
return new UnboundedCountingInput( | ||
timestampFn, elementsPerPeriod, period, Optional.of(maxRecords), maxReadTime); | ||
} | ||
|
||
/** | ||
* Returns an {@link UnboundedCountingInput} like this one, but with output production limited | ||
* to an aggregate rate of no more than the number of elements per the period length. | ||
* | ||
* <p>Note that this period is taken in aggregate across all instances of the | ||
* {@link PTransform}, which may cause elements to be produced in bunches. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bunches behavior is less there now, right? How about something like
Also, can we attempt to explicitly synchronize them by offseting the start time? That way if I have 1 every s split 10 ways, the start time is 1s later for the split that produces element 2, etc. Of course we'll still have the offset in worker starting time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We would prefer to use the point at which the pipeline starts to decide the actual starting point, but we don't have access to that value. We shouldn't use the time at which the source is created because there is an unknown amount of time between when the source is first created and when it is run. |
||
* | ||
* <p>A duration of {@link Duration#ZERO} will produce output as fast as possbile. | ||
*/ | ||
public UnboundedCountingInput withRate(long numElements, Duration periodLength) { | ||
return new UnboundedCountingInput( | ||
timestampFn, numElements, periodLength, maxNumRecords, maxReadTime); | ||
} | ||
|
||
/** | ||
|
@@ -169,13 +199,18 @@ public UnboundedCountingInput withMaxNumRecords(long maxRecords) { | |
*/ | ||
public UnboundedCountingInput withMaxReadTime(Duration readTime) { | ||
checkNotNull(readTime, "ReadTime cannot be null"); | ||
return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime)); | ||
return new UnboundedCountingInput( | ||
timestampFn, elementsPerPeriod, period, maxNumRecords, Optional.of(readTime)); | ||
} | ||
|
||
@SuppressWarnings("deprecation") | ||
@Override | ||
public PCollection<Long> apply(PBegin begin) { | ||
Unbounded<Long> read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn)); | ||
Unbounded<Long> read = | ||
Read.from( | ||
CountingSource.createUnbounded() | ||
.withTimestampFn(timestampFn) | ||
.withRate(elementsPerPeriod, period)); | ||
if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { | ||
return begin.apply(read); | ||
} else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
package com.google.cloud.dataflow.sdk.io; | ||
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkNotNull; | ||
|
||
import com.google.cloud.dataflow.sdk.coders.AvroCoder; | ||
import com.google.cloud.dataflow.sdk.coders.Coder; | ||
|
@@ -28,7 +29,9 @@ | |
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; | ||
import com.google.cloud.dataflow.sdk.values.PCollection; | ||
import com.google.common.collect.ImmutableList; | ||
import com.google.common.math.LongMath; | ||
|
||
import org.joda.time.Duration; | ||
import org.joda.time.Instant; | ||
|
||
import java.io.IOException; | ||
|
@@ -81,6 +84,14 @@ public static BoundedSource<Long> upTo(long numElements) { | |
return new BoundedCountingSource(0, numElements); | ||
} | ||
|
||
/** | ||
* Create a new {@link UnboundedCountingSource}. | ||
*/ | ||
// package-private to return a typed UnboundedCountingSource rather than the UnboundedSource type. | ||
static UnboundedCountingSource createUnbounded() { | ||
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, new NowTimestampFn()); | ||
} | ||
|
||
/** | ||
* Creates an {@link UnboundedSource} that will produce numbers starting from {@code 0} up to | ||
* {@link Long#MAX_VALUE}. | ||
|
@@ -113,7 +124,7 @@ public static UnboundedSource<Long, CounterMark> unbounded() { | |
@Deprecated | ||
public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn( | ||
SerializableFunction<Long, Instant> timestampFn) { | ||
return new UnboundedCountingSource(0, 1, timestampFn); | ||
return new UnboundedCountingSource(0, 1, 1L, Duration.ZERO, timestampFn); | ||
} | ||
|
||
///////////////////////////////////////////////////////////////////////////////////////////// | ||
|
@@ -226,11 +237,15 @@ public void close() throws IOException {} | |
/** | ||
* An implementation of {@link CountingSource} that produces an unbounded {@link PCollection}. | ||
*/ | ||
private static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> { | ||
static class UnboundedCountingSource extends UnboundedSource<Long, CounterMark> { | ||
/** The first number (>= 0) generated by this {@link UnboundedCountingSource}. */ | ||
private final long start; | ||
/** The interval between numbers generated by this {@link UnboundedCountingSource}. */ | ||
private final long stride; | ||
/** The number of elements to produce each period. */ | ||
private final long elementsPerPeriod; | ||
/** The time between producing numbers from this {@link UnboundedCountingSource}. */ | ||
private final Duration period; | ||
/** The function used to produce timestamps for the generated elements. */ | ||
private final SerializableFunction<Long, Instant> timestampFn; | ||
|
||
|
@@ -243,13 +258,47 @@ private static class UnboundedCountingSource extends UnboundedSource<Long, Count | |
* | ||
* <p>Note that the timestamps produced by {@code timestampFn} may not decrease. | ||
*/ | ||
public UnboundedCountingSource( | ||
long start, long stride, SerializableFunction<Long, Instant> timestampFn) { | ||
private UnboundedCountingSource( | ||
long start, | ||
long stride, | ||
long elementsPerPeriod, | ||
Duration period, | ||
SerializableFunction<Long, Instant> timestampFn) { | ||
this.start = start; | ||
this.stride = stride; | ||
this.elementsPerPeriod = elementsPerPeriod; | ||
this.period = period; | ||
this.timestampFn = timestampFn; | ||
} | ||
|
||
/** | ||
* Returns an {@link UnboundedCountingSource} like this one with the specified period. Elements | ||
* will be produced with an interval between them equal to the period. | ||
*/ | ||
public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) { | ||
checkArgument( | ||
elementsPerPeriod > 0, | ||
"elements produced per period must be a positive value, got %s", | ||
elementsPerPeriod); | ||
checkArgument( | ||
period.getMillis() >= 0, | ||
"Period must be a non-negative value, got %s", | ||
period.getMillis()); | ||
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn); | ||
} | ||
|
||
/** | ||
* Returns an {@link UnboundedCountingSource} like this one where the timestamp of output | ||
* elements are supplied by the specified function. | ||
* | ||
* <p>Note that timestamps produced by {@code timestampFn} may not decrease. | ||
*/ | ||
public UnboundedCountingSource withTimestampFn( | ||
SerializableFunction<Long, Instant> timestampFn) { | ||
checkNotNull(timestampFn); | ||
return new UnboundedCountingSource(start, stride, elementsPerPeriod, period, timestampFn); | ||
} | ||
|
||
/** | ||
* Splits an unbounded source {@code desiredNumSplits} ways by giving each split every | ||
* {@code desiredNumSplits}th element that this {@link UnboundedCountingSource} | ||
|
@@ -265,12 +314,15 @@ public List<? extends UnboundedSource<Long, CountingSource.CounterMark>> generat | |
int desiredNumSplits, PipelineOptions options) throws Exception { | ||
// Using Javadoc example, stride 2 with 3 splits becomes stride 6. | ||
long newStride = stride * desiredNumSplits; | ||
Duration newPeriod = period.multipliedBy(desiredNumSplits); | ||
|
||
ImmutableList.Builder<UnboundedCountingSource> splits = ImmutableList.builder(); | ||
for (int i = 0; i < desiredNumSplits; ++i) { | ||
// Starts offset by the original stride. Using Javadoc example, this generates starts of | ||
// 0, 2, and 4. | ||
splits.add(new UnboundedCountingSource(start + i * stride, newStride, timestampFn)); | ||
splits.add( | ||
new UnboundedCountingSource( | ||
start + i * stride, newStride, elementsPerPeriod, newPeriod, timestampFn)); | ||
} | ||
return splits.build(); | ||
} | ||
|
@@ -287,7 +339,9 @@ public Coder<CountingSource.CounterMark> getCheckpointMarkCoder() { | |
} | ||
|
||
@Override | ||
public void validate() {} | ||
public void validate() { | ||
checkArgument(period.getMillis() >= 0L); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. error message. Is this necessary, or can we check in the constructor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constructor is better; done |
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. revert? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
@Override | ||
public Coder<Long> getDefaultOutputCoder() { | ||
|
@@ -302,17 +356,27 @@ public Coder<Long> getDefaultOutputCoder() { | |
*/ | ||
private static class UnboundedCountingReader extends UnboundedReader<Long> { | ||
private UnboundedCountingSource source; | ||
|
||
private long current; | ||
private Instant currentTimestamp; | ||
|
||
private Instant started; | ||
private long totalProduced; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. new blank lines necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no, reverted. |
||
public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) { | ||
this.source = source; | ||
if (mark == null) { | ||
// Because we have not emitted an element yet, and start() calls advance, we need to | ||
// "un-advance" so that start() produces the correct output. | ||
this.current = source.start - source.stride; | ||
// Do not produce an element immediately, to ensure that we do not exceed the requested | ||
// period due to splits. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment does not line up with the code. And shouldn't this be in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rm'd the comment; moved to start (have to add a null check to make sure we don't overwrite our checkpoint) |
||
this.started = Instant.now(); | ||
this.totalProduced = 0L; | ||
} else { | ||
this.current = mark.getLastEmitted(); | ||
this.started = mark.getStartTime(); | ||
this.totalProduced = mark.getElementsProduced(); | ||
} | ||
} | ||
|
||
|
@@ -327,19 +391,51 @@ public boolean advance() throws IOException { | |
if (Long.MAX_VALUE - source.stride < current) { | ||
return false; | ||
} | ||
if (getSplitBacklogElements() <= 0) { | ||
return false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return false will indicate no more input available. (Based on advance() javadoc) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's the correct behavior - if we're not at or beyond the next time we should produce, we have no input available, but may in the future. |
||
} | ||
current += source.stride; | ||
currentTimestamp = source.timestampFn.apply(current); | ||
totalProduced++; | ||
return true; | ||
} | ||
|
||
/** | ||
* Gets the size of the split backlog in number of elements. | ||
*/ | ||
private long getSplitBacklogElements() { | ||
if (source.period.getMillis() == 0) { | ||
return Long.MAX_VALUE; | ||
} | ||
long msElapsed = Instant.now().getMillis() - started.getMillis(); | ||
long expectedOutputs = | ||
LongMath.checkedMultiply(source.elementsPerPeriod, msElapsed) / source.period.getMillis(); | ||
return expectedOutputs - totalProduced; | ||
} | ||
|
||
@Override | ||
public long getSplitBacklogBytes() { | ||
long backlogElems = getSplitBacklogElements(); | ||
if (backlogElems == Long.MAX_VALUE) { | ||
return BACKLOG_UNKNOWN; | ||
} | ||
// 8 == Long.BYTES | ||
long backlogBytes = backlogElems * 8; | ||
// overflow protection | ||
if (backlogBytes < backlogElems) { | ||
return Long.MAX_VALUE; | ||
} | ||
return backlogBytes; | ||
} | ||
|
||
@Override | ||
public Instant getWatermark() { | ||
return source.timestampFn.apply(current); | ||
} | ||
|
||
@Override | ||
public CounterMark getCheckpointMark() { | ||
return new CounterMark(current); | ||
return new CounterMark(current, started, totalProduced); | ||
} | ||
|
||
@Override | ||
|
@@ -369,12 +465,16 @@ public void close() throws IOException {} | |
public static class CounterMark implements UnboundedSource.CheckpointMark { | ||
/** The last value emitted. */ | ||
private final long lastEmitted; | ||
private final Instant startTime; | ||
private final long totalProduced; | ||
|
||
/** | ||
* Creates a checkpoint mark reflecting the last emitted value. | ||
*/ | ||
public CounterMark(long lastEmitted) { | ||
public CounterMark(long lastEmitted, Instant startTime, long totalProduced) { | ||
this.lastEmitted = lastEmitted; | ||
this.startTime = startTime; | ||
this.totalProduced = totalProduced; | ||
} | ||
|
||
/** | ||
|
@@ -384,11 +484,24 @@ public long getLastEmitted() { | |
return lastEmitted; | ||
} | ||
|
||
/** | ||
* Returns the total number of elements already produced by this source. | ||
*/ | ||
public long getElementsProduced() { | ||
return totalProduced; | ||
} | ||
|
||
public Instant getStartTime() { | ||
return startTime; | ||
} | ||
|
||
///////////////////////////////////////////////////////////////////////////////////// | ||
|
||
@SuppressWarnings("unused") // For AvroCoder | ||
private CounterMark() { | ||
this.lastEmitted = 0L; | ||
this.startTime = Instant.now(); | ||
this.totalProduced = 0L; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typically easier to read as
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done