Skip to content

Commit

Permalink
[BEAM-313] Code review corrections
Browse files Browse the repository at this point in the history
  • Loading branch information
amarouni committed Aug 26, 2016
1 parent 2eabfaa commit 5a1796f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions,
Long getBatchIntervalMillis();
void setBatchIntervalMillis(Long batchInterval);

@Override
@Default.String("spark dataflow pipeline job")
String getAppName();

@Description("If the spark runner will be initialized with a provided Spark Context")
@Default.Boolean(false)
boolean getUsesProvidedSparkContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public static SparkRunner create() {

/**
* Creates and returns a new SparkRunner with specified options.
*
* @param options The SparkPipelineOptions to use when executing the job.
* @return A pipeline runner that will execute with specified options.
*/
Expand Down Expand Up @@ -140,16 +141,17 @@ private SparkRunner(SparkPipelineOptions options) {
public EvaluationResult run(Pipeline pipeline) {
try {
LOG.info("Executing pipeline using the SparkRunner.");
JavaSparkContext jsc = mOptions.getProvidedSparkContext();
if (mOptions.getUsesProvidedSparkContext() && jsc != null) {
JavaSparkContext jsc;
if (mOptions.getUsesProvidedSparkContext()) {
LOG.info("Using a provided Spark Context");
if (jsc.sc().isStopped()){
jsc = mOptions.getProvidedSparkContext();
if (jsc == null || jsc.sc().isStopped()){
LOG.error("The provided Spark context "
+ jsc + " is stopped");
throw new RuntimeException("The provided Spark context is stopped");
+ jsc + " was not created or was stopped");
throw new RuntimeException("The provided Spark context was not created or was stopped");
}
} else {
LOG.info("Creating a new Spark Java Context");
LOG.info("Creating a new Spark Context");
jsc = SparkContextFactory.getSparkContext(mOptions.getSparkMaster(), mOptions.getAppName());
}
if (mOptions.isStreaming()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,38 @@ public void testWithProvidedContext() throws Exception {
// Run test from pipeline
p.run();

// Run test from Runner
EvaluationResult res = SparkRunner.create(options).run(p);
res.close();

jsc.stop();
}

/**
* Provide a context and call pipeline run.
* @throws Exception
*/
@Test
public void testWithNullContext() throws Exception {
JavaSparkContext jsc = null;

SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);

Pipeline p = Pipeline.create(options);
PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder
.of()));
PCollection<String> output = inputWords.apply(new WordCount.CountWords())
.apply(MapElements.via(new WordCount.FormatAsTextFn()));

PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);

try {
p.run();
fail("Should throw an exception when The provided Spark context is null");
} catch (RuntimeException e){
assert(e.getMessage().contains("The provided Spark context was not created or was stopped"));
}
}

/**
* A SparkRunner with a stopped provided Spark context cannot run pipelines.
* @throws Exception
Expand Down Expand Up @@ -104,7 +129,7 @@ public void testWithStoppedProvidedContext() throws Exception {
p.run();
fail("Should throw an exception when The provided Spark context is stopped");
} catch (RuntimeException e){
assert(e.getMessage().contains("The provided Spark context is stopped"));
assert(e.getMessage().contains("The provided Spark context was not created or was stopped"));
}
}

Expand Down

0 comments on commit 5a1796f

Please sign in to comment.