From bfacc4c099cf48c6b4fa71b7a859c8f57a89ddd1 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sat, 3 Jun 2017 13:46:26 -1000 Subject: [PATCH 1/4] First draft of memory backend --- .../com/twitter/scalding/Execution.scala | 2 +- .../scala/com/twitter/scalding/Mode.scala | 34 +- .../typed/memory_backend/MemoryBackend.scala | 493 ++++++++++++++++++ .../typed/memory_backend/MemoryTest.scala | 53 ++ 4 files changed, 566 insertions(+), 16 deletions(-) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index fc65408718..b00d6ca338 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -143,7 +143,7 @@ sealed trait Execution[+T] extends java.io.Serializable { self: Product => * Seriously: pro-style is for this to be called only once in a program. */ final def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T] = { - val writer: Execution.Writer = new AsyncFlowDefRunner + val writer: Execution.Writer = mode.newWriter() val ec = new EvalCache(writer) val confWithId = conf.setScaldingExecutionId(UUID.randomUUID.toString) // get on Trampoline diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala index 9103e7dcf5..0b89259db0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala @@ -15,30 +15,23 @@ limitations under the License. */ package com.twitter.scalding +import cascading.flow.local.{ LocalFlowConnector, LocalFlowProcess } +import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow } +import cascading.property.AppProps +import cascading.tap.Tap +import cascading.tuple.{ Tuple, TupleEntryIterator } +import com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner import java.io.File import java.util.{ UUID, Properties } - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.hadoop.mapred.JobConf - -import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow } -import cascading.flow.local.LocalFlowConnector -import cascading.flow.local.LocalFlowProcess -import cascading.property.AppProps -import cascading.tap.Tap -import cascading.tuple.Tuple -import cascading.tuple.TupleEntryIterator - +import org.slf4j.LoggerFactory import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection.mutable.Buffer -import scala.collection.mutable.{ Map => MMap } -import scala.collection.mutable.{ Set => MSet } +import scala.collection.mutable.{ Buffer, Map => MMap, Set => MSet } import scala.util.{ Failure, Success } -import org.slf4j.LoggerFactory - case class ModeException(message: String) extends RuntimeException(message) case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin) @@ -105,6 +98,11 @@ object Mode { } trait Mode extends java.io.Serializable { + + /** + * Make the Execution.Writer for this platform + */ + def newWriter(): Execution.Writer /* * Using a new FlowProcess, which is only suitable for reading outside * of a map/reduce job, open a given tap and return the TupleEntryIterator @@ -154,6 +152,9 @@ trait HadoopMode extends Mode { } } + def newWriter(): Execution.Writer = + new AsyncFlowDefRunner + // TODO unlike newFlowConnector, this does not look at the Job.config override def openForRead(config: Config, tap: Tap[_, _, _]) = { val htap = tap.asInstanceOf[Tap[JobConf, _, _]] @@ -183,6 +184,9 @@ trait CascadingLocal extends Mode { override def newFlowConnector(conf: Config) = new LocalFlowConnector(conf.toMap.toMap[AnyRef, AnyRef].asJava) // linter:ignore + def newWriter(): Execution.Writer = + new AsyncFlowDefRunner + override def openForRead(config: Config, tap: Tap[_, _, _]) = { val ltap = tap.asInstanceOf[Tap[Properties, _, _]] val props = new java.util.Properties diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala new file mode 100644 index 0000000000..0ff52276b4 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala @@ -0,0 +1,493 @@ +package com.twitter.scalding.typed.memory_backend + +import cascading.flow.{ FlowDef, FlowConnector } +import cascading.pipe.Pipe +import cascading.tap.Tap +import cascading.tuple.{ Fields, TupleEntryIterator } +import com.twitter.scalding.typed._ +import com.twitter.scalding.{ Config, Execution, ExecutionCounters, Mode } +import java.util.concurrent.atomic.AtomicReference +import java.util.{ ArrayList, Collections, UUID } +import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext, Promise } +import scala.collection.mutable.{ ArrayBuffer, Map => MMap } +import scala.collection.JavaConverters._ + +import Execution.{ ToWrite, Writer } + +class AtomicBox[T <: AnyRef](init: T) { + private[this] val ref = new AtomicReference[T](init) + + def lazySet(t: T): Unit = + ref.lazySet(t) + + /** + * use a pure function to update the state. + * fn may be called more than once + */ + def update[R](fn: T => (T, R)): R = { + + @annotation.tailrec + def loop(): R = { + val init = ref.get + val (next, res) = fn(init) + if (ref.compareAndSet(init, next)) res + else loop() + } + + loop() + } + + def get(): T = ref.get +} + +final class MemoryMode private (srcs: Map[TypedSource[_], Iterable[_]], sinks: Map[TypedSink[_], AtomicBox[Option[Iterable[_]]]]) extends Mode { + + def newWriter(): Writer = + new MemoryWriter(this) + + def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator = ??? + def fileExists(filename: String): Boolean = ??? + def newFlowConnector(props: Config): FlowConnector = ??? + + def addSource[T](src: TypedSource[T], ts: Iterable[T]): MemoryMode = + new MemoryMode(srcs + (src -> ts), sinks) + + def addSink[T](sink: TypedSink[T]): MemoryMode = + new MemoryMode(srcs, sinks + (sink -> new AtomicBox[Option[Iterable[_]]](None))) + + /** + * This has a side effect of mutating this MemoryMode + */ + def writeSink[T](t: TypedSink[T], iter: Iterable[T]): Unit = + sinks(t).lazySet(Some(iter)) + + def readSink[T](t: TypedSink[T]): Option[Iterable[T]] = + sinks.get(t).flatMap(_.get).asInstanceOf[Option[Iterable[T]]] + + def readSource[T](t: TypedSource[T]): Option[Iterable[T]] = + srcs.get(t).asInstanceOf[Option[Iterable[T]]] +} + +object MemoryMode { + def empty: MemoryMode = new MemoryMode(Map.empty, Map.empty) +} + +object MemoryPlanner { + + sealed trait Op[+O] { + def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[_ <: O]] + + def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = + Op.Map(this, fn) + + def mapAll[O1 >: O, O2](fn: IndexedSeq[O1] => ArrayBuffer[O2]): Op[O2] = + Op.MapAll[O1, O2](this, fn) + } + object Op { + def source[I](i: Iterable[I]): Op[I] = Source(_ => Future.successful(i)) + def empty[I]: Op[I] = source(Nil) + + case class Source[I](input: ConcurrentExecutionContext => Future[Iterable[I]]) extends Op[I] { + private[this] val promise: Promise[ArrayBuffer[I]] = Promise() + + def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[I]] = { + if (!promise.isCompleted) { + promise.tryCompleteWith { + val iter = input(cec) + iter.map { i => ArrayBuffer.concat(i) } + } + } + + promise.future + } + } + + case class Materialize[O](op: Op[O]) extends Op[O] { + private[this] val promise: Promise[ArrayBuffer[_ <: O]] = Promise() + + def result(implicit cec: ConcurrentExecutionContext) = { + if (!promise.isCompleted) { + promise.tryCompleteWith(op.result) + } + + promise.future + } + } + + case class Concat[O](left: Op[O], right: Op[O]) extends Op[O] { + def result(implicit cec: ConcurrentExecutionContext) = + for { + l <- left.result + r <- right.result + } yield { + ArrayBuffer.concat(l, r) + } + } + + case class Map[I, O](input: Op[I], fn: I => TraversableOnce[O]) extends Op[O] { + def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[O]] = + input.result.map { array => + val res = ArrayBuffer[O]() + val it = array.iterator + while(it.hasNext) { + val i = it.next + fn(i).foreach(res += _) + } + res + } + } + + case class MapAll[I, O](input: Op[I], fn: IndexedSeq[I] => ArrayBuffer[O]) extends Op[O] { + def result(implicit cec: ConcurrentExecutionContext) = + input.result.map(fn) + } + + case class Reduce[K, V1, V2]( + input: Op[(K, V1)], + fn: (K, Iterator[V1]) => Iterator[V2], + ord: Option[Ordering[_ >: V1]] + ) extends Op[(K, V2)] { + + def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[(K, V2)]] = + input.result.map { kvs => + val m = MMap[K, ArrayList[V1]]() + def add(kv: (K, V1)): Unit = { + val vs = m.getOrElseUpdate(kv._1, new ArrayList[V1]()) + vs.add(kv._2) + } + kvs.foreach(add) + + /* + * This portion could be parallelized for each key, or we could split + * the keys into as many groups as there are CPUs and process that way + */ + val res = ArrayBuffer[(K, V2)]() + m.foreach { case (k, vs) => + ord.foreach(Collections.sort[V1](vs, _)) + val v2iter = fn(k, vs.iterator.asScala) + while(v2iter.hasNext) { + res += ((k, v2iter.next)) + } + } + res + } + } + + case class Join[A, B, C]( + opA: Op[A], + opB: Op[B], + fn: (IndexedSeq[A], IndexedSeq[B]) => ArrayBuffer[C]) extends Op[C] { + + def result(implicit cec: ConcurrentExecutionContext) = + for { + as <- opA.result + bs <- opB.result + } yield fn(as, bs) + } + } + + /** + * Memoize previously planned TypedPipes so we don't replan too much + * + * ideally we would look at a series of writes and first look for all + * the fan-outs and only memoize at the spots where we have to force + * materializations, but this is just a demo for now + */ + case class Memo(planned: Map[TypedPipe[_], Op[_]]) { + def plan[T](t: TypedPipe[T])(op: => (Memo, Op[T])): (Memo, Op[T]) = + planned.get(t) match { + case Some(op) => (this, op.asInstanceOf[Op[T]]) + case None => + val (m1, newOp) = op + (m1.copy(planned = m1.planned.updated(t, newOp)), newOp) + } + } + + object Memo { + def empty: Memo = Memo(Map.empty) + } + +} + +/** + * These are just used as type markers which are connected + * to inputs via the MemoryMode + */ +case class SourceT[T](ident: String) extends TypedSource[T] { + def converter[U >: T] = ??? + def read(implicit flowDef: FlowDef, mode: Mode): Pipe = ??? +} + +/** + * These are just used as type markers which are connected + * to outputs via the MemoryMode + */ +case class SinkT[T](indent: String) extends TypedSink[T] { + def setter[U <: T] = ??? + def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode): Pipe = ??? +} + +class MemoryWriter(mem: MemoryMode) extends Writer { + + import MemoryPlanner.{ Memo, Op } + import TypedPipe._ + + def start(): Unit = () + /** + * This is called by an Execution to end processing + */ + def finished(): Unit = () + + def plan[T](m: Memo, tp: TypedPipe[T]): (Memo, Op[T]) = + m.plan(tp) { + tp match { + case cp@CrossPipe(_, _) => + plan(m, cp.viaHashJoin) + + case cv@CrossValue(_, _) => + plan(m, cv.viaHashJoin) + + case DebugPipe(p) => + // There is really little that can be done here but println + plan(m, p.map { t => println(t); t }) + + case EmptyTypedPipe => + // just use an empty iterable pipe. + // Note, rest is irrelevant + (m, Op.empty[T]) + + case fk@FilterKeys(_, _) => + def go[K, V](node: FilterKeys[K, V]): (Memo, Op[(K, V)]) = { + val FilterKeys(pipe, fn) = node + val (m1, op) = plan(m, pipe) + (m1, op.flatMap { case (k, v) => if (fn(k)) { (k, v) :: Nil } else Nil }) + } + go(fk) + + case f@Filter(_, _) => + def go[T](f: Filter[T]): (Memo, Op[T]) = { + val Filter(p, fn) = f + val (m1, op) = plan(m, f) + (m1, op.flatMap { t => if (fn(t)) t :: Nil else Nil }) + } + go(f) + + case f@FlatMapValues(_, _) => + def go[K, V, U](node: FlatMapValues[K, V, U]) = { + // don't capture node, which is a TypedPipe, which we avoid serializing + val fn = node.fn + val (m1, op) = plan(m, node.input) + (m1, op.flatMap { case (k, v) => fn(v).map((k, _)) }) + } + + go(f) + + case FlatMapped(prev, fn) => + val (m1, op) = plan(m, prev) + (m1, op.flatMap(fn)) + + case ForceToDisk(pipe) => + val (m1, op) = plan(m, pipe) + (m1, Op.Materialize(op)) + + case Fork(pipe) => + val (m1, op) = plan(m, pipe) + (m1, Op.Materialize(op)) + + case IterablePipe(iterable) => + (m, Op.source(iterable)) + + case f@MapValues(_, _) => + def go[K, V, U](node: MapValues[K, V, U]) = { + // don't capture node, which is a TypedPipe, which we avoid serializing + val mvfn = node.fn + val (m1, op) = plan(m, node.input) + (m1, op.flatMap { case (k, v) => Iterator.single((k, mvfn(v))) }) + } + + go(f) + + case Mapped(input, fn) => + val (m1, op) = plan(m, input) + (m1, op.flatMap { t => fn(t) :: Nil }) + + case MergedTypedPipe(left, right) => + val (m1, op1) = plan(m, left) + val (m2, op2) = plan(m1, right) + (m2, Op.Concat(op1, op2)) + + case SourcePipe(src) => + (m, Op.Source({ cec => + mem.readSource(src) match { + case Some(iter) => Future.successful(iter) + case None => Future.failed(new Exception(s"Source: $src not wired")) + } + })) + + case slk@SumByLocalKeys(_, _) => + def sum[K, V](sblk: SumByLocalKeys[K, V]) = { + val SumByLocalKeys(p, sg) = sblk + + val (m1, op) = plan(m, p) + (m1, op.mapAll[(K, V), (K, V)] { kvs => + val map = collection.mutable.Map.empty[K, V] + val iter = kvs.iterator + while(iter.hasNext) { + val (k, v) = iter.next + map(k) = map.get(k) match { + case None => v + case Some(v1) => sg.plus(v1, v) + } + } + val res = new ArrayBuffer[(K, V)](map.size) + map.foreach { res += _ } + res + }) + } + sum(slk) + + case tp@TrappedPipe(_, _, _) => ??? + // this can be interpretted as catching any exception + // on the map-phase until the next partition, so it can + // be made to work, but skipping for now + + case WithDescriptionTypedPipe(pipe, description, dedup) => + plan(m, pipe) + + case WithOnComplete(pipe, fn) => ??? + + case hcg@HashCoGroup(_, _, _) => + def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]) = { + val (m1, leftOp) = plan(m, hcg.left) + val (m2, rightOp) = planHashJoinable(m1, hcg.right) + (m2, Op.Join[(K, V1), (K, V2), (K, R)](leftOp, rightOp, { (v1s, v2s) => + val kv2 = v2s.groupBy(_._1) + val result = new ArrayBuffer[(K, R)]() + v1s.foreach { case (k, v1) => + val v2 = kv2.getOrElse(k, Nil).map(_._2) + result ++= hcg.joiner(k, v1, v2).map((k, _)) + } + result + })) + } + go(hcg) + + case cgp@CoGroupedPipe(_) => ??? + + case ReduceStepPipe(IdentityReduce(_, pipe, _, descriptions)) => + plan(m, pipe) + case ReduceStepPipe(UnsortedIdentityReduce(_, pipe, _, descriptions)) => + plan(m, pipe) + case ReduceStepPipe(IdentityValueSortedReduce(_, pipe, ord, _, _)) => + def go[K, V](p: TypedPipe[(K, V)], ord: Ordering[_ >: V]) = { + val (m1, op) = plan(m, p) + (m1, Op.Reduce[K, V, V](op, { (k, vs) => vs }, Some(ord))) + } + go(pipe, ord) + case ReduceStepPipe(ValueSortedReduce(_, pipe, ord, fn, _, _)) => + val (m1, op) = plan(m, pipe) + (m1, Op.Reduce(op, fn, Some(ord))) + case ReduceStepPipe(IteratorMappedReduce(_, pipe, fn, _, _)) => + val (m1, op) = plan(m, pipe) + (m1, Op.Reduce(op, fn, None)) + } + } + + def planHashJoinable[K, V](m: Memo, hk: HashJoinable[K, V]): (Memo, Op[(K, V)]) = ??? + + case class State( + id: Long, + memo: MemoryPlanner.Memo, + forced: Map[TypedPipe[_], Future[TypedPipe[_]]] + ) + + val state = new AtomicBox[State](State(0, MemoryPlanner.Memo.empty, Map.empty)) + + /** + * do a batch of writes, possibly optimizing, and return a new unique + * Long. + * + * empty writes are legitmate and should still return a Long + */ + def execute( + conf: Config, + mode: Mode, + writes: List[ToWrite])(implicit cec: ConcurrentExecutionContext): Future[(Long, ExecutionCounters)] = { + + type Action = () => Future[Unit] + + def force[T](p: TypedPipe[T], oldState: State): (State, Action) = { + val (nextM, op) = plan(oldState.memo, p) + val pipePromise = Promise[TypedPipe[Any]]() + val action = () => { + val arrayBufferF = op.result + pipePromise.completeWith(arrayBufferF.map(TypedPipe.from(_))) + + arrayBufferF.map(_ => ()) + } + (oldState.copy(memo = nextM, forced = oldState.forced.updated(p, pipePromise.future)), action) + } + + val (id, acts) = state.update { s => + val (nextState, acts) = writes.foldLeft((s, List.empty[Action])) { + case (noop, ToWrite.Force(pipe)) if noop._1.forced.contains(pipe) => + // we have already forced this pipe + noop + case ((oldState, acts), ToWrite.Force(pipe)) => + val (st, a) = force(pipe, oldState) + (st, a :: acts) + case (old@(oldState, acts), ToWrite.ToIterable(pipe)) => + pipe match { + case TypedPipe.EmptyTypedPipe => old + case TypedPipe.IterablePipe(_) => old + case TypedPipe.SourcePipe(_) => old + case other if oldState.forced.contains(other) => old + case other => + val (st, a) = force(other, oldState) + (st, a :: acts) + } + case ((oldState, acts), ToWrite.SimpleWrite(pipe, sink)) => + val (nextM, op) = plan(oldState.memo, pipe) + val action = () => { + val arrayBufferF = op.result + arrayBufferF.foreach { mem.writeSink(sink, _) } + arrayBufferF.map(_ => ()) + } + (oldState.copy(memo = nextM), action :: acts) + } + (nextState.copy(id = nextState.id + 1) , (nextState.id, acts)) + } + // now we run the actions: + Future.traverse(acts) { fn => fn() }.map(_ => (id, ExecutionCounters.empty)) + } + + /** + * This should only be called after a call to execute + */ + def getForced[T]( + conf: Config, + mode: Mode, + initial: TypedPipe[T] + )(implicit cec: ConcurrentExecutionContext): Future[TypedPipe[T]] = + state.get.forced.get(initial) match { + case None => Future.failed(new Exception(s"$initial not forced")) + case Some(f) => f.asInstanceOf[Future[TypedPipe[T]]] + } + + /** + * This should only be called after a call to execute + */ + def getIterable[T]( + conf: Config, + mode: Mode, + initial: TypedPipe[T] + )(implicit cec: ConcurrentExecutionContext): Future[Iterable[T]] = initial match { + case TypedPipe.EmptyTypedPipe => Future.successful(Nil) + case TypedPipe.IterablePipe(iter) => Future.successful(iter) + case TypedPipe.SourcePipe(src) => mem.readSource(src) match { + case Some(iter) => Future.successful(iter) + case None => Future.failed(new Exception(s"Source: $src not connected")) + } + case other => getForced(conf, mode, other).flatMap(getIterable(conf, mode, _)) + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala new file mode 100644 index 0000000000..df1c049230 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala @@ -0,0 +1,53 @@ +package com.twitter.scalding.typed.memory_backend + +import org.scalatest.FunSuite +import com.twitter.scalding.{ TypedPipe, Execution, Config, Local } + +class MemoryTest extends FunSuite { + + private def mapMatch[K, V](ex: Execution[Iterable[(K, V)]]) = { + val mm = MemoryMode.empty + + val mkv = ex.waitFor(Config.empty, mm) + + val lkv = ex.waitFor(Config.empty, Local(true)) + assert(mkv.get.toMap == lkv.get.toMap) + } + + test("basic word count") { + val x = TypedPipe.from(0 until 100) + .groupBy(_ % 2) + .sum + .toIterableExecution + + mapMatch(x) + } + + test("mapGroup works") { + val x = TypedPipe.from(0 until 100) + .groupBy(_ % 2) + .mapGroup { (k, vs) => Iterator.single(vs.foldLeft(k)(_ + _)) } + .toIterableExecution + + mapMatch(x) + } + + // fails now + // test("hashJoin works") { + // val input = TypedPipe.from(0 until 100) + // val left = input.map { k => (k, k % 2) } + // val right = input.map { k => (k, k % 3) } + + // mapMatch(left.hashJoin(right).toIterableExecution) + // } + + // fails now + // test("join works") { + // val input = TypedPipe.from(0 until 100) + // val left = input.map { k => (k, k % 2) } + // val right = input.map { k => (k, k % 3) } + + // mapMatch(left.join(right).toIterableExecution) + // } + +} From e6090115bb8011f90279583d09b7dcc441c96292 Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sat, 3 Jun 2017 15:55:05 -1000 Subject: [PATCH 2/4] add hashJoin support --- .../typed/memory_backend/MemoryBackend.scala | 31 ++++++++++++++++--- .../typed/memory_backend/MemoryTest.scala | 13 ++++---- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala index 0ff52276b4..c75056de6b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala @@ -244,9 +244,12 @@ class MemoryWriter(mem: MemoryMode) extends Writer { case cp@CrossPipe(_, _) => plan(m, cp.viaHashJoin) - case cv@CrossValue(_, _) => - plan(m, cv.viaHashJoin) - + case CrossValue(left, EmptyValue) => (m, Op.empty) + case CrossValue(left, LiteralValue(v)) => + val (m1, op) = plan(m, left) + (m1, op.flatMap { a => Iterator.single((a, v)) }) + case CrossValue(left, ComputedValue(right)) => + plan(m, CrossPipe(left, right)) case DebugPipe(p) => // There is really little that can be done here but println plan(m, p.map { t => println(t); t }) @@ -393,7 +396,27 @@ class MemoryWriter(mem: MemoryMode) extends Writer { } } - def planHashJoinable[K, V](m: Memo, hk: HashJoinable[K, V]): (Memo, Op[(K, V)]) = ??? + def planHashJoinable[K, V](m: Memo, hk: HashJoinable[K, V]): (Memo, Op[(K, V)]) = hk match { + case IdentityReduce(_, pipe, _, _) => plan(m, pipe) + case UnsortedIdentityReduce(_, pipe, _, _) => plan(m, pipe) + case imr@IteratorMappedReduce(_, _, _, _, _) => + def go[K, U, V](imr: IteratorMappedReduce[K, U, V]) = { + val IteratorMappedReduce(_, pipe, fn, _, _) = imr + val (m1, op) = plan(m, pipe) + (m1, op.mapAll[(K, U), (K, V)] { kvs => + val m = kvs.groupBy(_._1).iterator + val res = ArrayBuffer[(K, V)]() + m.foreach { case (k, kus) => + val us = kus.iterator.map { case (k, u) => u } + fn(k, us).foreach { v => + res += ((k, v)) + } + } + res + }) + } + go(imr) + } case class State( id: Long, diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala index df1c049230..822f782c61 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/memory_backend/MemoryTest.scala @@ -32,14 +32,13 @@ class MemoryTest extends FunSuite { mapMatch(x) } - // fails now - // test("hashJoin works") { - // val input = TypedPipe.from(0 until 100) - // val left = input.map { k => (k, k % 2) } - // val right = input.map { k => (k, k % 3) } + test("hashJoin works") { + val input = TypedPipe.from(0 until 100) + val left = input.map { k => (k, k % 2) } + val right = input.map { k => (k, k % 3) } - // mapMatch(left.hashJoin(right).toIterableExecution) - // } + mapMatch(left.hashJoin(right).toIterableExecution) + } // fails now // test("join works") { From dd1f8a4c2084ffdf0c3743a921eaba7d03e27fae Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Sat, 3 Jun 2017 16:03:53 -1000 Subject: [PATCH 3/4] make futures run in parallel --- .../typed/memory_backend/MemoryBackend.scala | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala index c75056de6b..b4dfd44ec9 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala @@ -115,13 +115,12 @@ object MemoryPlanner { } case class Concat[O](left: Op[O], right: Op[O]) extends Op[O] { - def result(implicit cec: ConcurrentExecutionContext) = - for { - l <- left.result - r <- right.result - } yield { - ArrayBuffer.concat(l, r) - } + def result(implicit cec: ConcurrentExecutionContext) = { + // start both futures in parallel + val f1 = left.result + val f2 = right.result + f1.zip(f2).map { case (l, r) => ArrayBuffer.concat(l, r) } + } } case class Map[I, O](input: Op[I], fn: I => TraversableOnce[O]) extends Op[O] { @@ -178,11 +177,12 @@ object MemoryPlanner { opB: Op[B], fn: (IndexedSeq[A], IndexedSeq[B]) => ArrayBuffer[C]) extends Op[C] { - def result(implicit cec: ConcurrentExecutionContext) = - for { - as <- opA.result - bs <- opB.result - } yield fn(as, bs) + def result(implicit cec: ConcurrentExecutionContext) = { + // start both futures in parallel + val f1 = opA.result + val f2 = opB.result + f1.zip(f2).map { case (a, b) => fn(a, b) } + } } } @@ -375,7 +375,18 @@ class MemoryWriter(mem: MemoryMode) extends Writer { } go(hcg) - case cgp@CoGroupedPipe(_) => ??? + case CoGroupedPipe(cg) => + def go[K, V](cg: CoGrouped[K, V]) = { + val inputs = cg.inputs + val joinf = cg.joinFunction + /** + * we need to expand the Op type to deal + * with multi-way or we need to keep + * the decomposed series of joins + */ + ??? + } + go(cg) case ReduceStepPipe(IdentityReduce(_, pipe, _, descriptions)) => plan(m, pipe) From ef8580504d70ec611133609be74bb6bc2e08862f Mon Sep 17 00:00:00 2001 From: Oscar Boykin Date: Thu, 22 Jun 2017 17:36:17 -1000 Subject: [PATCH 4/4] address review comments --- .../typed/memory_backend/MemoryBackend.scala | 112 ++++++++++++++---- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala index b4dfd44ec9..60d5da3054 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/memory_backend/MemoryBackend.scala @@ -45,6 +45,14 @@ final class MemoryMode private (srcs: Map[TypedSource[_], Iterable[_]], sinks: M def newWriter(): Writer = new MemoryWriter(this) + /** + * note that ??? in scala is the same as not implemented + * + * These methods are not needed for use with the Execution API, and indeed + * don't make sense outside of cascading, but backwards compatibility + * currently requires them on Mode. Ideally we will find another solution + * to this in the future + */ def openForRead(config: Config, tap: Tap[_, _, _]): TupleEntryIterator = ??? def fileExists(filename: String): Boolean = ??? def newFlowConnector(props: Config): FlowConnector = ??? @@ -77,11 +85,41 @@ object MemoryPlanner { sealed trait Op[+O] { def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[_ <: O]] - def flatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = - Op.Map(this, fn) + def concatMap[O1](fn: O => TraversableOnce[O1]): Op[O1] = + transform { in: IndexedSeq[O] => + val res = ArrayBuffer[O1]() + val it = in.iterator + while(it.hasNext) { + val i = it.next + fn(i).foreach(res += _) + } + res + } + + def map[O1](fn: O => O1): Op[O1] = + transform { in: IndexedSeq[O] => + val res = new ArrayBuffer[O1](in.size) + val it = in.iterator + while(it.hasNext) { + res += fn(it.next) + } + res + } + + def filter(fn: O => Boolean): Op[O] = + transform { in: IndexedSeq[O] => + // we can't increase in size, just assume the worst + val res = new ArrayBuffer[O](in.size) + val it = in.iterator + while(it.hasNext) { + val o = it.next + if (fn(o)) res += o + } + res + } - def mapAll[O1 >: O, O2](fn: IndexedSeq[O1] => ArrayBuffer[O2]): Op[O2] = - Op.MapAll[O1, O2](this, fn) + def transform[O1 >: O, O2](fn: IndexedSeq[O1] => ArrayBuffer[O2]): Op[O2] = + Op.Transform[O1, O2](this, fn) } object Op { def source[I](i: Iterable[I]): Op[I] = Source(_ => Future.successful(i)) @@ -116,7 +154,6 @@ object MemoryPlanner { case class Concat[O](left: Op[O], right: Op[O]) extends Op[O] { def result(implicit cec: ConcurrentExecutionContext) = { - // start both futures in parallel val f1 = left.result val f2 = right.result f1.zip(f2).map { case (l, r) => ArrayBuffer.concat(l, r) } @@ -136,7 +173,15 @@ object MemoryPlanner { } } - case class MapAll[I, O](input: Op[I], fn: IndexedSeq[I] => ArrayBuffer[O]) extends Op[O] { + case class OnComplete[O](of: Op[O], fn: () => Unit) extends Op[O] { + def result(implicit cec: ConcurrentExecutionContext) = { + val res = of.result + res.onComplete(_ => fn()) + res + } + } + + case class Transform[I, O](input: Op[I], fn: IndexedSeq[I] => ArrayBuffer[O]) extends Op[O] { def result(implicit cec: ConcurrentExecutionContext) = input.result.map(fn) } @@ -149,9 +194,9 @@ object MemoryPlanner { def result(implicit cec: ConcurrentExecutionContext): Future[ArrayBuffer[(K, V2)]] = input.result.map { kvs => - val m = MMap[K, ArrayList[V1]]() + val valuesByKey = MMap[K, ArrayList[V1]]() def add(kv: (K, V1)): Unit = { - val vs = m.getOrElseUpdate(kv._1, new ArrayList[V1]()) + val vs = valuesByKey.getOrElseUpdate(kv._1, new ArrayList[V1]()) vs.add(kv._2) } kvs.foreach(add) @@ -161,7 +206,7 @@ object MemoryPlanner { * the keys into as many groups as there are CPUs and process that way */ val res = ArrayBuffer[(K, V2)]() - m.foreach { case (k, vs) => + valuesByKey.foreach { case (k, vs) => ord.foreach(Collections.sort[V1](vs, _)) val v2iter = fn(k, vs.iterator.asScala) while(v2iter.hasNext) { @@ -214,6 +259,14 @@ object MemoryPlanner { * to inputs via the MemoryMode */ case class SourceT[T](ident: String) extends TypedSource[T] { + /** + * note that ??? in scala is the same as not implemented + * + * These methods are not needed for use with the Execution API, and indeed + * don't make sense outside of cascading, but backwards compatibility + * currently requires them on TypedSource. Ideally we will find another solution + * to this in the future + */ def converter[U >: T] = ??? def read(implicit flowDef: FlowDef, mode: Mode): Pipe = ??? } @@ -223,6 +276,14 @@ case class SourceT[T](ident: String) extends TypedSource[T] { * to outputs via the MemoryMode */ case class SinkT[T](indent: String) extends TypedSink[T] { + /** + * note that ??? in scala is the same as not implemented + * + * These methods are not needed for use with the Execution API, and indeed + * don't make sense outside of cascading, but backwards compatibility + * currently requires them on TypedSink. Ideally we will find another solution + * to this in the future + */ def setter[U <: T] = ??? def writeFrom(pipe: Pipe)(implicit flowDef: FlowDef, mode: Mode): Pipe = ??? } @@ -247,7 +308,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer { case CrossValue(left, EmptyValue) => (m, Op.empty) case CrossValue(left, LiteralValue(v)) => val (m1, op) = plan(m, left) - (m1, op.flatMap { a => Iterator.single((a, v)) }) + (m1, op.concatMap { a => Iterator.single((a, v)) }) case CrossValue(left, ComputedValue(right)) => plan(m, CrossPipe(left, right)) case DebugPipe(p) => @@ -263,7 +324,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer { def go[K, V](node: FilterKeys[K, V]): (Memo, Op[(K, V)]) = { val FilterKeys(pipe, fn) = node val (m1, op) = plan(m, pipe) - (m1, op.flatMap { case (k, v) => if (fn(k)) { (k, v) :: Nil } else Nil }) + (m1, op.concatMap { case (k, v) => if (fn(k)) { (k, v) :: Nil } else Nil }) } go(fk) @@ -271,23 +332,22 @@ class MemoryWriter(mem: MemoryMode) extends Writer { def go[T](f: Filter[T]): (Memo, Op[T]) = { val Filter(p, fn) = f val (m1, op) = plan(m, f) - (m1, op.flatMap { t => if (fn(t)) t :: Nil else Nil }) + (m1, op.filter(fn)) } go(f) case f@FlatMapValues(_, _) => def go[K, V, U](node: FlatMapValues[K, V, U]) = { - // don't capture node, which is a TypedPipe, which we avoid serializing val fn = node.fn val (m1, op) = plan(m, node.input) - (m1, op.flatMap { case (k, v) => fn(v).map((k, _)) }) + (m1, op.concatMap { case (k, v) => fn(v).map((k, _)) }) } go(f) case FlatMapped(prev, fn) => val (m1, op) = plan(m, prev) - (m1, op.flatMap(fn)) + (m1, op.concatMap(fn)) case ForceToDisk(pipe) => val (m1, op) = plan(m, pipe) @@ -302,17 +362,16 @@ class MemoryWriter(mem: MemoryMode) extends Writer { case f@MapValues(_, _) => def go[K, V, U](node: MapValues[K, V, U]) = { - // don't capture node, which is a TypedPipe, which we avoid serializing val mvfn = node.fn val (m1, op) = plan(m, node.input) - (m1, op.flatMap { case (k, v) => Iterator.single((k, mvfn(v))) }) + (m1, op.map { case (k, v) => (k, mvfn(v)) }) } go(f) case Mapped(input, fn) => val (m1, op) = plan(m, input) - (m1, op.flatMap { t => fn(t) :: Nil }) + (m1, op.map(fn)) case MergedTypedPipe(left, right) => val (m1, op1) = plan(m, left) @@ -323,7 +382,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer { (m, Op.Source({ cec => mem.readSource(src) match { case Some(iter) => Future.successful(iter) - case None => Future.failed(new Exception(s"Source: $src not wired")) + case None => Future.failed(new Exception(s"Source: $src not wired. Please provide an input with MemoryMode.addSource")) } })) @@ -332,7 +391,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer { val SumByLocalKeys(p, sg) = sblk val (m1, op) = plan(m, p) - (m1, op.mapAll[(K, V), (K, V)] { kvs => + (m1, op.transform[(K, V), (K, V)] { kvs => val map = collection.mutable.Map.empty[K, V] val iter = kvs.iterator while(iter.hasNext) { @@ -357,7 +416,9 @@ class MemoryWriter(mem: MemoryMode) extends Writer { case WithDescriptionTypedPipe(pipe, description, dedup) => plan(m, pipe) - case WithOnComplete(pipe, fn) => ??? + case WithOnComplete(pipe, fn) => + val (m1, op) = plan(m, pipe) + (m1, Op.OnComplete(op, fn)) case hcg@HashCoGroup(_, _, _) => def go[K, V1, V2, R](hcg: HashCoGroup[K, V1, V2, R]) = { @@ -383,6 +444,9 @@ class MemoryWriter(mem: MemoryMode) extends Writer { * we need to expand the Op type to deal * with multi-way or we need to keep * the decomposed series of joins + * + * TODO + * implement cogroups on the memory platform */ ??? } @@ -414,7 +478,7 @@ class MemoryWriter(mem: MemoryMode) extends Writer { def go[K, U, V](imr: IteratorMappedReduce[K, U, V]) = { val IteratorMappedReduce(_, pipe, fn, _, _) = imr val (m1, op) = plan(m, pipe) - (m1, op.mapAll[(K, U), (K, V)] { kvs => + (m1, op.transform[(K, U), (K, V)] { kvs => val m = kvs.groupBy(_._1).iterator val res = ArrayBuffer[(K, V)]() m.foreach { case (k, kus) => @@ -429,13 +493,13 @@ class MemoryWriter(mem: MemoryMode) extends Writer { go(imr) } - case class State( + private[this] case class State( id: Long, memo: MemoryPlanner.Memo, forced: Map[TypedPipe[_], Future[TypedPipe[_]]] ) - val state = new AtomicBox[State](State(0, MemoryPlanner.Memo.empty, Map.empty)) + private[this] val state = new AtomicBox[State](State(0, MemoryPlanner.Memo.empty, Map.empty)) /** * do a batch of writes, possibly optimizing, and return a new unique