Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #439 from twitter/feature/long_to_timestamp_offline
Browse files Browse the repository at this point in the history
Feature/long to timestamp offline
  • Loading branch information
johnynek committed Feb 7, 2014
2 parents e67e648 + 97a7303 commit dcc7eb1
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 100 deletions.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ object SummingbirdBuild extends Build {
lazy val summingbirdBatch = module("batch").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "scalding-date" % scaldingVersion
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object BatchID {
implicit val equiv: Equiv[BatchID] = Equiv.by(_.id)

def apply(long: Long) = new BatchID(long)
def apply(ts: Timestamp) = new BatchID(ts.milliSinceEpoch)

// Enables BatchID(someBatchID.toString) roundtripping
def apply(str: String) = new BatchID(str.split("\\.")(1).toLong)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package com.twitter.summingbird.batch
import com.twitter.algebird.Monoid
import com.twitter.bijection.Bijection
import java.util.Date

import com.twitter.scalding.RichDate

case class Timestamp(milliSinceEpoch: Long) extends Ordered[Timestamp] {
def compare(that: Timestamp) = milliSinceEpoch.compare(that.milliSinceEpoch)
def prev = copy(milliSinceEpoch = milliSinceEpoch - 1)
def next = copy(milliSinceEpoch = milliSinceEpoch + 1)
def toDate = new Date(milliSinceEpoch)
def toRichDate = new RichDate(milliSinceEpoch)
def -(other: Timestamp) = Timestamp(milliSinceEpoch - other.milliSinceEpoch)
def +(other: Timestamp) = Timestamp(milliSinceEpoch + other.milliSinceEpoch)
def incrementMillis(millis: Long) = Timestamp(milliSinceEpoch + millis)
def incrementSeconds(seconds: Long) = Timestamp(milliSinceEpoch + (seconds*1000L))
def incrementMinutes(minutes: Long) = Timestamp(milliSinceEpoch + (minutes*1000*60))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.summingbird.sink

import cascading.flow.FlowDef

import com.twitter.summingbird.batch.{ BatchID, Batcher }
import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp}
import com.twitter.scalding.{ Mode, TypedPipe }
import com.twitter.summingbird.scalding.ScaldingEnv
import com.twitter.summingbird.scalding.batch.BatchedSink
Expand Down Expand Up @@ -46,7 +46,7 @@ class BatchedSinkFromOffline[T](override val batcher: Batcher, offline: OfflineS
/** Instances may choose to write out materialized streams
* by implementing this. This is what readStream returns.
*/
def writeStream(batchID: BatchID, stream: TypedPipe[(Long, T)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
def writeStream(batchID: BatchID, stream: TypedPipe[(Timestamp, T)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
// strip the time
offline.write(batchID, stream.values)(flowDef, mode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.summingbird.scalding

import com.twitter.summingbird.batch.Timestamp
import com.twitter.scalding.TypedPipe

class PipeFactoryOps[+T](pipeFactory: PipeFactory[T]) {
Expand All @@ -28,7 +29,7 @@ class PipeFactoryOps[+T](pipeFactory: PipeFactory[T]) {
def mapElements[U](fn: (T => U)): PipeFactory[U] =
flatMapElements({tup => List(fn(tup))})

def mapPipe[U](fn: (TypedPipe[(Time, T)] => TypedPipe[(Time, U)])): PipeFactory[U] = {
def mapPipe[U](fn: (TypedPipe[(Timestamp, T)] => TypedPipe[(Timestamp, U)])): PipeFactory[U] = {
pipeFactory.map { flowProducer =>
flowProducer.map(fn(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ object Scalding {
new Scalding(jobName, options, identity, List())
}

implicit val dateRangeInjection: Injection[DateRange, Interval[Time]] = Injection.build {
implicit val dateRangeInjection: Injection[DateRange, Interval[Timestamp]] = Injection.build {
(dr: DateRange) => {
val DateRange(l, u) = dr
Interval.leftClosedRightOpen(l.timestamp, u.timestamp + 1L)
Interval.leftClosedRightOpen(Timestamp(l.timestamp), Timestamp(u.timestamp + 1L))
}
} {
case Intersection(lb, ub) =>
val low = lb match {
case InclusiveLower(l) => l
case ExclusiveLower(l) => l+1L
case ExclusiveLower(l) => l.next
}
val high = ub match {
case InclusiveUpper(u) => u
case ExclusiveUpper(u) => u-1L
case ExclusiveUpper(u) => u.prev
}
Success(DateRange(RichDate(low), RichDate(high)))
Success(DateRange(low.toRichDate, high.toRichDate))
case _ => Failure(new RuntimeException("Unbounded interval!"))
}

Expand Down Expand Up @@ -95,7 +95,7 @@ object Scalding {
}

def intersect(dr1: DateRange, dr2: DateRange): Option[DateRange] =
(dr1.as[Interval[Time]] && (dr2.as[Interval[Time]])).as[Option[DateRange]]
(dr1.as[Interval[Timestamp]] && (dr2.as[Interval[Timestamp]])).as[Option[DateRange]]

/** Given a constructor function, computes the maximum available range
* of time or gives an error.
Expand Down Expand Up @@ -151,19 +151,19 @@ object Scalding {
*/
def pipeFactory[T](factory: (DateRange) => Mappable[T])
(implicit timeOf: TimeExtractor[T]): PipeFactory[T] =
StateWithError[(Interval[Time], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Time], Mode)) => {
StateWithError[(Interval[Timestamp], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Timestamp], Mode)) => {
val (timeSpan, mode) = timeMode

toDateRange(timeSpan).right.flatMap { dr =>
minify(mode, dr)(factory)
.right.map { newDr =>
val newIntr = newDr.as[Interval[Time]]
val newIntr = newDr.as[Interval[Timestamp]]
val mappable = factory(newDr)
((newIntr, mode), Reader { (fdM: (FlowDef, Mode)) =>
TypedPipe.from(mappable)(fdM._1, fdM._2)
.flatMap { t =>
val time = timeOf(t)
val time = Timestamp(timeOf(t))
if(newIntr(time)) Some((time, t)) else None
}
})
Expand All @@ -174,8 +174,8 @@ object Scalding {

def pipeFactoryExact[T](factory: (DateRange) => Mappable[T])
(implicit timeOf: TimeExtractor[T]): PipeFactory[T] =
StateWithError[(Interval[Time], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Time], Mode)) => {
StateWithError[(Interval[Timestamp], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Timestamp], Mode)) => {
val (timeSpan, mode) = timeMode

toDateRange(timeSpan).right.map { dr =>
Expand All @@ -184,7 +184,7 @@ object Scalding {
mappable.validateTaps(fdM._2) //This can throw, but that is what this caller wants
TypedPipe.from(mappable)(fdM._1, fdM._2)
.flatMap { t =>
val time = timeOf(t)
val time = Timestamp(timeOf(t))
if(timeSpan(time)) Some((time, t)) else None
}
})
Expand All @@ -196,12 +196,12 @@ object Scalding {
factory: (DateRange) => Mappable[T]): Producer[Scalding, T] =
Producer.source[Scalding, T](pipeFactory(factory))

def toDateRange(timeSpan: Interval[Time]): Try[DateRange] =
def toDateRange(timeSpan: Interval[Timestamp]): Try[DateRange] =
timeSpan.as[Option[DateRange]]
.map { Right(_) }
.getOrElse(Left(List("only finite time ranges are supported by scalding: " + timeSpan.toString)))

def limitTimes[T](range: Interval[Time], in: FlowToPipe[T]): FlowToPipe[T] =
def limitTimes[T](range: Interval[Timestamp], in: FlowToPipe[T]): FlowToPipe[T] =
in.map { pipe => pipe.filter { case (time, _) => range(time) } }

def merge[T](left: FlowToPipe[T], right: FlowToPipe[T]): FlowToPipe[T] =
Expand Down Expand Up @@ -438,8 +438,8 @@ object Scalding {
*/
def toPipe[T](dr: DateRange,
prod: Producer[Scalding, T],
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[(DateRange, TypedPipe[(Long, T)])] = {
val ts = dr.as[Interval[Time]]
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[(DateRange, TypedPipe[(Timestamp, T)])] = {
val ts = dr.as[Interval[Timestamp]]
val pf = planProducer(opts, prod)
toPipe(ts, fd, mode, pf).right.map { case (ts, pipe) =>
(ts.as[Option[DateRange]].get, pipe)
Expand All @@ -451,23 +451,23 @@ object Scalding {
*/
def toPipeExact[T](dr: DateRange,
prod: Producer[Scalding, T],
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[TypedPipe[(Long, T)]] = {
val ts = dr.as[Interval[Time]]
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[TypedPipe[(Timestamp, T)]] = {
val ts = dr.as[Interval[Timestamp]]
val pf = planProducer(opts, prod)
toPipeExact(ts, fd, mode, pf)
}

def toPipe[T](timeSpan: Interval[Time],
def toPipe[T](timeSpan: Interval[Timestamp],
flowDef: FlowDef,
mode: Mode,
pf: PipeFactory[T]): Try[(Interval[Time], TimedPipe[T])] = {
pf: PipeFactory[T]): Try[(Interval[Timestamp], TimedPipe[T])] = {
logger.info("Planning on interval: {}", timeSpan.as[Option[DateRange]])
pf((timeSpan, mode))
.right
.map { case (((ts, m), flowDefMutator)) => (ts, flowDefMutator((flowDef, m))) }
}

def toPipeExact[T](timeSpan: Interval[Time],
def toPipeExact[T](timeSpan: Interval[Timestamp],
flowDef: FlowDef,
mode: Mode,
pf: PipeFactory[T]): Try[TimedPipe[T]] = {
Expand Down Expand Up @@ -562,7 +562,7 @@ class Scalding(
c.set("io.serializations", ioSerializations.map { _.getName }.mkString(","))

// This is a side-effect-free computation that is called by run
def toFlow(timeSpan: Interval[Time], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Time], Flow[_])] = {
def toFlow(timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Flow[_])] = {
val flowDef = new FlowDef
flowDef.setName(jobName)
Scalding.toPipe(timeSpan, flowDef, mode, pf)
Expand Down Expand Up @@ -593,15 +593,12 @@ class Scalding(
case _ =>
}



val prepareState = state.begin
val timeSpan = prepareState.requested.mapNonDecreasing(_.milliSinceEpoch)
toFlow(timeSpan, mode, pf) match {
toFlow(prepareState.requested, mode, pf) match {
case Left(errs) =>
prepareState.fail(FlowPlanException(errs))
case Right((ts,flow)) =>
prepareState.willAccept(ts.mapNonDecreasing(Timestamp(_))) match {
prepareState.willAccept(ts) match {
case Right(runningState) =>
try {
options.get(jobName).foreach { jopt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,22 @@ import Conversion.asMethod
*/
private class BatchedOperations(batcher: Batcher) {

implicit val timeToBatchInterval = Bijection.build { bint: Interval[Time] =>
bint.mapNonDecreasing { Timestamp(_) } } { bint: Interval[Timestamp] =>
bint.mapNonDecreasing { _.milliSinceEpoch }
}

def coverIt[T](timeSpan: Interval[Time]): Iterable[BatchID] = {
def coverIt[T](timeSpan: Interval[Timestamp]): Iterable[BatchID] = {
val batchInterval = batcher.cover(timeSpan.as[Interval[Timestamp]])
BatchID.toIterable(batchInterval)
}

def batchToTime(bint: Interval[BatchID]): Interval[Time] =
bint.mapNonDecreasing { batcher.earliestTimeOf(_).milliSinceEpoch }
def batchToTimestamp(bint: Interval[BatchID]): Interval[Timestamp] =
bint.mapNonDecreasing { batcher.earliestTimeOf(_) }

def intersect(batches: Interval[BatchID], ts: Interval[Time]): Interval[Time] =
batchToTime(batches) && ts
def intersect(batches: Interval[BatchID], ts: Interval[Timestamp]): Interval[Timestamp] =
batchToTimestamp(batches) && ts

def intersect(batches: Iterable[BatchID], ts: Interval[Time]): Option[Interval[Time]] =
def intersect(batches: Iterable[BatchID], ts: Interval[Timestamp]): Option[Interval[Timestamp]] =
BatchID.toInterval(batches).map { intersect(_, ts) }

def readBatched[T](inBatches: Interval[BatchID], mode: Mode, in: PipeFactory[T]): Try[(Interval[BatchID], FlowToPipe[T])] = {
val inTimes = batchToTime(inBatches)
val inTimes = batchToTimestamp(inBatches)
// Read the delta stream for the needed times
in((inTimes, mode))
.right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ trait BatchedService[K, V] extends Service[K, V] {
* You are guaranteed that all the service data needed
* to do the join is present.
*/
def lookup[W](incoming: TypedPipe[(Time, (K, W))],
servStream: TypedPipe[(Time, (K, Option[V]))]): TypedPipe[(Time, (K, (W, Option[V])))] = {
def lookup[W](incoming: TypedPipe[(Timestamp, (K, W))],
servStream: TypedPipe[(Timestamp, (K, Option[V]))]): TypedPipe[(Timestamp, (K, (W, Option[V])))] = {

def flatOpt[T](o: Option[Option[T]]): Option[T] = o.flatMap(identity)

Expand All @@ -56,13 +56,13 @@ trait BatchedService[K, V] extends Service[K, V] {
.map { case (t, (k, (w, optoptv))) => (t, (k, (w, flatOpt(optoptv)))) }
}

protected def batchedLookup[W](covers: Interval[Time],
protected def batchedLookup[W](covers: Interval[Timestamp],
getKeys: FlowToPipe[(K, W)],
last: (BatchID, FlowProducer[TypedPipe[(K, V)]]),
streams: Iterable[(BatchID, FlowToPipe[(K, Option[V])])]): FlowToPipe[(K, (W, Option[V]))] =
Reader[FlowInput, KeyValuePipe[K, (W, Option[V])]] { (flowMode: (FlowDef, Mode)) =>
val left = getKeys(flowMode)
val earliestInLast = batcher.earliestTimeOf(last._1).milliSinceEpoch
val earliestInLast = batcher.earliestTimeOf(last._1)
val liftedLast: KeyValuePipe[K, Option[V]] = last._2(flowMode)
.map { case (k, w) => (earliestInLast, (k, Some(w))) }
// TODO (https://github.com/twitter/summingbird/issues/91): we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait BatchedSink[T] extends Sink[T] {

// We need to write each of these.
iter.foreach { batch =>
val range = batcher.toInterval(batch).mapNonDecreasing { _.milliSinceEpoch }
val range = batcher.toInterval(batch)
writeStream(batch, inPipe.filter { case (time, _) =>
range(time)
})(flowMode._1, flowMode._2)
Expand Down Expand Up @@ -85,7 +85,7 @@ trait BatchedSink[T] extends Sink[T] {
.takeWhile { _._2.isDefined }
.collect { case (batch, Some(flow)) => (batch, flow) }

def mergeExistingAndBuilt(optBuilt: Option[(Interval[BatchID], FlowToPipe[T])]): Try[((Interval[Time], Mode), FlowToPipe[T])] = {
def mergeExistingAndBuilt(optBuilt: Option[(Interval[BatchID], FlowToPipe[T])]): Try[((Interval[Timestamp], Mode), FlowToPipe[T])] = {
val (aBatches, aFlows) = existing.unzip
val flows = aFlows ++ (optBuilt.map { _._2 })
val batches = aBatches ++ (optBuilt.map { pair => BatchID.toIterable(pair._1) }.getOrElse(Iterable.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
}
}

protected def sumByBatches[K1,V:Semigroup](ins: TypedPipe[(Long, (K1, V))],
protected def sumByBatches[K1,V:Semigroup](ins: TypedPipe[(Timestamp, (K1, V))],
capturedBatcher: Batcher,
commutativity: Commutativity): TypedPipe[((K1, BatchID), (Timestamp, V))] = {
implicit val timeValueSemigroup: Semigroup[(Timestamp, V)] =
IteratorSums.optimizedPairSemigroup[Timestamp, V](1000)

val inits = ins.map { case (t, (k, v)) =>
val ts = Timestamp(t)
val batch = capturedBatcher.batchOf(ts)
((k, batch), (ts, v))
val batch = capturedBatcher.batchOf(t)
((k, batch), (t, v))
}
(commutativity match {
case Commutative => Store.mapsideReduce(inits)
Expand All @@ -117,7 +116,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
case Commutative => delta.map { flow =>
flow.map { typedP =>
sumByBatches(typedP, capturedBatcher, Commutative)
.map { case ((k, _), (ts, v)) => (ts.milliSinceEpoch, (k, v)) }
.map { case ((k, _), (ts, v)) => (ts, (k, v)) }
}
}
case NonCommutative => delta
Expand Down Expand Up @@ -148,7 +147,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>

val capturedBatcher = batcher //avoid a closure on the whole store

def prepareDeltas(ins: TypedPipe[(Long, (K, V))]): TypedPipe[(K, (BatchID, (Timestamp, V)))] =
def prepareDeltas(ins: TypedPipe[(Timestamp, (K, V))]): TypedPipe[(K, (BatchID, (Timestamp, V)))] =
sumByBatches(ins, capturedBatcher, commutativity)
.map { case ((k, batch), (ts, v)) => (k, (batch, (ts, v))) }

Expand Down Expand Up @@ -195,11 +194,11 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>

// This builds the format we send to consumer nodes
def toOutputFormat(res: TypedPipe[(K, (BatchID, (Option[Option[(Timestamp, V)]], Option[(Timestamp, V)])))]):
TypedPipe[(Long, (K, (Option[V], V)))] =
TypedPipe[(Timestamp, (K, (Option[V], V)))] =
res.flatMap { case (k, (batchid, (optopt, opt))) =>
opt.map { case (ts, v) =>
val prev = flatOpt(optopt).map(_._2)
(ts.milliSinceEpoch, (k, (prev, v)))
(ts, (k, (prev, v)))
}
}

Expand All @@ -214,6 +213,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
} yield toOutputFormat(merged)
}


/** instances of this trait MAY NOT change the logic here. This always follows the rule
* that we look for existing data (avoiding reading deltas in that case), then we fall
* back to the last checkpointed output by calling readLast. In that case, we compute the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ import cascading.flow.FlowDef
import com.twitter.algebird.monad.{Reader, StateWithError}
import com.twitter.algebird.Interval
import com.twitter.summingbird
import com.twitter.summingbird.batch.Timestamp
import org.apache.hadoop.io.Writable

package object scalding {
/** We represent time as Long Millis */
type Time = Long
/** How we represent the streams in scalding */
type TimedPipe[+T] = TypedPipe[(Time, T)]
type TimedPipe[+T] = TypedPipe[(Timestamp, T)]
type KeyValuePipe[+K, +V] = TimedPipe[(K, V)]
/** The Platform recursively passes this input around to describe a
* step forward: requested input time span, and scalding Mode
*/
type FactoryInput = (Interval[Time], Mode)
type FactoryInput = (Interval[Timestamp], Mode)
/** When it is time to run build the final flow,
* this is what scalding needs. It is modified in the Reader[FlowInput, T]
*/
Expand Down
Loading

0 comments on commit dcc7eb1

Please sign in to comment.