From 83ccf082ceb129f536e8a01b755c15ec0f5c2495 Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 24 Aug 2016 17:26:50 -0700 Subject: [PATCH 1/8] Move the samples data to gs://apache-beam-samples/ --- .../java/org/apache/beam/examples/DebuggingWordCount.java | 4 ++-- .../main/java/org/apache/beam/examples/MinimalWordCount.java | 2 +- .../java/org/apache/beam/examples/WindowedWordCount.java | 2 +- .../src/main/java/org/apache/beam/examples/WordCount.java | 4 ++-- .../apache/beam/examples/complete/StreamingWordExtract.java | 2 +- .../main/java/org/apache/beam/examples/complete/TfIdf.java | 4 ++-- .../apache/beam/examples/complete/TopWikipediaSessions.java | 5 +++-- .../apache/beam/examples/complete/TrafficMaxLaneFlow.java | 2 +- .../org/apache/beam/examples/complete/TrafficRoutes.java | 2 +- .../apache/beam/examples/cookbook/DatastoreWordCount.java | 2 +- .../java/org/apache/beam/examples/cookbook/DeDupExample.java | 4 ++-- .../org/apache/beam/examples/cookbook/TriggerExample.java | 2 +- .../java/org/apache/beam/examples/MinimalWordCountJava8.java | 2 +- .../apache/beam/examples/complete/game/HourlyTeamScore.java | 2 +- .../org/apache/beam/examples/complete/game/UserScore.java | 2 +- .../org/apache/beam/examples/MinimalWordCountJava8Test.java | 2 +- 16 files changed, 22 insertions(+), 21 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 5a0930cd35f5b..be3aa419b5379 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -100,8 +100,8 @@ * that changing the default worker log level to TRACE or DEBUG will significantly increase * the amount of logs output. * - *

The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. + *

