Skip to content

Commit

Permalink
Move toIterableExecution and forceToDiskExecution into Execution (#1682)
Browse files Browse the repository at this point in the history
* Pull cascading flowDef thread into cascading_backend

* Make ToWrite public

* remove unused imports

* Move toIterableExecution and forceToDiskExecution into Execution

* use Writer API a bit more
  • Loading branch information
johnynek authored May 10, 2017
1 parent 396498a commit 6166921
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 169 deletions.
143 changes: 90 additions & 53 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.twitter.algebird.{ Monoid, Monad, Semigroup }
import com.twitter.scalding.cascading_interop.FlowListenerPromise
import com.twitter.scalding.filecache.{CachedFile, DistributedCacheFile}
import com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner
import java.util.UUID
import scala.collection.mutable
import scala.concurrent.{ Await, Future, ExecutionContext => ConcurrentExecutionContext, Promise }
import scala.runtime.ScalaRunTime
Expand Down Expand Up @@ -142,15 +143,15 @@ sealed trait Execution[+T] extends java.io.Serializable { self: Product =>
* Seriously: pro-style is for this to be called only once in a program.
*/
final def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T] = {
val runner = new AsyncFlowDefRunner
val ec = new EvalCache(runner)
val confWithId = conf.setScaldingExecutionId(java.util.UUID.randomUUID.toString)
val writer: Execution.Writer = new AsyncFlowDefRunner
val ec = new EvalCache(writer)
val confWithId = conf.setScaldingExecutionId(UUID.randomUUID.toString)
// get on Trampoline
val result = runStats(confWithId, mode, ec)(cec).get.map(_._1)
// When the final future in complete we stop the submit thread
result.onComplete { _ => runner.finished(mode) }
result.onComplete { _ => writer.finished() }
// wait till the end to start the thread in case the above throws
runner.start()
writer.start()
result
}

Expand Down Expand Up @@ -308,7 +309,7 @@ object Execution {
* This is a mutable state that is kept internal to an execution
* as it is evaluating.
*/
private[scalding] class EvalCache(val runner: AsyncFlowDefRunner) {
private[scalding] class EvalCache(val writer: Execution.Writer) {

type Counters = Map[Long, ExecutionCounters]
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, Counters)]
Expand All @@ -317,7 +318,7 @@ object Execution {
// This method with return a 'clean' cache, that shares
// the underlying thread and message queue of the parent evalCache
def cleanCache: EvalCache =
new EvalCache(runner)
new EvalCache(writer)

def getOrLock(cfg: Config, write: ToWrite): Either[Promise[Counters], Future[Counters]] =
toWriteCache.getOrPromise((cfg, write))
Expand Down Expand Up @@ -363,6 +364,7 @@ object Execution {
fut.map { case (s, stats) => (fn(s), stats) })
}
}

private case class GetCounters[T](prev: Execution[T]) extends Execution[(T, ExecutionCounters)] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
Trampoline.call(prev.runStats(conf, mode, cache)).map { fut =>
Expand Down Expand Up @@ -403,9 +405,7 @@ object Execution {
*/
private case class WithNewCache[T](prev: Execution[T]) extends Execution[T] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
// Share the runner thread, but have own cache
val ec = cache.cleanCache

Trampoline.call(prev.runStats(conf, mode, ec))
}
}
Expand Down Expand Up @@ -531,10 +531,16 @@ object Execution {
*/
private case class FlowDefExecution(result: (Config, Mode) => FlowDef) extends Execution[Unit] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
lazy val future = cache
.runner
.validateAndRun(conf, mode)(result)
.map { m => ((), m) }
lazy val future = {
cache.writer match {
case ar: AsyncFlowDefRunner =>
ar.validateAndRun(conf, mode)(result).map { m => ((), Map(m)) }
case other =>
Future.failed(
new Exception(
s"requires AsyncFlowDefRunner, found ${other.getClass}: $other"))
}
}

Trampoline(cache.getOrElseInsert(conf, this, future))
}
Expand All @@ -547,29 +553,65 @@ object Execution {

sealed trait ToWrite
object ToWrite {
case class Force[T](pipe: TypedPipe[T]) extends ToWrite
case class ToIterable[T](pipe: TypedPipe[T]) extends ToWrite
case class SimpleWrite[T](pipe: TypedPipe[T], sink: TypedSink[T]) extends ToWrite
case class PreparedWrite[T](fn: (Config, Mode) => SimpleWrite[T]) extends ToWrite
}

/**
* Something that can handle a batch of writes that may be optimized
* before running. Return a unique Long for each run and Counters
*/
trait Writer {
/**
* This is called by an Execution to begin processing
*/
def start(): Unit

/**
* This is called by an Execution to end processing
*/
def finished(): Unit

/**
* do a batch of writes, possibly optimizing, and return a new unique
* Long.
*
* empty writes are legitmate and should still return a Long
*/
def execute(
conf: Config,
mode: Mode,
head: ToWrite,
rest: List[ToWrite])(implicit cec: ConcurrentExecutionContext): Future[Map[Long, ExecutionCounters]]
writes: List[ToWrite])(implicit cec: ConcurrentExecutionContext): Future[(Long, ExecutionCounters)]

/**
* This should only be called after a call to execute
*/
private[Execution] def getForced[T](
conf: Config,
mode: Mode,
initial: TypedPipe[T]
)(implicit cec: ConcurrentExecutionContext): Future[TypedPipe[T]]

/**
* This should only be called after a call to execute
*/
private[Execution] def getIterable[T](
conf: Config,
mode: Mode,
initial: TypedPipe[T]
)(implicit cec: ConcurrentExecutionContext): Future[Iterable[T]]
}

/**
* This is the fundamental execution that actually happens in TypedPipes, all the rest
* are based on on this one. By keeping the Pipe and the Sink, can inspect the Execution
* DAG and optimize it later (a goal, but not done yet).
*/
private case class WriteExecution[T](head: ToWrite, tail: List[ToWrite], fn: (Config, Mode) => T, tempFilesToCleanup: (Config, Mode) => Set[String] = (_, _) => Set()) extends Execution[T] {
private case class WriteExecution[T](
head: ToWrite,
tail: List[ToWrite],
result: ((Config, Mode, Writer, ConcurrentExecutionContext)) => Future[T]) extends Execution[T] {

/**
* Apply a pure function to the result. This may not
Expand All @@ -579,7 +621,9 @@ object Execution {
* Here we inline the map operation into the presentation function so we can zip after map.
*/
override def map[U](mapFn: T => U): Execution[U] =
WriteExecution(head, tail, { (conf: Config, mode: Mode) => mapFn(fn(conf, mode)) })
WriteExecution(head,
tail,
{ tup => result(tup).map(mapFn)(tup._4) })

def unwrapListEither[A, B, C](it: List[(A, Either[B, C])]): (List[(A, B)], List[(A, C)]) = it match {
case (a, Left(b)) :: tail =>
Expand All @@ -596,7 +640,6 @@ object Execution {
// Anything not already ran we run as part of a single flow def, using their combined counters for the others
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
Trampoline(cache.getOrElseInsert(conf, this, {
cache.runner.addFilesToCleanup(tempFilesToCleanup(conf, mode))
val cacheLookup: List[(ToWrite, Either[Promise[Map[Long, ExecutionCounters]], Future[Map[Long, ExecutionCounters]]])] =
(head :: tail).map{ tw => (tw, cache.getOrLock(conf, tw)) }
val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup)
Expand All @@ -609,7 +652,10 @@ object Execution {
weDoOperation match {
case all @ (h :: tail) =>
val futCounters: Future[Map[Long, ExecutionCounters]] =
cache.runner.execute(conf, mode, h._1, tail.map(_._1))
cache.writer
.execute(conf, mode, all.map(_._1))
.map(Map(_))

// Complete all of the promises we put into the cache
// with this future counters set
all.foreach {
Expand All @@ -622,11 +668,12 @@ object Execution {
Future.successful(Map.empty)
}

failFastZip(otherResult, localFlowDefCountersFuture).map {
case (lCounters, fdCounters) =>
val summedCounters = (fdCounters :: lCounters).reduce(_ ++ _)
(fn(conf, mode), summedCounters)
}
val bothFutures = failFastZip(otherResult, localFlowDefCountersFuture)
for {
(lCounters, fdCounters) <- bothFutures
t <- result((conf, mode, cache.writer, cec))
summedCounters = (fdCounters :: lCounters).reduce(_ ++ _)
} yield (t, summedCounters)
}
}))
}
Expand All @@ -639,15 +686,11 @@ object Execution {
*/
override def zip[U](that: Execution[U]): Execution[(T, U)] =
that match {
case WriteExecution(h, t, otherFn, tempFiles) =>
val newFn = { (conf: Config, mode: Mode) =>
(fn(conf, mode), otherFn(conf, mode))
}
val newTempFilesFn = { (conf: Config, mode: Mode) =>
tempFilesToCleanup(conf, mode) ++ tempFiles(conf, mode)
case WriteExecution(h, t, other) =>
val newFn = { tup: (Config, Mode, Writer, ConcurrentExecutionContext) =>
(failFastZip(result(tup), other(tup))(tup._4))
}

WriteExecution(head, h :: t ::: tail, newFn, newTempFilesFn)
WriteExecution(head, h :: t ::: tail, newFn)
case o => Zipped(this, that)
}

Expand Down Expand Up @@ -706,14 +749,17 @@ object Execution {
def fromFn(fn: (Config, Mode) => FlowDef): Execution[Unit] =
FlowDefExecution(fn)

/**
* Creates an Execution to do a write
*
* This variant allows the user to supply a method using the config and mode to build a new
* type U for the resultant execution.
*/
private[scalding] def write[T, U](pipe: TypedPipe[T], sink: TypedSink[T], generatorFn: (Config, Mode) => U): Execution[U] =
WriteExecution(ToWrite.SimpleWrite(pipe, sink), Nil, generatorFn)
def forceToDisk[T](t: TypedPipe[T]): Execution[TypedPipe[T]] =
WriteExecution(
ToWrite.Force(t),
Nil,
{ case (conf, mode, w, cec) => w.getForced(conf, mode, t)(cec) })

def toIterable[T](t: TypedPipe[T]): Execution[Iterable[T]] =
WriteExecution(
ToWrite.ToIterable(t),
Nil,
{ case (conf, mode, w, cec) => w.getIterable(conf, mode, t)(cec) })

/**
* The simplest form, just sink the typed pipe into the sink and get a unit execution back
Expand All @@ -722,18 +768,9 @@ object Execution {
write(pipe, sink, ())

private[scalding] def write[T, U](pipe: TypedPipe[T], sink: TypedSink[T], presentType: => U): Execution[U] =
WriteExecution(ToWrite.SimpleWrite(pipe, sink), Nil, { (_: Config, _: Mode) => presentType })

/**
* Here we allow both the targets to write and the sources to be generated from the config and mode.
* This allows us to merge things looking for the config and mode without using flatmap.
*/
private[scalding] def write[T, U](fn: (Config, Mode) => (TypedPipe[T], TypedSink[T]), generatorFn: (Config, Mode) => U,
tempFilesToCleanup: (Config, Mode) => Set[String] = (_, _) => Set()): Execution[U] =
WriteExecution(ToWrite.PreparedWrite({ (cfg: Config, m: Mode) =>
val r = fn(cfg, m)
ToWrite.SimpleWrite(r._1, r._2)
}), Nil, generatorFn, tempFilesToCleanup)
WriteExecution(ToWrite.SimpleWrite(pipe, sink),
Nil,
{ tup => Future(presentType)(tup._4) })

/**
* Convenience method to get the Args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cascading.stats.{ CascadeStats, CascadingStats, FlowStats }
import scala.util.{ Failure, Try }

object JobStats {
def empty: JobStats = new JobStats(Map("counters" -> Map.empty))
def apply(stats: CascadingStats): JobStats = {
val m: Map[String, Any] = statsMap(stats)
new JobStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import cascading.flow.FlowDef
import cascading.pipe.Pipe
import cascading.tuple.Fields
import com.twitter.algebird.{ Aggregator, Batched, Monoid, Semigroup }
import com.twitter.scalding.TupleConverter.{ singleConverter, tuple2Converter }
import com.twitter.scalding.TupleSetter.{ singleSetter, tup2Setter }
import com.twitter.scalding.TupleConverter.singleConverter
import com.twitter.scalding._
import com.twitter.scalding.serialization.OrderedSerialization
import com.twitter.scalding.serialization.OrderedSerialization.Result
Expand Down Expand Up @@ -528,54 +527,8 @@ sealed trait TypedPipe[+T] extends Serializable {
* This writes the current TypedPipe into a temporary file
* and then opens it after complete so that you can continue from that point
*/
def forceToDiskExecution: Execution[TypedPipe[T]] = {
val cachedRandomUUID = java.util.UUID.randomUUID
lazy val inMemoryDest = new MemorySink[T]

def temporaryPath(conf: Config, uuid: UUID): String = {
val tmpDir = conf.get("hadoop.tmp.dir")
.orElse(conf.get("cascading.tmp.dir"))
.getOrElse("/tmp")

tmpDir + "/scalding/snapshot-" + uuid + ".seq"
}

def hadoopTypedSource(conf: Config): Mappable[T] with TypedSink[T] = {
// come up with unique temporary filename, use the config here
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = temporaryPath(conf, cachedRandomUUID)
source.TypedSequenceFile[T](tmpSeq)

}
val writeFn = { (conf: Config, mode: Mode) =>
mode match {
case _: CascadingLocal => // Local or Test mode
(this, inMemoryDest)
case _: HadoopMode =>
(this, hadoopTypedSource(conf))
}
}

val readFn = { (conf: Config, mode: Mode) =>
mode match {
case _: CascadingLocal => // Local or Test mode
TypedPipe.from(inMemoryDest.readResults)
case _: HadoopMode =>
TypedPipe.from(hadoopTypedSource(conf))
}
}

val filesToDeleteFn = { (conf: Config, mode: Mode) =>
mode match {
case _: CascadingLocal => // Local or Test mode
Set[String]()
case _: HadoopMode =>
Set(temporaryPath(conf, cachedRandomUUID))
}
}

Execution.write(writeFn, readFn, filesToDeleteFn)
}
def forceToDiskExecution: Execution[TypedPipe[T]] =
Execution.forceToDisk(this)

/**
* This gives an Execution that when run evaluates the TypedPipe,
Expand All @@ -585,19 +538,8 @@ sealed trait TypedPipe[+T] extends Serializable {
* the Iterable forces a read of the entire thing. If you need it to
* be lazy, call .iterator and use the Iterator inside instead.
*/
def toIterableExecution: Execution[Iterable[T]] = this match {
case TypedPipe.EmptyTypedPipe => Execution.from(Nil)
case TypedPipe.IterablePipe(iter) => Execution.from(iter)
case TypedPipe.SourcePipe(src: Mappable[T]) =>
Execution.getConfigMode.map { case (conf, mode) =>
new Iterable[T] {
def iterator = src.toIterator(conf, mode)
}
}
case other =>
// after we force, we have a source pipe or an Iterable Pipe
forceToDiskExecution.flatMap(_.toIterableExecution)
}
def toIterableExecution: Execution[Iterable[T]] =
Execution.toIterable(this)

/** use a TupleUnpacker to flatten U out into a cascading Tuple */
def unpackToPipe[U >: T](fieldNames: Fields)(implicit fd: FlowDef, mode: Mode, up: TupleUnpacker[U]): Pipe = {
Expand Down
Loading

0 comments on commit 6166921

Please sign in to comment.