Skip to content

Commit

Permalink
SparkUI - changed config structure, restored default config
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasz Mitusinski committed Jun 15, 2018
1 parent 3016fb7 commit c608461
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 144 deletions.
2 changes: 1 addition & 1 deletion beakerx/beakerx/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"use_data_grid": true,
"show_catalog": false
},
"spark_options":{}
"spark_profiles": []
}
}
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ private SparkConf createSparkConf(List<SparkConfiguration.Configuration> configu
SparkConf sparkConf = new SparkConf();
sparkConf.set(SPARK_EXTRA_LISTENERS, old.get(SPARK_EXTRA_LISTENERS));
sparkConf.set(BEAKERX_ID, old.get(BEAKERX_ID));
if (old.contains(SPARK_APP_NAME)) {
sparkConf.set(SPARK_APP_NAME, old.get(SPARK_APP_NAME));
}
configurations.forEach(x -> {
if (x.getName() != null) {
sparkConf.set(x.getName(), (x.getValue() != null) ? x.getValue() : "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.twosigma.beakerx.kernel.KernelManager;
import com.twosigma.beakerx.kernel.msg.StacktraceHtmlPrinter;
import com.twosigma.beakerx.message.Message;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

import java.util.ArrayList;
Expand Down Expand Up @@ -117,6 +116,7 @@ private void configureSparkContext(Message parentMessage, KernelFunctionality ke
this.sparkUIForm.sendError(StacktraceHtmlPrinter.printRedBold(ERROR_CREATING_SPARK_SESSION));
} else {
singleSparkSession.active();
sparkUIForm.saveDefaults();
applicationStart();
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXECUTOR_CORES;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_EXECUTOR_MEMORY;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_MASTER;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_MASTER_DEFAULT;
import static com.twosigma.beakerx.widget.SparkUIApi.SPARK_ADVANCED_OPTIONS;
import static com.twosigma.beakerx.widget.SparkUiDefaults.DEFAULT_PROFILE;
import static java.util.Arrays.asList;

public class SparkUIForm extends VBox {
Expand Down Expand Up @@ -76,10 +77,11 @@ private void createSparkView() {
}

private HBox createProfileManagement() {
List profiles = sparkUiDefaults.getProfiles();
ComboBox profileComboBox = new ComboBox(true);
profileComboBox.setEditable(true);
profileComboBox.setOptions(this.sparkUiDefaults.getProfiles());
profileComboBox.setValue(profileComboBox.getOptions().length > 0 ? profileComboBox.getOptions()[0] : "");
profileComboBox.setOptions(this.sparkUiDefaults.getProfileNames());
profileComboBox.setValue(profiles == null || profiles.size() ==0 ? "" : sparkUiDefaults.getProfiles().get(0).get("name"));
profileComboBox.setDescription(PROFILE_DESC);
profileComboBox.register(this::loadProfile);
profileComboBox.setDomClasses(asList("bx-spark-config"));
Expand All @@ -99,10 +101,14 @@ private HBox createProfileManagement() {

private void loadProfile() {
String profileName = this.profile.getValue();
if (!this.sparkUiDefaults.getProfiles().contains(profileName)) {
if (!this.sparkUiDefaults.getProfileNames().contains(profileName)) {
return;
}
Map<String, Object> profileData = this.sparkUiDefaults.loadProfile(profileName);
sparkUiDefaults.loadProfiles();
Map<String, Object> profileData =
(Map<String, Object>) sparkUiDefaults
.getProfileByName(profileName)
.getOrDefault("spark_options", new HashMap<>());
if (profileData.size() > 0) {
this.masterURL.setValue(profileData.getOrDefault(SparkUI.SPARK_MASTER, SparkUI.SPARK_MASTER_DEFAULT));
this.executorCores.setValue(profileData.getOrDefault(SparkUI.SPARK_EXECUTOR_CORES, SparkUI.SPARK_EXECUTOR_CORES_DEFAULT));
Expand All @@ -118,19 +124,44 @@ private void loadProfile() {
}

private void saveProfile() {
HashMap<String, Object> profileConfig = new HashMap<>();
profileConfig.put(SparkUI.SPARK_MASTER, getMasterURL().getValue());
profileConfig.put(SparkUI.SPARK_EXECUTOR_CORES, getExecutorCores().getValue());
profileConfig.put(SparkUI.SPARK_EXECUTOR_MEMORY, getExecutorMemory().getValue());
profileConfig.put(SparkUI.SPARK_ADVANCED_OPTIONS, getAdvancedOptions());
sparkUiDefaults.saveSparkConf(profileConfig, this.profile.getValue());
this.profile.setOptions(sparkUiDefaults.getProfiles());
if (profile.getValue().equals(DEFAULT_PROFILE)) {
return;
}
HashMap sparkConfig = getCurrentConfig();
HashMap<String, Object> sparkProfile = new HashMap<>();
sparkProfile.put("name", profile.getValue());
sparkProfile.put("spark_options", sparkConfig);
sparkUiDefaults.saveProfile(sparkProfile);
profile.setOptions(sparkUiDefaults.getProfileNames());
}

private HashMap<String, Object> getCurrentConfig() {
HashMap<String, Object> sparkConfig = new HashMap<>();
sparkConfig.put(SparkUI.SPARK_MASTER, getMasterURL().getValue());
sparkConfig.put(SparkUI.SPARK_EXECUTOR_CORES, getExecutorCores().getValue());
sparkConfig.put(SparkUI.SPARK_EXECUTOR_MEMORY, getExecutorMemory().getValue());
sparkConfig.put(SparkUI.SPARK_ADVANCED_OPTIONS, getAdvancedOptions());
return sparkConfig;
}


public void saveDefaults() {
HashMap sparkConfig = getCurrentConfig();
HashMap<String, Object> sparkProfile = new HashMap<>();
sparkProfile.put("name", DEFAULT_PROFILE);
sparkProfile.put("spark_options", sparkConfig);
sparkUiDefaults.saveProfile(sparkProfile);
profile.setOptions(sparkUiDefaults.getProfileNames());
}

private void removeProfile() {
if (profile.getValue().equals(DEFAULT_PROFILE)) {
return;
}
sparkUiDefaults.removeSparkConf(profile.getValue());
this.profile.setOptions(sparkUiDefaults.getProfiles());
this.profile.setValue("");
profile.setOptions(sparkUiDefaults.getProfileNames());
profile.setValue(DEFAULT_PROFILE);
loadProfile();
}

private void addConnectButton(Button connect, HBox errors) {
Expand Down Expand Up @@ -225,4 +256,5 @@ public void clearErrors() {
public Button getConnectButton() {
return connectButton;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@

import org.apache.spark.sql.SparkSession;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public interface SparkUiDefaults {

void saveSparkConf(HashMap sparkConf, String profileName);
String DEFAULT_PROFILE = "Default";

void saveSparkConf(List<Map<String, Object>> sparkConf);

void loadDefaults(SparkSession.Builder builder);

Set<String> getProfiles();
List<Map<String, Object>> getProfiles();

Map<String, Object> getProfileByName(String name);

void removeSparkConf(String profileName);

Map<String, Object> loadProfile(String profileName);
void loadProfiles();

void saveProfile(Map<String, Object> profile);

List<String> getProfileNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static com.twosigma.beakerx.widget.SparkUI.BEAKERX_ID;
import static com.twosigma.beakerx.widget.SparkUI.SPARK_APP_NAME;
Expand All @@ -47,73 +49,87 @@ public class SparkUiDefaultsImpl implements SparkUiDefaults {
public static final String PROPERTIES = "properties";
public static final String SPARK_OPTIONS = "spark_options";
public static final String BEAKERX = "beakerx";
private static final Object SPARK_PROFILES = "spark_profiles";

private Set<String> profiles = new HashSet<>();
private List<Map<String, Object>> profiles = new ArrayList<>();
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
private Path path;

public SparkUiDefaultsImpl(Path path) {
this.path = path;
}

@Override
public void saveSparkConf(HashMap sparkConf, String profileName) {
public void saveSparkConf(List<Map<String, Object>> profiles) {
try {
Map<String, Map> map = beakerxJsonAsMap(path);
Map<String, Map> sparkOptions = (Map<String, Map>) map.get(BEAKERX).get(SPARK_OPTIONS);
if (sparkOptions == null) {
sparkOptions = new HashMap<>();
map.get(BEAKERX).put(SPARK_OPTIONS, sparkOptions);
}
sparkOptions.put(profileName, sparkConf);
map.get(BEAKERX).put(SPARK_PROFILES, profiles == null ? new ArrayList<>() : profiles);
String content = gson.toJson(map);
Files.write(path, content.getBytes(StandardCharsets.UTF_8));
profiles.add(profileName);
} catch (Exception e) {
throw new RuntimeException(e);
this.profiles = profiles;
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void loadDefaults(SparkSession.Builder builder) {
SparkConf sparkConf = SparkEngineImpl.getSparkConfBasedOn(builder);
Map<String, Object> map = loadProfile(null);
loadProfiles();
Map<String, Object> map = (Map<String, Object>) getProfileByName(DEFAULT_PROFILE).get(SPARK_OPTIONS);
if (map != null) {
map.entrySet().stream()
.filter(x -> !sparkConf.contains(x.getKey()))
.forEach(x -> addToBuilder(builder, x.getKey(), x.getValue()));
}
}

public Set<String> getProfiles() {
@Override
public List<Map<String, Object>> getProfiles() {
return profiles;
}

@Override
public Map<String, Object> loadProfile(String profileName) {
public Map<String, Object> getProfileByName(String name) {
Map<String, Object> profile = new HashMap<>();
return profiles.stream().filter(x -> x.get("name").equals(name)).findFirst().orElse(profile);
}

@Override
public void loadProfiles() {
Map<String, Map> beakerxJson = beakerxJsonAsMap(path);
Map<String, Object> sparkConfProfiles = getOptions(beakerxJson);
if (sparkConfProfiles != null && sparkConfProfiles.keySet().size() > 0) {
profiles = new HashSet<>(sparkConfProfiles.keySet());
String profileKey = profileName != null && profiles.contains(profileName) ? profileName : profiles.iterator().next();
profile = (Map<String, Object>) sparkConfProfiles.get(profileKey);
List<Map<String, Object>> profiles = (List<Map<String, Object>>) beakerxJson.get(BEAKERX).get(SPARK_PROFILES);
if (profiles == null) {
Map<String, Object> defaultProfile = new HashMap<>();
defaultProfile.put("name", DEFAULT_PROFILE);
defaultProfile.put(SPARK_OPTIONS, new HashMap<>());
profiles = new ArrayList<>();
profiles.add(defaultProfile);
}
return profile;
this.profiles = profiles;
}

@Override
public void removeSparkConf(String profileName) {
try {
Map<String, Map> map = beakerxJsonAsMap(path);
Map<String, Map> sparkOptions = (Map<String, Map>) map.get(BEAKERX).get(SPARK_OPTIONS);
sparkOptions.remove(profileName);
String content = gson.toJson(map);
Files.write(path, content.getBytes(StandardCharsets.UTF_8));
profiles.remove(profileName);
} catch (IOException e) {
throw new RuntimeException(e);
public void saveProfile(Map<String, Object> profile) {
int idx = IntStream.range(0, profiles.size())
.filter(i -> profile.get("name").equals(profiles.get(i).get("name")))
.findFirst().orElse(-1);
if (idx == -1) {
profiles.add(profile);
} else {
profiles.set(idx, profile);
}
saveSparkConf(profiles);

}

@Override
public List<String> getProfileNames() {
return profiles.stream().map(x -> (String) x.get("name")).collect(Collectors.toList());
}

@Override
public void removeSparkConf(String profileName) {
profiles.removeIf(x -> x.get("name").equals(profileName));
saveSparkConf(profiles);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import com.twosigma.beakerx.widget.SparkEngine;
import com.twosigma.beakerx.widget.SparkUI;
import com.twosigma.beakerx.widget.SparkUiDefaults;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static com.twosigma.beakerx.MessageFactorTest.commMsg;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -89,8 +87,9 @@ private MagicCommandOutcomeItem createSparkUi(String option) {
private SparkUI.SparkUIFactory createSparkUIFactory(SparkEngine.SparkEngineFactory sparkManagerFactory, SingleSparkSession singleSparkSession) {
return new SparkUI.SparkUIFactory() {
private SparkUI.SparkUIFactoryImpl factory = new SparkUI.SparkUIFactoryImpl(sparkManagerFactory, new SparkUiDefaults() {

@Override
public void saveSparkConf(HashMap sparkConf, String profileName) {
public void saveSparkConf(List<Map<String, Object>> sparkConf) {

}

Expand All @@ -100,7 +99,12 @@ public void loadDefaults(SparkSession.Builder builder) {
}

@Override
public Set<String> getProfiles() {
public List<Map<String, Object>> getProfiles() {
return null;
}

@Override
public Map<String, Object> getProfileByName(String name) {
return null;
}

Expand All @@ -110,9 +114,20 @@ public void removeSparkConf(String profileName) {
}

@Override
public Map<String, Object> loadProfile(String profileName) {
public void loadProfiles() {

}

@Override
public void saveProfile(Map<String, Object> profile) {

}

@Override
public List<String> getProfileNames() {
return null;
}

}, singleSparkSession);

@Override
Expand Down
Loading

0 comments on commit c608461

Please sign in to comment.