Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add typed version of RichPipe 'using' #1049

Merged
merged 2 commits into from
Sep 22, 2014
Merged

Conversation

aposwolsky
Copy link
Contributor

Add typed version of RichPipe 'using'

@johnynek
Copy link
Collaborator

I wonder if we can do better and have a method like:

def onComplete(fn: Try[Unit] => ()): TypedPipe[T]

Which we can then pass down to cascading. Here you could use a normal lazy val and just call:

lazy val lookup = new RemoteHandle
pipe.map { t => lookup(t) }.onComplete { _ => lookup.release }

Then we get access to all the methods on TypedPipe without copying them. What do you think of this? It is more work to plumb that completion function all the way through to cascading, but I think the API is nicer and the power greater.

@aposwolsky
Copy link
Contributor Author

Cool! I didn't think of that way of doing it. I just started looking at the code so it isn't obvious to me how to hook this in, but I'lll give it a shot!

@johnynek
Copy link
Collaborator

I can give you some coaching. The first step will be to make a new instance of TypedPipe, like:

class WithOnComplete[T](pipe: TypedPipe[T], fn: Try[Unit] => ()) extends TypedPipe[T] {

}

Next, we are going to have to alter how toPipe works to use that function if it exists. There is already some case matching on the subclass of TypedPipe. Look for TypedPipeFactory for instance.

@aposwolsky
Copy link
Contributor Author

So I tried the following approach which didn't work, but I am wondering if it is worth the trouble. This will have me create a new "Each" and the part of "using" that was nice is that it gave you clear control to say that this is the setup and cleanup for that step which we would lose here.

I added the following to Operations.scala

  case class CleanupIdentityFunction(cleanupFn: () => Unit)
    extends BaseOperation[Any](Fields.ALL) with Function[Any] with ScaldingPrepare[Any] {
    val lockedEf = Externalizer(cleanupFn)
    def operate(flowProcess: FlowProcess[_], functionCall: FunctionCall[Any]) {
      functionCall.getOutputCollector.add(functionCall.getArguments)
    }
    override def cleanup(flowProcess: FlowProcess[_], operationCall: OperationCall[Any]) {
      Try.apply(cleanupFn).foreach(_())
    }
  }

For TypedPipe.scala added:

trait TypedPipe[+T] {
  def withOnComplete[U >: T](cleanupFn: () => Unit): TypedPipe[U] = {
    new WithOnComplete[U](this, cleanupFn)
  }
}

class WithOnComplete[T](typedPipe: TypedPipe[T], cleanupFn: () => Unit) extends TypedPipe[T] {
  override def toPipe[U >: T](fieldNames: Fields)(implicit flowDef: FlowDef, mode: Mode, setter: TupleSetter[U]) = {
    val pipe = typedPipe.toPipe[U](fieldNames)(flowDef, mode, setter)
    new Each(pipe, Fields.ALL, CleanupIdentityFunction(cleanupFn), Fields.REPLACE)
  }
  override def cross[U](tiny: TypedPipe[U]): TypedPipe[(T, U)] = typedPipe.cross(tiny)
  override def flatMap[U](f: T => TraversableOnce[U]): TypedPipe[U] = typedPipe.flatMap(f)
  override def toIteratorExecution: Execution[Iterator[T]] = typedPipe.toIteratorExecution
}

And I then tried to create a test which fails

class TypedWithOnCompleteJob(args: Args, cleanupFn: () => Unit) extends Job(args) {
  val p = TypedTsv[String]("input").withOnComplete(cleanupFn): TypedPipe[String]
  p.write(TypedTsv[String]("output"))
}

class TypedPipeWithOnCompleteTest extends Specification {
  import Dsl._
  var cleanupCalled: Boolean = false
  def cleanupFn(): Unit = { cleanupCalled = true }
  noDetailedDiffs()
  "A TypedWithOnCompleteJob" should {
    JobTest(new TypedWithOnCompleteJob(_, cleanupFn))
      .source(TypedTsv[String]("input"), (0 to 100).map { i => Tuple1(i.toString) })
      .sink[String](TypedTsv[String]("output")) { outBuf =>
        "have its onComplete function called" in {
          cleanupCalled must be_==(true)
        }
      }
      .runHadoop
      .finish
  }
}

with error:

could not build flow from assembly: [unable to pack object: cascading.flow.hadoop.HadoopFlowStep] (FlowPlanner.java:577)

@johnynek
Copy link
Collaborator

I guess it is because it could not serialize the cleanUp. You are using the Externalizer, but then ignoring it in the your cleanup method of your Operation.

Can you instead try getting the function out of the externalizer?

Also, to be sure, mark your cleanup function as @transient.

Serialization is one of the major pains of dealing with Hadoop.

@johnynek
Copy link
Collaborator

Also, your toIteratorExecution is not going to work: it is not applying the cleanUp. I think you just want:

.forceToDiskExecution.flatMap(_.toIteratorExecution)

Since after you force it to disk, it will apply the cleanup, and the toIteratorExecution will not be on another WithOnComplete.

@aposwolsky
Copy link
Contributor Author

Thanks for the suggestions but it still errors. I also am pretty sure that my implementation of cross and flatMap are wrong. Those changes didn't fix it. I believe that the implementation of cross and flatMap is wrong as well as cleanUp won't be called... do I add a forceToDiskExecution there as well?

