Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned sources for Parquet thrift / scrooge #1590

Merged
merged 10 commits into from
Sep 16, 2016

Conversation

piyushnarang
Copy link
Collaborator

Adding a couple of partitioned sources for Parquet thrift / scrooge similar to PartitionedDelimitedSource.
Usage is similar to the PartitionedDelimitedSource:

val myTypedPipe: TypedPipe[ (String, String), MyThriftObject] = ...
myTypedPipe.write(PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s"))
// writes out data to: path/%s/%s

Added a couple of unit tests and tested these sources out using a couple of test read / write jobs.

@isnotinvain
Copy link
Contributor

Can this be slightly generalized such that any existing source can just "become" a partitioned source? Or maybe a factory that takes a Source and returns a PartitionedSource? or do we need to make one of these for each kind of source (parquet, scrooge, etc)

@piyushnarang
Copy link
Collaborator Author

Let me see if I can refactor / tweak things to generalize this. Gets a bit more complicated cause existing sources tend to pull in a set of traits with a specific type [T], when you trying including the PartitionedSchemed trait that pulls in an overlapping set of traits but with a different generic type [P,T]

@piyushnarang
Copy link
Collaborator Author

Poked around on this a bit to see if I could get it working. Seems like it's a bit more complicated / involved. Tried setting up a partitioned source which would delegate to another source but it seems like some of the methods on the Source class are protected so Scala's permissions seem to crib on that.
If possible, would be nice if we can tackle the generalization in a future iteration.

* {{{
* val data = MyScroogeObject()
* IterablePipe(data, flowDef, mode)
* .write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite right is it? The partition key needs to be exposed right?
how about:

val data: TypedPipe[MyScroogeObject] = ???
data.map { obj => 
  ((obj.country, obj.city), obj) 
}.write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, missed the map step there. Updated in the partitioned scrooge + thrift src docs.

@piyushnarang
Copy link
Collaborator Author

@isnotinvain - updated to clarify the docs to indicate that P must be a String / tuple of Strings. I think there's some code in PartitionSchemed (and some classes it delegates to) that expects the fields to be Strings if I'm understanding right - TemplatePartition

*
*/
case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](
path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove fields and Manifest[T]? Manifest is deprecated. And we really try to focus on the typed API now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, seems like both were required based on the traits I was mixing in.
Manifest[T] as we're mixing in ParquetThriftBase.
Fields due to PartitionSchemed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we always fix given Fields to be PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))? Also, can we fix ParquetThriftBase?

I don't want to confuse users with parameters they should really never change.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah can set Fields to that. Can override it in PartitionedParquetScroogeSource and PartitionedParquetThriftSource unless you'd rather have it set in PartitionSchemed and not exposed to PartitionedParquet{Scrooge|Thrift}Source and PartitionedDelimitedSource?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, any new code should certainly not expose it. How much to remove/lock
down of old code is a question.
On Fri, Sep 9, 2016 at 14:55 Piyush Narang notifications@github.com wrote:

In
scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala
#1590 (comment):

  • * val data: TypedPipe[MyScroogeObject] = ???
  • * data.map { obj =>
  • * ( (obj.country, obj.city), obj)
  • * }.write(PartitionedParquetScroogeSource(String, String), MyScroogeObject)
  • * }}}
  • * For reading it produces a pair (P, T) where P is the partition data, T is the corresponding
  • * scrooge object. Below is an example.
  • * {{{
  • * val in: TypedPipe[(String, String), MyScroogeObject] =
  • * TypedPipe.from( PartitionedParquetScroogeSource(String, String), MyScroogeObject )
  • * }}}
  • */
    +case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](
  • path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T],

Yeah can set Fields to that. Can override it in
PartitionedParquetScroogeSource and PartitionedParquetThriftSource unless
you'd rather have it set in PartitionSchemed and not exposed to
PartitionedParquet{Scrooge|Thrift}Source and PartitionedDelimitedSource?


You are receiving this because you commented.

Reply to this email directly, view it on GitHub
https://github.com/twitter/scalding/pull/1590/files/363fed0f3820cb03a8faf8cf54a127b1f1f399f8#r78267367,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAEJdul73q_uAfVf2X0ZD2EJMHEO82N3ks5qof_vgaJpZM4Jw9Ah
.

Copy link
Collaborator

@johnynek johnynek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one minor comment


def config: ParquetValueScheme.Config[T] = {
val config = new ParquetValueScheme.Config[T].withRecordClass(mf.runtimeClass.asInstanceOf[Class[T]])
def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we accepting ct as a parameter if we already have it on line 40?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall remove this, added this on one of my prior passes over the code when I was seeing some compile errors. Not needed anymore.

def config: ParquetValueScheme.Config[T] = {
val config = new ParquetValueScheme.Config[T].withRecordClass(mf.runtimeClass.asInstanceOf[Class[T]])
def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = {
val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just do val clazz = ct.runtimeClass.asInstanceOf[Class[T]] here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, that's cleaner.

@piyushnarang
Copy link
Collaborator Author

@johnynek - updated based on your comments today. Have been looking at bit into trying out macros but the best I've got so far ended up being something on the lines of:

  def verifyTemplate(partitionFields: Fields, template: String): Unit = macro verifyTemplateImpl

  def verifyTemplateImpl(c: Context)(partitionFields: c.Expr[Fields], template: c.Expr[String]): c.Expr[Unit] = {
    import c.universe._

    reify {
            assert(
              partitionFields.splice.size() == "%s".r.findAllIn(template.splice).length,
              "Number of partition fields %s does not correspond to template (%s)".format(partitionFields.splice, template.splice))
    }
  }

This however ends up happening at runtime pretty much (and is identical to what we're doing today). Don't have a good sense of how to do this at compile time (tried a few things out but they seem to fail due to compile / misc errors).

@johnynek
Copy link
Collaborator

@piyushnarang here is a simple macro that checks that a string is a valid regex at compile time: https://gist.github.com/xuwei-k/4991805

that might give a hint.

We don't have to do it here, but I like this technique. We can punt on that if you like.

@piyushnarang
Copy link
Collaborator Author

Thanks @johnynek, that's a pretty useful link. I'd vote for following up with a separate PR for the macro bit. I can write up an issue so that we don't lose track of it and pick it up in the next week or so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants