Skip to content

Commit

Permalink
Merge branch 'master' into mycbeam313
Browse files Browse the repository at this point in the history
  • Loading branch information
amarouni committed Aug 26, 2016
2 parents 5a1796f + 5161937 commit d5615a1
Show file tree
Hide file tree
Showing 89 changed files with 1,050 additions and 481 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@
* that changing the default worker log level to TRACE or DEBUG will significantly increase
* the amount of logs output.
*
* <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
* overridden with {@code --inputFile}.
* <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
* and can be overridden with {@code --inputFile}.
*/
public class DebuggingWordCount {
/** A DoFn that filters for a specific key based upon a regular expression. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static void main(String[] args) {
// Concept #1: Apply a root transform to the pipeline; in this case, TextIO.Read to read a set
// of input text files. TextIO.Read returns a PCollection where each element is one line from
// the input text (a set of Shakespeare's texts).
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
// Concept #2: Apply a ParDo transform to our PCollection of text lines. This ParDo invokes a
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
*
* <p>Optionally specify the input file path via:
* {@code --inputFile=gs://INPUT_PATH},
* which defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt}.
* which defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}.
*
* <p>Specify an output BigQuery dataset and optionally, a table for the output. If you don't
* specify the table, one will be created for you using the job name. If you don't specify the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@
* --output=gs://YOUR_OUTPUT_PREFIX
* }</pre>
*
* <p>The input file defaults to {@code gs://dataflow-samples/shakespeare/kinglear.txt} and can be
* overridden with {@code --inputFile}.
* <p>The input file defaults to {@code gs://apache-beam-samples/shakespeare/kinglear.txt}
* and can be overridden with {@code --inputFile}.
*/
public class WordCount {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public interface ExampleBigQueryTableOptions extends GcpOptions {
static class BigQueryTableFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
return options.as(ExampleOptions.class).getNormalizedUniqueName()
.replace('-', '_');
return options.getJobName().replace('-', '_');
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,9 @@
*/
package org.apache.beam.examples.common;

import com.google.common.base.MoreObjects;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/**
* Options that can be used to configure the Beam examples.
Expand All @@ -42,39 +34,4 @@ public interface ExampleOptions extends PipelineOptions {
@Default.Integer(1)
int getInjectorNumWorkers();
void setInjectorNumWorkers(int numWorkers);

@Description("A normalized unique name that is used to name anything related to the pipeline."
+ "It defaults to ApplicationName-UserName-Date-RandomInteger")
@Default.InstanceFactory(NormalizedUniqueNameFactory.class)
String getNormalizedUniqueName();
void setNormalizedUniqueName(String numWorkers);

/**
* Returns a normalized unique name constructed from {@link ApplicationNameOptions#getAppName()},
* the local system user name (if available), the current time, and a random integer.
*
* <p>The normalization makes sure that the name matches the pattern of
* [a-z]([-a-z0-9]*[a-z0-9])?.
*/
public static class NormalizedUniqueNameFactory implements DefaultValueFactory<String> {
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC);

@Override
public String create(PipelineOptions options) {
String appName = options.as(ApplicationNameOptions.class).getAppName();
String normalizedAppName = appName == null || appName.length() == 0 ? "BeamApp"
: appName.toLowerCase()
.replaceAll("[^a-z0-9]", "0")
.replaceAll("^[^a-z]", "a");
String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), "");
String normalizedUserName = userName.toLowerCase()
.replaceAll("[^a-z0-9]", "0");
String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis());

String randomPart = Integer.toHexString(ThreadLocalRandom.current().nextInt());
return String.format("%s-%s-%s-%s",
normalizedAppName, normalizedUserName, datePart, randomPart);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static class PubsubSubscriptionFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
return "projects/" + options.as(GcpOptions.class).getProject()
+ "/subscriptions/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+ "/subscriptions/" + options.getJobName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static class PubsubTopicFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
return "projects/" + options.as(GcpOptions.class).getProject()
+ "/topics/" + options.as(ExampleOptions.class).getNormalizedUniqueName();
+ "/topics/" + options.getJobName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Bound publish(String outputTopic) {
}
}

