Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Commit

Permalink
Merge pull request #501 from tsdeng/feature/mutateFlowObj
Browse files Browse the repository at this point in the history
expose flow object to mutate function
  • Loading branch information
ianoc committed May 12, 2014
2 parents 1b8f9e7 + dd7fa35 commit 8d16fad
Showing 1 changed file with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.twitter.summingbird.scalding

import cascading.flow.Flow
import com.twitter.algebird.{ Monoid, Semigroup, Monad }
import com.twitter.algebird.{
Universe,
Expand Down Expand Up @@ -630,7 +631,12 @@ class Scalding(

def run(state: WaitingState[Interval[Timestamp]],
mode: Mode,
pf: PipeFactory[Any]): WaitingState[Interval[Timestamp]] = {
pf: PipeFactory[Any]): WaitingState[Interval[Timestamp]] = run(state, mode, pf, (f: Flow[_]) => Unit)

def run(state: WaitingState[Interval[Timestamp]],
mode: Mode,
pf: PipeFactory[Any],
mutate: Flow[_] => Unit): WaitingState[Interval[Timestamp]] = {

mode match {
case Hdfs(_, conf) =>
Expand All @@ -647,6 +653,7 @@ class Scalding(
case Left(errs) =>
prepareState.fail(FlowPlanException(errs))
case Right((ts,flow)) =>
mutate(flow)
prepareState.willAccept(ts) match {
case Right(runningState) =>
try {
Expand Down

0 comments on commit 8d16fad

Please sign in to comment.