Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-294] Rename dataflow references to beam #884

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
</goals>
<configuration>
<relocations>
<!-- relocate Guava used by Dataflow (v18) since it conflicts with
<!-- relocate Guava used by Beam (v18) since it conflicts with
version used by Hadoop (v11) -->
<relocation>
<pattern>com.google.common</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
/**
* The SparkRunner translate operations defined on a pipeline to a representation
* executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
* a dataflow pipeline with the default options of a single threaded spark instance in local mode,
* a Beam pipeline with the default options of a single threaded spark instance in local mode,
* we would do the following:
*
* {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* The SparkRunner translate operations defined on a pipeline to a representation executable
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam
* pipeline with the default options of a single threaded spark instance in local mode, we would do
* the following:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

/**
* This class wraps a map of named aggregators. Spark expects that all accumulators be declared
* before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
* before a job is launched. Beam allows aggregators to be used and incremented on the fly.
* We create a map of named aggregators and instantiate in the the spark context before the job
* is launched. We can then add aggregators on the fly in Spark.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
*/
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
@Default.String("gs://beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public final class ShardNameTemplateHelper {

private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);

public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix";
public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template";
public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix";
public static final String OUTPUT_FILE_PREFIX = "spark.beam.fileoutputformat.prefix";
public static final String OUTPUT_FILE_TEMPLATE = "spark.beam.fileoutputformat.template";
public static final String OUTPUT_FILE_SUFFIX = "spark.beam.fileoutputformat.suffix";

private ShardNameTemplateHelper() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.slf4j.LoggerFactory;

/**
* Dataflow's Do functions correspond to Spark's FlatMap functions.
* Beam's Do functions correspond to Spark's FlatMap functions.
*
* @param <InputT> Input element type.
* @param <OutputT> Output element type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.transforms.PTransform;

/**
* Translator to support translation between Dataflow transformations and Spark transformations.
* Translator to support translation between Beam transformations and Spark transformations.
*/
public interface SparkPipelineTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;

/**
* Map fo names to dataflow aggregators.
* Map fo names to Beam aggregators.
*/
private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
private transient CoderRegistry coderRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import scala.Tuple2;

/**
* Supports translation between a DataFlow transform, and Spark's operations on RDDs.
* Supports translation between a Beam transform, and Spark's operations on RDDs.
*/
public final class TransformTranslator {

Expand Down Expand Up @@ -895,7 +895,7 @@ private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(
}

/**
* Translator matches Dataflow transformation with the appropriate evaluator.
* Translator matches Beam transformation with the appropriate evaluator.
*/
public static class Translator implements SparkPipelineTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@


/**
* Supports translation between a DataFlow transform, and Spark's operations on DStreams.
* Supports translation between a Beam transform, and Spark's operations on DStreams.
*/
public final class StreamingTransformTranslator {

Expand Down Expand Up @@ -349,13 +349,13 @@ public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
(TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
if (transform == null) {
if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
throw new UnsupportedOperationException("Dataflow transformation " + clazz
throw new UnsupportedOperationException("Beam transformation " + clazz
.getCanonicalName()
+ " is currently unsupported by the Spark streaming pipeline");
}
// DStream transformations will transform an RDD into another RDD
// Actions will create output
// In Dataflow it depends on the PTransform's Input and Output class
// In Beam it depends on the PTransform's Input and Output class
Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
if (PDone.class.equals(pTOutputClazz)) {
return foreachRDD(rddTranslator);
Expand All @@ -373,7 +373,7 @@ public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
}

/**
* Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
* Translator matches Beam transformation with the appropriate Spark streaming evaluator.
* rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
*/
public static class Translator implements SparkPipelineTranslator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
public abstract class BroadcastHelper<T> implements Serializable {

/**
* If the property {@code dataflow.spark.directBroadcast} is set to
* If the property {@code beam.spark.directBroadcast} is set to
* {@code true} then Spark serialization (Kryo) will be used to broadcast values
* in View objects. By default this property is not set, and values are coded using
* the appropriate {@link Coder}.
*/
public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
public static final String DIRECT_BROADCAST = "beam.spark.directBroadcast";

private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TransformTranslatorTest {

/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
* in DirectRunner and on SparkRunner, with the mapped dataflow-to-spark
* in DirectRunner and on SparkRunner, with the mapped beam-to-spark
* transforms. Finally it makes sure that the results are the same for both runs.
*/
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class KafkaStreamingTest {
new EmbeddedKafkaCluster.EmbeddedZookeeper();
private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
private static final String TOPIC = "kafka_dataflow_test_topic";
private static final String TOPIC = "kafka_beam_test_topic";
private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
"k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
);
Expand Down