Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #363 from twitter/feature/standardize_getOrElse_sc…
Browse files Browse the repository at this point in the history
…alding

Feature/standardize get or else scalding
  • Loading branch information
johnynek committed Nov 18, 2013
2 parents c752668 + 7662bb5 commit fa4db0b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class OptionsTest extends Specification {
val summers = dependants.nodes.collect { case s: Summer[_, _, _] => s }

summers.size must be_==(1)

val names = dependants.namesOf(summers.head).map(_.id)
Scalding
.getCommutativity(dependants,
.getCommutativity(names,
opts,
summers.head.asInstanceOf[Summer[Scalding,_,_]]) must be_==(Commutative)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,24 @@ object Scalding {
def emptyFlowProducer[T]: FlowProducer[TypedPipe[T]] =
Reader({ implicit fdm: (FlowDef, Mode) => TypedPipe.empty })

def getCommutativity(deps: Dependants[Scalding],
def getCommutativity(names: List[String],
options: Map[String, Options],
s: Summer[Scalding, _, _]): Commutativity = {

val storeNames = deps.namesOf(s).map(_.id)
val isCommutative = getFirst[MonoidIsCommutative](options, storeNames)
isCommutative.map(_.commutativity) match {
case Some(Commutative) =>
logger.info("Store: {} is commutative", storeNames)
Commutative
case Some(NonCommutative) =>
logger.info("Store: {} is non-commutative (less efficient than commutative)", storeNames)
NonCommutative
case None =>
logger.warn("Store: {} has no commutativity setting. Assuming {}",
storeNames, MonoidIsCommutative.default)
MonoidIsCommutative.default.commutativity
val commutativity = getOrElse(options, names, s, {
logger.warn("Store: {} has no commutativity setting. Assuming {}",
names, MonoidIsCommutative.default)
MonoidIsCommutative.default
}).commutativity

commutativity match {
case Commutative =>
logger.info("Store: {} is commutative", names)
case NonCommutative =>
logger.info("Store: {} is non-commutative (less efficient than commutative)", names)
}

commutativity
}

def intersect(dr1: DateRange, dr2: DateRange): Option[DateRange] =
Expand Down Expand Up @@ -230,38 +230,38 @@ object Scalding {
}


private def getOrElse[T: Manifest](options: Map[String, Options], idOpt: Option[String], default: T): T =
(for {
id <- idOpt.map(List(_, "DEFAULT")).getOrElse(List("DEFAULT"))
private def getOrElse[T <: AnyRef : Manifest](options: Map[String, Options], names: List[String], producer: Producer[Scalding, _], default: => T): T = {
val maybePair = (for {
id <- names :+ "DEFAULT"
innerOpts <- options.get(id)
option <- innerOpts.get[T]
} yield option).headOption.getOrElse(default)

@annotation.tailrec
private def getFirst[T: Manifest](options: Map[String, Options], names: List[String]): Option[T] =
names match {
case Nil => None
case h::tail => options.get(h).flatMap { _.get[T] } match {
case s@Some(res) => s
case None => getFirst[T](options, tail)
}
} yield (id, option)).headOption

maybePair match {
case None =>
logger.debug("Producer ({}): Using default setting {}", producer.getClass.getName, default)
default
case Some((id, opt)) =>
logger.info("Producer ({}) Using {} found via NamedProducer \"{}\"", Array[AnyRef](producer.getClass.getName, opt, id))
opt
}
}


/** Return a PipeFactory that can cover as much as possible of the time range requested,
* but the output state gives the actual, non-empty, interval that can be produced
*/
private def buildFlow[T](options: Map[String, Options],
producer: Producer[Scalding, T],
id: Option[String],
fanOuts: Set[Producer[Scalding, _]],
dependants: Dependants[Scalding],
built: Map[Producer[Scalding, _], PipeFactory[_]]): (PipeFactory[T], Map[Producer[Scalding, _], PipeFactory[_]]) = {
val names = dependants.namesOf(producer).map(_.id)

def recurse[U](p: Producer[Scalding, U],
id: Option[String] = id,
built: Map[Producer[Scalding, _], PipeFactory[_]] = built):
(PipeFactory[U], Map[Producer[Scalding, _], PipeFactory[_]]) =
buildFlow(options, p, id, fanOuts, dependants, built)
buildFlow(options, p, fanOuts, dependants, built)

/**
* The scalding Typed-API does not deal with TypedPipes with fanout,
Expand All @@ -287,7 +287,7 @@ object Scalding {
case None =>
val (pf, m) = producer match {
case Source(src) => {
val shards = getOrElse(options, id, FlatMapShards.default).count
val shards = getOrElse(options, names, producer, FlatMapShards.default).count
val srcPf = if (shards <= 1)
src
else
Expand All @@ -302,7 +302,7 @@ object Scalding {
(srcPf, built)
}
case IdentityKeyedProducer(producer) => recurse(producer)
case NamedProducer(producer, newId) => recurse(producer, id = Some(newId))
case NamedProducer(producer, newId) => recurse(producer)
case summer@Summer(producer, store, monoid) =>
/*
* The store may already have materialized values, so we don't need the whole
Expand All @@ -311,8 +311,8 @@ object Scalding {
* the time ranges that it needs.
*/
val (in, m) = recurse(producer)
val commutativity = getCommutativity(dependants, options, summer)
val storeReducers = getOrElse(options, id, Reducers.default).count
val commutativity = getCommutativity(names, options, summer)
val storeReducers = getOrElse(options, names, producer, Reducers.default).count
logger.info("Store {} using {} reducers (-1 means unset)", store, storeReducers)
(store.merge(in, monoid, commutativity, storeReducers), m)
case LeftJoinedProducer(left, service) =>
Expand Down Expand Up @@ -350,7 +350,7 @@ object Scalding {
val s = sAny.asInstanceOf[Summer[Scalding, Any, Any]]
if(downStream.forall(d => Producer.isNoOp(d) || d == s)) {
//only downstream are no-ops and the summer GO!
getCommutativity(dependants, options, s) match {
getCommutativity(names, options, s) match {
case Commutative =>
logger.info("enabling flatMapKeys mapside caching")
s.store.partialMerge(fmp, s.monoid, Commutative)
Expand Down Expand Up @@ -424,7 +424,7 @@ object Scalding {
val fanOutSet =
dep.nodes
.filter(dep.fanOut(_).exists(_ > 1)).toSet
buildFlow(options, prod, None, fanOutSet, dep, Map.empty)._1
buildFlow(options, prod, fanOutSet, dep, Map.empty)._1
}

def plan[T](options: Map[String, Options], prod: TailProducer[Scalding, T]): PipeFactory[T] = {
Expand Down

0 comments on commit fa4db0b

Please sign in to comment.