diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
new file mode 100644
index 0000000000..07609ba716
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn;
+import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PBegin;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
+import com.google.common.base.Optional;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A {@link PTransform} that produces longs. When used to produce a
+ * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0}
+ * and counts up to a specified maximum. When used to produce an
+ * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE}
+ * and then never produces more output. (In practice, this limit should never be reached.)
+ *
+ *
The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and
+ * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it
+ * supports dynamic work rebalancing.
+ *
+ *
To produce a bounded {@code PCollection}, use {@link CountingInput#upTo(long)}:
+ *
+ * {@code
+ * Pipeline p = ...
+ * PTransform> producer = CountingInput.upTo(1000);
+ * PCollection bounded = p.apply(producer);
+ * }
+ *
+ * To produce an unbounded {@code PCollection}, use {@link CountingInput#unbounded()},
+ * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
+ * with timestamps other than {@link Instant#now}.
+ *
+ * {@code
+ * Pipeline p = ...
+ *
+ * // To create an unbounded producer that uses processing time as the element timestamp.
+ * PCollection unbounded = p.apply(CountingInput.unbounded());
+ * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
+ * PCollection unboundedWithTimestamps =
+ * p.apply(CountingInput.unbounded().withTimestampFn(someFn));
+ * }
+ */
+public class CountingInput {
+ /**
+ * Creates a {@link BoundedCountingInput} that will produce the specified number of elements,
+ * from {@code 0} to {@code numElements - 1}.
+ */
+ public static BoundedCountingInput upTo(long numElements) {
+ checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
+ return new BoundedCountingInput(numElements);
+ }
+
+ /**
+ * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up
+ * to {@link Long#MAX_VALUE}.
+ *
+ * After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
+ * limit should never be reached.)
+ *
+ *
Elements in the resulting {@link PCollection PCollection<Long>} will by default have
+ * timestamps corresponding to processing time at element generation, provided by
+ * {@link Instant#now}. Use the transform returned by
+ * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
+ * timestamps.
+ */
+ public static UnboundedCountingInput unbounded() {
+ return new UnboundedCountingInput(
+ new NowTimestampFn(), Optional.absent(), Optional.absent());
+ }
+
+ /**
+ * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from
+ * 0.
+ */
+ public static class BoundedCountingInput extends PTransform> {
+ private final long numElements;
+
+ private BoundedCountingInput(long numElements) {
+ this.numElements = numElements;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public PCollection apply(PBegin begin) {
+ return begin.apply(Read.from(CountingSource.upTo(numElements)));
+ }
+ }
+
+ /**
+ * A {@link PTransform} that will produce numbers starting from {@code 0} up to
+ * {@link Long#MAX_VALUE}.
+ *
+ * After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this
+ * limit should never be reached.)
+ *
+ *
Elements in the resulting {@link PCollection PCollection<Long>} will by default have
+ * timestamps corresponding to processing time at element generation, provided by
+ * {@link Instant#now}. Use the transform returned by
+ * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output
+ * timestamps.
+ */
+ public static class UnboundedCountingInput extends PTransform> {
+ private final SerializableFunction timestampFn;
+ private final Optional maxNumRecords;
+ private final Optional maxReadTime;
+
+ private UnboundedCountingInput(
+ SerializableFunction timestampFn,
+ Optional maxNumRecords,
+ Optional maxReadTime) {
+ this.timestampFn = timestampFn;
+ this.maxNumRecords = maxNumRecords;
+ this.maxReadTime = maxReadTime;
+ }
+
+ /**
+ * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the
+ * timestamp specified by the timestampFn.
+ *
+ * Note that the timestamps produced by {@code timestampFn} may not decrease.
+ */
+ public UnboundedCountingInput withTimestampFn(SerializableFunction timestampFn) {
+ return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime);
+ }
+
+ /**
+ * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the
+ * specified number of elements.
+ *
+ * A bounded amount of elements will be produced by the result transform, and the result
+ * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+ */
+ public UnboundedCountingInput withMaxNumRecords(long maxRecords) {
+ checkArgument(
+ maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords);
+ return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime);
+ }
+
+ /**
+ * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the
+ * specified amount of time.
+ *
+ *
A bounded amount of elements will be produced by the result transform, and the result
+ * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}.
+ */
+ public UnboundedCountingInput withMaxReadTime(Duration readTime) {
+ checkNotNull(readTime, "ReadTime cannot be null");
+ return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime));
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public PCollection apply(PBegin begin) {
+ Unbounded read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn));
+ if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+ return begin.apply(read);
+ } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) {
+ return begin.apply(read.withMaxNumRecords(maxNumRecords.get()));
+ } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) {
+ return begin.apply(read.withMaxReadTime(maxReadTime.get()));
+ } else {
+ return begin.apply(
+ read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
+ }
+ }
+ }
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
index 2938534168..412f3a7ec9 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java
@@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
@@ -48,29 +49,33 @@
*
* {@code
* Pipeline p = ...
- * BoundedSource source = CountingSource.upTo(1000);
- * PCollection bounded = p.apply(Read.from(source));
+ * PTransform> producer = CountingInput.upTo(1000);
+ * PCollection bounded = p.apply(producer);
* }
*
- * To produce an unbounded {@code PCollection}, use {@link CountingSource#unbounded} or
- * {@link CountingSource#unboundedWithTimestampFn}:
+ * To produce an unbounded {@code PCollection}, use {@link CountingInput#unbounded()},
+ * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values
+ * with timestamps other than {@link Instant#now}.
*
* {@code
* Pipeline p = ...
*
- * // To create an unbounded source that uses processing time as the element timestamp.
- * UnboundedSource source = CountingSource.unbounded();
+ * // To create an unbounded PCollection that uses processing time as the element timestamp.
+ * PCollection unbounded = p.apply(CountingInput.unbounded());
* // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * UnboundedSource source = CountingSource.unboundedWithTimestampFn(someFn);
+ * PCollection unboundedWithTimestamps =
+ * p.apply(CountingInput.unbounded().withTimestampFn(someFn));
*
- * PCollection unbounded = p.apply(Read.from(source));
* }
*/
public class CountingSource {
/**
* Creates a {@link BoundedSource} that will produce the specified number of elements,
* from {@code 0} to {@code numElements - 1}.
+ *
+ * @deprecated use {@link CountingInput#upTo(long)} instead
*/
+ @Deprecated
public static BoundedSource upTo(long numElements) {
checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements);
return new BoundedCountingSource(0, numElements);
@@ -85,7 +90,10 @@ public static BoundedSource upTo(long numElements) {
*
* Elements in the resulting {@link PCollection PCollection<Long>} will have timestamps
* corresponding to processing time at element generation, provided by {@link Instant#now}.
+ *
+ * @deprecated use {@link CountingInput#unbounded()} instead
*/
+ @Deprecated
public static UnboundedSource unbounded() {
return unboundedWithTimestampFn(new NowTimestampFn());
}
@@ -98,7 +106,11 @@ public static UnboundedSource unbounded() {
* limit should never be reached.)
*
* Note that the timestamps produced by {@code timestampFn} may not decrease.
+ *
+ * @deprecated use {@link CountingInput#unbounded()} and call
+ * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead
*/
+ @Deprecated
public static UnboundedSource unboundedWithTimestampFn(
SerializableFunction timestampFn) {
return new UnboundedCountingSource(0, 1, timestampFn);
@@ -109,11 +121,10 @@ public static UnboundedSource unboundedWithTimestampFn(
/** Prevent instantiation. */
private CountingSource() {}
-
/**
* A function that returns {@link Instant#now} as the timestamp for each generated element.
*/
- private static class NowTimestampFn implements SerializableFunction {
+ static class NowTimestampFn implements SerializableFunction {
@Override
public Instant apply(Long input) {
return Instant.now();
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
new file mode 100644
index 0000000000..948a892b58
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.io;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput;
+import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
+import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.Count;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.Max;
+import com.google.cloud.dataflow.sdk.transforms.Min;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
+import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests for {@link CountingInput}.
+ */
+public class CountingInputTest {
+ public static void addCountingAsserts(PCollection input, long numElements) {
+ // Count == numElements
+ DataflowAssert.thatSingleton(input.apply("Count", Count.globally()))
+ .isEqualTo(numElements);
+ // Unique count == numElements
+ DataflowAssert.thatSingleton(
+ input
+ .apply(RemoveDuplicates.create())
+ .apply("UniqueCount", Count.globally()))
+ .isEqualTo(numElements);
+ // Min == 0
+ DataflowAssert.thatSingleton(input.apply("Min", Min.globally())).isEqualTo(0L);
+ // Max == numElements-1
+ DataflowAssert.thatSingleton(input.apply("Max", Max.globally()))
+ .isEqualTo(numElements - 1);
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBoundedInput() {
+ Pipeline p = TestPipeline.create();
+ long numElements = 1000;
+ PCollection input = p.apply(CountingInput.upTo(numElements));
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedInput() {
+ Pipeline p = TestPipeline.create();
+ long numElements = 1000;
+
+ PCollection input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ private static class ElementValueDiff extends DoFn {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element() - c.timestamp().getMillis());
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedInputTimestamps() {
+ Pipeline p = TestPipeline.create();
+ long numElements = 1000;
+
+ PCollection input =
+ p.apply(
+ CountingInput.unbounded()
+ .withTimestampFn(new ValueAsTimestampFn())
+ .withMaxNumRecords(numElements));
+ addCountingAsserts(input, numElements);
+
+ PCollection diffs =
+ input
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("RemoveDuplicateTimestamps", RemoveDuplicates.create());
+ // This assert also confirms that diffs only has one unique value.
+ DataflowAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
+ /**
+ * A timestamp function that uses the given value as the timestamp. Because the input values will
+ * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
+ * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
+ */
+ private static class ValueAsTimestampFn implements SerializableFunction {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input);
+ }
+ }
+}