Skip to content

Commit

Permalink
cluster/spec: remove tispark_{config,env} form worker spec
Browse files Browse the repository at this point in the history
  • Loading branch information
AstroProfundis committed Jul 16, 2020
1 parent 75f40d1 commit 0bf52f3
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 56 deletions.
26 changes: 0 additions & 26 deletions examples/topology.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -231,33 +231,7 @@ tispark_masters:
# NOTE: multiple worker nodes on the same host is not supported
tispark_workers:
- host: 10.0.1.22
#spark_config:
# spark.driver.memory: "2g"
# spark.eventLog.enabled: "False"
# spark.tispark.grpc.framesize: 268435456
# spark.tispark.grpc.timeout_in_sec: 100
# spark.tispark.meta.reload_period_in_sec: 60
# spark.tispark.request.command.priority: "Low"
# spark.tispark.table.scan_concurrency: 256
#spark_env:
# SPARK_EXECUTOR_CORES: 5
# SPARK_EXECUTOR_MEMORY: "10g"
# SPARK_WORKER_CORES: 5
# SPARK_WORKER_MEMORY: "10g"
- host: 10.0.1.23
#spark_config:
# spark.driver.memory: "2g"
# spark.eventLog.enabled: "False"
# spark.tispark.grpc.framesize: 268435456
# spark.tispark.grpc.timeout_in_sec: 100
# spark.tispark.meta.reload_period_in_sec: 60
# spark.tispark.request.command.priority: "Low"
# spark.tispark.table.scan_concurrency: 256
#spark_env:
# SPARK_EXECUTOR_CORES: 5
# SPARK_EXECUTOR_MEMORY: "10g"
# SPARK_WORKER_CORES: 5
# SPARK_WORKER_MEMORY: "10g"

monitoring_servers:
- host: 10.0.1.11
Expand Down
40 changes: 10 additions & 30 deletions pkg/cluster/spec/tispark.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,14 @@ func (s TiSparkMasterSpec) Status(pdList ...string) string {

// TiSparkWorkerSpec is the topology specification for TiSpark slave nodes
type TiSparkWorkerSpec struct {
Host string `yaml:"host"`
SSHPort int `yaml:"ssh_port,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Port int `yaml:"port" default:"7078"`
WebPort int `yaml:"web_port" default:"8081"`
DeployDir string `yaml:"deploy_dir,omitempty"`
SparkConfigs map[string]interface{} `yaml:"spark_config,omitempty"`
SparkEnvs map[string]string `yaml:"spark_env,omitempty"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
Host string `yaml:"host"`
SSHPort int `yaml:"ssh_port,omitempty"`
Imported bool `yaml:"imported,omitempty"`
Port int `yaml:"port" default:"7078"`
WebPort int `yaml:"web_port" default:"8081"`
DeployDir string `yaml:"deploy_dir,omitempty"`
Arch string `yaml:"arch,omitempty"`
OS string `yaml:"os,omitempty"`
}

// Role returns the component role of the instance
Expand Down Expand Up @@ -287,24 +285,6 @@ type TiSparkWorkerInstance struct {
instance
}

// GetCustomFields get custom spark configs of the instance
func (i *TiSparkWorkerInstance) GetCustomFields() map[string]interface{} {
v := reflect.ValueOf(i.InstanceSpec).FieldByName("SparkConfigs")
if !v.IsValid() {
return nil
}
return v.Interface().(map[string]interface{})
}

// GetCustomEnvs get custom spark envionment variables of the instance
func (i *TiSparkWorkerInstance) GetCustomEnvs() map[string]string {
v := reflect.ValueOf(i.InstanceSpec).FieldByName("SparkEnvs")
if !v.IsValid() {
return nil
}
return v.Interface().(map[string]string)
}

// InitConfig implement Instance interface
func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clusterVersion, deployUser string, paths meta.DirPaths) error {
// generate systemd service to invoke spark's start/stop scripts
Expand Down Expand Up @@ -338,7 +318,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu
}

cfg := config.NewTiSparkConfig(pdList).WithMasters(strings.Join(masterList, ",")).
WithCustomFields(i.GetCustomFields())
WithCustomFields(i.instance.topo.TiSparkMasters[0].SparkConfigs)
// transfer spark-defaults.conf
fp := filepath.Join(paths.Cache, fmt.Sprintf("spark-defaults-%s-%d.conf", host, port))
if err := cfg.ConfigToFile(fp); err != nil {
Expand All @@ -352,7 +332,7 @@ func (i *TiSparkWorkerInstance) InitConfig(e executor.Executor, clusterName, clu
env := scripts.NewTiSparkEnv(i.topo.TiSparkMasters[0].Host).
WithMasterPorts(i.topo.TiSparkMasters[0].Port, i.topo.TiSparkMasters[0].WebPort).
WithWorkerPorts(i.usedPorts[0], i.usedPorts[1]).
WithCustomEnv(i.GetCustomEnvs())
WithCustomEnv(i.instance.topo.TiSparkMasters[0].SparkEnvs)
// transfer spark-env.sh file
fp = filepath.Join(paths.Cache, fmt.Sprintf("spark-env-%s-%d.sh", host, port))
if err := env.ScriptToFile(fp); err != nil {
Expand Down

0 comments on commit 0bf52f3

Please sign in to comment.