Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REPL: Add toIterator (and related methods) #929

Merged
merged 29 commits into from
Jul 3, 2014

Conversation

bholt
Copy link
Contributor

@bholt bholt commented Jul 1, 2014

  • Refactor ad-hoc code from snapshot into TemporarySequenceFile class (possibly separated out into TemporaryFile and TypedSequenceFile)
  • Add toIterator that works at least for snapshots
    • If called on a TypedPipe that is not snapshotted, generate a snapshot and call toIterator
    • Ideally would allow running simple flatMappable operations without creating new snapshots (almost working)
    • toList and dump are trivially implementable from toIterator
  • Add documentation for new functionality, including changes made in Snapshot a pipe in the REPL #918
    • Add note about the repl to the README
    • Add a "Repl Walkthrough" Wiki page

bholt added 2 commits June 30, 2014 16:01
…ng Manifests available

note: toIterator doesn't work for tuples, supposedly can't find deserializer, even though "write"/"save" both can deserialize, run things, and write back out
import cascading.tuple.Fields

class TypedSequenceFile[T](path: String)(
implicit val mf: Manifest[T], tget: TupleGetter[T], tset: TupleSetter[T]) extends SequenceFile(path, 0) with Mappable[T] with TypedSink[T] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are any of these needed? You are not using tset. And tget can fall back to the default, I think, without an issue. Where is mf needed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to Oscar's comment, for the ones you do actually need, if you don't reference them explicitly, in this case you can do class TypedSequencefile[T: Manifest: TupleGetter: TupleSetter]

@bholt
Copy link
Contributor Author

bholt commented Jul 1, 2014

@johnynek: I added the Manifests as part of trying to solve an issue I'm having when T is a tuple. Everything with the singleField stuff seems to work fine, but when I try to run toIterator on a snapshot of a TypedSequenceFile[(String,Int)], it errors with:

Caused by: cascading.CascadingException: unable to load deserializer for: scala.Tuple2 from: org.apache.hadoop.io.serializer.SerializationFactory
    at cascading.tuple.hadoop.TupleSerialization.getNewDeserializer(TupleSerialization.java:470)
    at cascading.tuple.hadoop.TupleSerialization$SerializationElementReader.getDeserializerFor(TupleSerialization.java:654)
    at cascading.tuple.hadoop.TupleSerialization$SerializationElementReader.read(TupleSerialization.java:621)
    at cascading.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:105)
    at cascading.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)
    at cascading.tuple.io.TupleInputStream.readTuple(TupleInputStream.java:78)
    at cascading.tuple.hadoop.io.TupleDeserializer.deserialize(TupleDeserializer.java:40)
    at cascading.tuple.hadoop.io.TupleDeserializer.deserialize(TupleDeserializer.java:28)
    at org.apache.hadoop.io.SequenceFile$Reader.deserializeValue(SequenceFile.java:1879)
    at org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:1852)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:103)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:78)
    at cascading.tap.hadoop.util.MeasuredRecordReader.next(MeasuredRecordReader.java:61)
    at cascading.scheme.hadoop.SequenceFile.source(SequenceFile.java:93)
    at cascading.tuple.TupleEntrySchemeIterator.getNext(TupleEntrySchemeIterator.java:140)
    at cascading.tuple.TupleEntrySchemeIterator.hasNext(TupleEntrySchemeIterator.java:120)
    ... 28 more

I thought maybe it didn't have enough type information somewhere. These sequence files seem to work fine when I use save or write to write out to a typed sink (TypedTsv). I only get this error when running toIterator.

@johnynek
Copy link
Collaborator

johnynek commented Jul 1, 2014

@bholt I see what is happening here. There is no serialization configured in this:

https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Mode.scala#L119

I think this can be fixed with adding on line 22:

  // make sure to set up the required serialization for scalding:
  Config.default.toMap.foreach { case (k, v) =>
    conf.set(k, v)
  }

}

