diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/ValuePipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/ValuePipe.scala index be829ecb09..91ea53b8d9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/ValuePipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/ValuePipe.scala @@ -47,8 +47,28 @@ sealed trait ValuePipe[+T] extends java.io.Serializable { def map[U](fn: T => U): ValuePipe[U] def filter(fn: T => Boolean): ValuePipe[T] + /** + * Identical to toOptionExecution.map(_.get) + * The result will be an exception if there is no value. + * The name here follows the convention of adding + * Execution to the name so in the repl in is removed + */ + def getExecution: Execution[T] = toOptionExecution.map(_.get) + /** + * Like the above, but with a lazy parameter that is evaluated + * if the value pipe is empty + * The name here follows the convention of adding + * Execution to the name so in the repl in is removed + */ + def getOrElseExecution[U >: T](t: => U): Execution[U] = toOptionExecution.map(_.getOrElse(t)) def toTypedPipe: TypedPipe[T] + /** + * Convert this value to an Option. It is an error if somehow + * this is not either empty or has one value. + * The name here follows the convention of adding + * Execution to the name so in the repl in is removed + */ def toOptionExecution: Execution[Option[T]] = toTypedPipe.toIterableExecution.map { it => it.iterator.take(2).toList match { @@ -65,6 +85,7 @@ case object EmptyValue extends ValuePipe[Nothing] { override def map[U](fn: Nothing => U): ValuePipe[U] = this override def filter(fn: Nothing => Boolean) = this override def toTypedPipe: TypedPipe[Nothing] = TypedPipe.empty + override def toOptionExecution = Execution.from(None) def debug: ValuePipe[Nothing] = { println("EmptyValue") @@ -75,6 +96,7 @@ case class LiteralValue[T](value: T) extends ValuePipe[T] { override def map[U](fn: T => U) = LiteralValue(fn(value)) override def filter(fn: T => Boolean) = if (fn(value)) this else EmptyValue override def toTypedPipe = TypedPipe.from(Iterable(value)) + override def toOptionExecution = Execution.from(Some(value)) def debug: ValuePipe[T] = map { v => println("LiteralValue(" + v.toString + ")") diff --git a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala index ac0430f0cc..02520d8ffc 100644 --- a/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala +++ b/scalding-repl/src/main/scala/com/twitter/scalding/ShellPipe.scala @@ -60,10 +60,8 @@ class ShellTypedPipe[T](pipe: TypedPipe[T]) { class ShellValuePipe[T](vp: ValuePipe[T]) { import ReplImplicits.execute - def toOption: Option[T] = vp match { - case EmptyValue => None - case LiteralValue(v) => Some(v) - // (only take 2 from iterator to avoid blowing out memory in case there's some bug) - case _ => execute(vp.toOptionExecution) - } + // This might throw if the value is empty + def get: T = execute(vp.getExecution) + def getOrElse(t: => T): T = execute(vp.getOrElseExecution(t)) + def toOption: Option[T] = execute(vp.toOptionExecution) }