The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} + * and can be overridden with {@code --inputFile}. */ public class DebuggingWordCount { /** A DoFn that filters for a specific key based upon a regular expression. */ diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index df725e3f15858..f28a20cf94491 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -87,7 +87,7 @@ public static void main(String[] args) { // Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set // of input text files. TextIO.Read returns a PCollection where each element is one line from // the input text (a set of Shakespeare's texts). - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) // Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection, where each element is an individual word in diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java index 5f60524209c19..7af354cee0fab 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java @@ -82,7 +82,7 @@ * *

Optionally specify the input file path via: * {@code --inputFile=gs://INPUT_PATH}, - * which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}. + * which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}. * *

Specify an output BigQuery dataset and optionally, a table for the output. If you don't * specify the table, one will be created for you using the job name. If you don't specify the diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index d42d6214973d3..0275651bf288b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -89,8 +89,8 @@ * --output=gs://YOUR_OUTPUT_PREFIX * } * - *

The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be - * overridden with {@code --inputFile}. + *

The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt} + * and can be overridden with {@code --inputFile}. */ public class WordCount { diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java index 348bab84b996d..869ea69b8ae47 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java @@ -104,7 +104,7 @@ static TableSchema getSchema() { private interface StreamingWordExtractOptions extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index a5a939263ee4e..87023edac29ac 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -83,7 +83,7 @@ * --output=gs://YOUR_OUTPUT_PREFIX * } * - *

The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with + *

The default input is {@code gs://apache-beam-samples/shakespeare/} and can be overridden with * {@code --input}. */ public class TfIdf { @@ -94,7 +94,7 @@ public class TfIdf { */ private static interface Options extends PipelineOptions { @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") + @Default.String("gs://apache-beam-samples/shakespeare/") String getInput(); void setInput(String value); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 1b2064ad068ee..d597258d3b923 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -64,7 +64,7 @@ * --output=gs://YOUR_OUTPUT_PREFIX * } * - *

The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be + *

The default input is {@code gs://apache-beam-samples/wikipedia_edits/*.json} and can be * overridden with {@code --input}. * *

The input for this example is large enough that it's a good place to enable (experimental) @@ -77,7 +77,8 @@ * This will automatically scale the number of workers up over time until the job completes. */ public class TopWikipediaSessions { - private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json"; + private static final String EXPORTED_WIKI_TABLE = + "gs://apache-beam-samples/wikipedia_edits/*.json"; /** * Extracts user and timestamp from a TableRow representing a Wikipedia edit. diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java index 1b27e650f03e9..e4569600c05c6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java @@ -303,7 +303,7 @@ public PCollection apply(PBegin begin) { */ private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/traffic_sensor/" + @Default.String("gs://apache-beam-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") String getInputFile(); void setInputFile(String value); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java index f3c2d3936ee74..95336c644a79f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java @@ -313,7 +313,7 @@ public PCollection apply(PBegin begin) { */ private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions { @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/traffic_sensor/" + @Default.String("gs://apache-beam-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv") String getInputFile(); void setInputFile(String value); diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java index 9a9e79968670e..eb2165f91e19b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -151,7 +151,7 @@ public void processElement(ProcessContext c) { */ public static interface Options extends PipelineOptions { @Description("Path of the file to read from and store to Datastore") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") + @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt") String getInput(); void setInput(String value); diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java index d573bcd9d295e..57917109addd1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java @@ -47,7 +47,7 @@ * and an output prefix on GCS: * --output=gs://YOUR_OUTPUT_PREFIX * - *

The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be + *

The input defaults to {@code gs://apache-beam-samples/shakespeare/*} and can be * overridden with {@code --input}. */ public class DeDupExample { @@ -59,7 +59,7 @@ public class DeDupExample { */ private static interface Options extends PipelineOptions { @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/*") + @Default.String("gs://apache-beam-samples/shakespeare/*") String getInput(); void setInput(String value); diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index db59435555458..263054138dd64 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -420,7 +420,7 @@ public interface TrafficFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions { @Description("Input file to read from") - @Default.String("gs://dataflow-samples/traffic_sensor/" + @Default.String("gs://apache-beam-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv") String getInput(); void setInput(String value); diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index a49da7bdfbb65..24dd6f9b2c483 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -55,7 +55,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.by((String word) -> !word.isEmpty())) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index d408e2132dabf..cf1389981fc5b 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -70,7 +70,7 @@ * timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis. * To indicate a time before which data should be filtered out, include the {@code --startMin} arg. * If you're using the default input specified in {@link UserScore}, - * "gs://dataflow-samples/game/gaming_data*.csv", then + * "gs://apache-beam-samples/game/gaming_data*.csv", then * {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values. */ public class HourlyTeamScore extends UserScore { diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 65036cee6b922..f05879f93106f 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -182,7 +182,7 @@ public static interface Options extends PipelineOptions { @Description("Path to the data file(s) containing game data.") // The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent // day's worth (roughly) of data. - @Default.String("gs://dataflow-samples/game/gaming_data*.csv") + @Default.String("gs://apache-beam-samples/game/gaming_data*.csv") String getInput(); void setInput(String value); diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index 85841a7801474..181921920b002 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -60,7 +60,7 @@ public void testMinimalWordCountJava8() throws Exception { Pipeline p = TestPipeline.create(); p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil()); - p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*")) + p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) .withOutputType(TypeDescriptors.strings())) .apply(Filter.by((String word) -> !word.isEmpty())) From 4e6230cc734ab3dba081e04d135a285b73008270 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Wed, 17 Aug 2016 14:38:36 -0700 Subject: [PATCH 2/8] Update DoFn javadocs to remove references to OldDoFn and Dataflow --- .../examples/common/PubsubFileInjector.java | 2 +- .../apache/beam/sdk/util/DoFnRunnerBase.java | 16 +- .../beam/sdk/util/GroupAlsoByWindowsDoFn.java | 2 +- .../apache/beam/sdk/util/ReduceFnRunner.java | 5 +- .../beam/sdk/util/SimpleDoFnRunner.java | 4 +- .../ImmutabilityCheckingBundleFactory.java | 4 +- .../direct/TransformEvaluatorFactory.java | 3 +- .../beam/runners/dataflow/util/DoFnInfo.java | 7 +- .../translation/MultiOutputWordCountTest.java | 2 +- .../spark/translation/SerializationTest.java | 4 +- .../org/apache/beam/sdk/AggregatorValues.java | 4 +- .../beam/sdk/transforms/Aggregator.java | 14 +- .../beam/sdk/transforms/CombineFns.java | 18 +- .../org/apache/beam/sdk/transforms/DoFn.java | 23 +- .../beam/sdk/transforms/DoFnTester.java | 62 ++-- .../beam/sdk/transforms/GroupByKey.java | 7 +- .../beam/sdk/transforms/PTransform.java | 2 +- .../org/apache/beam/sdk/transforms/ParDo.java | 306 +++++++++--------- .../beam/sdk/transforms/SimpleFunction.java | 6 +- .../sdk/transforms/windowing/PaneInfo.java | 10 +- .../beam/sdk/util/BaseExecutionContext.java | 4 +- .../util/ReifyTimestampAndWindowsDoFn.java | 4 +- .../beam/sdk/util/SerializableUtils.java | 2 +- .../beam/sdk/util/SystemDoFnInternal.java | 7 +- .../beam/sdk/util/WindowingInternals.java | 3 +- .../DoFnDelegatingAggregatorTest.java | 2 +- .../beam/sdk/transforms/DoFnTesterTest.java | 3 +- .../beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 28 files changed, 263 insertions(+), 265 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java index e6a1495e545d5..4634159826d35 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java +++ b/examples/java/src/main/java/org/apache/beam/examples/common/PubsubFileInjector.java @@ -69,7 +69,7 @@ public Bound publish(String outputTopic) { } } - /** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */ + /** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */ public static class Bound extends OldDoFn { private final String outputTopic; private final String timestampLabelKey; diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 8a0f6bf868d91..04a0978b60301 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -58,10 +58,10 @@ */ public abstract class DoFnRunnerBase implements DoFnRunner { - /** The OldDoFn being run. */ + /** The {@link OldDoFn} being run. */ public final OldDoFn fn; - /** The context used for running the OldDoFn. */ + /** The context used for running the {@link OldDoFn}. */ public final DoFnContext context; protected DoFnRunnerBase( @@ -164,8 +164,8 @@ public void finishBundle() { /** * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. * - * @param the type of the OldDoFn's (main) input elements - * @param the type of the OldDoFn's (main) output elements + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements */ private static class DoFnContext extends OldDoFn.Context { @@ -350,7 +350,7 @@ protected Aggregator createAggreg } /** - * Returns a new {@code OldDoFn.ProcessContext} for the given element. + * Returns a new {@link OldDoFn.ProcessContext} for the given element. */ protected OldDoFn.ProcessContext createProcessContext( WindowedValue elem) { @@ -366,11 +366,11 @@ private boolean isSystemDoFn() { } /** - * A concrete implementation of {@code OldDoFn.ProcessContext} used for + * A concrete implementation of {@link OldDoFn.ProcessContext} used for * running a {@link OldDoFn} over a single element. * - * @param the type of the OldDoFn's (main) input elements - * @param the type of the OldDoFn's (main) output elements + * @param the type of the {@link OldDoFn} (main) input elements + * @param the type of the {@link OldDoFn} (main) output elements */ static class DoFnProcessContext extends OldDoFn.ProcessContext { diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java index f82e5dfe32e7b..f386dfba12201 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java @@ -26,7 +26,7 @@ import org.apache.beam.sdk.values.KV; /** - * OldDoFn that merges windows and groups elements in those windows, optionally + * {@link OldDoFn} that merges windows and groups elements in those windows, optionally * combining values. * * @param key type diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 61e5b21ebfd93..7c3e4d749a1f3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -33,7 +33,6 @@ import javax.annotation.Nullable; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -177,8 +176,8 @@ public class ReduceFnRunner { * Store the previously emitted pane (if any) for each window. * *