/** A OldDoFn that publishes non-empty lines to Google Cloud PubSub. */
/** A {@link OldDoFn} that publishes non-empty lines to Google Cloud PubSub. */
public static class Bound extends OldDoFn<String, Void> {
private final String outputTopic;
private final String timestampLabelKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static TableSchema getSchema() {
private interface StreamingWordExtractOptions
extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
* --output=gs://YOUR_OUTPUT_PREFIX
* }</pre>
*
* <p>The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with
* <p>The default input is {@code gs://apache-beam-samples/shakespeare/} and can be overridden with
* {@code --input}.
*/
public class TfIdf {
Expand All @@ -94,7 +94,7 @@ public class TfIdf {
*/
private static interface Options extends PipelineOptions {
@Description("Path to the directory or GCS prefix containing files to read from")
@Default.String("gs://dataflow-samples/shakespeare/")
@Default.String("gs://apache-beam-samples/shakespeare/")
String getInput();
void setInput(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
* --output=gs://YOUR_OUTPUT_PREFIX
* }</pre>
*
* <p>The default input is {@code gs://dataflow-samples/wikipedia_edits/*.json} and can be
* <p>The default input is {@code gs://apache-beam-samples/wikipedia_edits/*.json} and can be
* overridden with {@code --input}.
*
* <p>The input for this example is large enough that it's a good place to enable (experimental)
Expand All @@ -77,7 +77,8 @@
* This will automatically scale the number of workers up over time until the job completes.
*/
public class TopWikipediaSessions {
private static final String EXPORTED_WIKI_TABLE = "gs://dataflow-samples/wikipedia_edits/*.json";
private static final String EXPORTED_WIKI_TABLE =
"gs://apache-beam-samples/wikipedia_edits/*.json";

/**
* Extracts user and timestamp from a TableRow representing a Wikipedia edit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public PCollection<String> apply(PBegin begin) {
*/
private interface TrafficMaxLaneFlowOptions extends ExampleOptions, ExampleBigQueryTableOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
@Default.String("gs://apache-beam-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
String getInputFile();
void setInputFile(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public PCollection<String> apply(PBegin begin) {
*/
private interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
@Default.String("gs://apache-beam-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
String getInputFile();
void setInputFile(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void processElement(ProcessContext c) {
*/
public static interface Options extends PipelineOptions {
@Description("Path of the file to read from and store to Datastore")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
String getInput();
void setInput(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* and an output prefix on GCS:
* --output=gs://YOUR_OUTPUT_PREFIX
*
* <p>The input defaults to {@code gs://dataflow-samples/shakespeare/*} and can be
* <p>The input defaults to {@code gs://apache-beam-samples/shakespeare/*} and can be
* overridden with {@code --input}.
*/
public class DeDupExample {
Expand All @@ -59,7 +59,7 @@ public class DeDupExample {
*/
private static interface Options extends PipelineOptions {
@Description("Path to the directory or GCS prefix containing files to read from")
@Default.String("gs://dataflow-samples/shakespeare/*")
@Default.String("gs://apache-beam-samples/shakespeare/*")
String getInput();
void setInput(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public interface TrafficFlowOptions
extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {

@Description("Input file to read from")
@Default.String("gs://dataflow-samples/traffic_sensor/"
@Default.String("gs://apache-beam-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
String getInput();
void setInput(String value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static void main(String[] args) {

Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
.apply(Filter.by((String word) -> !word.isEmpty()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
* timestamped after 23:59 PST on 2015-10-18 should not be included in the analysis.
* To indicate a time before which data should be filtered out, include the {@code --startMin} arg.
* If you're using the default input specified in {@link UserScore},
* "gs://dataflow-samples/game/gaming_data*.csv", then
* "gs://apache-beam-samples/game/gaming_data*.csv", then
* {@code --startMin=2015-11-16-16-10 --stopMin=2015-11-17-16-10} are good values.
*/
public class HourlyTeamScore extends UserScore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public static interface Options extends PipelineOptions {
@Description("Path to the data file(s) containing game data.")
// The default maps to two large Google Cloud Storage files (each ~12GB) holding two subsequent
// day's worth (roughly) of data.
@Default.String("gs://dataflow-samples/game/gaming_data*.csv")
@Default.String("gs://apache-beam-samples/game/gaming_data*.csv")
String getInput();
void setInput(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testMinimalWordCountJava8() throws Exception {
Pipeline p = TestPipeline.create();
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(TypeDescriptors.strings()))
.apply(Filter.by((String word) -> !word.isEmpty()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
*/
public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {

/** The OldDoFn being run. */
/** The {@link OldDoFn} being run. */
public final OldDoFn<InputT, OutputT> fn;

/** The context used for running the OldDoFn. */
/** The context used for running the {@link OldDoFn}. */
public final DoFnContext<InputT, OutputT> context;

protected DoFnRunnerBase(
Expand Down Expand Up @@ -164,8 +164,8 @@ public void finishBundle() {
/**
* A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}.
*
* @param <InputT> the type of the OldDoFn's (main) input elements
* @param <OutputT> the type of the OldDoFn's (main) output elements
* @param <InputT> the type of the {@link OldDoFn} (main) input elements
* @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
private static class DoFnContext<InputT, OutputT>
extends OldDoFn<InputT, OutputT>.Context {
Expand Down Expand Up @@ -350,7 +350,7 @@ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggreg
}

/**
* Returns a new {@code OldDoFn.ProcessContext} for the given element.
* Returns a new {@link OldDoFn.ProcessContext} for the given element.
*/
protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext(
WindowedValue<InputT> elem) {
Expand All @@ -366,11 +366,11 @@ private boolean isSystemDoFn() {
}

/**
* A concrete implementation of {@code OldDoFn.ProcessContext} used for
* A concrete implementation of {@link OldDoFn.ProcessContext} used for
* running a {@link OldDoFn} over a single element.
*
* @param <InputT> the type of the OldDoFn's (main) input elements
* @param <OutputT> the type of the OldDoFn's (main) output elements
* @param <InputT> the type of the {@link OldDoFn} (main) input elements
* @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
static class DoFnProcessContext<InputT, OutputT>
extends OldDoFn<InputT, OutputT>.ProcessContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.beam.sdk.values.KV;

/**
* OldDoFn that merges windows and groups elements in those windows, optionally
* {@link OldDoFn} that merges windows and groups elements in those windows, optionally
* combining values.
*
* @param <K> key type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
Expand Down Expand Up @@ -177,8 +176,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
* Store the previously emitted pane (if any) for each window.
*
* <ul>
* <li>State: The previous {@link PaneInfo} passed to the user's {@link OldDoFn#processElement},
* if any.
* <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
* method, if any.
* <li>Style style: DIRECT
* <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
* Cleared when window is merged away.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
/**
* Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
*
* @param <InputT> the type of the OldDoFn's (main) input elements
* @param <OutputT> the type of the OldDoFn's (main) output elements
* @param <InputT> the type of the {@link OldDoFn} (main) input elements
* @param <OutputT> the type of the {@link OldDoFn} (main) output elements
*/
public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT>{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.IllegalMutationException;
import org.apache.beam.sdk.util.MutationDetector;
import org.apache.beam.sdk.util.MutationDetectors;
Expand All @@ -40,7 +40,7 @@
* elements added to the bundle will be encoded by the {@link Coder} of the underlying
* {@link PCollection}.
*
* <p>This catches errors during the execution of a {@link OldDoFn} caused by modifying an element
* <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element
* after it is added to an output {@link PCollection}.
*/
class ImmutabilityCheckingBundleFactory implements BundleFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;

/**
Expand All @@ -37,7 +36,7 @@ public interface TransformEvaluatorFactory {
* Create a new {@link TransformEvaluator} for the application of the {@link PTransform}.
*
* <p>Any work that must be done before input elements are processed (such as calling
* {@link OldDoFn#startBundle(OldDoFn.Context)}) must be done before the
* {@code DoFn.StartBundle}) must be done before the
* {@link TransformEvaluator} is made available to the caller.
*
* <p>May return null if the application cannot produce an evaluator (for example, it is a
Expand Down
Loading

0 comments on commit d5615a1

Please sign in to comment.