Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned sources for Parquet thrift / scrooge #1590

Merged
merged 10 commits into from
Sep 16, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.twitter.scalding.parquet.thrift.ParquetThriftBaseFileSource
import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource }
import com.twitter.scrooge.ThriftStruct

import scala.reflect.ClassTag

trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] {

override def hdfsScheme = {
Expand All @@ -18,13 +20,13 @@ trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] {

class DailySuffixParquetScrooge[T <: ThriftStruct](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends DailySuffixSource(path, dateRange) with ParquetScrooge[T]

class HourlySuffixParquetScrooge[T <: ThriftStruct](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends HourlySuffixSource(path, dateRange) with ParquetScrooge[T]

class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val mf: Manifest[T])
class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val ct: ClassTag[T])
extends FixedPathSource(paths: _*) with ParquetScrooge[T]
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.twitter.scalding.parquet.scrooge

import _root_.cascading.scheme.Scheme
import _root_.cascading.tuple.Fields
import com.twitter.scalding._
import com.twitter.scalding.parquet.thrift.ParquetThriftBase
import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil }
import com.twitter.scrooge.ThriftStruct

import scala.reflect.ClassTag

/**
* Scalding source to read or write partitioned Parquet scrooge data.
*
Expand All @@ -28,11 +29,12 @@ import com.twitter.scrooge.ThriftStruct
* }}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for reads, is it required that P be a String or a TupleN of Strings? Seems like it'd have to be?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also be a string (if its just "%s"). If you see the unit tests, there P ends up being a String.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should clarify that in the docs? Could add an example for the %s case cause it isn't a tuple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so P must be either: String or tuple of Strings right? That'd be good to clarify.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll update the docs to indicate this and also include an example

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible, maybe a geek out, to have a macro that checks the format string at compile time?

this would allow it to be really clear that the string matches the tuple type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into this. Not super familiar with Scala macros so this can be a learning opportunity :-). Currently we do check this (at runtime though) - in TemplatePartition

case class TemplatePartition(partitionFields: Fields, template: String) extends Partition {
  assert(
    partitionFields.size == "%s".r.findAllIn(template).length,
    "Number of partition fields %s does not correspond to template (%s)".format(partitionFields, template))
...
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you do something like:

val partitionSource = PartitionedParquetScroogeSource[String, Address](path, "%s/%s") // should be just %s

you get an error:
assertion failed: Number of partition fields '1' does not correspond to template (%s/%s)
java.lang.AssertionError: assertion failed: Number of partition fields '1' does not correspond to template (%s/%s)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it would be nice to just call the code cascading uses to verify it at compile time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll dig into this

*
*/
case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](
path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](path: String, template: String)(implicit val ct: ClassTag[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable {

override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity)

assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource }
import java.io.Serializable
import org.apache.thrift.{ TBase, TFieldIdEnum }

import scala.reflect.ClassTag

object ParquetThrift extends Serializable {
type ThriftBase = TBase[_ <: TBase[_, _], _ <: TFieldIdEnum]
}

trait ParquetThriftBase[T] extends LocalTapSource with HasFilterPredicate with HasColumnProjection {

def mf: Manifest[T]
implicit def ct: ClassTag[T]

def config: ParquetValueScheme.Config[T] = {
val config = new ParquetValueScheme.Config[T].withRecordClass(mf.runtimeClass.asInstanceOf[Class[T]])
def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we accepting ct as a parameter if we already have it on line 40?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall remove this, added this on one of my prior passes over the code when I was seeing some compile errors. Not needed anymore.

val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just do val clazz = ct.runtimeClass.asInstanceOf[Class[T]] here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, that's cleaner.

val config = new ParquetValueScheme.Config[T].withRecordClass(clazz)
val configWithFp = withFilter match {
case Some(fp) => config.withFilterPredicate(fp)
case None => config
Expand Down Expand Up @@ -110,13 +113,13 @@ trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBaseFile
*/
class DailySuffixParquetThrift[T <: ParquetThrift.ThriftBase](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends DailySuffixSource(path, dateRange) with ParquetThrift[T]

class HourlySuffixParquetThrift[T <: ParquetThrift.ThriftBase](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends HourlySuffixSource(path, dateRange) with ParquetThrift[T]

class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val mf: Manifest[T])
class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val ct: ClassTag[T])
extends FixedPathSource(paths: _*) with ParquetThrift[T]
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.twitter.scalding.parquet.thrift

import cascading.scheme.Scheme
import cascading.tuple.Fields
import com.twitter.scalding.{ HadoopSchemeInstance, FixedPathSource, TupleConverter, TupleSetter }
import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil }
import com.twitter.scalding.{ FixedPathSource, HadoopSchemeInstance, TupleConverter, TupleSetter }

import scala.reflect.ClassTag

/**
* Scalding source to read or write partitioned Parquet thrift data.
Expand All @@ -26,11 +27,12 @@ import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil }
* }}}
*
*/
case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](
path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](path: String, template: String)(implicit val ct: ClassTag[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable {

override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity)

assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")
Expand Down