Skip to content

Commit

Permalink
#7471: store spark props (#7504)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaroslawmalekcodete authored and scottdraves committed Jun 8, 2018
1 parent 69f03b2 commit b2ff61c
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,29 @@
import com.google.gson.GsonBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.twosigma.beakerx.widget.SparkUI.BEAKERX_ID;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_APP_NAME;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXTRA_LISTENERS;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_CORES;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_MEMORY;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_MASTER;

public class SparkUiDefaultsImpl implements SparkUiDefaults {

public static final String NAME = "name";
public static final String VALUE = "value";
public static final String PROPERTIES = "properties";
public static final String SPARK_OPTIONS = "spark_options";
public static final String BEAKERX = "beakerx";
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
Expand All @@ -44,10 +52,11 @@ public SparkUiDefaultsImpl(Path path) {
}

@Override
@SuppressWarnings("unchecked")
public void saveSparkConf(SparkConf sparkConf) {
Map<String, String> newSparkConf = getNewSparkConf(sparkConf);
Map<String, Object> newSparkConf = toMap(sparkConf);
try {
Map<String, Map> map = beakerxJson(path);
Map<String, Map> map = beakerxJsonAsMap(path);
map.get(BEAKERX).put(SPARK_OPTIONS, newSparkConf);
String content = gson.toJson(map);
Files.write(path, content.getBytes(StandardCharsets.UTF_8));
Expand All @@ -59,36 +68,76 @@ public void saveSparkConf(SparkConf sparkConf) {
@Override
public void loadDefaults(SparkSession.Builder builder) {
SparkConf sparkConf = SparkEngineImpl.getSparkConfBasedOn(builder);
try {
Map<String, Map> beakerxJson = beakerxJson(path);
Map<String, String> map = (Map<String, String>) beakerxJson.get(BEAKERX).get(SPARK_OPTIONS);
if (map != null) {
map.entrySet().stream()
.filter(x -> !sparkConf.contains(x.getKey()))
.forEach(x -> builder.config(x.getKey(), x.getValue()));
}
} catch (Exception e) {
throw new RuntimeException(e);
Map<String, Map> beakerxJson = beakerxJsonAsMap(path);
Map<String, Object> map = getOptions(beakerxJson);
if (map != null) {
map.entrySet().stream()
.filter(x -> !sparkConf.contains(x.getKey()))
.forEach(x -> addToBuilder(builder, x.getKey(), x.getValue()));
}
}

private Map<String, Map> beakerxJson(Path path) throws IOException {
String pathToFile = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
return toMap(pathToFile);
@SuppressWarnings("unchecked")
private void addToBuilder(SparkSession.Builder builder, String key, Object value) {
if (isOneOfMainProp(key)) {
builder.config(key, (String) value);
} else if (key.equals(PROPERTIES)) {
List<Map<String, String>> props = (List<Map<String, String>>) value;
props.forEach(x -> builder.config(x.get(NAME), x.get(VALUE)));
}
}

private Map<String, Map> toMap(String pathToFile) {
return (Map<String, Map>) gson.fromJson(pathToFile, Map.class);
@SuppressWarnings("unchecked")
private Map<String, Object> getOptions(Map<String, Map> beakerxJson) {
return (Map<String, Object>) beakerxJson.get(BEAKERX).get(SPARK_OPTIONS);
}

private boolean isOneOfMainProp(String key) {
return key.equals(SPARK_MASTER) || key.equals(SPARK_EXECUTOR_MEMORY) || key.equals(SPARK_EXECUTOR_CORES);
}

@SuppressWarnings("unchecked")
Map<String, Map> beakerxJsonAsMap(Path path) {
String jsonAsString = null;
try {
jsonAsString = new String(Files.readAllBytes(path), StandardCharsets.UTF_8);
} catch (Exception e) {
throw new RuntimeException(e);
}
return (Map<String, Map>) gson.fromJson(jsonAsString, Map.class);
}

private Map<String, String> getNewSparkConf(SparkConf sparkConf) {
Map<String, String> result = new HashMap<>();
private Map<String, Object> toMap(SparkConf sparkConf) {
Map<String, Object> result = new HashMap<>();
Arrays.stream(sparkConf.getAll())
.filter(x -> !x._1.equals(SPARK_EXTRA_LISTENERS))
.filter(x -> !x._1.equals(BEAKERX_ID))
.filter(x -> !x._1.equals(SPARK_APP_NAME))
.forEach(x -> result.put(x._1, x._2));
.forEach(x -> addToMap(result, x));
return result;
}

private void addToMap(Map<String, Object> result, Tuple2<String, String> x) {
if (isOneOfMainProp(x._1)) {
result.put(x._1, x._2);
} else {
List<Map<String, String>> props = getProps(result);
Map<String, String> e = new HashMap<>();
e.put(NAME, x._1);
e.put(VALUE, x._2);
props.add(e);
}
}

@SuppressWarnings("unchecked")
private List<Map<String, String>> getProps(Map<String, Object> result) {
Object propsAsObject = result.get(PROPERTIES);
if (propsAsObject == null) {
List<Map<String, String>> props = new ArrayList<>();
result.put(PROPERTIES, props);
return props;
}
return (List<Map<String, String>>) propsAsObject;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,112 @@
import org.apache.spark.sql.SparkSession;
import org.junit.Before;
import org.junit.Test;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;

import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_CORES;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_MEMORY;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_MASTER;
import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.BEAKERX;
import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.NAME;
import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.PROPERTIES;
import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.SPARK_OPTIONS;
import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.VALUE;
import static org.assertj.core.api.Assertions.assertThat;

public class SparkUiDefaultsImplTest {

private SparkUiDefaultsImpl sut;
private Path pathToBeakerxTestJson;

@Before
public void setUp() {
String path = SparkUiDefaultsImplTest.class.getClassLoader().getResource("beakerxTest.json").getPath();
this.sut = new SparkUiDefaultsImpl(Paths.get(path));
public void setUp() {
String path = this.getClass().getClassLoader().getResource("beakerxTest.json").getPath();
this.pathToBeakerxTestJson = Paths.get(path);
this.sut = new SparkUiDefaultsImpl(pathToBeakerxTestJson);
}

@Test
public void saveMasterURL() {
//given
SparkConf sparkConf = new SparkConf();
sparkConf.set(SPARK_MASTER, "local[4]");
//when
sut.saveSparkConf(sparkConf);
//then
Map options = getOptions();
String prop = (String) options.get(SPARK_MASTER);
assertThat(prop).isEqualTo("local[4]");
}

@Test
public void saveExecutorMemory() {
//given
SparkConf sparkConf = new SparkConf();
sparkConf.set(SPARK_EXECUTOR_MEMORY, "8g");
//when
sut.saveSparkConf(sparkConf);
//then
Map options = getOptions();
String prop = (String) options.get(SPARK_EXECUTOR_MEMORY);
assertThat(prop).isEqualTo("8g");
}

@Test
public void saveCores() {
//given
SparkConf sparkConf = new SparkConf();
sparkConf.set(SPARK_EXECUTOR_CORES, "10");
//when
sut.saveSparkConf(sparkConf);
//then
Map options = getOptions();
String prop = (String) options.get(SPARK_EXECUTOR_CORES);
assertThat(prop).isEqualTo("10");
}

@Test
public void saveAsProp() {
//given
SparkConf sparkConf = new SparkConf();
sparkConf.set("sparkOption2", "sp2");
//when
sut.saveSparkConf(sparkConf);
//then
List<Object> props = getProps();
assertThat(props).isNotEmpty();
Map prop = (Map) props.get(0);
assertThat(prop.get(NAME)).isEqualTo("sparkOption2");
assertThat(prop.get(VALUE)).isEqualTo("sp2");
}

@SuppressWarnings("unchecked")
private List<Object> getProps() {
Map options = getOptions();
return (List<Object>) options.get(PROPERTIES);
}

@SuppressWarnings("unchecked")
private Map getOptions() {
Map<String, Map> beakerxTestJson = sut.beakerxJsonAsMap(this.pathToBeakerxTestJson).get(BEAKERX);
return beakerxTestJson.get(SPARK_OPTIONS);
}

@Test
public void saveAndLoadDefaults() {
//given
SparkConf sparkConf = new SparkConf();
sparkConf.set("sparkOption2", "sp2");
sparkConf.set(SPARK_MASTER, "local[4]");
//when
sut.saveSparkConf(sparkConf);
//then
SparkSession.Builder builder = SparkSession.builder();
sut.loadDefaults(builder);
SparkConf sparkConfBasedOn = SparkEngineImpl.getSparkConfBasedOn(builder);
assertThat(sparkConfBasedOn.get("sparkOption2")).isEqualTo("sp2");
assertThat(sparkConfBasedOn.get(SPARK_MASTER)).isEqualTo("local[4]");
}
}

0 comments on commit b2ff61c

Please sign in to comment.