Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/long to timestamp offline #439

Merged
merged 3 commits into from
Feb 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ object SummingbirdBuild extends Build {
lazy val summingbirdBatch = module("batch").settings(
libraryDependencies ++= Seq(
"com.twitter" %% "algebird-core" % algebirdVersion,
"com.twitter" %% "bijection-core" % bijectionVersion
"com.twitter" %% "bijection-core" % bijectionVersion,
"com.twitter" %% "scalding-date" % scaldingVersion
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object BatchID {
implicit val equiv: Equiv[BatchID] = Equiv.by(_.id)

def apply(long: Long) = new BatchID(long)
def apply(ts: Timestamp) = new BatchID(ts.milliSinceEpoch)

// Enables BatchID(someBatchID.toString) roundtripping
def apply(str: String) = new BatchID(str.split("\\.")(1).toLong)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package com.twitter.summingbird.batch
import com.twitter.algebird.Monoid
import com.twitter.bijection.Bijection
import java.util.Date

import com.twitter.scalding.RichDate

case class Timestamp(milliSinceEpoch: Long) extends Ordered[Timestamp] {
def compare(that: Timestamp) = milliSinceEpoch.compare(that.milliSinceEpoch)
def prev = copy(milliSinceEpoch = milliSinceEpoch - 1)
def next = copy(milliSinceEpoch = milliSinceEpoch + 1)
def toDate = new Date(milliSinceEpoch)
def toRichDate = new RichDate(milliSinceEpoch)
def -(other: Timestamp) = Timestamp(milliSinceEpoch - other.milliSinceEpoch)
def +(other: Timestamp) = Timestamp(milliSinceEpoch + other.milliSinceEpoch)
def incrementMillis(millis: Long) = Timestamp(milliSinceEpoch + millis)
def incrementSeconds(seconds: Long) = Timestamp(milliSinceEpoch + (seconds*1000L))
def incrementMinutes(minutes: Long) = Timestamp(milliSinceEpoch + (minutes*1000*60))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.twitter.summingbird.sink

import cascading.flow.FlowDef

import com.twitter.summingbird.batch.{ BatchID, Batcher }
import com.twitter.summingbird.batch.{ BatchID, Batcher, Timestamp}
import com.twitter.scalding.{ Mode, TypedPipe }
import com.twitter.summingbird.scalding.ScaldingEnv
import com.twitter.summingbird.scalding.batch.BatchedSink
Expand Down Expand Up @@ -46,7 +46,7 @@ class BatchedSinkFromOffline[T](override val batcher: Batcher, offline: OfflineS
/** Instances may choose to write out materialized streams
* by implementing this. This is what readStream returns.
*/
def writeStream(batchID: BatchID, stream: TypedPipe[(Long, T)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
def writeStream(batchID: BatchID, stream: TypedPipe[(Timestamp, T)])(implicit flowDef: FlowDef, mode: Mode): Unit = {
// strip the time
offline.write(batchID, stream.values)(flowDef, mode)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.summingbird.scalding

import com.twitter.summingbird.batch.Timestamp
import com.twitter.scalding.TypedPipe

class PipeFactoryOps[+T](pipeFactory: PipeFactory[T]) {
Expand All @@ -28,7 +29,7 @@ class PipeFactoryOps[+T](pipeFactory: PipeFactory[T]) {
def mapElements[U](fn: (T => U)): PipeFactory[U] =
flatMapElements({tup => List(fn(tup))})

def mapPipe[U](fn: (TypedPipe[(Time, T)] => TypedPipe[(Time, U)])): PipeFactory[U] = {
def mapPipe[U](fn: (TypedPipe[(Timestamp, T)] => TypedPipe[(Timestamp, U)])): PipeFactory[U] = {
pipeFactory.map { flowProducer =>
flowProducer.map(fn(_))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,22 @@ object Scalding {
new Scalding(jobName, options, identity, List())
}

implicit val dateRangeInjection: Injection[DateRange, Interval[Time]] = Injection.build {
implicit val dateRangeInjection: Injection[DateRange, Interval[Timestamp]] = Injection.build {
(dr: DateRange) => {
val DateRange(l, u) = dr
Interval.leftClosedRightOpen(l.timestamp, u.timestamp + 1L)
Interval.leftClosedRightOpen(Timestamp(l.timestamp), Timestamp(u.timestamp + 1L))
}
} {
case Intersection(lb, ub) =>
val low = lb match {
case InclusiveLower(l) => l
case ExclusiveLower(l) => l+1L
case ExclusiveLower(l) => l.next
}
val high = ub match {
case InclusiveUpper(u) => u
case ExclusiveUpper(u) => u-1L
case ExclusiveUpper(u) => u.prev
}
Success(DateRange(RichDate(low), RichDate(high)))
Success(DateRange(low.toRichDate, high.toRichDate))
case _ => Failure(new RuntimeException("Unbounded interval!"))
}

Expand Down Expand Up @@ -95,7 +95,7 @@ object Scalding {
}

def intersect(dr1: DateRange, dr2: DateRange): Option[DateRange] =
(dr1.as[Interval[Time]] && (dr2.as[Interval[Time]])).as[Option[DateRange]]
(dr1.as[Interval[Timestamp]] && (dr2.as[Interval[Timestamp]])).as[Option[DateRange]]

/** Given a constructor function, computes the maximum available range
* of time or gives an error.
Expand Down Expand Up @@ -151,19 +151,19 @@ object Scalding {
*/
def pipeFactory[T](factory: (DateRange) => Mappable[T])
(implicit timeOf: TimeExtractor[T]): PipeFactory[T] =
StateWithError[(Interval[Time], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Time], Mode)) => {
StateWithError[(Interval[Timestamp], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Timestamp], Mode)) => {
val (timeSpan, mode) = timeMode

toDateRange(timeSpan).right.flatMap { dr =>
minify(mode, dr)(factory)
.right.map { newDr =>
val newIntr = newDr.as[Interval[Time]]
val newIntr = newDr.as[Interval[Timestamp]]
val mappable = factory(newDr)
((newIntr, mode), Reader { (fdM: (FlowDef, Mode)) =>
TypedPipe.from(mappable)(fdM._1, fdM._2)
.flatMap { t =>
val time = timeOf(t)
val time = Timestamp(timeOf(t))
if(newIntr(time)) Some((time, t)) else None
}
})
Expand All @@ -174,8 +174,8 @@ object Scalding {

def pipeFactoryExact[T](factory: (DateRange) => Mappable[T])
(implicit timeOf: TimeExtractor[T]): PipeFactory[T] =
StateWithError[(Interval[Time], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Time], Mode)) => {
StateWithError[(Interval[Timestamp], Mode), List[FailureReason], FlowToPipe[T]]{
(timeMode: (Interval[Timestamp], Mode)) => {
val (timeSpan, mode) = timeMode

toDateRange(timeSpan).right.map { dr =>
Expand All @@ -184,7 +184,7 @@ object Scalding {
mappable.validateTaps(fdM._2) //This can throw, but that is what this caller wants
TypedPipe.from(mappable)(fdM._1, fdM._2)
.flatMap { t =>
val time = timeOf(t)
val time = Timestamp(timeOf(t))
if(timeSpan(time)) Some((time, t)) else None
}
})
Expand All @@ -196,12 +196,12 @@ object Scalding {
factory: (DateRange) => Mappable[T]): Producer[Scalding, T] =
Producer.source[Scalding, T](pipeFactory(factory))

def toDateRange(timeSpan: Interval[Time]): Try[DateRange] =
def toDateRange(timeSpan: Interval[Timestamp]): Try[DateRange] =
timeSpan.as[Option[DateRange]]
.map { Right(_) }
.getOrElse(Left(List("only finite time ranges are supported by scalding: " + timeSpan.toString)))

def limitTimes[T](range: Interval[Time], in: FlowToPipe[T]): FlowToPipe[T] =
def limitTimes[T](range: Interval[Timestamp], in: FlowToPipe[T]): FlowToPipe[T] =
in.map { pipe => pipe.filter { case (time, _) => range(time) } }

def merge[T](left: FlowToPipe[T], right: FlowToPipe[T]): FlowToPipe[T] =
Expand Down Expand Up @@ -438,8 +438,8 @@ object Scalding {
*/
def toPipe[T](dr: DateRange,
prod: Producer[Scalding, T],
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[(DateRange, TypedPipe[(Long, T)])] = {
val ts = dr.as[Interval[Time]]
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[(DateRange, TypedPipe[(Timestamp, T)])] = {
val ts = dr.as[Interval[Timestamp]]
val pf = planProducer(opts, prod)
toPipe(ts, fd, mode, pf).right.map { case (ts, pipe) =>
(ts.as[Option[DateRange]].get, pipe)
Expand All @@ -451,23 +451,23 @@ object Scalding {
*/
def toPipeExact[T](dr: DateRange,
prod: Producer[Scalding, T],
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[TypedPipe[(Long, T)]] = {
val ts = dr.as[Interval[Time]]
opts: Map[String, Options] = Map.empty)(implicit fd: FlowDef, mode: Mode): Try[TypedPipe[(Timestamp, T)]] = {
val ts = dr.as[Interval[Timestamp]]
val pf = planProducer(opts, prod)
toPipeExact(ts, fd, mode, pf)
}

def toPipe[T](timeSpan: Interval[Time],
def toPipe[T](timeSpan: Interval[Timestamp],
flowDef: FlowDef,
mode: Mode,
pf: PipeFactory[T]): Try[(Interval[Time], TimedPipe[T])] = {
pf: PipeFactory[T]): Try[(Interval[Timestamp], TimedPipe[T])] = {
logger.info("Planning on interval: {}", timeSpan.as[Option[DateRange]])
pf((timeSpan, mode))
.right
.map { case (((ts, m), flowDefMutator)) => (ts, flowDefMutator((flowDef, m))) }
}

def toPipeExact[T](timeSpan: Interval[Time],
def toPipeExact[T](timeSpan: Interval[Timestamp],
flowDef: FlowDef,
mode: Mode,
pf: PipeFactory[T]): Try[TimedPipe[T]] = {
Expand Down Expand Up @@ -562,7 +562,7 @@ class Scalding(
c.set("io.serializations", ioSerializations.map { _.getName }.mkString(","))

// This is a side-effect-free computation that is called by run
def toFlow(timeSpan: Interval[Time], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Time], Flow[_])] = {
def toFlow(timeSpan: Interval[Timestamp], mode: Mode, pf: PipeFactory[_]): Try[(Interval[Timestamp], Flow[_])] = {
val flowDef = new FlowDef
flowDef.setName(jobName)
Scalding.toPipe(timeSpan, flowDef, mode, pf)
Expand Down Expand Up @@ -593,15 +593,12 @@ class Scalding(
case _ =>
}



val prepareState = state.begin
val timeSpan = prepareState.requested.mapNonDecreasing(_.milliSinceEpoch)
toFlow(timeSpan, mode, pf) match {
toFlow(prepareState.requested, mode, pf) match {
case Left(errs) =>
prepareState.fail(FlowPlanException(errs))
case Right((ts,flow)) =>
prepareState.willAccept(ts.mapNonDecreasing(Timestamp(_))) match {
prepareState.willAccept(ts) match {
case Right(runningState) =>
try {
options.get(jobName).foreach { jopt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,22 @@ import Conversion.asMethod
*/
private class BatchedOperations(batcher: Batcher) {

implicit val timeToBatchInterval = Bijection.build { bint: Interval[Time] =>
bint.mapNonDecreasing { Timestamp(_) } } { bint: Interval[Timestamp] =>
bint.mapNonDecreasing { _.milliSinceEpoch }
}

def coverIt[T](timeSpan: Interval[Time]): Iterable[BatchID] = {
def coverIt[T](timeSpan: Interval[Timestamp]): Iterable[BatchID] = {
val batchInterval = batcher.cover(timeSpan.as[Interval[Timestamp]])
BatchID.toIterable(batchInterval)
}

def batchToTime(bint: Interval[BatchID]): Interval[Time] =
bint.mapNonDecreasing { batcher.earliestTimeOf(_).milliSinceEpoch }
def batchToTimestamp(bint: Interval[BatchID]): Interval[Timestamp] =
bint.mapNonDecreasing { batcher.earliestTimeOf(_) }

def intersect(batches: Interval[BatchID], ts: Interval[Time]): Interval[Time] =
batchToTime(batches) && ts
def intersect(batches: Interval[BatchID], ts: Interval[Timestamp]): Interval[Timestamp] =
batchToTimestamp(batches) && ts

def intersect(batches: Iterable[BatchID], ts: Interval[Time]): Option[Interval[Time]] =
def intersect(batches: Iterable[BatchID], ts: Interval[Timestamp]): Option[Interval[Timestamp]] =
BatchID.toInterval(batches).map { intersect(_, ts) }

def readBatched[T](inBatches: Interval[BatchID], mode: Mode, in: PipeFactory[T]): Try[(Interval[BatchID], FlowToPipe[T])] = {
val inTimes = batchToTime(inBatches)
val inTimes = batchToTimestamp(inBatches)
// Read the delta stream for the needed times
in((inTimes, mode))
.right
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ trait BatchedService[K, V] extends Service[K, V] {
* You are guaranteed that all the service data needed
* to do the join is present.
*/
def lookup[W](incoming: TypedPipe[(Time, (K, W))],
servStream: TypedPipe[(Time, (K, Option[V]))]): TypedPipe[(Time, (K, (W, Option[V])))] = {
def lookup[W](incoming: TypedPipe[(Timestamp, (K, W))],
servStream: TypedPipe[(Timestamp, (K, Option[V]))]): TypedPipe[(Timestamp, (K, (W, Option[V])))] = {

def flatOpt[T](o: Option[Option[T]]): Option[T] = o.flatMap(identity)

Expand All @@ -56,13 +56,13 @@ trait BatchedService[K, V] extends Service[K, V] {
.map { case (t, (k, (w, optoptv))) => (t, (k, (w, flatOpt(optoptv)))) }
}

protected def batchedLookup[W](covers: Interval[Time],
protected def batchedLookup[W](covers: Interval[Timestamp],
getKeys: FlowToPipe[(K, W)],
last: (BatchID, FlowProducer[TypedPipe[(K, V)]]),
streams: Iterable[(BatchID, FlowToPipe[(K, Option[V])])]): FlowToPipe[(K, (W, Option[V]))] =
Reader[FlowInput, KeyValuePipe[K, (W, Option[V])]] { (flowMode: (FlowDef, Mode)) =>
val left = getKeys(flowMode)
val earliestInLast = batcher.earliestTimeOf(last._1).milliSinceEpoch
val earliestInLast = batcher.earliestTimeOf(last._1)
val liftedLast: KeyValuePipe[K, Option[V]] = last._2(flowMode)
.map { case (k, w) => (earliestInLast, (k, Some(w))) }
// TODO (https://github.com/twitter/summingbird/issues/91): we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ trait BatchedSink[T] extends Sink[T] {

// We need to write each of these.
iter.foreach { batch =>
val range = batcher.toInterval(batch).mapNonDecreasing { _.milliSinceEpoch }
val range = batcher.toInterval(batch)
writeStream(batch, inPipe.filter { case (time, _) =>
range(time)
})(flowMode._1, flowMode._2)
Expand Down Expand Up @@ -85,7 +85,7 @@ trait BatchedSink[T] extends Sink[T] {
.takeWhile { _._2.isDefined }
.collect { case (batch, Some(flow)) => (batch, flow) }

def mergeExistingAndBuilt(optBuilt: Option[(Interval[BatchID], FlowToPipe[T])]): Try[((Interval[Time], Mode), FlowToPipe[T])] = {
def mergeExistingAndBuilt(optBuilt: Option[(Interval[BatchID], FlowToPipe[T])]): Try[((Interval[Timestamp], Mode), FlowToPipe[T])] = {
val (aBatches, aFlows) = existing.unzip
val flows = aFlows ++ (optBuilt.map { _._2 })
val batches = aBatches ++ (optBuilt.map { pair => BatchID.toIterable(pair._1) }.getOrElse(Iterable.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,15 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
}
}

protected def sumByBatches[K1,V:Semigroup](ins: TypedPipe[(Long, (K1, V))],
protected def sumByBatches[K1,V:Semigroup](ins: TypedPipe[(Timestamp, (K1, V))],
capturedBatcher: Batcher,
commutativity: Commutativity): TypedPipe[((K1, BatchID), (Timestamp, V))] = {
implicit val timeValueSemigroup: Semigroup[(Timestamp, V)] =
IteratorSums.optimizedPairSemigroup[Timestamp, V](1000)

val inits = ins.map { case (t, (k, v)) =>
val ts = Timestamp(t)
val batch = capturedBatcher.batchOf(ts)
((k, batch), (ts, v))
val batch = capturedBatcher.batchOf(t)
((k, batch), (t, v))
}
(commutativity match {
case Commutative => Store.mapsideReduce(inits)
Expand All @@ -117,7 +116,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
case Commutative => delta.map { flow =>
flow.map { typedP =>
sumByBatches(typedP, capturedBatcher, Commutative)
.map { case ((k, _), (ts, v)) => (ts.milliSinceEpoch, (k, v)) }
.map { case ((k, _), (ts, v)) => (ts, (k, v)) }
}
}
case NonCommutative => delta
Expand Down Expand Up @@ -148,7 +147,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>

val capturedBatcher = batcher //avoid a closure on the whole store

def prepareDeltas(ins: TypedPipe[(Long, (K, V))]): TypedPipe[(K, (BatchID, (Timestamp, V)))] =
def prepareDeltas(ins: TypedPipe[(Timestamp, (K, V))]): TypedPipe[(K, (BatchID, (Timestamp, V)))] =
sumByBatches(ins, capturedBatcher, commutativity)
.map { case ((k, batch), (ts, v)) => (k, (batch, (ts, v))) }

Expand Down Expand Up @@ -195,11 +194,11 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>

// This builds the format we send to consumer nodes
def toOutputFormat(res: TypedPipe[(K, (BatchID, (Option[Option[(Timestamp, V)]], Option[(Timestamp, V)])))]):
TypedPipe[(Long, (K, (Option[V], V)))] =
TypedPipe[(Timestamp, (K, (Option[V], V)))] =
res.flatMap { case (k, (batchid, (optopt, opt))) =>
opt.map { case (ts, v) =>
val prev = flatOpt(optopt).map(_._2)
(ts.milliSinceEpoch, (k, (prev, v)))
(ts, (k, (prev, v)))
}
}

Expand All @@ -214,6 +213,7 @@ trait BatchedStore[K, V] extends scalding.Store[K, V] { self =>
} yield toOutputFormat(merged)
}


/** instances of this trait MAY NOT change the logic here. This always follows the rule
* that we look for existing data (avoiding reading deltas in that case), then we fall
* back to the last checkpointed output by calling readLast. In that case, we compute the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,17 @@ import cascading.flow.FlowDef
import com.twitter.algebird.monad.{Reader, StateWithError}
import com.twitter.algebird.Interval
import com.twitter.summingbird
import com.twitter.summingbird.batch.Timestamp
import org.apache.hadoop.io.Writable

package object scalding {
/** We represent time as Long Millis */
type Time = Long
/** How we represent the streams in scalding */
type TimedPipe[+T] = TypedPipe[(Time, T)]
type TimedPipe[+T] = TypedPipe[(Timestamp, T)]
type KeyValuePipe[+K, +V] = TimedPipe[(K, V)]
/** The Platform recursively passes this input around to describe a
* step forward: requested input time span, and scalding Mode
*/
type FactoryInput = (Interval[Time], Mode)
type FactoryInput = (Interval[Timestamp], Mode)
/** When it is time to run build the final flow,
* this is what scalding needs. It is modified in the Reader[FlowInput, T]
*/
Expand Down
Loading