From 8893d41fb735aa8f727b1b40642f938bc5b7b525 Mon Sep 17 00:00:00 2001 From: jaroslawmalekcodete Date: Tue, 29 May 2018 14:21:30 +0200 Subject: [PATCH] jarek/7331: save/load spark UI defaults (#7438) * #7331: save spark UI defaults * #7331: remove sparkContext * #7331: remove getSparkContext --- beakerx/beakerx/environment.py | 3 +- .../magic/command/SparkMagicCommand.java | 27 +- .../beakerx/widget/SparkConfiguration.java | 40 ++- .../{SparkManager.java => SparkEngine.java} | 18 +- ...kManagerImpl.java => SparkEngineImpl.java} | 128 ++++---- .../com/twosigma/beakerx/widget/SparkUI.java | 277 ++++++++++++++++-- .../twosigma/beakerx/widget/SparkUIApi.java | 45 +++ .../beakerx/widget/SparkUIManager.java | 257 ---------------- .../beakerx/widget/SparkUiDefaults.java | 26 ++ .../beakerx/widget/SparkUiDefaultsImpl.java | 94 ++++++ .../beakerx/widget/SparkVariable.java | 25 +- .../widget/StartStopSparkListener.java | 8 +- .../SparkMagicCommandAutoConnectTest.java | 34 ++- .../magic/command/SparkMagicCommandTest.java | 118 ++++---- .../twosigma/beakerx/widget/SparkUITest.java | 133 +++++++++ .../widget/SparkUiDefaultsImplTest.java | 51 ++++ .../widget/StartStopSparkListenerTest.java | 126 ++++++++ .../src/test/resources/beakerxTest.json | 13 + 18 files changed, 965 insertions(+), 458 deletions(-) rename kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/{SparkManager.java => SparkEngine.java} (71%) rename kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/{SparkManagerImpl.java => SparkEngineImpl.java} (57%) create mode 100644 kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIApi.java delete mode 100644 kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIManager.java create mode 100644 kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaults.java create mode 100644 kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java create mode 100644 kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUITest.java create mode 100644 kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java create mode 100644 kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/StartStopSparkListenerTest.java create mode 100644 kernel/sparkex/src/test/resources/beakerxTest.json diff --git a/beakerx/beakerx/environment.py b/beakerx/beakerx/environment.py index f19ce85d07..df568b59f1 100644 --- a/beakerx/beakerx/environment.py +++ b/beakerx/beakerx/environment.py @@ -34,7 +34,8 @@ "auto_save": true, "use_data_grid": true, "show_catalog": false - } + }, + "spark_options":{} } } """ diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommand.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommand.java index a5b1689c5d..838fae3e88 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommand.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommand.java @@ -28,12 +28,14 @@ import com.twosigma.beakerx.kernel.msg.JupyterMessages; import com.twosigma.beakerx.message.Header; import com.twosigma.beakerx.message.Message; -import com.twosigma.beakerx.widget.SparkManager; -import com.twosigma.beakerx.widget.SparkManagerImpl; +import com.twosigma.beakerx.widget.SparkEngineImpl; import com.twosigma.beakerx.widget.SparkUI; +import com.twosigma.beakerx.widget.SparkUiDefaultsImpl; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import java.io.File; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -47,19 +49,23 @@ public class SparkMagicCommand implements MagicCommandFunctionality { public static final String SPARK = "%%sparkRunner"; private KernelFunctionality kernel; private SparkUI.SparkUIFactory sparkUIFactory; - private SparkManager.SparkManagerFactory sparkManagerFactory; private SparkUI sparkUI; private Map sparkOptions; public SparkMagicCommand(KernelFunctionality kernel) { //constructor for reflection in LoadMagicMagicCommand - this(kernel, new SparkUI.SparkUIFactoryImpl(), new SparkManagerImpl.SparkManagerFactoryImpl()); + this( + kernel, + new SparkUI.SparkUIFactoryImpl( + new SparkEngineImpl.SparkEngineFactoryImpl(), + new SparkUiDefaultsImpl( + Paths.get(System.getProperty("user.home") + File.separator + ".jupyter" + File.separator + "beakerx.json")))); + } - SparkMagicCommand(KernelFunctionality kernel, SparkUI.SparkUIFactory sparkUIFactory, SparkManager.SparkManagerFactory sparkManagerFactory) { + SparkMagicCommand(KernelFunctionality kernel, SparkUI.SparkUIFactory sparkUIFactory) { this.kernel = kernel; this.sparkUIFactory = sparkUIFactory; - this.sparkManagerFactory = sparkManagerFactory; configureOptions(); } @@ -76,7 +82,7 @@ public String getMagicCommandName() { @Override public MagicCommandOutcomeItem execute(MagicCommandExecutionParam param) { - if (sparkUI != null && sparkUI.isSparkSessionIsActive()) { + if (sparkUI != null && sparkUI.isActive()) { return new MagicCommandOutput(MagicCommandOutput.Status.ERROR, "Active spark session exists. If you want to close it run 'spark.close()'"); } List options = getOptions(param); @@ -135,16 +141,13 @@ private MagicCommandOutcomeItem createSparkUIBasedOnUserSparkConfiguration(Magic } private MagicCommandOutcomeItem createSparkUI(SparkSession.Builder builder) { - SparkManager sparkManager = sparkManagerFactory.create(builder); - this.sparkUI = sparkUIFactory.create(sparkManager); + this.sparkUI = sparkUIFactory.create(builder); return displaySparkUI(sparkUI); } private MagicCommandOutcomeItem createSparkUI(SparkConf sparkConf) { SparkSession.Builder builder = SparkSession.builder().config(sparkConf); - SparkManager sparkManager = sparkManagerFactory.create(builder); - this.sparkUI = sparkUIFactory.create(sparkManager); - return displaySparkUI(sparkUI); + return createSparkUI(builder); } private MagicCommandOutcomeItem displaySparkUI(SparkUI sparkUI) { diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkConfiguration.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkConfiguration.java index 5100072ba9..a8c602c821 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkConfiguration.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkConfiguration.java @@ -18,28 +18,35 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; - import static java.util.Collections.EMPTY_LIST; import static java.util.Collections.singletonList; public class SparkConfiguration extends VBox { - public static final String VIEW_NAME_VALUE = "SparkConfigurationView"; - public static final String MODEL_NAME_VALUE = "SparkConfigurationModel"; + static final String VIEW_NAME_VALUE = "SparkConfigurationView"; + static final String MODEL_NAME_VALUE = "SparkConfigurationModel"; private Button add; private HBox header; private Properties properties; - public SparkConfiguration() { + SparkConfiguration(Map advancedSettings) { super(new ArrayList<>()); this.add = createAddButton(); this.header = new HBox(singletonList(this.add)); - this.properties = new Properties(new ArrayList<>()); + List propertyItems = createPropertyItems(advancedSettings); + this.properties = new Properties(propertyItems); add(new VBox(Arrays.asList(this.header, this.properties.getWidget()))); } + private List createPropertyItems(Map advancedSettings) { + return advancedSettings.entrySet().stream() + .map(x -> createPropertyItem(x.getKey(), x.getValue())) + .collect(Collectors.toList()); + } + private Button createAddButton() { Button add = new Button(); add.setDescription("+"); @@ -48,15 +55,26 @@ private Button createAddButton() { } private void addProperty() { - Text name = new Text(); - name.setPlaceholder("name"); - Text value = new Text(); - value.setPlaceholder("value"); + PropertyItem propertyItem = createPropertyItem(new Text(), new Text()); + this.properties.add(propertyItem); + } + + private PropertyItem createPropertyItem(String name, String value) { + Text nameWidget = new Text(); + nameWidget.setValue(name); + Text valueWidget = new Text(); + valueWidget.setValue(value); + return createPropertyItem(nameWidget, valueWidget); + } + + private PropertyItem createPropertyItem(Text nameWidget, Text valueWidget) { + nameWidget.setPlaceholder("name"); + valueWidget.setPlaceholder("value"); Button remove = new Button(); remove.setDescription("-"); - PropertyItem propertyItem = new PropertyItem(name, value, remove); + PropertyItem propertyItem = new PropertyItem(nameWidget, valueWidget, remove); remove.registerOnClick((content, message) -> this.properties.getWidget().removeDOMWidget(propertyItem)); - this.properties.add(propertyItem); + return propertyItem; } @Override diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManager.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngine.java similarity index 71% rename from kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManager.java rename to kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngine.java index cb4ce03c31..5bbf668bca 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManager.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngine.java @@ -22,27 +22,25 @@ import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; -import java.util.List; +import java.util.Map; -public interface SparkManager { +public interface SparkEngine { - TryResult configure(KernelFunctionality kernel, SparkUIManager sparkContextManager, Message parentMessage); + TryResult configure(KernelFunctionality kernel, SparkUIApi sparkUI, Message parentMessage); SparkSession getOrCreate(); - SparkConf getSparkConf(List configurations); - - SparkContext sparkContext(); - - SparkSession.Builder getBuilder(); + SparkConf getSparkConf(); String getSparkAppId(); + Map getAdvanceSettings(); + String getSparkUiWebUrl(); String getSparkMasterUrl(); - interface SparkManagerFactory { - SparkManager create(SparkSession.Builder sparkSessionBuilder); + interface SparkEngineFactory { + SparkEngine create(SparkSession.Builder sparkSessionBuilder); } } diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManagerImpl.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngineImpl.java similarity index 57% rename from kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManagerImpl.java rename to kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngineImpl.java index f573f2656f..bf61b2a509 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkManagerImpl.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkEngineImpl.java @@ -37,34 +37,40 @@ import scala.collection.Iterator; import java.lang.reflect.Field; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static com.twosigma.beakerx.kernel.PlainCode.createSimpleEvaluationObject; -import static com.twosigma.beakerx.widget.SparkUIManager.SPARK_EXECUTOR_CORES; -import static com.twosigma.beakerx.widget.SparkUIManager.SPARK_EXECUTOR_MEMORY; -import static com.twosigma.beakerx.widget.SparkUIManager.SPARK_MASTER; -import static com.twosigma.beakerx.widget.SparkUIManager.SPARK_SESSION_NAME; +import static com.twosigma.beakerx.widget.SparkUI.BEAKERX_ID; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_APP_NAME; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXECUTOR_CORES; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXECUTOR_MEMORY; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXTRA_LISTENERS; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_MASTER; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_REPL_CLASS_OUTPUT_DIR; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_SESSION_NAME; +import static com.twosigma.beakerx.widget.SparkUI.STANDARD_SETTINGS; +import static com.twosigma.beakerx.widget.StartStopSparkListener.START_STOP_SPARK_LISTENER; + +public class SparkEngineImpl implements SparkEngine { -public class SparkManagerImpl implements SparkManager { - - public static final String SPARK_APP_NAME = "spark.app.name"; - public static final String SPARK_CORES_MAX = "spark.cores.max"; - private SparkUIManager sparkContextManager; private SparkSession.Builder sparkSessionBuilder; - private SparkManagerImpl(SparkSession.Builder sparkSessionBuilder) { + private SparkEngineImpl(SparkSession.Builder sparkSessionBuilder) { this.sparkSessionBuilder = sparkSessionBuilder; + configureSparkSessionBuilder(this.sparkSessionBuilder); } @Override - public TryResult configure(KernelFunctionality kernel, SparkUIManager sparkContextManager, Message parentMessage) { - this.sparkContextManager = sparkContextManager; - SparkConf sparkConf = configureSparkConf(getSparkConf(sparkContextManager.getAdvancedOptions())); - sparkSessionBuilder.config(sparkConf); + public TryResult configure(KernelFunctionality kernel, SparkUIApi sparkUI, Message parentMessage) { + SparkConf sparkConf = createSparkConf(sparkUI.getAdvancedOptions(), getSparkConfBasedOn(this.sparkSessionBuilder)); + sparkConf = configureSparkConf(sparkConf, sparkUI); + this.sparkSessionBuilder = SparkSession.builder().config(sparkConf); SparkSession sparkSession = getOrCreate(); - addListener(getOrCreate().sparkContext()); - SparkVariable.putSparkContext(getOrCreate().sparkContext()); + addListener(getOrCreate().sparkContext(), sparkUI); SparkVariable.putSparkSession(sparkSession); TryResult tryResultSparkContext = initSparkContextInShell(kernel, parentMessage); if (!tryResultSparkContext.isError()) { @@ -95,16 +101,6 @@ public String getSparkMasterUrl() { return conf.getAll().get("spark.master").get(); } - @Override - public SparkContext sparkContext() { - return getOrCreate().sparkContext(); - } - - @Override - public SparkSession.Builder getBuilder() { - return this.sparkSessionBuilder; - } - private TryResult initSparkContextInShell(KernelFunctionality kernel, Message parent) { String addSc = String.format(("import com.twosigma.beakerx.widget.SparkVariable\n" + "val %s = SparkVariable.getSparkSession()\n" + @@ -118,57 +114,67 @@ private TryResult initSparkContextInShell(KernelFunctionality kernel, Message pa return kernel.executeCode(addSc, seo); } - private SparkConf createSparkConf(List configurations) { + private SparkConf createSparkConf(List configurations, SparkConf old) { SparkConf sparkConf = new SparkConf(); + sparkConf.set(SPARK_EXTRA_LISTENERS, old.get(SPARK_EXTRA_LISTENERS)); + sparkConf.set(BEAKERX_ID, old.get(BEAKERX_ID)); + configurations.forEach(x -> { + if (x.getName() != null) { + sparkConf.set(x.getName(), (x.getValue() != null) ? x.getValue() : ""); + } + }); + return sparkConf; + } + + public SparkConf getSparkConf() { + return getSparkConfBasedOn(this.sparkSessionBuilder); + } + + public static SparkConf getSparkConfBasedOn(SparkSession.Builder sparkSessionBuilder) { try { - Field options = this.sparkSessionBuilder.getClass().getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options"); + SparkConf sparkConf = new SparkConf(); + Field options = sparkSessionBuilder.getClass().getDeclaredField("org$apache$spark$sql$SparkSession$Builder$$options"); options.setAccessible(true); - Iterator iterator = ((scala.collection.mutable.HashMap) options.get(this.sparkSessionBuilder)).iterator(); + Iterator iterator = ((scala.collection.mutable.HashMap) options.get(sparkSessionBuilder)).iterator(); while (iterator.hasNext()) { Tuple2 x = (Tuple2) iterator.next(); sparkConf.set((String) (x)._1, (String) (x)._2); } - configurations.forEach(x -> { - if (x.getName() != null) { - sparkConf.set(x.getName(), (x.getValue() != null) ? x.getValue() : ""); - } - }); + return sparkConf; } catch (Exception e) { throw new RuntimeException(e); } - return sparkConf; } - public SparkConf getSparkConf(List configurations) { - return createSparkConf(configurations); + private SparkSession.Builder configureSparkSessionBuilder(SparkSession.Builder builder) { + builder.config(SPARK_EXTRA_LISTENERS, START_STOP_SPARK_LISTENER); + builder.config(BEAKERX_ID, UUID.randomUUID().toString()); + return builder; } - private SparkConf configureSparkConf(SparkConf sparkConf) { + private SparkConf configureSparkConf(SparkConf sparkConf, SparkUIApi sparkUI) { if (!sparkConf.contains(SPARK_APP_NAME)) { sparkConf.setAppName("beaker_" + UUID.randomUUID().toString()); } - if (this.sparkContextManager.getMasterURL().getValue() != null && !this.sparkContextManager.getMasterURL().getValue().isEmpty()) { - sparkConf.set(SPARK_MASTER, this.sparkContextManager.getMasterURL().getValue()); + if (sparkUI.getMasterURL().getValue() != null && !sparkUI.getMasterURL().getValue().isEmpty()) { + sparkConf.set(SPARK_MASTER, sparkUI.getMasterURL().getValue()); } if (!isLocalSpark(sparkConf)) { - sparkConf.set("spark.repl.class.outputDir", KernelManager.get().getOutDir()); + sparkConf.set(SPARK_REPL_CLASS_OUTPUT_DIR, KernelManager.get().getOutDir()); } - if (this.sparkContextManager.getExecutorMemory().getValue() != null && !this.sparkContextManager.getExecutorMemory().getValue().isEmpty()) { - sparkConf.set(SPARK_EXECUTOR_MEMORY, this.sparkContextManager.getExecutorMemory().getValue()); + if (sparkUI.getExecutorMemory().getValue() != null && !sparkUI.getExecutorMemory().getValue().isEmpty()) { + sparkConf.set(SPARK_EXECUTOR_MEMORY, sparkUI.getExecutorMemory().getValue()); } - if (this.sparkContextManager.getExecutorCores().getValue() != null && !this.sparkContextManager.getExecutorCores().getValue().isEmpty()) { - sparkConf.set(SPARK_EXECUTOR_CORES, this.sparkContextManager.getExecutorCores().getValue()); + if (sparkUI.getExecutorCores().getValue() != null && !sparkUI.getExecutorCores().getValue().isEmpty()) { + sparkConf.set(SPARK_EXECUTOR_CORES, sparkUI.getExecutorCores().getValue()); } - if (!sparkConf.contains(SPARK_CORES_MAX)) { - sparkConf.set(SPARK_CORES_MAX, "100"); - } return sparkConf; } - private SparkContext addListener(SparkContext sc) { + private SparkContext addListener(SparkContext sc, SparkUIApi sparkUIManager) { sc.addSparkListener(new SparkListener() { @Override @@ -184,42 +190,52 @@ public void onJobEnd(SparkListenerJobEnd jobEnd) { @Override public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { super.onStageSubmitted(stageSubmitted); - sparkContextManager.startStage(stageSubmitted.stageInfo().stageId(), stageSubmitted.stageInfo().numTasks()); + sparkUIManager.startStage(stageSubmitted.stageInfo().stageId(), stageSubmitted.stageInfo().numTasks()); } @Override public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { super.onStageCompleted(stageCompleted); - sparkContextManager.endStage(stageCompleted.stageInfo().stageId()); + sparkUIManager.endStage(stageCompleted.stageInfo().stageId()); } @Override public void onTaskStart(SparkListenerTaskStart taskStart) { super.onTaskStart(taskStart); - sparkContextManager.taskStart(taskStart.stageId(), taskStart.taskInfo().taskId()); + sparkUIManager.taskStart(taskStart.stageId(), taskStart.taskInfo().taskId()); } @Override public void onTaskEnd(SparkListenerTaskEnd taskEnd) { super.onTaskEnd(taskEnd); if (taskEnd.reason().toString().equals("Success")) { - sparkContextManager.taskEnd(taskEnd.stageId(), taskEnd.taskInfo().taskId()); + sparkUIManager.taskEnd(taskEnd.stageId(), taskEnd.taskInfo().taskId()); } } }); return sc; } + public Map getAdvanceSettings() { + return Arrays.stream(getSparkConf().getAll()) + .filter(x -> isAdvancedSettings(x._1)) + .collect(Collectors.toMap(Tuple2::_1, Tuple2::_2)); + } + + private boolean isAdvancedSettings(String name) { + return !STANDARD_SETTINGS.contains(name); + } + private static boolean isLocalSpark(SparkConf sparkConf) { return sparkConf.contains(SPARK_MASTER) && sparkConf.get(SPARK_MASTER) != null && sparkConf.get("spark.master").startsWith("local"); } - public static class SparkManagerFactoryImpl implements SparkManagerFactory { + public static class SparkEngineFactoryImpl implements SparkEngineFactory { @Override - public SparkManager create(SparkSession.Builder sparkSessionBuilder) { - return new SparkManagerImpl(sparkSessionBuilder); + public SparkEngine create(SparkSession.Builder sparkSessionBuilder) { + return new SparkEngineImpl(sparkSessionBuilder); } } diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUI.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUI.java index 0f4816ab3b..18a182a62b 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUI.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUI.java @@ -15,39 +15,68 @@ */ package com.twosigma.beakerx.widget; +import com.twosigma.beakerx.TryResult; +import com.twosigma.beakerx.evaluator.InternalVariable; +import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject; +import com.twosigma.beakerx.kernel.KernelFunctionality; +import com.twosigma.beakerx.kernel.KernelManager; +import com.twosigma.beakerx.message.Message; +import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; import java.util.ArrayList; import java.util.Arrays; -import java.util.UUID; +import java.util.HashMap; +import java.util.List; +import java.util.Map; -import static com.twosigma.beakerx.widget.StartStopSparkListener.START_STOP_SPARK_LISTENER; +import static com.twosigma.beakerx.kernel.PlainCode.createSimpleEvaluationObject; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; -public class SparkUI extends VBox { +public class SparkUI extends VBox implements SparkUIApi { + public static final String SPARK_REPL_CLASS_OUTPUT_DIR = "spark.repl.class.outputDir"; + public static final String SPARK_APP_NAME = "spark.app.name"; + public static final String SPARK_MASTER = "spark.master"; + public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory"; + public static final String SPARK_EXECUTOR_CORES = "spark.executor.cores"; public static final String SPARK_EXTRA_LISTENERS = "spark.extraListeners"; + public static final String BEAKERX_ID = "beakerx.id"; + public static final List STANDARD_SETTINGS = Arrays.asList(SPARK_MASTER, SPARK_EXECUTOR_MEMORY, SPARK_EXECUTOR_CORES, SPARK_APP_NAME, BEAKERX_ID, SPARK_EXTRA_LISTENERS, SPARK_REPL_CLASS_OUTPUT_DIR); public static final String VIEW_NAME_VALUE = "SparkUIView"; public static final String MODEL_NAME_VALUE = "SparkUIModel"; - public static final String BEAKERX_ID = "beakerx.id"; - - private final SparkUIManager sparkUIManager; + static final String SPARK_SESSION_NAME = "spark"; + static final String CONNECT = "Start"; + private static final String SPARK_MASTER_DEFAULT = "local[*]"; + public static final String SPARK_APP_ID = "sparkAppId"; private VBox sparkConfig; private VBox sparkConfigPanel; private Button connectButton; private HBox statusPanel; + private Map progressBarMap = new HashMap<>(); + private Text masterURL; + private Text executorMemory; + private Text executorCores; + private SparkConfiguration advancedOption; + private boolean active = false; + private SparkFoldout jobPanel = null; + private Message currentParentHeader = null; - private static SparkUI create(SparkManager sparkManager) { - return new SparkUI(sparkManager); - } + private SparkEngine sparkEngine; + private SparkUiDefaults sparkUiDefaults; - private SparkUI(SparkManager sparkManager) { + SparkUI(SparkSession.Builder builder, SparkEngine.SparkEngineFactory sparkEngineFactory, SparkUiDefaults sparkUiDefaults) { super(new ArrayList<>()); - configureSparkSessionBuilder(sparkManager.getBuilder()); + this.sparkUiDefaults = sparkUiDefaults; + this.sparkUiDefaults.loadDefaults(builder); + this.sparkEngine = sparkEngineFactory.create(builder); this.sparkConfig = new VBox(new ArrayList<>()); - this.sparkConfigPanel = new VBox(Arrays.asList(sparkConfig)); + this.sparkConfigPanel = new VBox(singletonList(sparkConfig)); add(sparkConfigPanel); - this.sparkUIManager = new SparkUIManager(this, sparkManager); + SparkVariable.putSparkUI(this); + createSparkView(); } @Override @@ -70,14 +99,204 @@ public String getViewModuleValue() { return BeakerxWidget.VIEW_MODULE_VALUE; } - private SparkSession.Builder configureSparkSessionBuilder(SparkSession.Builder builder) { - builder.config(SPARK_EXTRA_LISTENERS, START_STOP_SPARK_LISTENER); - builder.config(BEAKERX_ID, UUID.randomUUID().toString()); - return builder; + private void createSparkView() { + this.masterURL = createMasterURL(); + this.executorMemory = createExecutorMemory(); + this.executorCores = createExecutorCores(); + this.addConnectButton(createConnectButton()); + this.addMasterUrl(masterURL); + this.addExecutorCores(executorCores); + this.addExecutorMemory(executorMemory); + this.advancedOption = new SparkConfiguration(sparkEngine.getAdvanceSettings()); + this.addAdvanceOptions(advancedOption); + } + + private Text createExecutorCores() { + Text cores = new Text(); + cores.setDescription("Executor cores"); + if (getSparkConf().contains(SPARK_EXECUTOR_CORES)) { + cores.setValue(getSparkConf().get(SPARK_EXECUTOR_CORES)); + } else { + cores.setValue("10"); + } + return cores; + } + + private SparkConf getSparkConf() { + return sparkEngine.getSparkConf(); + } + + private Text createExecutorMemory() { + Text masterURL = new Text(); + masterURL.setDescription("Executor Memory"); + if (getSparkConf().contains(SPARK_EXECUTOR_MEMORY)) { + masterURL.setValue(getSparkConf().get(SPARK_EXECUTOR_MEMORY)); + } else { + masterURL.setValue("8g"); + } + return masterURL; + } + + private Text createMasterURL() { + Text masterURL = new Text(); + masterURL.setDescription("Master URL"); + if (getSparkConf().contains(SPARK_MASTER)) { + masterURL.setValue(getSparkConf().get(SPARK_MASTER)); + } else { + masterURL.setValue(SPARK_MASTER_DEFAULT); + } + return masterURL; + } + + private Button createConnectButton() { + Button connect = new Button(); + connect.setDescription(CONNECT); + connect.registerOnClick((content, message) -> initSparkContext(message)); + return connect; + } + + private void initSparkContext(Message parentMessage) { + KernelFunctionality kernel = KernelManager.get(); + try { + TryResult configure = sparkEngine.configure(kernel, this, parentMessage); + if (configure.isError()) { + sendError(parentMessage, kernel, configure.error()); + } else { + active = true; + saveSparkConf(sparkEngine.getSparkConf()); + } + } catch (Exception e) { + sendError(parentMessage, kernel, e.getMessage()); + } + } + + private SparkSession getSparkSession() { + return sparkEngine.getOrCreate(); + } + + private void sendError(Message parentMessage, KernelFunctionality kernel, String message) { + SimpleEvaluationObject seo = createSimpleEvaluationObject("", kernel, parentMessage, 1); + seo.error(message); + } + + public void applicationStart() { + clearView(); + addStatusPanel(createStatusPanel()); + sendUpdate(SPARK_APP_ID, sparkEngine.getSparkAppId()); + sendUpdate("sparkUiWebUrl", sparkEngine.getSparkUiWebUrl()); + sendUpdate("sparkMasterUrl", sparkEngine.getSparkMasterUrl()); + } + + public void applicationEnd() { + removeStatusPanel(); + active = false; + addView(); + } + + private HBox createStatusPanel() { + Label appStatus = createAppStatus(); + Button disconnect = createDisconnectButton(); + HBox connectionPanel = new HBox(Arrays.asList(appStatus, disconnect)); + connectionPanel.setDomClasses(new ArrayList<>(Arrays.asList("bx-status-panel"))); + return connectionPanel; + } + + private Label createAppStatus() { + Label appStatus = new Label(); + appStatus.setValue("Connected"); + appStatus.setDomClasses(new ArrayList<>(Arrays.asList("bx-connection-status", "connected"))); + return appStatus; } - public boolean isSparkSessionIsActive() { - return sparkUIManager.isActive(); + private Button createDisconnectButton() { + Button disconnect = new Button(); + disconnect.registerOnClick((content, message) -> getSparkSession().sparkContext().stop()); + disconnect.setDomClasses(new ArrayList<>(Arrays.asList("bx-button", "icon-close"))); + return disconnect; + } + + public void startStage(int stageId, int numTasks) { + if (isStartStageFromNewCell()) { + jobPanel = createSparkFoldout(jobPanel); + } + SparkStateProgress intProgress = new SparkStateProgress(numTasks, stageId, stageId, jobLink(stageId), stageLink(stageId)); + intProgress.init(); + jobPanel.add(intProgress); + progressBarMap.put(stageId, intProgress); + } + + private boolean isStartStageFromNewCell() { + return InternalVariable.getParentHeader() != currentParentHeader; + } + + private SparkFoldout createSparkFoldout(SparkFoldout oldJobPanel) { + currentParentHeader = InternalVariable.getParentHeader(); + + if (oldJobPanel != null) { + oldJobPanel.getLayout().setDisplayNone(); + oldJobPanel.close(); + } + SparkFoldout.FoldoutOption foldoutOption = new SparkFoldout.FoldoutOption(); + foldoutOption.headerLabel = "Spark progress"; + SparkFoldout jobPanel = new SparkFoldout(new ArrayList<>(), foldoutOption); + jobPanel.display(); + return jobPanel; + } + + public void endStage(int stageId) { + SparkStateProgress sparkStateProgress = progressBarMap.get(stageId); + sparkStateProgress.hide(); + } + + public void taskStart(int stageId, long taskId) { + SparkStateProgress intProgress = progressBarMap.get(stageId); + intProgress.addActive(); + } + + public void taskEnd(int stageId, long taskId) { + SparkStateProgress intProgress = progressBarMap.get(stageId); + intProgress.addDone(); + } + + private String stageLink(int stageId) { + if (getSparkSession().sparkContext().uiWebUrl().isDefined()) { + return getSparkSession().sparkContext().uiWebUrl().get() + "/stages/stage/?id=" + stageId + "&attempt=0"; + } else { + return ""; + } + } + + private String jobLink(int jobId) { + if (getSparkSession().sparkContext().uiWebUrl().isDefined()) { + return getSparkSession().sparkContext().uiWebUrl().get() + "/jobs/job/?id=" + jobId; + } else { + return ""; + } + } + + public void cancelAllJobs() { + getSparkSession().sparkContext().cancelAllJobs(); + } + + @Override + public boolean isActive() { + return active; + } + + public Text getMasterURL() { + return masterURL; + } + + public Text getExecutorMemory() { + return executorMemory; + } + + public Text getExecutorCores() { + return executorCores; + } + + public List getAdvancedOptions() { + return this.advancedOption.getConfiguration(); } public void addMasterUrl(Text masterURL) { @@ -103,7 +322,7 @@ public void clearView() { } public void addView() { - this.sparkConfigPanel = new VBox(Arrays.asList(sparkConfig)); + this.sparkConfigPanel = new VBox(asList(sparkConfig)); add(sparkConfigPanel); } @@ -127,14 +346,26 @@ public void removeStatusPanel() { } } + void saveSparkConf(SparkConf sparkConf) { + sparkUiDefaults.saveSparkConf(sparkConf); + } + public interface SparkUIFactory { - SparkUI create(SparkManager sparkManager); + SparkUI create(SparkSession.Builder builder); } public static class SparkUIFactoryImpl implements SparkUIFactory { + SparkEngine.SparkEngineFactory sparkEngineFactory; + SparkUiDefaults sparkUiDefaults; + + public SparkUIFactoryImpl(SparkEngine.SparkEngineFactory sparkEngineFactory, SparkUiDefaults sparkUiDefaults) { + this.sparkEngineFactory = sparkEngineFactory; + this.sparkUiDefaults = sparkUiDefaults; + } + @Override - public SparkUI create(SparkManager sparkManager) { - return SparkUI.create(sparkManager); + public SparkUI create(SparkSession.Builder builder) { + return new SparkUI(builder, sparkEngineFactory, sparkUiDefaults); } } } diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIApi.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIApi.java new file mode 100644 index 0000000000..48da26939e --- /dev/null +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIApi.java @@ -0,0 +1,45 @@ +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import java.util.List; + +public interface SparkUIApi { + + List getAdvancedOptions(); + + Text getMasterURL(); + + Text getExecutorMemory(); + + Text getExecutorCores(); + + void startStage(int stageId, int numTasks); + + void endStage(int stageId); + + void taskStart(int stageId, long taskId); + + void taskEnd(int stageId, long taskId); + + void applicationStart(); + + void applicationEnd(); + + void cancelAllJobs(); + + boolean isActive(); +} diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIManager.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIManager.java deleted file mode 100644 index 7144481b09..0000000000 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUIManager.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twosigma.beakerx.widget; - -import com.twosigma.beakerx.TryResult; -import com.twosigma.beakerx.evaluator.InternalVariable; -import com.twosigma.beakerx.jvm.object.SimpleEvaluationObject; -import com.twosigma.beakerx.kernel.KernelFunctionality; -import com.twosigma.beakerx.kernel.KernelManager; -import com.twosigma.beakerx.message.Message; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SparkSession; - -import java.util.*; - -import static com.twosigma.beakerx.kernel.PlainCode.createSimpleEvaluationObject; - -public class SparkUIManager { - - public static final String SPARK_MASTER = "spark.master"; - public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory"; - - public static final String SPARK_EXECUTOR_CORES = "spark.executor.cores"; - public static final String SPARK_SESSION_NAME = "spark"; - public static final String CONNECT = "Start"; - private static final String SPARK_MASTER_DEFAULT = "local[*]"; - - private final SparkUI sparkUI; - private Map progressBarMap = new HashMap<>(); - private Text masterURL; - private Text executorMemory; - private Text executorCores; - private SparkConfiguration advancedOption; - private boolean active = false; - private SparkManager sparkManager; - private SparkFoldout jobPanel = null; - private Message currentParentHeader = null; - - public SparkUIManager(SparkUI sparkUI, SparkManager sparkManager) { - this.sparkUI = sparkUI; - this.sparkManager = sparkManager; - SparkVariable.putSparkUIManager(this); - createSparkView(); - } - - private void createSparkView() { - this.masterURL = createMasterURL(); - this.executorMemory = createExecutorMemory(); - this.executorCores = createExecutorCores(); - this.sparkUI.addConnectButton(createConnectButton()); - this.sparkUI.addMasterUrl(masterURL); - this.sparkUI.addExecutorCores(executorCores); - this.sparkUI.addExecutorMemory(executorMemory); - this.advancedOption = new SparkConfiguration(); - this.sparkUI.addAdvanceOptions(advancedOption); - } - - private Text createExecutorCores() { - Text cores = new Text(); - cores.setDescription("Executor cores"); - if (getSparkConf().contains(SPARK_EXECUTOR_CORES)) { - cores.setValue(getSparkConf().get(SPARK_EXECUTOR_CORES)); - } else { - cores.setValue("10"); - } - return cores; - } - - private SparkConf getSparkConf() { - List list = (this.advancedOption != null) ? this.advancedOption.getConfiguration() : Collections.EMPTY_LIST; - return sparkManager.getSparkConf(list); - } - - private Text createExecutorMemory() { - Text masterURL = new Text(); - masterURL.setDescription("Executor Memory"); - if (getSparkConf().contains(SPARK_EXECUTOR_MEMORY)) { - masterURL.setValue(getSparkConf().get(SPARK_EXECUTOR_MEMORY)); - } else { - masterURL.setValue("8g"); - } - return masterURL; - } - - private Text createMasterURL() { - Text masterURL = new Text(); - masterURL.setDescription("Master URL"); - if (getSparkConf().contains(SPARK_MASTER)) { - masterURL.setValue(getSparkConf().get(SPARK_MASTER)); - } else { - masterURL.setValue(SPARK_MASTER_DEFAULT); - } - return masterURL; - } - - private Button createConnectButton() { - Button connect = new Button(); - connect.setDescription(CONNECT); - connect.registerOnClick((content, message) -> initSparkContext(message)); - return connect; - } - - private void initSparkContext(Message parentMessage) { - KernelFunctionality kernel = KernelManager.get(); - try { - TryResult configure = sparkManager.configure(kernel, this, parentMessage); - if (configure.isError()) { - sendError(parentMessage, kernel, configure.error()); - } else { - active = true; - } - } catch (Exception e) { - sendError(parentMessage, kernel, e.getMessage()); - } - } - - private SparkSession getSparkSession() { - return sparkManager.getOrCreate(); - } - - private void sendError(Message parentMessage, KernelFunctionality kernel, String message) { - SimpleEvaluationObject seo = createSimpleEvaluationObject("", kernel, parentMessage, 1); - seo.error(message); - } - - public void applicationStart() { - sparkUI.clearView(); - sparkUI.addStatusPanel(createStatusPanel()); - sparkUI.sendUpdate("sparkAppId", sparkManager.getSparkAppId()); - sparkUI.sendUpdate("sparkUiWebUrl", sparkManager.getSparkUiWebUrl()); - sparkUI.sendUpdate("sparkMasterUrl", sparkManager.getSparkMasterUrl()); - } - - public void applicationEnd() { - sparkUI.removeStatusPanel(); - active = false; - sparkUI.addView(); - } - - private HBox createStatusPanel() { - Label appStatus = createAppStatus(); - Button disconnect = createDisconnectButton(); - HBox connectionPanel = new HBox(Arrays.asList(appStatus, disconnect)); - connectionPanel.setDomClasses(new ArrayList<>(Arrays.asList("bx-status-panel"))); - return connectionPanel; - } - - private Label createAppStatus() { - Label appStatus = new Label(); - appStatus.setValue("Connected"); - appStatus.setDomClasses(new ArrayList<>(Arrays.asList("bx-connection-status", "connected"))); - return appStatus; - } - - private Button createDisconnectButton() { - Button disconnect = new Button(); - disconnect.registerOnClick((content, message) -> getSparkSession().sparkContext().stop()); - disconnect.setDomClasses(new ArrayList<>(Arrays.asList("bx-button", "icon-close"))); - return disconnect; - } - - void startStage(int stageId, int numTasks) { - if (isStartStageFromNewCell()) { - jobPanel = createSparkFoldout(jobPanel); - } - SparkStateProgress intProgress = new SparkStateProgress(numTasks, stageId, stageId, jobLink(stageId), stageLink(stageId)); - intProgress.init(); - jobPanel.add(intProgress); - progressBarMap.put(stageId, intProgress); - } - - private boolean isStartStageFromNewCell() { - return InternalVariable.getParentHeader() != currentParentHeader; - } - - private SparkFoldout createSparkFoldout(SparkFoldout oldJobPanel) { - currentParentHeader = InternalVariable.getParentHeader(); - - if (oldJobPanel != null) { - oldJobPanel.getLayout().setDisplayNone(); - oldJobPanel.close(); - } - SparkFoldout.FoldoutOption foldoutOption = new SparkFoldout.FoldoutOption(); - foldoutOption.headerLabel = "Spark progress"; - SparkFoldout jobPanel = new SparkFoldout(new ArrayList<>(), foldoutOption); - jobPanel.display(); - return jobPanel; - } - - void endStage(int stageId) { - SparkStateProgress sparkStateProgress = progressBarMap.get(stageId); - sparkStateProgress.hide(); - } - - void taskStart(int stageId, long taskId) { - SparkStateProgress intProgress = progressBarMap.get(stageId); - intProgress.addActive(); - } - - void taskEnd(int stageId, long taskId) { - SparkStateProgress intProgress = progressBarMap.get(stageId); - intProgress.addDone(); - } - - private String stageLink(int stageId) { - if (getSparkSession().sparkContext().uiWebUrl().isDefined()) { - return getSparkSession().sparkContext().uiWebUrl().get() + "/stages/stage/?id=" + stageId + "&attempt=0"; - } else { - return ""; - } - } - - private String jobLink(int jobId) { - if (getSparkSession().sparkContext().uiWebUrl().isDefined()) { - return getSparkSession().sparkContext().uiWebUrl().get() + "/jobs/job/?id=" + jobId; - } else { - return ""; - } - } - - public void cancelAllJobs() { - getSparkSession().sparkContext().cancelAllJobs(); - } - - public boolean isActive() { - return active; - } - - public Text getMasterURL() { - return masterURL; - } - - public Text getExecutorMemory() { - return executorMemory; - } - - public Text getExecutorCores() { - return executorCores; - } - - public List getAdvancedOptions() { - return this.advancedOption.getConfiguration(); - } -} diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaults.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaults.java new file mode 100644 index 0000000000..e20249df35 --- /dev/null +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaults.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +public interface SparkUiDefaults { + + void saveSparkConf(SparkConf sparkConf); + + void loadDefaults(SparkSession.Builder builder); +} diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java new file mode 100644 index 0000000000..2ee936091b --- /dev/null +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkUiDefaultsImpl.java @@ -0,0 +1,94 @@ +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static com.twosigma.beakerx.widget.SparkUI.BEAKERX_ID; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_APP_NAME; +import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXTRA_LISTENERS; + +public class SparkUiDefaultsImpl implements SparkUiDefaults { + + public static final String SPARK_OPTIONS = "spark_options"; + public static final String BEAKERX = "beakerx"; + private Gson gson = new GsonBuilder().setPrettyPrinting().create(); + private Path path; + + public SparkUiDefaultsImpl(Path path) { + this.path = path; + } + + @Override + public void saveSparkConf(SparkConf sparkConf) { + Map newSparkConf = getNewSparkConf(sparkConf); + try { + Map map = beakerxJson(path); + map.get(BEAKERX).put(SPARK_OPTIONS, newSparkConf); + String content = gson.toJson(map); + Files.write(path, content.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void loadDefaults(SparkSession.Builder builder) { + SparkConf sparkConf = SparkEngineImpl.getSparkConfBasedOn(builder); + try { + Map beakerxJson = beakerxJson(path); + Map map = (Map) beakerxJson.get(BEAKERX).get(SPARK_OPTIONS); + if (map != null) { + map.entrySet().stream() + .filter(x -> !sparkConf.contains(x.getKey())) + .forEach(x -> builder.config(x.getKey(), x.getValue())); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Map beakerxJson(Path path) throws IOException { + String pathToFile = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); + return toMap(pathToFile); + } + + private Map toMap(String pathToFile) { + return (Map) gson.fromJson(pathToFile, Map.class); + } + + private Map getNewSparkConf(SparkConf sparkConf) { + Map result = new HashMap<>(); + Arrays.stream(sparkConf.getAll()) + .filter(x -> !x._1.equals(SPARK_EXTRA_LISTENERS)) + .filter(x -> !x._1.equals(BEAKERX_ID)) + .filter(x -> !x._1.equals(SPARK_APP_NAME)) + .forEach(x -> result.put(x._1, x._2)); + return result; + } + +} diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkVariable.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkVariable.java index f285e17f24..493a21aca4 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkVariable.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/SparkVariable.java @@ -15,33 +15,23 @@ */ package com.twosigma.beakerx.widget; -import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; public class SparkVariable { - private static SparkUIManager manager = null; - private static SparkContext sparkContext; - private static SparkSession sparkSession; + private static SparkUIApi sparkUI = null; + private static SparkSession sparkSession = null; - static void putSparkUIManager(SparkUIManager sparkUIManager) { - manager = sparkUIManager; + static void putSparkUI(SparkUIApi ui) { + sparkUI = ui; } - public static SparkUIManager getSparkUIManager() { - return manager; - } - - static void putSparkContext(SparkContext sc) { - sparkContext = sc; - } - - public static SparkContext getSparkContext() { - return sparkContext; + public static SparkUIApi getSparkUI() { + return sparkUI; } public static void cancelAllJobs() { - manager.cancelAllJobs(); + sparkUI.cancelAllJobs(); } public static void putSparkSession(SparkSession sSession) { @@ -49,6 +39,7 @@ public static void putSparkSession(SparkSession sSession) { } public static SparkSession getSparkSession() { + //method is used by initSparkContextInShell return sparkSession; } } diff --git a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/StartStopSparkListener.java b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/StartStopSparkListener.java index 4ff0098039..bb06ce150d 100644 --- a/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/StartStopSparkListener.java +++ b/kernel/sparkex/src/main/java/com/twosigma/beakerx/widget/StartStopSparkListener.java @@ -29,14 +29,14 @@ public StartStopSparkListener() { @Override public void onApplicationStart(SparkListenerApplicationStart applicationStart) { super.onApplicationStart(applicationStart); - SparkUIManager sparkUIManager = SparkVariable.getSparkUIManager(); - sparkUIManager.applicationStart(); + SparkUIApi sparkUI = SparkVariable.getSparkUI(); + sparkUI.applicationStart(); } @Override public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { super.onApplicationEnd(applicationEnd); - SparkUIManager sparkUIManager = SparkVariable.getSparkUIManager(); - sparkUIManager.applicationEnd(); + SparkUIApi sparkUI = SparkVariable.getSparkUI(); + sparkUI.applicationEnd(); } } \ No newline at end of file diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandAutoConnectTest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandAutoConnectTest.java index 73a6721979..ce0ce6f460 100644 --- a/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandAutoConnectTest.java +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandAutoConnectTest.java @@ -19,16 +19,17 @@ import com.twosigma.beakerx.kernel.Code; import com.twosigma.beakerx.kernel.magic.command.MagicCommandExecutionParam; import com.twosigma.beakerx.kernel.magic.command.outcome.MagicCommandOutcomeItem; -import com.twosigma.beakerx.message.Message; -import com.twosigma.beakerx.widget.SparkManager; +import com.twosigma.beakerx.widget.SparkEngine; import com.twosigma.beakerx.widget.SparkUI; +import com.twosigma.beakerx.widget.SparkUiDefaults; +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import static com.twosigma.beakerx.MessageFactorTest.commMsg; -import static com.twosigma.beakerx.scala.magic.command.SparkMagicCommandTest.createSparkManagerFactory; import static org.assertj.core.api.Assertions.assertThat; public class SparkMagicCommandAutoConnectTest { @@ -38,9 +39,8 @@ public class SparkMagicCommandAutoConnectTest { @Before public void setUp() { - SparkManager.SparkManagerFactory sparkManagerFactory = createSparkManagerFactory(); - SparkUI.SparkUIFactory sparkUIFactory = createSparkUIFactory(); - sparkMagicCommand = new SparkMagicCommand(new KernelTest(), sparkUIFactory, sparkManagerFactory); + SparkUI.SparkUIFactory sparkUIFactory = createSparkUIFactory(new SparkMagicCommandTest.SparkManagerFactoryTest()); + sparkMagicCommand = new SparkMagicCommand(new KernelTest(), sparkUIFactory); } @Test @@ -60,7 +60,7 @@ public void autoConnectToSpark_by_connect_option() { MagicCommandOutcomeItem execute = createSparkUi("--connect"); //then assertThat(execute.getStatus()).isEqualTo(MagicCommandOutcomeItem.Status.OK); - assertThat(sparkUI.isSparkSessionIsActive()).isTrue(); + assertThat(sparkUI.isActive()).isTrue(); } @Test @@ -70,7 +70,7 @@ public void autoConnectToSpark_by_c_option() { MagicCommandOutcomeItem execute = createSparkUi("-c"); //then assertThat(execute.getStatus()).isEqualTo(MagicCommandOutcomeItem.Status.OK); - assertThat(sparkUI.isSparkSessionIsActive()).isTrue(); + assertThat(sparkUI.isActive()).isTrue(); } private MagicCommandOutcomeItem createSparkUi(String option) { @@ -80,13 +80,23 @@ private MagicCommandOutcomeItem createSparkUi(String option) { return execute; } - private SparkUI.SparkUIFactory createSparkUIFactory() { + private SparkUI.SparkUIFactory createSparkUIFactory(SparkEngine.SparkEngineFactory sparkManagerFactory) { return new SparkUI.SparkUIFactory() { - private SparkUI.SparkUIFactoryImpl factory = new SparkUI.SparkUIFactoryImpl(); + private SparkUI.SparkUIFactoryImpl factory = new SparkUI.SparkUIFactoryImpl(sparkManagerFactory, new SparkUiDefaults() { + @Override + public void saveSparkConf(SparkConf sparkConf) { + + } + + @Override + public void loadDefaults(SparkSession.Builder builder) { + + } + }); @Override - public SparkUI create(SparkManager sparkManager) { - sparkUI = factory.create(sparkManager); + public SparkUI create(SparkSession.Builder builder) { + sparkUI = factory.create(builder); return sparkUI; } }; diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandTest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandTest.java index 038c925a7d..7aac1e3e3d 100644 --- a/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandTest.java +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/scala/magic/command/SparkMagicCommandTest.java @@ -22,10 +22,10 @@ import com.twosigma.beakerx.kernel.magic.command.MagicCommandExecutionParam; import com.twosigma.beakerx.kernel.magic.command.outcome.MagicCommandOutcomeItem; import com.twosigma.beakerx.message.Message; -import com.twosigma.beakerx.widget.SparkConfiguration; -import com.twosigma.beakerx.widget.SparkUIManager; -import com.twosigma.beakerx.widget.SparkManager; +import com.twosigma.beakerx.widget.SparkUIApi; +import com.twosigma.beakerx.widget.SparkEngine; import com.twosigma.beakerx.widget.SparkUI; +import com.twosigma.beakerx.widget.SparkUiDefaults; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; @@ -34,7 +34,7 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.List; +import java.util.Map; import static com.twosigma.beakerx.MessageFactorTest.commMsg; import static org.assertj.core.api.Assertions.assertThat; @@ -46,9 +46,8 @@ public class SparkMagicCommandTest { @Before public void setUp() { - SparkManager.SparkManagerFactory sparkManagerFactory = createSparkManagerFactory(); SparkUI.SparkUIFactory sparkUIFactory = createSparkUIFactory(); - sparkMagicCommand = new SparkMagicCommand(new KernelTest(), sparkUIFactory, sparkManagerFactory); + sparkMagicCommand = new SparkMagicCommand(new KernelTest(), sparkUIFactory); } @Test @@ -71,78 +70,87 @@ public void returnErrorWhenConnectToExistingSparkSession() { } private MagicCommandOutcomeItem connectToSparkSecondTime() { - Code code2 = Code.createCode("", new ArrayList<>(), new ArrayList<>(),commMsg()); + Code code2 = Code.createCode("", new ArrayList<>(), new ArrayList<>(), commMsg()); MagicCommandExecutionParam param2 = new MagicCommandExecutionParam("", "", 2, code2, true); return sparkMagicCommand.execute(param2); } private MagicCommandOutcomeItem createSparkUiAndConnectToSession() { - Code code = Code.createCode("%%spark", new ArrayList<>(), new ArrayList<>(),commMsg()); + Code code = Code.createCode("%%spark", new ArrayList<>(), new ArrayList<>(), commMsg()); MagicCommandExecutionParam param = new MagicCommandExecutionParam("%%spark", "", 1, code, true); MagicCommandOutcomeItem execute = sparkMagicCommand.execute(param); assertThat(execute.getStatus()).isEqualTo(MagicCommandOutcomeItem.Status.OK); - assertThat(sparkUI.isSparkSessionIsActive()).isFalse(); + assertThat(sparkUI.isActive()).isFalse(); sparkUI.getConnectButton().onClick(new HashMap(), commMsg()); return execute; } private SparkUI.SparkUIFactory createSparkUIFactory() { return new SparkUI.SparkUIFactory() { - private SparkUI.SparkUIFactoryImpl factory = new SparkUI.SparkUIFactoryImpl(); - @Override - public SparkUI create(SparkManager sparkManager) { - sparkUI = factory.create(sparkManager); - return sparkUI; - } - }; - } - - public static SparkManager.SparkManagerFactory createSparkManagerFactory() { - return sparkSessionBuilder -> new SparkManager() { - SparkConf sparkConf = new SparkConf(); - SparkSession.Builder builder = SparkSession.builder().config(sparkConf); + private SparkUI.SparkUIFactoryImpl factory = new SparkUI.SparkUIFactoryImpl(new SparkManagerFactoryTest(), new SparkUiDefaults() { + @Override + public void saveSparkConf(SparkConf sparkConf) { - @Override - public TryResult configure(KernelFunctionality kernel, SparkUIManager sparkContextManager, Message parent) { - return TryResult.createResult("ok"); - } + } - @Override - public SparkSession getOrCreate() { - return null; - } + @Override + public void loadDefaults(SparkSession.Builder builder) { - @Override - public SparkConf getSparkConf(List configurations) { - return sparkConf; - } + } + }); @Override - public SparkContext sparkContext() { - return null; - } - - @Override - public SparkSession.Builder getBuilder() { - return builder; + public SparkUI create(SparkSession.Builder builder) { + sparkUI = factory.create(builder); + return sparkUI; } + }; + } - @Override - public String getSparkAppId() { - return "sparkAppId1"; - } - @Override - public String getSparkUiWebUrl() { - return "sparkUiWebUrl"; - } - - @Override - public String getSparkMasterUrl() { - return "sparkMasterUrl"; - } - }; + static class SparkManagerFactoryTest implements SparkEngine.SparkEngineFactory { + + @Override + public SparkEngine create(SparkSession.Builder sparkSessionBuilder) { + return new SparkEngine() { + @Override + public TryResult configure(KernelFunctionality kernel, SparkUIApi spark, Message parentMessage) { + return TryResult.createResult("ok"); + } + + @Override + public SparkSession getOrCreate() { + return null; + } + + @Override + public SparkConf getSparkConf() { + return new SparkConf(); + } + + @Override + public String getSparkAppId() { + return "sparkAppId1"; + } + + @Override + public Map getAdvanceSettings() { + return new HashMap<>(); + } + + @Override + public String getSparkUiWebUrl() { + return ""; + } + + @Override + public String getSparkMasterUrl() { + return ""; + } + }; + } } + } \ No newline at end of file diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUITest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUITest.java new file mode 100644 index 0000000000..142db32284 --- /dev/null +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUITest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import com.twosigma.beakerx.KernelTest; +import com.twosigma.beakerx.TryResult; +import com.twosigma.beakerx.kernel.KernelFunctionality; +import com.twosigma.beakerx.kernel.KernelManager; +import com.twosigma.beakerx.message.Message; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static com.twosigma.beakerx.MessageFactorTest.commMsg; +import static org.assertj.core.api.Assertions.assertThat; + +public class SparkUITest { + + SparkUI sparkUI; + SparkUiDefaultsImplMock sparkUiDefaults; + KernelTest kernel; + + @Before + public void setUp() throws Exception { + kernel = new KernelTest(); + KernelManager.register(kernel); + sparkUiDefaults = new SparkUiDefaultsImplMock(); + sparkUI = new SparkUI(SparkSession.builder(), sparkSessionBuilder -> new SparkManagerImplTest(), sparkUiDefaults); + } + + @After + public void tearDown() throws Exception { + KernelManager.register(null); + } + + @Test + public void loadDefaultsWhenCreateSparkUI() { + //given + SparkUiDefaultsImplMock sparkUiDefaults = new SparkUiDefaultsImplMock(); + //when + new SparkUI(SparkSession.builder(), sparkSessionBuilder -> new SparkManagerImplTest(), sparkUiDefaults); + //then + assertThat(sparkUiDefaults.loaded).isTrue(); + } + + @Test + public void saveDefaultsWhenConnectToSparkSession() { + //given + //when + sparkUI.getConnectButton().onClick(new HashMap(), commMsg()); + //then + assertThat(sparkUiDefaults.saved).isTrue(); + } + + static class SparkUiDefaultsImplMock implements SparkUiDefaults { + + public boolean saved = false; + public boolean loaded = false; + + @Override + public void saveSparkConf(SparkConf sparkConf) { + saved = true; + } + + @Override + public void loadDefaults(SparkSession.Builder builder) { + loaded = true; + } + } + + static class SparkManagerImplTest implements SparkEngine { + + public static final String SPARK_APP_ID_MOCK = "SparkAppId1"; + + @Override + public TryResult configure(KernelFunctionality kernel, SparkUIApi sparkContextManager, Message parentMessage) { + return TryResult.createResult("ok"); + } + + @Override + public SparkSession getOrCreate() { + return SparkSession.builder().config(getSparkConf()).getOrCreate(); + } + + @Override + public SparkConf getSparkConf() { + SparkConf sparkConf = new SparkConf(); + sparkConf.setMaster("local[1]"); + sparkConf.setAppName("appName1"); + return sparkConf; + } + + @Override + public String getSparkAppId() { + return SPARK_APP_ID_MOCK; + } + + @Override + public Map getAdvanceSettings() { + return new HashMap<>(); + } + + @Override + public String getSparkUiWebUrl() { + return ""; + } + + @Override + public String getSparkMasterUrl() { + return ""; + } + } + +} \ No newline at end of file diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java new file mode 100644 index 0000000000..097cc05cf1 --- /dev/null +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/SparkUiDefaultsImplTest.java @@ -0,0 +1,51 @@ + +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.SparkSession; +import org.junit.Before; +import org.junit.Test; + +import java.nio.file.Paths; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SparkUiDefaultsImplTest { + + private SparkUiDefaultsImpl sut; + + @Before + public void setUp() { + String path = SparkUiDefaultsImplTest.class.getClassLoader().getResource("beakerxTest.json").getPath(); + this.sut = new SparkUiDefaultsImpl(Paths.get(path)); + } + + @Test + public void saveAndLoadDefaults() { + //given + SparkConf sparkConf = new SparkConf(); + sparkConf.set("sparkOption2", "sp2"); + //when + sut.saveSparkConf(sparkConf); + //then + SparkSession.Builder builder = SparkSession.builder(); + sut.loadDefaults(builder); + SparkConf sparkConfBasedOn = SparkEngineImpl.getSparkConfBasedOn(builder); + assertThat(sparkConfBasedOn.get("sparkOption2")).isEqualTo("sp2"); + } +} \ No newline at end of file diff --git a/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/StartStopSparkListenerTest.java b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/StartStopSparkListenerTest.java new file mode 100644 index 0000000000..c2e57aa7bc --- /dev/null +++ b/kernel/sparkex/src/test/java/com/twosigma/beakerx/widget/StartStopSparkListenerTest.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018 TWO SIGMA OPEN SOURCE, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twosigma.beakerx.widget; + +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.assertj.core.api.Assertions; +import org.junit.Before; +import org.junit.Test; +import scala.Option; + +import java.util.List; + +public class StartStopSparkListenerTest { + + SparkUIMock sparkUIMock; + + @Before + public void setUp() throws Exception { + sparkUIMock = new SparkUIMock(); + SparkVariable.putSparkUI(sparkUIMock); + } + + @Test + public void onApplicationStart() { + //given + StartStopSparkListener listener = new StartStopSparkListener(); + //when + listener.onApplicationStart(eventStart()); + //then + Assertions.assertThat(sparkUIMock.started).isTrue(); + } + + private SparkListenerApplicationStart eventStart() { + return new SparkListenerApplicationStart("an1", Option.empty(), 111, "su1", Option.empty(), Option.empty()); + } + + @Test + public void onApplicationEnd() { + //given + StartStopSparkListener listener = new StartStopSparkListener(); + //when + listener.onApplicationEnd(new SparkListenerApplicationEnd(123)); + //then + Assertions.assertThat(sparkUIMock.ended).isTrue(); + } + + static class SparkUIMock implements SparkUIApi { + + private boolean started = false; + private boolean ended = false; + + @Override + public List getAdvancedOptions() { + return null; + } + + @Override + public Text getMasterURL() { + return null; + } + + @Override + public Text getExecutorMemory() { + return null; + } + + @Override + public Text getExecutorCores() { + return null; + } + + @Override + public void startStage(int stageId, int numTasks) { + + } + + @Override + public void endStage(int stageId) { + + } + + @Override + public void taskStart(int stageId, long taskId) { + + } + + @Override + public void taskEnd(int stageId, long taskId) { + + } + + @Override + public void applicationStart() { + started = true; + } + + @Override + public void applicationEnd() { + ended = true; + } + + @Override + public void cancelAllJobs() { + + } + + @Override + public boolean isActive() { + return false; + } + } +} \ No newline at end of file diff --git a/kernel/sparkex/src/test/resources/beakerxTest.json b/kernel/sparkex/src/test/resources/beakerxTest.json new file mode 100644 index 0000000000..f7b9aaaa3f --- /dev/null +++ b/kernel/sparkex/src/test/resources/beakerxTest.json @@ -0,0 +1,13 @@ +{ + "beakerx": { + "jvm_options": { + "other": [], + "properties": {} + }, + "spark_options": {}, + "ui_options": { + "auto_close": true + }, + "version": 2.0 + } +} \ No newline at end of file