@johnynek
Copy link
Collaborator

Ahh, no. You should have:

  override def cross[U](tiny: TypedPipe[U]): TypedPipe[(T, U)] =
    new WithOnComplete(typedPipe.cross(tiny), onComplete)
  override def flatMap[U](f: T => TraversableOnce[U]): TypedPipe[U] =
    new WithOnComplete(typedPipe.flatMap(f), onComplete)

@aposwolsky
Copy link
Contributor Author

Ah yes that makes sense. I was afraid of delaying the cleanup but it can
be done after the cross or flatMap. Unfortunately the
CleanupIdentityFunction still causes serialization errors:

  class CleanupIdentityFunction(@transient cleanupFn: () => Unit)
    extends BaseOperation[Any](Fields.ALL) with Function[Any] with
ScaldingPrepare[Any] {
    val lockedEf = Externalizer(cleanupFn)
    def operate(flowProcess: FlowProcess[_], functionCall:
FunctionCall[Any]) {
      functionCall.getOutputCollector.add(functionCall.getArguments)
    }
    override def cleanup(flowProcess: FlowProcess[_], operationCall:
OperationCall[Any]) {
      Try.apply(lockedEf.get).foreach(_())
    }
  }

On Mon, Sep 15, 2014 at 2:27 PM, P. Oscar Boykin notifications@github.com
wrote:

Ahh, no. You should have:

override def cross[U](tiny: TypedPipe[U]): TypedPipe[(T, U)] =
new WithOnComplete(typedPipe.cross(tiny), onComplete)
override def flatMap[U](f: T => TraversableOnce[U]): TypedPipe[U] =
new WithOnComplete(typedPipe.flatMap(f), onComplete)

Reply to this email directly or view it on GitHub
#1049 (comment).

@johnynek
Copy link
Collaborator

Can you update the pull request and post a gist of the full stack trace?

On Mon, Sep 15, 2014 at 12:09 PM, Adam Poswolsky notifications@github.com
wrote:

Ah yes that makes sense. I was afraid of delaying the cleanup but it can
be done after the cross or flatMap. Unfortunately the
CleanupIdentityFunction still causes serialization errors:

class CleanupIdentityFunction(@transient cleanupFn: () => Unit)
extends BaseOperation[Any](Fields.ALL) with Function[Any] with
ScaldingPrepare[Any] {
val lockedEf = Externalizer(cleanupFn)
def operate(flowProcess: FlowProcess[_], functionCall:
FunctionCall[Any]) {
functionCall.getOutputCollector.add(functionCall.getArguments)
}
override def cleanup(flowProcess: FlowProcess[_], operationCall:
OperationCall[Any]) {
Try.apply(lockedEf.get).foreach(_())
}
}

On Mon, Sep 15, 2014 at 2:27 PM, P. Oscar Boykin <notifications@github.com

wrote:

Ahh, no. You should have:

override def cross[U](tiny: TypedPipe[U]): TypedPipe[(T, U)] =
new WithOnComplete(typedPipe.cross(tiny), onComplete)
override def flatMap[U](f: T => TraversableOnce[U]): TypedPipe[U] =
new WithOnComplete(typedPipe.flatMap(f), onComplete)

Reply to this email directly or view it on GitHub
#1049 (comment).


Reply to this email directly or view it on GitHub
#1049 (comment).

Oscar Boykin :: @posco :: http://twitter.com/posco

@aposwolsky
Copy link
Contributor Author

class TypedPipeWithOnCompleteTest extends Specification {
import Dsl._
var cleanupCalled: Boolean = false
def cleanupFn(): Unit = { cleanupCalled = true }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is going to require the Specification to be serialized, and that class has a ton of stuff that can be serialized.

Instead do this:

class CleanUp {
  var called: Boolean = false
  def cleanUp(): Unit = { called = true; () }
}
// in the Specification:
class ... extends Specification {
  val cc = new CleanUp
  // pass this to the job.
  new Job(args, cc)
  // in the job, call cc.cleanUp
}

Now, use that

@aposwolsky
Copy link
Contributor Author

Thanks! You ended up pretty much rewriting every line :) Was still having serialization issues so changed it to use a counter instead which did the trick..

@@ -0,0 +1 @@
/Users/adamp/.ivy2/cache/com.twitter/algebird-core_2.10/jars/algebird-core_2.10-0.7.0.jar
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should not be here.

@aposwolsky
Copy link
Contributor Author

Cleaned it all up and merged into one commit.

@johnynek
Copy link
Collaborator

This looks all pretty great. One last request: can you add a few more tests that show at least a groupBy with a onComplete on the mapside and on the reduceside?

@aposwolsky
Copy link
Contributor Author

Added a more robust test. I tried to do withReducers(2) so that I can have
different counts for both but it looks like that this doesn't work in the
tests...

On Tue, Sep 16, 2014 at 6:19 PM, P. Oscar Boykin notifications@github.com
wrote:

This looks all pretty great. One last request: can you add a few more
tests that show at least a groupBy with a onComplete on the mapside and on
the reduceside?

Reply to this email directly or view it on GitHub
#1049 (comment).

@johnynek
Copy link
Collaborator

This is great.

johnynek added a commit that referenced this pull request Sep 22, 2014
Add typed version of RichPipe 'using'
@johnynek johnynek merged commit 34015dc into twitter:develop Sep 22, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants