Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept spark-submit arguments on the command-line #1455

Merged
merged 6 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ final case class SharedRunOptions(
@ExtraName("spark")
sparkSubmit: Option[Boolean] = None,
@Group("Run")
@Hidden
@HelpMessage("[experimental] spark-submit arguments")
@ExtraName("submitArg")
submitArgument: List[String] = Nil,
@Group("Run")
@HelpMessage("[experimental] Run as a Spark job, using a vanilla Spark distribution downloaded by Scala CLI")
@ExtraName("sparkStandalone")
standaloneSpark: Option[Boolean] = None,
Expand Down
50 changes: 33 additions & 17 deletions modules/cli/src/main/scala/scala/cli/commands/Repl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.build.options.{BuildOptions, JavaOpt, Scope}
import scala.cli.CurrentParams
import scala.cli.commands.Run.{maybePrintSimpleScalacOutput, orPythonDetectionError}
import scala.cli.commands.publish.ConfigUtil.*
import scala.cli.commands.run.RunMode
import scala.cli.commands.util.CommonOps.*
import scala.cli.commands.util.SharedOptionsUtil.*
import scala.cli.config.{ConfigDb, Keys}
Expand Down Expand Up @@ -52,6 +53,9 @@ object Repl extends ScalaCommand[ReplOptions] {
)
}

private def runMode(options: ReplOptions): RunMode.HasRepl =
RunMode.Default

