diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index a7903fcd44..f3e1983598 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -46,6 +46,16 @@ import cascading.flow.{ FlowDef, Flow } sealed trait Execution[+T] { import Execution.{ FactoryExecution, FlatMapped, MapCounters, Mapped, OnComplete, RecoverWith, Zipped } + /** + * Scala uses the filter method in for syntax for pattern matches that can fail. + * If this filter is false, the result of run will be an exception in the future + */ + def filter(pred: T => Boolean): Execution[T] = + flatMap { + case good if pred(good) => Execution.from(good) + case failed => Execution.from(sys.error("Filter failed on: " + failed.toString)) + } + /** * First run this Execution, then move to the result * of the function @@ -143,6 +153,13 @@ sealed trait Execution[+T] { scala.concurrent.duration.Duration.Inf)) } + /** + * This is here to silence warnings in for comprehensions, but is + * identical to .filter. + * + * Users should never directly call this method, call .filter + */ + def withFilter(p: T => Boolean): Execution[T] = filter(p) /* * run this and that in parallel, without any dependency. This will * be done in a single cascading flow if possible. @@ -231,15 +248,24 @@ object Execution { */ override def zip[U](that: Execution[U]): Execution[(T, U)] = that match { + /* + * There is an issue in cascading where + * two Pipes with the same name have to be referentially equivalent + * since Pipes are immutable, there is no way to change the name. + * + * This merging parallelism only works if the names of the + * sources are distinct. As this code is designed + * now, by the time you have the flowDef, it is too + * late to merge. + * case FlowDefExecution(result2) => FlowDefExecution({ (conf, m) => val (fd1, fn1) = result(conf, m) val (fd2, fn2) = result2(conf, m) - val merged = fd1.copy merged.mergeFrom(fd2) (merged, { (js: JobStats) => fn1(js).zip(fn2(js)) }) - }) + }) */ case _ => super.zip(that) } } @@ -305,6 +331,12 @@ object Execution { fn: (Config, Mode) => ((FlowDef, JobStats => Future[T]))): Execution[T] = FlowDefExecution(fn) + /** + * Use this to read the configuration, which may contain Args or options + * which describe input on which to run + */ + def getConfig: Execution[Config] = factory { case (conf, _) => from(conf) } + /** * Use this to use counters/stats with Execution. You do this: * Execution.withId { implicit uid => diff --git a/scalding-core/src/main/scala/com/twitter/scalding/source/TypedSequenceFile.scala b/scalding-core/src/main/scala/com/twitter/scalding/source/TypedSequenceFile.scala index 206f6b9599..a31eb1bc82 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/source/TypedSequenceFile.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/source/TypedSequenceFile.scala @@ -25,11 +25,18 @@ import com.twitter.scalding.SequenceFile * Not to be used for permanent storage: uses Kryo serialization which may not be * consistent across JVM instances. Use Thrift sources instead. */ -class TypedSequenceFile[T](path: String) extends SequenceFile(path, Fields.FIRST) with Mappable[T] with TypedSink[T] { +class TypedSequenceFile[T](val path: String) extends SequenceFile(path, Fields.FIRST) with Mappable[T] with TypedSink[T] { override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T]) override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T]) + override def toString: String = "TypedSequenceFile(%s)".format(path) + override def equals(that: Any): Boolean = that match { + case null => false + case t: TypedSequenceFile[_] => t.p == p // horribly named fields in the SequenceFile case class + case _ => false + } + override def hashCode = path.hashCode } object TypedSequenceFile {