Skip to content

Commit

Permalink
First draft of pure-scalding memory backend (#1697)
Browse files Browse the repository at this point in the history
* First draft of memory backend

* add hashJoin support

* make futures run in parallel

* address review comments
  • Loading branch information
johnynek authored Jun 23, 2017
1 parent 7615dbf commit 026ba62
Show file tree
Hide file tree
Showing 4 changed files with 663 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ sealed trait Execution[+T] extends java.io.Serializable { self: Product =>
* Seriously: pro-style is for this to be called only once in a program.
*/
final def run(conf: Config, mode: Mode)(implicit cec: ConcurrentExecutionContext): Future[T] = {
val writer: Execution.Writer = new AsyncFlowDefRunner
val writer: Execution.Writer = mode.newWriter()
val ec = new EvalCache(writer)
val confWithId = conf.setScaldingExecutionId(UUID.randomUUID.toString)
// get on Trampoline
Expand Down
34 changes: 19 additions & 15 deletions scalding-core/src/main/scala/com/twitter/scalding/Mode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,23 @@ limitations under the License.
*/
package com.twitter.scalding

import cascading.flow.local.{ LocalFlowConnector, LocalFlowProcess }
import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow }
import cascading.property.AppProps
import cascading.tap.Tap
import cascading.tuple.{ Tuple, TupleEntryIterator }
import com.twitter.scalding.typed.cascading_backend.AsyncFlowDefRunner
import java.io.File
import java.util.{ UUID, Properties }

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.hadoop.mapred.JobConf

import cascading.flow.{ FlowProcess, FlowConnector, FlowDef, Flow }
import cascading.flow.local.LocalFlowConnector
import cascading.flow.local.LocalFlowProcess
import cascading.property.AppProps
import cascading.tap.Tap
import cascading.tuple.Tuple
import cascading.tuple.TupleEntryIterator

import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.Buffer
import scala.collection.mutable.{ Map => MMap }
import scala.collection.mutable.{ Set => MSet }
import scala.collection.mutable.{ Buffer, Map => MMap, Set => MSet }
import scala.util.{ Failure, Success }

import org.slf4j.LoggerFactory

case class ModeException(message: String) extends RuntimeException(message)

case class ModeLoadException(message: String, origin: ClassNotFoundException) extends RuntimeException(origin)
Expand Down Expand Up @@ -105,6 +98,11 @@ object Mode {
}

trait Mode extends java.io.Serializable {

/**
* Make the Execution.Writer for this platform
*/
def newWriter(): Execution.Writer
/*
* Using a new FlowProcess, which is only suitable for reading outside
* of a map/reduce job, open a given tap and return the TupleEntryIterator
Expand Down Expand Up @@ -154,6 +152,9 @@ trait HadoopMode extends Mode {
}
}

def newWriter(): Execution.Writer =
new AsyncFlowDefRunner

// TODO unlike newFlowConnector, this does not look at the Job.config
override def openForRead(config: Config, tap: Tap[_, _, _]) = {
val htap = tap.asInstanceOf[Tap[JobConf, _, _]]
Expand Down Expand Up @@ -183,6 +184,9 @@ trait CascadingLocal extends Mode {
override def newFlowConnector(conf: Config) =
new LocalFlowConnector(conf.toMap.toMap[AnyRef, AnyRef].asJava) // linter:ignore

def newWriter(): Execution.Writer =
new AsyncFlowDefRunner

override def openForRead(config: Config, tap: Tap[_, _, _]) = {
val ltap = tap.asInstanceOf[Tap[Properties, _, _]]
val props = new java.util.Properties
Expand Down
Loading

0 comments on commit 026ba62

Please sign in to comment.