Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPL: Add toIterator (and related methods) #929

Merged
merged 29 commits into from
Jul 3, 2014
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b768ce4
mostly working toIterator
bholt Jun 30, 2014
c2ed7b3
refactor to use TypedSequenceFile (single field), which requires havi…
bholt Jul 1, 2014
6e365d4
fix tuple reading for Hadoop mode
Jul 1, 2014
9121d4a
Merge branch 'repl+execution' into repl+toiterator
Jul 2, 2014
9848c61
Merge branch 'repl+execution' into repl+toiterator
Jul 2, 2014
5e2801d
pattern match for head pipe with Converter
Jul 2, 2014
b886748
add TextLine.toIterator test
Jul 2, 2014
5cb3605
get rid of unnecessary implicit Manifest, TupleConverter, TupleSetters
Jul 2, 2014
906fd18
add tests for toIterator, 'tuple' one currently breaks it because of …
Jul 2, 2014
81f8088
snapshot: use MemorySink for Local mode, handle this case in toIterator
Jul 2, 2014
4962ac0
add implicit to enrich ValuePipe -> ShellPipe
Jul 2, 2014
d88284e
test that implicit snapshot on `toIterator` works in all the cases
Jul 2, 2014
2624481
add comment explaining TypedSequenceFile
Jul 2, 2014
ccebbb0
fixes and cleanup from @johnynek's feedback
Jul 2, 2014
37a6c09
document toIterator, toList, and dump
Jul 2, 2014
9fc9a53
working on making running tests in Local & Hadoop modes
Jul 3, 2014
b5ade0e
make ReplImplicit.{flowDef,mode} non-implicit, plumb implicit FlowDef…
bholt Jul 3, 2014
52fc649
different paths for local/hdfs modes
bholt Jul 3, 2014
0f16fa2
put implicit versions of FlowDef and Mode in a separate object so you…
bholt Jul 3, 2014
e442240
add imports to make './sbt scalding-repl/console' work
bholt Jul 3, 2014
64331ba
some cleanup, fix path suffix
Jul 3, 2014
26d9b69
add 'ValuePipe.toOption' instead of toIterator
Jul 3, 2014
de55cb3
merge develop (includes Execution improvements)
Jul 3, 2014
5aa2a12
move TypedSequenceFile to com.twitter.scalding.source
Jul 3, 2014
326e9ad
merge with 'develop' to pick up more ExecutionContext changes
Jul 3, 2014
ea6971b
make implicits 'defs' in ReplImplicitContext
Jul 3, 2014
0006f73
handle EmptyPipe
Jul 3, 2014
044e5ca
take 2 on ValuePipe iterator to be safe, throw error if more elements
Jul 3, 2014
dcf97e1
merge 'develop', conflicted with project/Build on my additions to 're…
Jul 3, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ object ScaldingBuild extends Build {
).settings(
name := "scalding-repl",
previousArtifact := None,
initialCommands in console := """
import com.twitter.scalding._
import com.twitter.scalding.ReplImplicits._
import com.twitter.scalding.ReplImplicitContext._
""",
libraryDependencies <++= (scalaVersion) { scalaVersion => Seq(
"org.scala-lang" % "jline" % scalaVersion,
"org.scala-lang" % "scala-compiler" % scalaVersion,
Expand Down
2 changes: 2 additions & 0 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ trait HadoopMode extends Mode {
override def openForRead(tap: Tap[_, _, _]) = {
val htap = tap.asInstanceOf[Tap[JobConf, _, _]]
val conf = new JobConf(jobConf)
// copy over Config defaults
Config.default.toMap.foreach{ case (k, v) => conf.set(k, v) }
val fp = new HadoopFlowProcess(conf)
htap.retrieveSourceFields(fp)
htap.sourceConfInit(fp, conf)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.twitter.scalding.source

import cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.SequenceFile

/**
* SequenceFile with explicit types. Useful for debugging flows using the Typed API.
* Not to be used for permanent storage: uses Kryo serialization which may not be
* consistent across JVM instances. Use Thrift sources instead.
*/
class TypedSequenceFile[T](path: String) extends SequenceFile(path, Fields.FIRST) with Mappable[T] with TypedSink[T] {
override def converter[U >: T] =
TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])
override def setter[U <: T] =
TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T])
}

object TypedSequenceFile {
def apply[T](path: String): TypedSequenceFile[T] = new TypedSequenceFile[T](path)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import scala.util.{ Failure, Success }
* Most of these conversions come from the [[com.twitter.scalding.Job]] class.
*/
object ReplImplicits extends FieldConversions {

/** Implicit flowDef for this Scalding shell session. */
implicit var flowDef: FlowDef = getEmptyFlowDef
var flowDef: FlowDef = getEmptyFlowDef
/** Defaults to running in local mode if no mode is specified. */
implicit var mode: Mode = com.twitter.scalding.Local(false)
var mode: Mode = com.twitter.scalding.Local(false)

/**
* Sets the flow definition in implicit scope to an empty flow definition.
Expand All @@ -53,8 +54,7 @@ object ReplImplicits extends FieldConversions {
*
* Automatically cleans up the flowDef to include only sources upstream from tails.
*/
def run(implicit flowDef: FlowDef): Option[JobStats] = {
import Dsl.flowDefToRichFlowDef
def run(implicit fd: FlowDef, md: Mode): Option[JobStats] = {

def config = {
val conf = Config.default
Expand All @@ -81,7 +81,7 @@ object ReplImplicits extends FieldConversions {
}

// TODO: This is not getting any UniqueID, so counters will not work with REPL
ExecutionContext.newContext(config, flowDef, mode).waitFor match {
ExecutionContext.newContext(config, fd, md).waitFor match {
case Success(stats) => Some(stats)
case Failure(e) =>
println("Flow execution failed!")
Expand Down Expand Up @@ -149,7 +149,7 @@ object ReplImplicits extends FieldConversions {
*/
implicit def iterableToPipe[T](
iterable: Iterable[T])(implicit setter: TupleSetter[T],
converter: TupleConverter[T]): Pipe = {
converter: TupleConverter[T], fd: FlowDef, md: Mode): Pipe = {
iterableToSource(iterable)(setter, converter).read
}

Expand All @@ -164,8 +164,8 @@ object ReplImplicits extends FieldConversions {
*/
implicit def iterableToRichPipe[T](
iterable: Iterable[T])(implicit setter: TupleSetter[T],
converter: TupleConverter[T]): RichPipe = {
RichPipe(iterableToPipe(iterable)(setter, converter))
converter: TupleConverter[T], fd: FlowDef, md: Mode): RichPipe = {
RichPipe(iterableToPipe(iterable)(setter, converter, fd, md))
}

/**
Expand All @@ -181,4 +181,22 @@ object ReplImplicits extends FieldConversions {
implicit def typedPipeToShellTypedPipe[T](pipe: TypedPipe[T]): ShellTypedPipe[T] =
new ShellTypedPipe[T](pipe)

/**
* Enrich ValuePipe for the shell
* (e.g. allows .toOption to be called on it)
*/
implicit def valuePipeToShellValuePipe[T](pipe: ValuePipe[T]): ShellValuePipe[T] =
new ShellValuePipe[T](pipe)

}

/**
* Implicit FlowDef and Mode, import in the REPL to have the global context implicitly
* used everywhere.
*/
object ReplImplicitContext {
/** Implicit flowDef for this Scalding shell session. */
implicit var fd = ReplImplicits.flowDef
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't these implicit def flowDef: FlowDef = ....

I like to minimize the vars. Can we just make the ones in ReplImplicit vars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, definitely didn't intend to make them "var".

/** Defaults to running in local mode if no mode is specified. */
implicit var md = ReplImplicits.mode
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ class ScaldingILoop

addThunk {
intp.beQuietDuring {
intp.addImports("com.twitter.scalding._", "com.twitter.scalding.ReplImplicits._")
intp.addImports(
"com.twitter.scalding._",
"com.twitter.scalding.ReplImplicits._",
"com.twitter.scalding.ReplImplicitContext._")
}
}
}
97 changes: 74 additions & 23 deletions scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,54 +15,105 @@

package com.twitter.scalding

import cascading.flow.Flow
import cascading.flow.FlowDef
import cascading.pipe.Pipe
import java.util.UUID
import com.twitter.scalding.ReplImplicits._

import cascading.flow.FlowDef
import cascading.tuple.Fields
import com.twitter.scalding.typed._
import scala.collection.JavaConverters._
import com.twitter.scalding.source.TypedSequenceFile

/**
* Enrichment on TypedPipes allowing them to be run locally, independent of the overall flow.
* @param pipe to wrap
*/
class ShellTypedPipe[T](pipe: TypedPipe[T]) {
import Dsl.flowDefToRichFlowDef
import ReplImplicits._

/**
* Shorthand for .write(dest).run
*/
def save(dest: TypedSink[T] with Mappable[T]): TypedPipe[T] = {
def save(dest: TypedSink[T] with Mappable[T])(implicit fd: FlowDef, md: Mode): TypedPipe[T] = {

val p = pipe.toPipe(dest.sinkFields)(dest.setter)

val localFlow = flowDef.onlyUpstreamFrom(p)
dest.writeFrom(p)(localFlow, mode)
run(localFlow)
val localFlow = fd.onlyUpstreamFrom(p)
dest.writeFrom(p)(localFlow, md)
run(localFlow, md)

TypedPipe.from(dest)
TypedPipe.from(dest)(fd, md)
}

/**
* Save snapshot of a typed pipe to a temporary sequence file.
* @return A TypedPipe to a new Source, reading from the sequence file.
*/
def snapshot: TypedPipe[T] = {

// come up with unique temporary filename
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID() + ".seq"
val dest = SequenceFile(tmpSeq, 'record)
val p = pipe.toPipe('record)

val localFlow = flowDef.onlyUpstreamFrom(p)
dest.writeFrom(p)(localFlow, mode)
run(localFlow)
def snapshot(implicit fd: FlowDef, md: Mode): TypedPipe[T] = {
val p = pipe.toPipe(0)
val localFlow = fd.onlyUpstreamFrom(p)
md match {
case _: CascadingLocal => // Local or Test mode
val dest = new MemorySink[T]
dest.writeFrom(p)(localFlow, md)
run(localFlow, md)
TypedPipe.from(dest.readResults)(fd, md)
case _: HadoopMode =>
// come up with unique temporary filename
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = "/tmp/scalding-repl/snapshot-" + UUID.randomUUID + ".seq"
val dest = TypedSequenceFile[T](tmpSeq)
dest.writeFrom(p)(localFlow, md)
run(localFlow, md)
TypedPipe.from(dest)(fd, md)
}
}

TypedPipe.fromSingleField[T](SequenceFile(tmpSeq))
/**
* Create a (local) iterator over the pipe. For non-trivial pipes (anything except
* a head-pipe reading from a source), a snapshot is automatically created and
* iterated over.
* @return local iterator
*/
def toIterator(implicit fd: FlowDef, md: Mode): Iterator[T] = pipe match {
// if this is just a Converter on a head pipe
// (true for the first pipe on a source, e.g. a snapshot pipe)
case TypedPipeInst(p, fields, Converter(conv)) if p.getPrevious.isEmpty =>
val srcs = fd.getSources
if (srcs.containsKey(p.getName)) {
val tap = srcs.get(p.getName)
md.openForRead(tap).asScala.map(tup => conv(tup.selectEntry(fields)))
} else {
sys.error("Invalid head: pipe has no previous, but there is no registered source.")
}
// if it's already just a wrapped iterable (MemorySink), just return it
case IterablePipe(iter, _, _) => iter.toIterator
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the EmptyPipe case?

// otherwise, snapshot the pipe and get an iterator on that
case _ =>
pipe.snapshot.toIterator
}

// TODO: add back `toList` based on `snapshot` this time
/**
* Create a list from the pipe in memory. Uses `ShellTypedPipe.toIterator`.
* Warning: user must ensure that the results will actually fit in memory.
*/
def toList(implicit fd: FlowDef, md: Mode): List[T] = toIterator.toList

// TODO: add `dump` to view contents without reading into memory
/**
* Print the contents of a pipe to stdout. Uses `ShellTypedPipe.toIterator`.
*/
def dump(implicit fd: FlowDef, md: Mode): Unit = toIterator.foreach(println(_))

}

class ShellValuePipe[T](vp: ValuePipe[T]) {
import ReplImplicits.typedPipeToShellTypedPipe
def toOption(implicit fd: FlowDef, md: Mode): Option[T] = vp match {
case _: EmptyValue => None
case LiteralValue(v) => Some(v)
case ComputedValue(tp) => tp.snapshot.toList match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tp.snapshot.toIterator.take(2).toList

is is a bit safer, in the case there is some bug. It will still error, but it won't blow up the memory if there are 2 or more items in the list.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why not tp.toIterator.take(2).toList, why should we explicitly cass snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an error to have more than one value in a ValuePipe, right? In the match, should I do a sys.error for case _?

case Nil => None
case v :: Nil => Some(v)
}
}
}
Loading