Skip to content

Commit

Permalink
[BEAM-294] This closes #884
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed Aug 26, 2016
2 parents b21c35d + e233e5f commit 2046783
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 21 deletions.
2 changes: 1 addition & 1 deletion runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@
</goals>
<configuration>
<relocations>
<!-- relocate Guava used by Dataflow (v18) since it conflicts with
<!-- relocate Guava used by Beam (v18) since it conflicts with
version used by Hadoop (v11) -->
<relocation>
<pattern>com.google.common</pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
/**
* The SparkRunner translate operations defined on a pipeline to a representation
* executable by Spark, and then submitting the job to Spark to be executed. If we wanted to run
* a dataflow pipeline with the default options of a single threaded spark instance in local mode,
* a Beam pipeline with the default options of a single threaded spark instance in local mode,
* we would do the following:
*
* {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

/**
* The SparkRunner translate operations defined on a pipeline to a representation executable
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a dataflow
* by Spark, and then submitting the job to Spark to be executed. If we wanted to run a Beam
* pipeline with the default options of a single threaded spark instance in local mode, we would do
* the following:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

/**
* This class wraps a map of named aggregators. Spark expects that all accumulators be declared
* before a job is launched. Dataflow allows aggregators to be used and incremented on the fly.
* before a job is launched. Beam allows aggregators to be used and incremented on the fly.
* We create a map of named aggregators and instantiate in the the spark context before the job
* is launched. We can then add aggregators on the fly in Spark.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
*/
public static interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
@Default.String("gs://beam-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public final class ShardNameTemplateHelper {

private static final Logger LOG = LoggerFactory.getLogger(ShardNameTemplateHelper.class);

public static final String OUTPUT_FILE_PREFIX = "spark.dataflow.fileoutputformat.prefix";
public static final String OUTPUT_FILE_TEMPLATE = "spark.dataflow.fileoutputformat.template";
public static final String OUTPUT_FILE_SUFFIX = "spark.dataflow.fileoutputformat.suffix";
public static final String OUTPUT_FILE_PREFIX = "spark.beam.fileoutputformat.prefix";
public static final String OUTPUT_FILE_TEMPLATE = "spark.beam.fileoutputformat.template";
public static final String OUTPUT_FILE_SUFFIX = "spark.beam.fileoutputformat.suffix";

private ShardNameTemplateHelper() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.slf4j.LoggerFactory;

/**
* Dataflow's Do functions correspond to Spark's FlatMap functions.
* Beam's Do functions correspond to Spark's FlatMap functions.
*
* @param <InputT> Input element type.
* @param <OutputT> Output element type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.beam.sdk.transforms.PTransform;

/**
* Translator to support translation between Dataflow transformations and Spark transformations.
* Translator to support translation between Beam transformations and Spark transformations.
*/
public interface SparkPipelineTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class SparkRuntimeContext implements Serializable {
private final String serializedPipelineOptions;

/**
* Map fo names to dataflow aggregators.
* Map fo names to Beam aggregators.
*/
private final Map<String, Aggregator<?, ?>> aggregators = new HashMap<>();
private transient CoderRegistry coderRegistry;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
import scala.Tuple2;

/**
* Supports translation between a DataFlow transform, and Spark's operations on RDDs.
* Supports translation between a Beam transform, and Spark's operations on RDDs.
*/
public final class TransformTranslator {

Expand Down Expand Up @@ -895,7 +895,7 @@ private static Map<TupleTag<?>, BroadcastHelper<?>> getSideInputs(
}

/**
* Translator matches Dataflow transformation with the appropriate evaluator.
* Translator matches Beam transformation with the appropriate evaluator.
*/
public static class Translator implements SparkPipelineTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@


/**
* Supports translation between a DataFlow transform, and Spark's operations on DStreams.
* Supports translation between a Beam transform, and Spark's operations on DStreams.
*/
public final class StreamingTransformTranslator {

Expand Down Expand Up @@ -349,13 +349,13 @@ public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
(TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
if (transform == null) {
if (UNSUPPORTED_EVALUATORS.contains(clazz)) {
throw new UnsupportedOperationException("Dataflow transformation " + clazz
throw new UnsupportedOperationException("Beam transformation " + clazz
.getCanonicalName()
+ " is currently unsupported by the Spark streaming pipeline");
}
// DStream transformations will transform an RDD into another RDD
// Actions will create output
// In Dataflow it depends on the PTransform's Input and Output class
// In Beam it depends on the PTransform's Input and Output class
Class<?> pTOutputClazz = getPTransformOutputClazz(clazz);
if (PDone.class.equals(pTOutputClazz)) {
return foreachRDD(rddTranslator);
Expand All @@ -373,7 +373,7 @@ public void evaluate(Window.Bound<T> transform, EvaluationContext context) {
}

/**
* Translator matches Dataflow transformation with the appropriate Spark streaming evaluator.
* Translator matches Beam transformation with the appropriate Spark streaming evaluator.
* rddTranslator uses Spark evaluators in transform/foreachRDD to evaluate the transformation
*/
public static class Translator implements SparkPipelineTranslator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
public abstract class BroadcastHelper<T> implements Serializable {

/**
* If the property {@code dataflow.spark.directBroadcast} is set to
* If the property {@code beam.spark.directBroadcast} is set to
* {@code true} then Spark serialization (Kryo) will be used to broadcast values
* in View objects. By default this property is not set, and values are coded using
* the appropriate {@link Coder}.
*/
public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
public static final String DIRECT_BROADCAST = "beam.spark.directBroadcast";

private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class TransformTranslatorTest {

/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
* in DirectRunner and on SparkRunner, with the mapped dataflow-to-spark
* in DirectRunner and on SparkRunner, with the mapped beam-to-spark
* transforms. Finally it makes sure that the results are the same for both runs.
*/
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class KafkaStreamingTest {
new EmbeddedKafkaCluster.EmbeddedZookeeper();
private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER =
new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties());
private static final String TOPIC = "kafka_dataflow_test_topic";
private static final String TOPIC = "kafka_beam_test_topic";
private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of(
"k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4"
);
Expand Down

0 comments on commit 2046783

Please sign in to comment.