Skip to content

Commit

Permalink
Some Execution fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
johnynek committed Aug 11, 2014
1 parent 4d006b8 commit 15a97e4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
36 changes: 34 additions & 2 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ import cascading.flow.{ FlowDef, Flow }
sealed trait Execution[+T] {
import Execution.{ FactoryExecution, FlatMapped, MapCounters, Mapped, OnComplete, RecoverWith, Zipped }

/**
* Scala uses the filter method in for syntax for pattern matches that can fail.
* If this filter is false, the result of run will be an exception in the future
*/
def filter(pred: T => Boolean): Execution[T] =
flatMap {
case good if pred(good) => Execution.from(good)
case failed => Execution.from(sys.error("Filter failed on: " + failed.toString))
}

/**
* First run this Execution, then move to the result
* of the function
Expand Down Expand Up @@ -143,6 +153,13 @@ sealed trait Execution[+T] {
scala.concurrent.duration.Duration.Inf))
}

/**
* This is here to silence warnings in for comprehensions, but is
* identical to .filter.
*
* Users should never directly call this method, call .filter
*/
def withFilter(p: T => Boolean): Execution[T] = filter(p)
/*
* run this and that in parallel, without any dependency. This will
* be done in a single cascading flow if possible.
Expand Down Expand Up @@ -231,15 +248,24 @@ object Execution {
*/
override def zip[U](that: Execution[U]): Execution[(T, U)] =
that match {
/*
* There is an issue in cascading where
* two Pipes with the same name have to be referentially equivalent
* since Pipes are immutable, there is no way to change the name.
*
* This merging parallelism only works if the names of the
* sources are distinct. As this code is designed
* now, by the time you have the flowDef, it is too
* late to merge.
*
case FlowDefExecution(result2) =>
FlowDefExecution({ (conf, m) =>
val (fd1, fn1) = result(conf, m)
val (fd2, fn2) = result2(conf, m)

val merged = fd1.copy
merged.mergeFrom(fd2)
(merged, { (js: JobStats) => fn1(js).zip(fn2(js)) })
})
}) */
case _ => super.zip(that)
}
}
Expand Down Expand Up @@ -305,6 +331,12 @@ object Execution {
fn: (Config, Mode) => ((FlowDef, JobStats => Future[T]))): Execution[T] =
FlowDefExecution(fn)

/**
* Use this to read the configuration, which may contain Args or options
* which describe input on which to run
*/
def getConfig: Execution[Config] = factory { case (conf, _) => from(conf) }

/**
* Use this to use counters/stats with Execution. You do this:
* Execution.withId { implicit uid =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,18 @@ import com.twitter.scalding.SequenceFile
* Not to be used for permanent storage: uses Kryo serialization which may not be
* consistent across JVM instances. Use Thrift sources instead.
*/
class TypedSequenceFile[T](path: String) extends SequenceFile(path, Fields.FIRST) with Mappable[T] with TypedSink[T] {
class TypedSequenceFile[T](val path: String) extends SequenceFile(path, Fields.FIRST) with Mappable[T] with TypedSink[T] {
override def converter[U >: T] =
TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])
override def setter[U <: T] =
TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T])
override def toString: String = "TypedSequenceFile(%s)".format(path)
override def equals(that: Any): Boolean = that match {
case null => false
case t: TypedSequenceFile[_] => t.p == p // horribly named fields in the SequenceFile case class
case _ => false
}
override def hashCode = path.hashCode
}

object TypedSequenceFile {
Expand Down

0 comments on commit 15a97e4

Please sign in to comment.