Skip to content

Commit

Permalink
Merge pull request apache#6244: [BEAM-3371] Enable running integratio…
Browse files Browse the repository at this point in the history
…n tests on Spark
  • Loading branch information
aromanenko-dev authored Oct 2, 2018
2 parents 853758c + dd0d74b commit e79918c
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,17 @@ artifactId=${project.name}
testCompile it.project(path: ":beam-runners-flink_2.11", configuration: 'shadowTest')
}

if (runner?.equalsIgnoreCase('spark')) {
testCompile it.project(path: ":beam-runners-spark", configuration: 'shadowTest')
testCompile project.library.java.spark_core
testCompile project.library.java.spark_streaming

// Testing the Spark runner causes a StackOverflowError if slf4j-jdk14 is on the classpath
project.configurations.testRuntimeClasspath {
exclude group: "org.slf4j", module: "slf4j-jdk14"
}
}

/* include dependencies required by filesystems */
if (filesystem?.equalsIgnoreCase('hdfs')) {
testCompile it.project(path: ":beam-sdks-java-io-hadoop-file-system", configuration: 'shadowTest')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@
*/
package org.apache.beam.runners.core.construction;

import static com.google.common.base.Preconditions.checkArgument;

import com.fasterxml.jackson.core.Base64Variants;
import com.google.common.base.Strings;
import com.google.common.hash.Funnels;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.sdk.util.ZipFiles;

/** Utilities for working with classpath resources for pipelines. */
public class PipelineResources {
Expand Down Expand Up @@ -57,4 +69,60 @@ public static List<String> detectClassPathResourcesToStage(ClassLoader classLoad
}
return files;
}

/**
* Goes through the list of files that need to be staged on runner. Removes nonexistent
* directories and packages existing ones. This is necessary for runners that require filesToStage
* to be jars only.
*
* @param resourcesToStage list of resources that need to be staged
* @param tmpJarLocation temporary directory to store the jars
* @return A list of absolute paths to resources (jar files)
*/
public static List<String> prepareFilesForStaging(
List<String> resourcesToStage, String tmpJarLocation) {
return resourcesToStage
.stream()
.map(File::new)
.filter(File::exists)
.map(
file ->
file.isDirectory()
? packageDirectoriesToStage(file, tmpJarLocation)
: file.getAbsolutePath())
.collect(Collectors.toList());
}

private static String packageDirectoriesToStage(File directoryToStage, String tmpJarLocation) {
String hash = calculateDirectoryContentHash(directoryToStage);
String pathForJar = getUniqueJarPath(hash, tmpJarLocation);
zipDirectory(directoryToStage, pathForJar);
return pathForJar;
}

private static String calculateDirectoryContentHash(File directoryToStage) {
Hasher hasher = Hashing.md5().newHasher();
try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
ZipFiles.zipDirectory(directoryToStage, hashStream);
return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static String getUniqueJarPath(String contentHash, String tmpJarLocation) {
checkArgument(
!Strings.isNullOrEmpty(tmpJarLocation),
"Please provide temporary location for storing the jar files.");

return String.format("%s%s.jar", tmpJarLocation, contentHash);
}

private static void zipDirectory(File directoryToStage, String uniqueDirectoryPath) {
try {
ZipFiles.zipDirectory(directoryToStage, new FileOutputStream(uniqueDirectoryPath));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
*/
package org.apache.beam.runners.core.construction;

import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -68,4 +75,43 @@ public void detectClassPathResourceWithNonFileResources() throws Exception {

PipelineResources.detectClassPathResourcesToStage(classLoader);
}

@Test
public void testRemovingNonexistentFilesFromFilesToStage() throws IOException {
String nonexistentFilePath = tmpFolder.getRoot().getPath() + "/nonexistent/file";
String existingFilePath = tmpFolder.newFile("existingFile").getAbsolutePath();
String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();

List<String> filesToStage = Arrays.asList(nonexistentFilePath, existingFilePath);
List<String> expectedFilesToStage = Arrays.asList(existingFilePath);

List<String> result = PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);

assertThat(result, is(expectedFilesToStage));
}

@Test
public void testPackagingDirectoryResourceToJarFile() throws IOException {
String directoryPath = tmpFolder.newFolder().getAbsolutePath();
String temporaryLocation = tmpFolder.newFolder().getAbsolutePath();

List<String> filesToStage = new ArrayList<>();
filesToStage.add(directoryPath);

List<String> result = PipelineResources.prepareFilesForStaging(filesToStage, temporaryLocation);

assertTrue(new File(result.get(0)).exists());
assertTrue(result.get(0).matches(".*\\.jar"));
}

@Test
public void testIfThrowsWhenThereIsNoTemporaryFolderForJars() throws IOException {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Please provide temporary location for storing the jar files.");

List<String> filesToStage = new ArrayList<>();
filesToStage.add(tmpFolder.newFolder().getAbsolutePath());

PipelineResources.prepareFilesForStaging(filesToStage, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,10 @@
*/
package org.apache.beam.runners.flink;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.fasterxml.jackson.core.Base64Variants;
import com.google.common.base.Strings;
import com.google.common.hash.Funnels;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.util.ZipFiles;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -95,73 +83,34 @@ public void translate(Pipeline pipeline) {
pipeline.replaceAll(
FlinkTransformOverrides.getDefaultOverrides(translationMode == TranslationMode.STREAMING));

// Local flink configurations work in the same JVM and have no problems with improperly
// formatted files on classpath (eg. directories with .class files or empty directories).
// prepareFilesToStage() only when using remote flink cluster.
List<String> filesToStage;
if (!options.getFlinkMaster().matches("\\[.*\\]")) {
filesToStage = prepareFilesToStage();
} else {
filesToStage = options.getFilesToStage();
}
prepareFilesToStageForRemoteClusterExecution(options);

FlinkPipelineTranslator translator;
if (translationMode == TranslationMode.STREAMING) {
this.flinkStreamEnv =
FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, filesToStage);
FlinkExecutionEnvironments.createStreamExecutionEnvironment(
options, options.getFilesToStage());
translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options);
} else {
this.flinkBatchEnv =
FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, filesToStage);
FlinkExecutionEnvironments.createBatchExecutionEnvironment(
options, options.getFilesToStage());
translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
}

translator.translate(pipeline);
}

private List<String> prepareFilesToStage() {
return options
.getFilesToStage()
.stream()
.map(File::new)
.filter(File::exists)
.map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : file.getAbsolutePath())
.collect(Collectors.toList());
}

private String packageDirectoriesToStage(File directoryToStage) {
String hash = calculateDirectoryContentHash(directoryToStage);
String pathForJar = getUniqueJarPath(hash);
zipDirectory(directoryToStage, pathForJar);
return pathForJar;
}

private String calculateDirectoryContentHash(File directoryToStage) {
Hasher hasher = Hashing.md5().newHasher();
try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
ZipFiles.zipDirectory(directoryToStage, hashStream);
return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String getUniqueJarPath(String contentHash) {
String tempLocation = options.getTempLocation();

checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"Please provide \"tempLocation\" pipeline option. Flink runner needs it to store jars "
+ "made of directories that were on classpath.");

return String.format("%s%s.jar", tempLocation, contentHash);
}

private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) {
try {
ZipFiles.zipDirectory(directoryToStage, new FileOutputStream(uniqueDirectoryPath));
} catch (IOException e) {
throw new RuntimeException(e);
/**
* Local configurations work in the same JVM and have no problems with improperly formatted files
* on classpath (eg. directories with .class files or empty directories). Prepare files for
* staging only when using remote cluster (passing the master address explicitly).
*/
private static void prepareFilesToStageForRemoteClusterExecution(FlinkPipelineOptions options) {
if (!options.getFlinkMaster().matches("\\[auto\\]|\\[collection\\]|\\[local\\]")) {
options.setFilesToStage(
PipelineResources.prepareFilesForStaging(
options.getFilesToStage(), options.getTempLocation()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@
*/
package org.apache.beam.runners.flink;

import static java.util.Arrays.asList;
import static org.apache.beam.sdk.testing.RegexMatcher.matches;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.core.Every.everyItem;
import static org.junit.Assert.assertThat;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.TextIO;
Expand All @@ -27,14 +37,18 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link FlinkPipelineExecutionEnvironment}. */
@RunWith(JUnit4.class)
public class FlinkPipelineExecutionEnvironmentTest implements Serializable {

@Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();

@Test
public void shouldRecognizeAndTranslateStreamingPipeline() {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
Expand All @@ -49,6 +63,7 @@ public void shouldRecognizeAndTranslateStreamingPipeline() {
.apply(
ParDo.of(
new DoFn<Long, String>() {

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(Long.toString(c.element()));
Expand All @@ -61,4 +76,63 @@ public void processElement(ProcessContext c) throws Exception {

// no exception should be thrown
}

@Test
public void shouldPrepareFilesToStageWhenFlinkMasterIsSetExplicitly() throws IOException {
FlinkPipelineOptions options = testPreparingResourcesToStage("localhost:8081");

assertThat(options.getFilesToStage().size(), is(1));
assertThat(options.getFilesToStage().get(0), matches(".*\\.jar"));
}

@Test
public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToAuto() throws IOException {
FlinkPipelineOptions options = testPreparingResourcesToStage("[auto]");

assertThat(options.getFilesToStage().size(), is(2));
assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
}

@Test
public void shouldNotPrepareFilesToStagewhenFlinkMasterIsSetToCollection() throws IOException {
FlinkPipelineOptions options = testPreparingResourcesToStage("[collection]");

assertThat(options.getFilesToStage().size(), is(2));
assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
}

@Test
public void shouldNotPrepareFilesToStageWhenFlinkMasterIsSetToLocal() throws IOException {
FlinkPipelineOptions options = testPreparingResourcesToStage("[local]");

assertThat(options.getFilesToStage().size(), is(2));
assertThat(options.getFilesToStage(), everyItem(not(matches(".*\\.jar"))));
}

private FlinkPipelineOptions testPreparingResourcesToStage(String flinkMaster)
throws IOException {
Pipeline pipeline = Pipeline.create();
String tempLocation = tmpFolder.newFolder().getAbsolutePath();

File notEmptyDir = tmpFolder.newFolder();
notEmptyDir.createNewFile();
String notEmptyDirPath = notEmptyDir.getAbsolutePath();
String notExistingPath = "/path/to/not/existing/dir";

FlinkPipelineOptions options =
setPipelineOptions(flinkMaster, tempLocation, asList(notEmptyDirPath, notExistingPath));
FlinkPipelineExecutionEnvironment flinkEnv = new FlinkPipelineExecutionEnvironment(options);
flinkEnv.translate(pipeline);
return options;
}

private FlinkPipelineOptions setPipelineOptions(
String flinkMaster, String tempLocation, List<String> filesToStage) {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(TestFlinkRunner.class);
options.setFlinkMaster(flinkMaster);
options.setTempLocation(tempLocation);
options.setFilesToStage(filesToStage);
return options;
}
}
Loading

0 comments on commit e79918c

Please sign in to comment.