diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java index 2ee936091b..b70ccb76ea 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java @@ -19,21 +19,29 @@ import com.google.gson.GsonBuilder; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import scala.Tuple2; -import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import static com.twosigma.beakerx.widget.SparkUI.BEAKERX_ID; import static com.twosigma.beakerx.widget.SparkUI.SPARK_APP_NAME; import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXTRA_LISTENERS; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_CORES; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_MEMORY; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_MASTER; public class SparkUiDefaultsImpl implements SparkUiDefaults { + public static final String NAME = "name"; + public static final String VALUE = "value"; + public static final String PROPERTIES = "properties"; public static final String SPARK_OPTIONS = "spark_options"; public static final String BEAKERX = "beakerx"; private Gson gson = new GsonBuilder().setPrettyPrinting().create(); @@ -44,10 +52,11 @@ public SparkUiDefaultsImpl(Path path) { } @Override + @SuppressWarnings("unchecked") public void saveSparkConf(SparkConf sparkConf) { - Map newSparkConf = getNewSparkConf(sparkConf); + Map newSparkConf = toMap(sparkConf); try { - Map map = beakerxJson(path); + Map map = beakerxJsonAsMap(path); map.get(BEAKERX).put(SPARK_OPTIONS, newSparkConf); String content = gson.toJson(map); Files.write(path, content.getBytes(StandardCharsets.UTF_8)); @@ -59,36 +68,76 @@ public void saveSparkConf(SparkConf sparkConf) { @Override public void loadDefaults(SparkSession.Builder builder) { SparkConf sparkConf = SparkEngineImpl.getSparkConfBasedOn(builder); - try { - Map beakerxJson = beakerxJson(path); - Map map = (Map) beakerxJson.get(BEAKERX).get(SPARK_OPTIONS); - if (map != null) { - map.entrySet().stream() - .filter(x -> !sparkConf.contains(x.getKey())) - .forEach(x -> builder.config(x.getKey(), x.getValue())); - } - } catch (Exception e) { - throw new RuntimeException(e); + Map beakerxJson = beakerxJsonAsMap(path); + Map map = getOptions(beakerxJson); + if (map != null) { + map.entrySet().stream() + .filter(x -> !sparkConf.contains(x.getKey())) + .forEach(x -> addToBuilder(builder, x.getKey(), x.getValue())); } } - private Map beakerxJson(Path path) throws IOException { - String pathToFile = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); - return toMap(pathToFile); + @SuppressWarnings("unchecked") + private void addToBuilder(SparkSession.Builder builder, String key, Object value) { + if (isOneOfMainProp(key)) { + builder.config(key, (String) value); + } else if (key.equals(PROPERTIES)) { + List> props = (List>) value; + props.forEach(x -> builder.config(x.get(NAME), x.get(VALUE))); + } } - private Map toMap(String pathToFile) { - return (Map) gson.fromJson(pathToFile, Map.class); + @SuppressWarnings("unchecked") + private Map getOptions(Map beakerxJson) { + return (Map) beakerxJson.get(BEAKERX).get(SPARK_OPTIONS); + } + + private boolean isOneOfMainProp(String key) { + return key.equals(SPARK_MASTER) || key.equals(SPARK_EXECUTOR_MEMORY) || key.equals(SPARK_EXECUTOR_CORES); + } + + @SuppressWarnings("unchecked") + Map beakerxJsonAsMap(Path path) { + String jsonAsString = null; + try { + jsonAsString = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException(e); + } + return (Map) gson.fromJson(jsonAsString, Map.class); } - private Map getNewSparkConf(SparkConf sparkConf) { - Map result = new HashMap<>(); + private Map toMap(SparkConf sparkConf) { + Map result = new HashMap<>(); Arrays.stream(sparkConf.getAll()) .filter(x -> !x._1.equals(SPARK_EXTRA_LISTENERS)) .filter(x -> !x._1.equals(BEAKERX_ID)) .filter(x -> !x._1.equals(SPARK_APP_NAME)) - .forEach(x -> result.put(x._1, x._2)); + .forEach(x -> addToMap(result, x)); return result; } + private void addToMap(Map result, Tuple2 x) { + if (isOneOfMainProp(x._1)) { + result.put(x._1, x._2); + } else { + List> props = getProps(result); + Map e = new HashMap<>(); + e.put(NAME, x._1); + e.put(VALUE, x._2); + props.add(e); + } + } + + @SuppressWarnings("unchecked") + private List> getProps(Map result) { + Object propsAsObject = result.get(PROPERTIES); + if (propsAsObject == null) { + List> props = new ArrayList<>(); + result.put(PROPERTIES, props); + return props; + } + return (List>) propsAsObject; + } + } diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java index 097cc05cf1..fa781fa7e6 100644 --- a/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java @@ -20,19 +20,97 @@ import org.apache.spark.sql.SparkSession; import org.junit.Before; import org.junit.Test; - +import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_CORES; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_EXECUTOR_MEMORY; +import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_MASTER; +import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.BEAKERX; +import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.NAME; +import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.PROPERTIES; +import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.SPARK_OPTIONS; +import static com.twosigma.beakerx.widget.SparkUiDefaultsImpl.VALUE; import static org.assertj.core.api.Assertions.assertThat; public class SparkUiDefaultsImplTest { private SparkUiDefaultsImpl sut; + private Path pathToBeakerxTestJson; @Before - public void setUp() { - String path = SparkUiDefaultsImplTest.class.getClassLoader().getResource("beakerxTest.json").getPath(); - this.sut = new SparkUiDefaultsImpl(Paths.get(path)); + public void setUp() { + String path = this.getClass().getClassLoader().getResource("beakerxTest.json").getPath(); + this.pathToBeakerxTestJson = Paths.get(path); + this.sut = new SparkUiDefaultsImpl(pathToBeakerxTestJson); + } + + @Test + public void saveMasterURL() { + //given + SparkConf sparkConf = new SparkConf(); + sparkConf.set(SPARK_MASTER, "local[4]"); + //when + sut.saveSparkConf(sparkConf); + //then + Map options = getOptions(); + String prop = (String) options.get(SPARK_MASTER); + assertThat(prop).isEqualTo("local[4]"); + } + + @Test + public void saveExecutorMemory() { + //given + SparkConf sparkConf = new SparkConf(); + sparkConf.set(SPARK_EXECUTOR_MEMORY, "8g"); + //when + sut.saveSparkConf(sparkConf); + //then + Map options = getOptions(); + String prop = (String) options.get(SPARK_EXECUTOR_MEMORY); + assertThat(prop).isEqualTo("8g"); + } + + @Test + public void saveCores() { + //given + SparkConf sparkConf = new SparkConf(); + sparkConf.set(SPARK_EXECUTOR_CORES, "10"); + //when + sut.saveSparkConf(sparkConf); + //then + Map options = getOptions(); + String prop = (String) options.get(SPARK_EXECUTOR_CORES); + assertThat(prop).isEqualTo("10"); + } + + @Test + public void saveAsProp() { + //given + SparkConf sparkConf = new SparkConf(); + sparkConf.set("sparkOption2", "sp2"); + //when + sut.saveSparkConf(sparkConf); + //then + List props = getProps(); + assertThat(props).isNotEmpty(); + Map prop = (Map) props.get(0); + assertThat(prop.get(NAME)).isEqualTo("sparkOption2"); + assertThat(prop.get(VALUE)).isEqualTo("sp2"); + } + + @SuppressWarnings("unchecked") + private List getProps() { + Map options = getOptions(); + return (List) options.get(PROPERTIES); + } + + @SuppressWarnings("unchecked") + private Map getOptions() { + Map beakerxTestJson = sut.beakerxJsonAsMap(this.pathToBeakerxTestJson).get(BEAKERX); + return beakerxTestJson.get(SPARK_OPTIONS); } @Test @@ -40,6 +118,7 @@ public void saveAndLoadDefaults() { //given SparkConf sparkConf = new SparkConf(); sparkConf.set("sparkOption2", "sp2"); + sparkConf.set(SPARK_MASTER, "local[4]"); //when sut.saveSparkConf(sparkConf); //then @@ -47,5 +126,6 @@ public void saveAndLoadDefaults() { sut.loadDefaults(builder); SparkConf sparkConfBasedOn = SparkEngineImpl.getSparkConfBasedOn(builder); assertThat(sparkConfBasedOn.get("sparkOption2")).isEqualTo("sp2"); + assertThat(sparkConfBasedOn.get(SPARK_MASTER)).isEqualTo("local[4]"); } } \ No newline at end of file