object TypedSequenceFile {
def apply[T](path: String)(implicit mf: Manifest[T]): TypedSequenceFile[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same point as above. This can be apply[T: Manifest]

@bholt
Copy link
Contributor Author

bholt commented Jul 1, 2014

Thanks @johnynek, that fixed it for HDFS mode. Any idea what the equivalent is in CascadingLocal mode?

@johnynek
Copy link
Collaborator

johnynek commented Jul 1, 2014

@bholt actually, for cascading local mode, if you are going to in-memory anyway, why not use the MemorySink I implemented:
https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/typed/MemorySink.scala#L33

Then you should not need to serialize anyway.

@bholt
Copy link
Contributor Author

bholt commented Jul 1, 2014

Cool, I'll give that a try. Should be fine since we're already saying snapshot files should only be valid in the current session.

@johnynek
Copy link
Collaborator

johnynek commented Jul 1, 2014

@bholt you'll have to pattern match the Mode to see : CascadingLocal, otherwise, MemorySink won't work.

@bholt
Copy link
Contributor Author

bholt commented Jul 2, 2014

(please excuse the extra commits from merging repl+execution)


TypedPipe.fromSingleField[T](SequenceFile(tmpSeq))
mode match {
case Local(_) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you do: cl: CascadingLocal you will get both Local and Test modes, if we want to support Test (which might be nice).

import cascading.flow.FlowDef
import cascading.tuple.Fields

//import com.twitter.scalding.ReplImplicits._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no commented code, please.

@johnynek
Copy link
Collaborator

johnynek commented Jul 3, 2014

Brandon, can you merge with develop and we can merge this? Scalding needs to go out, and this looks like an improvement to me.

@bholt
Copy link
Contributor Author

bholt commented Jul 3, 2014

Was just doing that. Also moved TypedSequenceFile to new directory.

@johnynek
Copy link
Collaborator

johnynek commented Jul 3, 2014

sweet.

@johnynek
Copy link
Collaborator

johnynek commented Jul 3, 2014

one last merge needed. Sorry. :) I keep changing that ExecutionContext.

*/
object ReplImplicitContext {
/** Implicit flowDef for this Scalding shell session. */
implicit var fd = ReplImplicits.flowDef
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why aren't these implicit def flowDef: FlowDef = ....

I like to minimize the vars. Can we just make the ones in ReplImplicit vars?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, definitely didn't intend to make them "var".

def toOption(implicit fd: FlowDef, md: Mode): Option[T] = vp match {
case _: EmptyValue => None
case LiteralValue(v) => Some(v)
case ComputedValue(tp) => tp.snapshot.toList match {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tp.snapshot.toIterator.take(2).toList

is is a bit safer, in the case there is some bug. It will still error, but it won't blow up the memory if there are 2 or more items in the list.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why not tp.toIterator.take(2).toList, why should we explicitly cass snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is an error to have more than one value in a ValuePipe, right? In the match, should I do a sys.error for case _?

@johnynek
Copy link
Collaborator

johnynek commented Jul 3, 2014

merge when green.

ianoc added a commit that referenced this pull request Jul 3, 2014
REPL: Add toIterator (and related methods)
@ianoc ianoc merged commit c480067 into twitter:develop Jul 3, 2014
@bholt bholt deleted the repl+toiterator branch July 3, 2014 22:56
@bholt
Copy link
Contributor Author

bholt commented Jul 4, 2014

Btw, added a tutorial for the new REPL to the wiki: https://github.com/twitter/scalding/wiki/Scalding-REPL

@sriramkrishnan
Copy link
Collaborator

Love all the new features on the REPL. Doesn't the Wiki page make this obsolete:
https://github.com/twitter/scalding/blob/develop/scalding-repl/README.md

I would rather not have the same info in two places.

@johnynek
Copy link
Collaborator

johnynek commented Jul 4, 2014

Did a quick edit, to fix a minor error with the old one. But yes. +1 to
more features, -1 to duplicate docs.

On Thu, Jul 3, 2014 at 2:46 PM, Sriram Krishnan notifications@github.com
wrote:

Love all the new features on the REPL. Doesn't the Wiki page make this
obsolete:
https://github.com/twitter/scalding/blob/develop/scalding-repl/README.md

I would rather not have the same info in two places.


Reply to this email directly or view it on GitHub
#929 (comment).

Oscar Boykin :: @posco :: http://twitter.com/posco

@bholt
Copy link
Contributor Author

bholt commented Jul 4, 2014

The open source wiki seemed like a better place for this documentation, so that it can be more easily updated (not tied to release schedules). Perhaps we should just put a pointer to the wiki page in the README?

@sriramkrishnan
Copy link
Collaborator

A pointer to the Wiki page sounds good. Having said that, I actually prefer the docs to be closer to the code - and indeed tied to release schedules (since the docs may actually be different for different releases). For instance, the Wiki page is now inconsistent with the 0.9.0 release.

@sriramkrishnan
Copy link
Collaborator

#941

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants