Skip to content

Commit

Permalink
[BEAM-3371] Fix directories not being staged to classpath issue
Browse files Browse the repository at this point in the history
Directories are now packaged to .jar files and placed in temporary
location. They can be staged from there later by the runner.
  • Loading branch information
lgajowy committed Sep 3, 2018
1 parent 5bb935e commit 6bdb72e
Showing 1 changed file with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.beam.runners.spark;

import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.runners.core.construction.PipelineResources.prepareFilesForStaging;

import com.google.common.collect.Iterables;
import java.util.Collection;
Expand Down Expand Up @@ -164,6 +165,8 @@ public SparkPipelineResult run(final Pipeline pipeline) {

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(mOptions.isStreaming()));

prepareFilesToStageForRemoteClusterExecution();

if (mOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(mOptions.getCheckpointDir());
SparkRunnerStreamingContextFactory streamingContextFactory =
Expand Down Expand Up @@ -272,6 +275,18 @@ private void detectTranslationMode(Pipeline pipeline) {
}
}

/**
* Local configurations work in the same JVM and have no problems with improperly formatted files
* on classpath (eg. directories with .class files or empty directories). Prepare files for
* staging only when using remote cluster.
*/
private void prepareFilesToStageForRemoteClusterExecution() {
if (!mOptions.getSparkMaster().matches("local\\[?\\d*\\]?")) {
mOptions.setFilesToStage(
prepareFilesForStaging(mOptions.getFilesToStage(), mOptions.getTempLocation()));
}
}

/** Evaluator that update/populate the cache candidates. */
public static void updateCacheCandidates(
Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) {
Expand Down Expand Up @@ -435,7 +450,7 @@ <TransformT extends PTransform<? super PInput, POutput>> void doVisitTransform(
protected <TransformT extends PTransform<? super PInput, POutput>>
TransformEvaluator<TransformT> translate(
TransformHierarchy.Node node, TransformT transform, Class<TransformT> transformClass) {
//--- determine if node is bounded/unbounded.
// --- determine if node is bounded/unbounded.
// usually, the input determines if the PCollection to apply the next transformation to
// is BOUNDED or UNBOUNDED, meaning RDD/DStream.
Map<TupleTag<?>, PValue> pValues;
Expand Down

0 comments on commit 6bdb72e

Please sign in to comment.