override def runCommand(options: ReplOptions, args: RemainingArgs): Unit = {
val initialBuildOptions = buildOptionsOrExit(options)
def default = Inputs.default().getOrElse {
Expand Down Expand Up @@ -85,6 +89,7 @@ object Repl extends ScalaCommand[ReplOptions] {
artifacts: Artifacts,
classDir: Option[os.Path],
allowExit: Boolean,
runMode: RunMode.HasRepl,
buildOpt: Option[Build.Successful]
): Unit = {
val res = runRepl(
Expand All @@ -96,6 +101,7 @@ object Repl extends ScalaCommand[ReplOptions] {
logger,
allowExit = allowExit,
options.sharedRepl.replDryRun,
runMode,
buildOpt
)
res match {
Expand All @@ -107,13 +113,15 @@ object Repl extends ScalaCommand[ReplOptions] {
}
def doRunReplFromBuild(
build: Build.Successful,
allowExit: Boolean
allowExit: Boolean,
runMode: RunMode.HasRepl
): Unit =
doRunRepl(
build.options,
build.artifacts,
build.outputOpt,
allowExit,
runMode,
Some(build)
)

Expand All @@ -135,6 +143,7 @@ object Repl extends ScalaCommand[ReplOptions] {
artifacts,
None,
allowExit = !options.sharedRepl.watch.watchMode,
runMode = runMode(options),
buildOpt = None
)
}
Expand All @@ -160,9 +169,10 @@ object Repl extends ScalaCommand[ReplOptions] {
) { res =>
for (builds <- res.orReport(logger))
builds.main match {
case s: Build.Successful => doRunReplFromBuild(s, allowExit = false)
case _: Build.Failed => buildFailed(allowExit = false)
case _: Build.Cancelled => buildCancelled(allowExit = false)
case s: Build.Successful =>
doRunReplFromBuild(s, allowExit = false, runMode = runMode(options))
case _: Build.Failed => buildFailed(allowExit = false)
case _: Build.Cancelled => buildCancelled(allowExit = false)
}
}
try WatchUtil.waitForCtrlC(() => watcher.schedule())
Expand All @@ -183,9 +193,10 @@ object Repl extends ScalaCommand[ReplOptions] {
)
.orExit(logger)
builds.main match {
case s: Build.Successful => doRunReplFromBuild(s, allowExit = true)
case _: Build.Failed => buildFailed(allowExit = true)
case _: Build.Cancelled => buildCancelled(allowExit = true)
case s: Build.Successful =>
doRunReplFromBuild(s, allowExit = true, runMode = runMode(options))
case _: Build.Failed => buildFailed(allowExit = true)
case _: Build.Cancelled => buildCancelled(allowExit = true)
}
}
}
Expand All @@ -208,6 +219,7 @@ object Repl extends ScalaCommand[ReplOptions] {
logger: Logger,
allowExit: Boolean,
dryRun: Boolean,
runMode: RunMode.HasRepl,
buildOpt: Option[Build.Successful]
): Either[BuildException, Unit] = either {

Expand Down Expand Up @@ -363,16 +375,20 @@ object Repl extends ScalaCommand[ReplOptions] {
case other => other
}

if (shouldUseAmmonite) {
val replArtifacts = value(ammoniteArtifacts())
val replArgs = ammoniteAdditionalArgs() ++ programArgs
maybeRunRepl(replArtifacts, replArgs)
}
else {
val replArtifacts = value(defaultArtifacts())
val replArgs = additionalArgs ++ programArgs
maybeRunRepl(replArtifacts, replArgs)
}
if (shouldUseAmmonite)
runMode match {
case RunMode.Default =>
val replArtifacts = value(ammoniteArtifacts())
val replArgs = ammoniteAdditionalArgs() ++ programArgs
maybeRunRepl(replArtifacts, replArgs)
}
else
runMode match {
case RunMode.Default =>
val replArtifacts = value(defaultArtifacts())
val replArgs = additionalArgs ++ programArgs
maybeRunRepl(replArtifacts, replArgs)
}
}

final class ReplError(retCode: Int)
Expand Down
18 changes: 10 additions & 8 deletions modules/cli/src/main/scala/scala/cli/commands/Run.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers {
options.sharedRun.standaloneSpark.getOrElse(false) &&
!options.sharedRun.sparkSubmit.contains(false)
)
RunMode.StandaloneSparkSubmit
RunMode.StandaloneSparkSubmit(options.sharedRun.submitArgument)
else if (options.sharedRun.sparkSubmit.getOrElse(false))
RunMode.SparkSubmit
RunMode.SparkSubmit(options.sharedRun.submitArgument)
else if (options.sharedRun.hadoopJar)
RunMode.HadoopJar
else
Expand Down Expand Up @@ -69,7 +69,7 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers {
sharedJava.allJavaOpts.map(JavaOpt(_)).map(Positioned.commandLine),
jvmIdOpt = baseOptions.javaOptions.jvmIdOpt.orElse {
runMode(options) match {
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
case _: RunMode.Spark | RunMode.HadoopJar =>
Some("8")
case RunMode.Default => None
}
Expand All @@ -78,8 +78,8 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers {
internal = baseOptions.internal.copy(
keepResolution = baseOptions.internal.keepResolution || {
runMode(options) match {
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar => true
case RunMode.Default => false
case _: RunMode.Spark | RunMode.HadoopJar => true
case RunMode.Default => false
}
}
),
Expand All @@ -90,7 +90,7 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers {
scalaPyVersion = options.sharedRun.sharedPython.scalaPyVersion,
addRunnerDependencyOpt = baseOptions.notForBloopOptions.addRunnerDependencyOpt.orElse {
runMode(options) match {
case RunMode.StandaloneSparkSubmit | RunMode.SparkSubmit | RunMode.HadoopJar =>
case _: RunMode.Spark | RunMode.HadoopJar =>
Some(false)
case RunMode.Default => None
}
Expand Down Expand Up @@ -472,24 +472,26 @@ object Run extends ScalaCommand[RunOptions] with BuildCommandHelpers {
)
Right((proc, None))
}
case RunMode.SparkSubmit =>
case mode: RunMode.SparkSubmit =>
value {
RunSpark.run(
build,
mainClass,
args,
mode.submitArgs,
logger,
allowExecve,
showCommand,
scratchDirOpt
)
}
case RunMode.StandaloneSparkSubmit =>
case mode: RunMode.StandaloneSparkSubmit =>
value {
RunSpark.runStandalone(
build,
mainClass,
args,
mode.submitArgs,
logger,
allowExecve,
showCommand,
Expand Down
21 changes: 17 additions & 4 deletions modules/cli/src/main/scala/scala/cli/commands/run/RunMode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@ package scala.cli.commands.run
sealed abstract class RunMode extends Product with Serializable

object RunMode {
case object Default extends RunMode
case object SparkSubmit extends RunMode
case object StandaloneSparkSubmit extends RunMode
case object HadoopJar extends RunMode

sealed abstract class HasRepl extends RunMode
sealed abstract class Spark extends RunMode {
def submitArgs: Seq[String]
def withSubmitArgs(args: Seq[String]): Spark
}

case object Default extends HasRepl
final case class SparkSubmit(submitArgs: Seq[String]) extends Spark {
def withSubmitArgs(args: Seq[String]): SparkSubmit =
copy(submitArgs = args)
}
final case class StandaloneSparkSubmit(submitArgs: Seq[String]) extends Spark {
def withSubmitArgs(args: Seq[String]): StandaloneSparkSubmit =
copy(submitArgs = args)
}
case object HadoopJar extends RunMode
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ object RunSpark {
build: Build.Successful,
mainClass: String,
args: Seq[String],
submitArgs: Seq[String],
logger: Logger,
allowExecve: Boolean,
showCommand: Boolean,
Expand Down Expand Up @@ -54,6 +55,7 @@ object RunSpark {
Seq(submitCommand, "--class", mainClass) ++
jarsArgs ++
javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++
submitArgs ++
Seq(library.toString) ++
args
val envUpdates = javaHomeInfo.envUpdates(sys.env)
Expand All @@ -77,6 +79,7 @@ object RunSpark {
build: Build.Successful,
mainClass: String,
args: Seq[String],
submitArgs: Seq[String],
logger: Logger,
allowExecve: Boolean,
showCommand: Boolean,
Expand Down Expand Up @@ -107,6 +110,7 @@ object RunSpark {
Seq("--class", mainClass) ++
jarsArgs ++
javaOpts.flatMap(opt => Seq("--driver-java-options", opt)) ++
submitArgs ++
Seq(library.toString) ++
args
val envUpdates = javaHomeInfo.envUpdates(sys.env)
Expand Down
Loading