Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make some repl components extensible #1342

Merged
merged 12 commits into from
Jul 1, 2015
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext
* Object containing various implicit conversions required to create Scalding flows in the REPL.
* Most of these conversions come from the [[com.twitter.scalding.Job]] class.
*/
object ReplImplicits extends FieldConversions {
trait BaseReplImplicits extends FieldConversions {

/** required for switching to hdfs local mode */
private val mr1Key = "mapred.job.tracker"
Expand Down Expand Up @@ -77,14 +77,15 @@ object ReplImplicits extends FieldConversions {
/* Using getter/setter here lets us get the correct defaults set by the mode
(and read from command-line, etc) while still allowing the user to customize it */
def config: Config = Config.defaultFrom(mode) ++ customConfig
def config_=(c: Config) { customConfig = c }

protected def shell: BaseScaldingShell = ScaldingShell
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems strange to have this called BaseReplImplicits. The implicit part is about the imports. What is the core function here? If we are refactoring, I'd like to improve it, seems we make be making it worse here.


/** Create config for execution. Tacks on a new jar for each execution. */
private[scalding] def executionConfig: Config = {
// Create a jar to hold compiled code for this REPL session in addition to
// "tempjars" which can be passed in from the command line, allowing code
// in the repl to be distributed for the Hadoop job to run.
val replCodeJar = ScaldingShell.createReplCodeJar()
val replCodeJar: Option[java.io.File] = shell.createReplCodeJar()
val tmpJarsConfig: Map[String, String] =
replCodeJar match {
case Some(jar) =>
Expand Down Expand Up @@ -245,6 +246,8 @@ object ReplImplicits extends FieldConversions {

}

object ReplImplicits extends BaseReplImplicits

/**
* Implicit FlowDef and Mode, import in the REPL to have the global context implicitly
* used everywhere.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ class ScaldingILoop
*
* @return a prompt string to use for this REPL.
*/
override def prompt: String = ScaldingShell.prompt()
override def prompt: String = shell.prompt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be in BaseScaldingShell? Seems to make sense there too.


/**
* Which shell instance to use. Override this for customized shells.
*/
protected def shell: BaseScaldingShell = ScaldingShell

private[this] def addImports(ids: String*): IR.Result =
if (ids.isEmpty) IR.Success
Expand All @@ -77,14 +82,16 @@ class ScaldingILoop
*/
override def commands: List[LoopCommand] = super.commands ++ scaldingCommands

protected def shellImports: List[String] = List(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be moved to BaseScaldingShell so there is only one place to configure if it makes sense?

"com.twitter.scalding._",
"com.twitter.scalding.ReplImplicits._",
"com.twitter.scalding.ReplImplicitContext._")

override def createInterpreter() {
super.createInterpreter()
addThunk {
intp.beQuietDuring {
addImports(
"com.twitter.scalding._",
"com.twitter.scalding.ReplImplicits._",
"com.twitter.scalding.ReplImplicitContext._")
addImports(shellImports: _*)

// interpret all files named ".scalding_repl" from the current directory up to the root
findAllUpPath(".scalding_repl")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.google.common.io.Files
* A runner for a Scala REPL providing functionality extensions specific to working with
* Scalding.
*/
object ScaldingShell extends MainGenericRunner {
trait BaseScaldingShell extends MainGenericRunner {

/** Customizable prompt. */
var prompt: () => String = { () => Console.BLUE + "\nscalding> " + Console.RESET }
Expand All @@ -48,6 +48,10 @@ object ScaldingShell extends MainGenericRunner {
*/
private val conf: Configuration = new Configuration()

protected def replImplicits: BaseReplImplicits = ReplImplicits

protected def scaldingREPLProvider: () => ILoop = { () => new ScaldingILoop }

/**
* The main entry point for executing the REPL.
*
Expand All @@ -60,11 +64,11 @@ object ScaldingShell extends MainGenericRunner {
*/
override def process(args: Array[String]): Boolean = {
// Get the mode (hdfs or local), and initialize the configuration
val (mode, jobArgs) = parseModeArgs(args)
val (cfg, mode) = parseModeArgs(args)

// Process command line arguments into a settings object, and use that to start the REPL.
// We ignore params we don't care about - hence error function is empty
val command = new GenericRunnerCommand(jobArgs.toList, _ => ())
val command = new GenericRunnerCommand(List[String](), _ => ())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we are changing the way this works. What's the story?


// inherit defaults for embedded interpretter (needed for running with SBT)
// (TypedPipe chosen arbitrarily, just needs to be something representative)
Expand All @@ -78,11 +82,16 @@ object ScaldingShell extends MainGenericRunner {
// Force the repl to be synchronous, so all cmds are executed in the same thread
command.settings.Yreplsync.value = true

scaldingREPL = Some(new ScaldingILoop)
ReplImplicits.mode = mode
scaldingREPL = Some(scaldingREPLProvider.apply())
replImplicits.mode = mode
replImplicits.customConfig = replImplicits.customConfig ++ (mode match {
case _: HadoopMode => cfg
case _ => Config.empty
})

// if in Hdfs mode, store the mode to enable switching between Local and Hdfs
mode match {
case m @ Hdfs(_, _) => ReplImplicits.storedHdfsMode = Some(m)
case m @ Hdfs(_, _) => replImplicits.storedHdfsMode = Some(m)
case _ => ()
}

Expand All @@ -102,9 +111,10 @@ object ScaldingShell extends MainGenericRunner {
* @param args from the command line.
* @return a Mode for the job (e.g. local, hdfs), and the non-hadoop params
*/
def parseModeArgs(args: Array[String]): (Mode, Array[String]) = {
def parseModeArgs(args: Array[String]): (Config, Mode) = {
val a = nonHadoopArgsFrom(args)
(Mode(Args(a), conf), a)
val mode = Mode(Args(a), conf)
(Config.defaultFrom(mode), mode)
}

/**
Expand Down Expand Up @@ -181,3 +191,5 @@ object ScaldingShell extends MainGenericRunner {
}
}
}

object ScaldingShell extends BaseScaldingShell