From 90e573f8185fb2ab9e227739abdfec976c16f46e Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Sat, 27 Jun 2015 09:36:26 -0700 Subject: [PATCH 1/7] make some repl components extensible --- .../com/twitter/scalding/ReplImplicits.scala | 11 ++++--- .../com/twitter/scalding/ScaldingILoop.scala | 17 +++++++--- .../com/twitter/scalding/ScaldingShell.scala | 32 +++++++++++++------ 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 6bdb89f9d5..726c0d3817 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -25,7 +25,7 @@ import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext * Object containing various implicit conversions required to create Scalding flows in the REPL. * Most of these conversions come from the [[com.twitter.scalding.Job]] class. */ -object ReplImplicits extends FieldConversions { +trait BaseReplImplicits extends FieldConversions { /** required for switching to hdfs local mode */ private val mr1Key = "mapred.job.tracker" @@ -40,7 +40,7 @@ object ReplImplicits extends FieldConversions { * If the repl is started in Hdfs mode, this field is used to preserve the settings * when switching Modes. */ - private[scalding] var storedHdfsMode: Option[Hdfs] = None + var storedHdfsMode: Option[Hdfs] = None /** Switch to Local mode */ def useLocalMode() { mode = Local(false) } @@ -77,14 +77,15 @@ object ReplImplicits extends FieldConversions { /* Using getter/setter here lets us get the correct defaults set by the mode (and read from command-line, etc) while still allowing the user to customize it */ def config: Config = Config.defaultFrom(mode) ++ customConfig - def config_=(c: Config) { customConfig = c } + + protected def shell: BaseScaldingShell = ScaldingShell /** Create config for execution. Tacks on a new jar for each execution. */ private[scalding] def executionConfig: Config = { // Create a jar to hold compiled code for this REPL session in addition to // "tempjars" which can be passed in from the command line, allowing code // in the repl to be distributed for the Hadoop job to run. - val replCodeJar = ScaldingShell.createReplCodeJar() + val replCodeJar: Option[java.io.File] = shell.createReplCodeJar() val tmpJarsConfig: Map[String, String] = replCodeJar match { case Some(jar) => @@ -245,6 +246,8 @@ object ReplImplicits extends FieldConversions { } +object ReplImplicits extends BaseReplImplicits + /** * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly * used everywhere. diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala index 7e5c6bcf08..ddb2e6a98f 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala @@ -54,7 +54,12 @@ class ScaldingILoop * * @return a prompt string to use for this REPL. */ - override def prompt: String = ScaldingShell.prompt() + override def prompt: String = shell.prompt() + + /** + * Which shell instance to use. Override this for customized shells. + */ + protected def shell: BaseScaldingShell = ScaldingShell private[this] def addImports(ids: String*): IR.Result = if (ids.isEmpty) IR.Success @@ -77,14 +82,16 @@ class ScaldingILoop */ override def commands: List[LoopCommand] = super.commands ++ scaldingCommands + protected def shellImports: List[String] = List( + "com.twitter.scalding._", + "com.twitter.scalding.ReplImplicits._", + "com.twitter.scalding.ReplImplicitContext._") + override def createInterpreter() { super.createInterpreter() addThunk { intp.beQuietDuring { - addImports( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._") + addImports(shellImports: _*) // interpret all files named ".scalding_repl" from the current directory up to the root findAllUpPath(".scalding_repl") diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index e7f0dc03ca..04c63a7dc7 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -33,7 +33,7 @@ import com.google.common.io.Files * A runner for a Scala REPL providing functionality extensions specific to working with * Scalding. */ -object ScaldingShell extends MainGenericRunner { +trait BaseScaldingShell extends MainGenericRunner { /** Customizable prompt. */ var prompt: () => String = { () => Console.BLUE + "\nscalding> " + Console.RESET } @@ -48,6 +48,10 @@ object ScaldingShell extends MainGenericRunner { */ private val conf: Configuration = new Configuration() + protected def replImplicits: BaseReplImplicits = ReplImplicits + + protected def scaldingREPLProvider: () => ILoop = { () => new ScaldingILoop } + /** * The main entry point for executing the REPL. * @@ -60,29 +64,34 @@ object ScaldingShell extends MainGenericRunner { */ override def process(args: Array[String]): Boolean = { // Get the mode (hdfs or local), and initialize the configuration - val (mode, jobArgs) = parseModeArgs(args) + val (cfg, mode) = parseModeArgs(args) // Process command line arguments into a settings object, and use that to start the REPL. // We ignore params we don't care about - hence error function is empty - val command = new GenericRunnerCommand(jobArgs.toList, _ => ()) + val command = new GenericRunnerCommand(List[String](), _ => ()) // inherit defaults for embedded interpretter (needed for running with SBT) // (TypedPipe chosen arbitrarily, just needs to be something representative) command.settings.embeddedDefaults[TypedPipe[String]] // if running from the assembly, need to explicitly tell it to use java classpath - if (args.contains("--repl")) command.settings.usejavacp.value = true + command.settings.usejavacp.value = true - command.settings.classpath.append(System.getProperty("java.class.path")) + if (args.contains("--repl")) command.settings.classpath.append(System.getProperty("java.class.path")) // Force the repl to be synchronous, so all cmds are executed in the same thread command.settings.Yreplsync.value = true - scaldingREPL = Some(new ScaldingILoop) - ReplImplicits.mode = mode + scaldingREPL = Some(scaldingREPLProvider.apply()) + replImplicits.mode = mode + replImplicits.customConfig = replImplicits.customConfig ++ (mode match { + case _: HadoopMode => cfg + case _ => Config.empty + }) + // if in Hdfs mode, store the mode to enable switching between Local and Hdfs mode match { - case m @ Hdfs(_, _) => ReplImplicits.storedHdfsMode = Some(m) + case m @ Hdfs(_, _) => replImplicits.storedHdfsMode = Some(m) case _ => () } @@ -102,9 +111,10 @@ object ScaldingShell extends MainGenericRunner { * @param args from the command line. * @return a Mode for the job (e.g. local, hdfs), and the non-hadoop params */ - def parseModeArgs(args: Array[String]): (Mode, Array[String]) = { + def parseModeArgs(args: Array[String]): (Config, Mode) = { val a = nonHadoopArgsFrom(args) - (Mode(Args(a), conf), a) + val mode = Mode(Args(a), conf) + (Config.defaultFrom(mode), mode) } /** @@ -181,3 +191,5 @@ object ScaldingShell extends MainGenericRunner { } } } + +object ScaldingShell extends BaseScaldingShell From c004fabd252a3aac279603e34e5ded7ff62d15d0 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Sat, 27 Jun 2015 10:31:43 -0700 Subject: [PATCH 2/7] undo some changes --- .../src/main/scala/com/twitter/scalding/ReplImplicits.scala | 2 +- .../src/main/scala/com/twitter/scalding/ScaldingShell.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 726c0d3817..ddfc3f9f88 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -40,7 +40,7 @@ trait BaseReplImplicits extends FieldConversions { * If the repl is started in Hdfs mode, this field is used to preserve the settings * when switching Modes. */ - var storedHdfsMode: Option[Hdfs] = None + private[scalding] var storedHdfsMode: Option[Hdfs] = None /** Switch to Local mode */ def useLocalMode() { mode = Local(false) } diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index 04c63a7dc7..fc769b61dd 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -75,9 +75,9 @@ trait BaseScaldingShell extends MainGenericRunner { command.settings.embeddedDefaults[TypedPipe[String]] // if running from the assembly, need to explicitly tell it to use java classpath - command.settings.usejavacp.value = true + if (args.contains("--repl")) command.settings.usejavacp.value = true - if (args.contains("--repl")) command.settings.classpath.append(System.getProperty("java.class.path")) + command.settings.classpath.append(System.getProperty("java.class.path")) // Force the repl to be synchronous, so all cmds are executed in the same thread command.settings.Yreplsync.value = true From b6178b8f80832fb6f20825ca0ed8f3bc03920554 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Mon, 29 Jun 2015 17:48:26 -0700 Subject: [PATCH 3/7] separate repl implicits and repl state --- .../com/twitter/scalding/ReplImplicits.scala | 25 +++++++++++-------- .../com/twitter/scalding/ScaldingILoop.scala | 7 +----- .../com/twitter/scalding/ScaldingShell.scala | 14 ++++++++--- .../com/twitter/scalding/ShellPipe.scala | 4 +-- .../scala/com/twitter/scalding/ReplTest.scala | 7 +++--- 5 files changed, 32 insertions(+), 25 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index ddfc3f9f88..4a24e2f3c7 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -25,7 +25,7 @@ import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext * Object containing various implicit conversions required to create Scalding flows in the REPL. * Most of these conversions come from the [[com.twitter.scalding.Job]] class. */ -trait BaseReplImplicits extends FieldConversions { +trait BaseReplState { /** required for switching to hdfs local mode */ private val mr1Key = "mapred.job.tracker" @@ -145,6 +145,9 @@ trait BaseReplImplicits extends FieldConversions { */ def execute[T](execution: Execution[T]): T = execution.waitFor(executionConfig, mode).get +} + +object ReplImplicits extends FieldConversions { /** * Converts a Cascading Pipe to a Scalding RichPipe. This method permits implicit conversions from @@ -171,7 +174,8 @@ trait BaseReplImplicits extends FieldConversions { * @param source to convert to a RichPipe. * @return a RichPipe wrapping the result of reading the specified Source. */ - implicit def sourceToRichPipe(source: Source): RichPipe = RichPipe(source.read(flowDef, mode)) + implicit def sourceToRichPipe(source: Source)(implicit flowDef: FlowDef, mode: Mode): RichPipe = + RichPipe(source.read(flowDef, mode)) /** * Converts a Source to a Pipe. This method permits implicit conversions from Source to Pipe. @@ -179,7 +183,8 @@ trait BaseReplImplicits extends FieldConversions { * @param source to convert to a Pipe. * @return a Pipe that is the result of reading the specified Source. */ - implicit def sourceToPipe(source: Source): Pipe = source.read(flowDef, mode) + implicit def sourceToPipe(source: Source)(implicit flowDef: FlowDef, mode: Mode): Pipe = + source.read(flowDef, mode) /** * Converts an iterable into a Source with index (int-based) fields. @@ -205,7 +210,7 @@ trait BaseReplImplicits extends FieldConversions { */ implicit def iterableToPipe[T]( iterable: Iterable[T])(implicit setter: TupleSetter[T], - converter: TupleConverter[T], fd: FlowDef, md: Mode): Pipe = { + converter: TupleConverter[T], flowDef: FlowDef, mode: Mode): Pipe = { iterableToSource(iterable)(setter, converter).read } @@ -220,8 +225,8 @@ trait BaseReplImplicits extends FieldConversions { */ implicit def iterableToRichPipe[T]( iterable: Iterable[T])(implicit setter: TupleSetter[T], - converter: TupleConverter[T], fd: FlowDef, md: Mode): RichPipe = { - RichPipe(iterableToPipe(iterable)(setter, converter, fd, md)) + converter: TupleConverter[T], flowDef: FlowDef, mode: Mode): RichPipe = { + RichPipe(iterableToPipe(iterable)(setter, converter, flowDef, mode)) } /** @@ -246,7 +251,7 @@ trait BaseReplImplicits extends FieldConversions { } -object ReplImplicits extends BaseReplImplicits +object ReplState extends BaseReplState /** * Implicit FlowDef and Mode, import in the REPL to have the global context implicitly @@ -256,8 +261,8 @@ object ReplImplicitContext { /** Implicit execution context for using the Execution monad */ implicit val executionContext = ConcurrentExecutionContext.global /** Implicit flowDef for this Scalding shell session. */ - implicit def flowDefImpl = ReplImplicits.flowDef + implicit def flowDefImpl = ReplState.flowDef /** Defaults to running in local mode if no mode is specified. */ - implicit def modeImpl = ReplImplicits.mode - implicit def configImpl = ReplImplicits.config + implicit def modeImpl = ReplState.mode + implicit def configImpl = ReplState.config } diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala index ddb2e6a98f..c6b796dc97 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala @@ -82,16 +82,11 @@ class ScaldingILoop */ override def commands: List[LoopCommand] = super.commands ++ scaldingCommands - protected def shellImports: List[String] = List( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._") - override def createInterpreter() { super.createInterpreter() addThunk { intp.beQuietDuring { - addImports(shellImports: _*) + addImports(shell.imports: _*) // interpret all files named ".scalding_repl" from the current directory up to the root findAllUpPath(".scalding_repl") diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index fc769b61dd..3c539d2c2a 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -48,10 +48,16 @@ trait BaseScaldingShell extends MainGenericRunner { */ private val conf: Configuration = new Configuration() - protected def replImplicits: BaseReplImplicits = ReplImplicits + protected def replState: BaseReplState = ReplState protected def scaldingREPLProvider: () => ILoop = { () => new ScaldingILoop } + def imports: List[String] = List( + "com.twitter.scalding._", + "com.twitter.scalding.ReplImplicits._", + "com.twitter.scalding.ReplImplicitContext._", + "com.twitter.scalding.ReplState._") + /** * The main entry point for executing the REPL. * @@ -83,15 +89,15 @@ trait BaseScaldingShell extends MainGenericRunner { command.settings.Yreplsync.value = true scaldingREPL = Some(scaldingREPLProvider.apply()) - replImplicits.mode = mode - replImplicits.customConfig = replImplicits.customConfig ++ (mode match { + replState.mode = mode + replState.customConfig = replState.customConfig ++ (mode match { case _: HadoopMode => cfg case _ => Config.empty }) // if in Hdfs mode, store the mode to enable switching between Local and Hdfs mode match { - case m @ Hdfs(_, _) => replImplicits.storedHdfsMode = Some(m) + case m @ Hdfs(_, _) => replState.storedHdfsMode = Some(m) case _ => () } diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index f223625119..a1693f7a20 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -20,7 +20,7 @@ package com.twitter.scalding * @param pipe to wrap */ class ShellTypedPipe[T](pipe: TypedPipe[T]) { - import ReplImplicits.execute + import ReplState.execute /** * Shorthand for .write(dest).run @@ -57,7 +57,7 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { } class ShellValuePipe[T](vp: ValuePipe[T]) { - import ReplImplicits.execute + import ReplState.execute // This might throw if the value is empty def dump: Unit = println(toOption) def get: T = execute(vp.getExecution) diff --git a/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala b/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala index 36a0a61182..674a7bc3ea 100644 --- a/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala +++ b/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.JobConf class ReplTest extends WordSpec { import ReplImplicits._ import ReplImplicitContext._ + import ReplState._ val tutorialData = "../tutorial/data" val helloPath = tutorialData + "/hello.txt" @@ -146,7 +147,7 @@ class ReplTest extends WordSpec { val out = TypedTsv[String](testPath + "words.tsv") hello.write(out) - ReplImplicits.run + ReplState.run val words = out.toIterator.toSet assert(words === Set("hello", "world", "goodbye")) @@ -177,12 +178,12 @@ class ReplTest extends WordSpec { } "REPL in Local mode" should { - ReplImplicits.mode = Local(strictSources = true) + mode = Local(strictSources = true) test() } "REPL in Hadoop mode" should { - ReplImplicits.mode = Hdfs(strict = true, new JobConf) + mode = Hdfs(strict = true, new JobConf) test() } } From 09982377cd32a7c41f90633002c64a2c038f05cf Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Tue, 30 Jun 2015 13:12:28 -0700 Subject: [PATCH 4/7] move imports, prompt to ILoop --- .../com/twitter/scalding/ScaldingILoop.scala | 15 ++++++++------- .../com/twitter/scalding/ScaldingShell.scala | 9 --------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala index c6b796dc97..a2fe6b6049 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingILoop.scala @@ -54,12 +54,7 @@ class ScaldingILoop * * @return a prompt string to use for this REPL. */ - override def prompt: String = shell.prompt() - - /** - * Which shell instance to use. Override this for customized shells. - */ - protected def shell: BaseScaldingShell = ScaldingShell + override def prompt: String = Console.BLUE + "\nscalding> " + Console.RESET private[this] def addImports(ids: String*): IR.Result = if (ids.isEmpty) IR.Success @@ -82,11 +77,17 @@ class ScaldingILoop */ override def commands: List[LoopCommand] = super.commands ++ scaldingCommands + protected def imports: List[String] = List( + "com.twitter.scalding._", + "com.twitter.scalding.ReplImplicits._", + "com.twitter.scalding.ReplImplicitContext._", + "com.twitter.scalding.ReplState._") + override def createInterpreter() { super.createInterpreter() addThunk { intp.beQuietDuring { - addImports(shell.imports: _*) + addImports(imports: _*) // interpret all files named ".scalding_repl" from the current directory up to the root findAllUpPath(".scalding_repl") diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index 3c539d2c2a..71ce9d7113 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -35,9 +35,6 @@ import com.google.common.io.Files */ trait BaseScaldingShell extends MainGenericRunner { - /** Customizable prompt. */ - var prompt: () => String = { () => Console.BLUE + "\nscalding> " + Console.RESET } - /** * An instance of the Scala REPL the user will interact with. */ @@ -52,12 +49,6 @@ trait BaseScaldingShell extends MainGenericRunner { protected def scaldingREPLProvider: () => ILoop = { () => new ScaldingILoop } - def imports: List[String] = List( - "com.twitter.scalding._", - "com.twitter.scalding.ReplImplicits._", - "com.twitter.scalding.ReplImplicitContext._", - "com.twitter.scalding.ReplState._") - /** * The main entry point for executing the REPL. * From d904f8460e393aca2936f801933058618e5f68fa Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Tue, 30 Jun 2015 13:22:59 -0700 Subject: [PATCH 5/7] remove unneeded repl implicit --- .../main/scala/com/twitter/scalding/ReplImplicits.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 4a24e2f3c7..229037061f 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -177,15 +177,6 @@ object ReplImplicits extends FieldConversions { implicit def sourceToRichPipe(source: Source)(implicit flowDef: FlowDef, mode: Mode): RichPipe = RichPipe(source.read(flowDef, mode)) - /** - * Converts a Source to a Pipe. This method permits implicit conversions from Source to Pipe. - * - * @param source to convert to a Pipe. - * @return a Pipe that is the result of reading the specified Source. - */ - implicit def sourceToPipe(source: Source)(implicit flowDef: FlowDef, mode: Mode): Pipe = - source.read(flowDef, mode) - /** * Converts an iterable into a Source with index (int-based) fields. * From 5d0cf21446126c2fd7f8bacb256070140436fa92 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Tue, 30 Jun 2015 15:56:57 -0700 Subject: [PATCH 6/7] fix cmd line args --- .../scala/com/twitter/scalding/ScaldingShell.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala index 71ce9d7113..269736bfae 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ScaldingShell.scala @@ -29,6 +29,8 @@ import scala.tools.nsc.io.VirtualDirectory import com.google.common.io.Files +case class ShellArgs(cfg: Config, mode: Mode, cmdArgs: List[String]) + /** * A runner for a Scala REPL providing functionality extensions specific to working with * Scalding. @@ -61,11 +63,11 @@ trait BaseScaldingShell extends MainGenericRunner { */ override def process(args: Array[String]): Boolean = { // Get the mode (hdfs or local), and initialize the configuration - val (cfg, mode) = parseModeArgs(args) + val ShellArgs(cfg, mode, cmdArgs) = parseModeArgs(args) // Process command line arguments into a settings object, and use that to start the REPL. // We ignore params we don't care about - hence error function is empty - val command = new GenericRunnerCommand(List[String](), _ => ()) + val command = new GenericRunnerCommand(cmdArgs, _ => ()) // inherit defaults for embedded interpretter (needed for running with SBT) // (TypedPipe chosen arbitrarily, just needs to be something representative) @@ -106,12 +108,12 @@ trait BaseScaldingShell extends MainGenericRunner { * and returns all the non-hadoop arguments. * * @param args from the command line. - * @return a Mode for the job (e.g. local, hdfs), and the non-hadoop params + * @return a Mode for the job (e.g. local, hdfs), config and the non-hadoop params */ - def parseModeArgs(args: Array[String]): (Config, Mode) = { + def parseModeArgs(args: Array[String]): ShellArgs = { val a = nonHadoopArgsFrom(args) val mode = Mode(Args(a), conf) - (Config.defaultFrom(mode), mode) + ShellArgs(Config.defaultFrom(mode), mode, a.toList) } /** From 7b23b22086ad611cf2f3209d99e08a007a90f7a5 Mon Sep 17 00:00:00 2001 From: Ruban Monu Date: Tue, 30 Jun 2015 16:57:18 -0700 Subject: [PATCH 7/7] make shellpipe take in implicit replstate --- .../scala/com/twitter/scalding/ReplImplicits.scala | 13 ++++++++----- .../main/scala/com/twitter/scalding/ShellPipe.scala | 8 ++++---- .../test/scala/com/twitter/scalding/ReplTest.scala | 4 +++- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala index 229037061f..2d00f3c22a 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ReplImplicits.scala @@ -224,21 +224,22 @@ object ReplImplicits extends FieldConversions { * Convert KeyedListLike to enriched ShellTypedPipe * (e.g. allows .snapshot to be called on Grouped, CoGrouped, etc) */ - implicit def keyedListLikeToShellTypedPipe[K, V, T[K, +V] <: KeyedListLike[K, V, T]](kll: KeyedListLike[K, V, T]) = new ShellTypedPipe(kll.toTypedPipe) + implicit def keyedListLikeToShellTypedPipe[K, V, T[K, +V] <: KeyedListLike[K, V, T]](kll: KeyedListLike[K, V, T])(implicit state: BaseReplState) = + new ShellTypedPipe(kll.toTypedPipe)(state) /** * Enrich TypedPipe for the shell * (e.g. allows .snapshot to be called on it) */ - implicit def typedPipeToShellTypedPipe[T](pipe: TypedPipe[T]): ShellTypedPipe[T] = - new ShellTypedPipe[T](pipe) + implicit def typedPipeToShellTypedPipe[T](pipe: TypedPipe[T])(implicit state: BaseReplState): ShellTypedPipe[T] = + new ShellTypedPipe[T](pipe)(state) /** * Enrich ValuePipe for the shell * (e.g. allows .toOption to be called on it) */ - implicit def valuePipeToShellValuePipe[T](pipe: ValuePipe[T]): ShellValuePipe[T] = - new ShellValuePipe[T](pipe) + implicit def valuePipeToShellValuePipe[T](pipe: ValuePipe[T])(implicit state: BaseReplState): ShellValuePipe[T] = + new ShellValuePipe[T](pipe)(state) } @@ -251,6 +252,8 @@ object ReplState extends BaseReplState object ReplImplicitContext { /** Implicit execution context for using the Execution monad */ implicit val executionContext = ConcurrentExecutionContext.global + /** Implicit repl state used for ShellPipes */ + implicit def stateImpl = ReplState /** Implicit flowDef for this Scalding shell session. */ implicit def flowDefImpl = ReplState.flowDef /** Defaults to running in local mode if no mode is specified. */ diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index a1693f7a20..2db40cad90 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -19,8 +19,8 @@ package com.twitter.scalding * Enrichment on TypedPipes allowing them to be run locally, independent of the overall flow. * @param pipe to wrap */ -class ShellTypedPipe[T](pipe: TypedPipe[T]) { - import ReplState.execute +class ShellTypedPipe[T](pipe: TypedPipe[T])(implicit state: BaseReplState) { + import state.execute /** * Shorthand for .write(dest).run @@ -56,8 +56,8 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { def dump: Unit = toIterator.foreach(println(_)) } -class ShellValuePipe[T](vp: ValuePipe[T]) { - import ReplState.execute +class ShellValuePipe[T](vp: ValuePipe[T])(implicit state: BaseReplState) { + import state.execute // This might throw if the value is empty def dump: Unit = println(toOption) def get: T = execute(vp.getExecution) diff --git a/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala b/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala index 674a7bc3ea..f041e8f4df 100644 --- a/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala +++ b/scalding-repl/src/test/scala/com/twitter/scalding/ReplTest.scala @@ -23,7 +23,9 @@ import org.apache.hadoop.mapred.JobConf class ReplTest extends WordSpec { import ReplImplicits._ import ReplImplicitContext._ - import ReplState._ + + val state = implicitly[BaseReplState] + import state._ val tutorialData = "../tutorial/data" val helloPath = tutorialData + "/hello.txt"