From f041893f99c529189f61567b57777b4f0dc050ae Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Wed, 2 Mar 2016 14:03:28 -0800 Subject: [PATCH] Add CountingInput as a PTransform This transform produces an unbounded PCollection containing longs based on a CountingSource. Deprecate methods producing a Source in CountingSource. --- .../cloud/dataflow/sdk/io/CountingInput.java | 191 ++++++++++++++++++ .../cloud/dataflow/sdk/io/CountingSource.java | 31 ++- .../dataflow/sdk/io/CountingInputTest.java | 122 +++++++++++ 3 files changed, 334 insertions(+), 10 deletions(-) create mode 100644 sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java create mode 100644 sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java new file mode 100644 index 0000000000000..07609ba716eb0 --- /dev/null +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.io; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.cloud.dataflow.sdk.io.CountingSource.NowTimestampFn; +import com.google.cloud.dataflow.sdk.io.Read.Unbounded; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; +import com.google.common.base.Optional; + +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * A {@link PTransform} that produces longs. When used to produce a + * {@link IsBounded#BOUNDED bounded} {@link PCollection}, {@link CountingInput} starts at {@code 0} + * and counts up to a specified maximum. When used to produce an + * {@link IsBounded#UNBOUNDED unbounded} {@link PCollection}, it counts up to {@link Long#MAX_VALUE} + * and then never produces more output. (In practice, this limit should never be reached.) + * + *

The bounded {@link CountingInput} is implemented based on {@link OffsetBasedSource} and + * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it + * supports dynamic work rebalancing. + * + *

To produce a bounded {@code PCollection}, use {@link CountingInput#upTo(long)}: + * + *

{@code
+ * Pipeline p = ...
+ * PTransform> producer = CountingInput.upTo(1000);
+ * PCollection bounded = p.apply(producer);
+ * }
+ * + *

To produce an unbounded {@code PCollection}, use {@link CountingInput#unbounded()}, + * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values + * with timestamps other than {@link Instant#now}. + * + *

{@code
+ * Pipeline p = ...
+ *
+ * // To create an unbounded producer that uses processing time as the element timestamp.
+ * PCollection unbounded = p.apply(CountingInput.unbounded());
+ * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
+ * PCollection unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
+ * }
+ */ +public class CountingInput { + /** + * Creates a {@link BoundedCountingInput} that will produce the specified number of elements, + * from {@code 0} to {@code numElements - 1}. + */ + public static BoundedCountingInput upTo(long numElements) { + checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); + return new BoundedCountingInput(numElements); + } + + /** + * Creates an {@link UnboundedCountingInput} that will produce numbers starting from {@code 0} up + * to {@link Long#MAX_VALUE}. + * + *

After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this + * limit should never be reached.) + * + *

Elements in the resulting {@link PCollection PCollection<Long>} will by default have + * timestamps corresponding to processing time at element generation, provided by + * {@link Instant#now}. Use the transform returned by + * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output + * timestamps. + */ + public static UnboundedCountingInput unbounded() { + return new UnboundedCountingInput( + new NowTimestampFn(), Optional.absent(), Optional.absent()); + } + + /** + * A {@link PTransform} that will produce a specified number of {@link Long Longs} starting from + * 0. + */ + public static class BoundedCountingInput extends PTransform> { + private final long numElements; + + private BoundedCountingInput(long numElements) { + this.numElements = numElements; + } + + @SuppressWarnings("deprecation") + @Override + public PCollection apply(PBegin begin) { + return begin.apply(Read.from(CountingSource.upTo(numElements))); + } + } + + /** + * A {@link PTransform} that will produce numbers starting from {@code 0} up to + * {@link Long#MAX_VALUE}. + * + *

After {@link Long#MAX_VALUE}, the transform never produces more output. (In practice, this + * limit should never be reached.) + * + *

Elements in the resulting {@link PCollection PCollection<Long>} will by default have + * timestamps corresponding to processing time at element generation, provided by + * {@link Instant#now}. Use the transform returned by + * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to control the output + * timestamps. + */ + public static class UnboundedCountingInput extends PTransform> { + private final SerializableFunction timestampFn; + private final Optional maxNumRecords; + private final Optional maxReadTime; + + private UnboundedCountingInput( + SerializableFunction timestampFn, + Optional maxNumRecords, + Optional maxReadTime) { + this.timestampFn = timestampFn; + this.maxNumRecords = maxNumRecords; + this.maxReadTime = maxReadTime; + } + + /** + * Returns an {@link UnboundedCountingInput} like this one, but where output elements have the + * timestamp specified by the timestampFn. + * + *

Note that the timestamps produced by {@code timestampFn} may not decrease. + */ + public UnboundedCountingInput withTimestampFn(SerializableFunction timestampFn) { + return new UnboundedCountingInput(timestampFn, maxNumRecords, maxReadTime); + } + + /** + * Returns an {@link UnboundedCountingInput} like this one, but that will read at most the + * specified number of elements. + * + *

A bounded amount of elements will be produced by the result transform, and the result + * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. + */ + public UnboundedCountingInput withMaxNumRecords(long maxRecords) { + checkArgument( + maxRecords > 0, "MaxRecords must be a positive (nonzero) value. Got %s", maxRecords); + return new UnboundedCountingInput(timestampFn, Optional.of(maxRecords), maxReadTime); + } + + /** + * Returns an {@link UnboundedCountingInput} like this one, but that will read for at most the + * specified amount of time. + * + *

A bounded amount of elements will be produced by the result transform, and the result + * {@link PCollection} will be {@link IsBounded#BOUNDED bounded}. + */ + public UnboundedCountingInput withMaxReadTime(Duration readTime) { + checkNotNull(readTime, "ReadTime cannot be null"); + return new UnboundedCountingInput(timestampFn, maxNumRecords, Optional.of(readTime)); + } + + @SuppressWarnings("deprecation") + @Override + public PCollection apply(PBegin begin) { + Unbounded read = Read.from(CountingSource.unboundedWithTimestampFn(timestampFn)); + if (!maxNumRecords.isPresent() && !maxReadTime.isPresent()) { + return begin.apply(read); + } else if (maxNumRecords.isPresent() && !maxReadTime.isPresent()) { + return begin.apply(read.withMaxNumRecords(maxNumRecords.get())); + } else if (!maxNumRecords.isPresent() && maxReadTime.isPresent()) { + return begin.apply(read.withMaxReadTime(maxReadTime.get())); + } else { + return begin.apply( + read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get())); + } + } + } +} diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java index 2938534168abd..412f3a7ec9c61 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/CountingSource.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.DefaultCoder; import com.google.cloud.dataflow.sdk.coders.VarLongCoder; +import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput; import com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; @@ -48,29 +49,33 @@ * *

{@code
  * Pipeline p = ...
- * BoundedSource source = CountingSource.upTo(1000);
- * PCollection bounded = p.apply(Read.from(source));
+ * PTransform> producer = CountingInput.upTo(1000);
+ * PCollection bounded = p.apply(producer);
  * }
* - *

To produce an unbounded {@code PCollection}, use {@link CountingSource#unbounded} or - * {@link CountingSource#unboundedWithTimestampFn}: + *

To produce an unbounded {@code PCollection}, use {@link CountingInput#unbounded()}, + * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values + * with timestamps other than {@link Instant#now}. * *

{@code
  * Pipeline p = ...
  *
- * // To create an unbounded source that uses processing time as the element timestamp.
- * UnboundedSource source = CountingSource.unbounded();
+ * // To create an unbounded PCollection that uses processing time as the element timestamp.
+ * PCollection unbounded = p.apply(CountingInput.unbounded());
  * // Or, to create an unbounded source that uses a provided function to set the element timestamp.
- * UnboundedSource source = CountingSource.unboundedWithTimestampFn(someFn);
+ * PCollection unboundedWithTimestamps =
+ *     p.apply(CountingInput.unbounded().withTimestampFn(someFn));
  *
- * PCollection unbounded = p.apply(Read.from(source));
  * }
*/ public class CountingSource { /** * Creates a {@link BoundedSource} that will produce the specified number of elements, * from {@code 0} to {@code numElements - 1}. + * + * @deprecated use {@link CountingInput#upTo(long)} instead */ + @Deprecated public static BoundedSource upTo(long numElements) { checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); return new BoundedCountingSource(0, numElements); @@ -85,7 +90,10 @@ public static BoundedSource upTo(long numElements) { * *

Elements in the resulting {@link PCollection PCollection<Long>} will have timestamps * corresponding to processing time at element generation, provided by {@link Instant#now}. + * + * @deprecated use {@link CountingInput#unbounded()} instead */ + @Deprecated public static UnboundedSource unbounded() { return unboundedWithTimestampFn(new NowTimestampFn()); } @@ -98,7 +106,11 @@ public static UnboundedSource unbounded() { * limit should never be reached.) * *

Note that the timestamps produced by {@code timestampFn} may not decrease. + * + * @deprecated use {@link CountingInput#unbounded()} and call + * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead */ + @Deprecated public static UnboundedSource unboundedWithTimestampFn( SerializableFunction timestampFn) { return new UnboundedCountingSource(0, 1, timestampFn); @@ -109,11 +121,10 @@ public static UnboundedSource unboundedWithTimestampFn( /** Prevent instantiation. */ private CountingSource() {} - /** * A function that returns {@link Instant#now} as the timestamp for each generated element. */ - private static class NowTimestampFn implements SerializableFunction { + static class NowTimestampFn implements SerializableFunction { @Override public Instant apply(Long input) { return Instant.now(); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java new file mode 100644 index 0000000000000..948a892b58a92 --- /dev/null +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.io; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.CountingInput.UnboundedCountingInput; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Count; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Max; +import com.google.cloud.dataflow.sdk.transforms.Min; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Tests for {@link CountingInput}. + */ +public class CountingInputTest { + public static void addCountingAsserts(PCollection input, long numElements) { + // Count == numElements + DataflowAssert.thatSingleton(input.apply("Count", Count.globally())) + .isEqualTo(numElements); + // Unique count == numElements + DataflowAssert.thatSingleton( + input + .apply(RemoveDuplicates.create()) + .apply("UniqueCount", Count.globally())) + .isEqualTo(numElements); + // Min == 0 + DataflowAssert.thatSingleton(input.apply("Min", Min.globally())).isEqualTo(0L); + // Max == numElements-1 + DataflowAssert.thatSingleton(input.apply("Max", Max.globally())) + .isEqualTo(numElements - 1); + } + + @Test + @Category(RunnableOnService.class) + public void testBoundedInput() { + Pipeline p = TestPipeline.create(); + long numElements = 1000; + PCollection input = p.apply(CountingInput.upTo(numElements)); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedInput() { + Pipeline p = TestPipeline.create(); + long numElements = 1000; + + PCollection input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); + + addCountingAsserts(input, numElements); + p.run(); + } + + private static class ElementValueDiff extends DoFn { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element() - c.timestamp().getMillis()); + } + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedInputTimestamps() { + Pipeline p = TestPipeline.create(); + long numElements = 1000; + + PCollection input = + p.apply( + CountingInput.unbounded() + .withTimestampFn(new ValueAsTimestampFn()) + .withMaxNumRecords(numElements)); + addCountingAsserts(input, numElements); + + PCollection diffs = + input + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("RemoveDuplicateTimestamps", RemoveDuplicates.create()); + // This assert also confirms that diffs only has one unique value. + DataflowAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + + /** + * A timestamp function that uses the given value as the timestamp. Because the input values will + * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out + * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}. + */ + private static class ValueAsTimestampFn implements SerializableFunction { + @Override + public Instant apply(Long input) { + return new Instant(input); + } + } +}