Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve TypedParquetTuple #1302 #1303

Merged
merged 3 commits into from
May 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,8 @@ object ScaldingBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion,
"com.twitter" %% "bijection-macros" % bijectionVersion
"com.twitter" %% "bijection-macros" % bijectionVersion,
"com.twitter" %% "chill-bijection" % chillVersion
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq())
}, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full))
.dependsOn(scaldingCore, scaldingHadoopTest)
Expand Down
27 changes: 26 additions & 1 deletion scalding-parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,29 @@ The implementation is ported from code used by Twitter internally written by Sam
## Use com.twitter.scalding.parquet.thrift for reading apache Thrift (TBase) records
## Use com.twitter.scalding.parquet.scrooge for reading scrooge Thrift (ThriftStruct) records
Located in the scalding-parquet-scrooge module
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple.TypedParquet for reading or writing case classes:
Can use macro in com.twitter.scalding.parquet.tuple.macros.Macros to generate parquet read/write support. Here's an example:
```scala
import com.twitter.scalding.parquet.tuple.macros.Macros._

case class SampleClass(x: Int, y: String)

class WriteToTypedParquetTupleJob(args: Args) extends Job(args) {
val outputPath = args.required("output")
val sink = TypedParquetSink[SampleClass](outputPath)

TypedPipe.from(List(SampleClass(0, "foo"), SampleClass(1, "bar"))).write(sink)
}

class ReadWithFilterPredicateJob(args: Args) extends Job(args) {
val fp: FilterPredicate = FilterApi.eq(binaryColumn("y"), Binary.fromString("foo"))

val inputPath = args.required("input")
val outputPath = args.required("output")

val input = TypedParquet[SampleClass](inputPath, fp)

TypedPipe.from(input).map(_.x).write(TypedTsv[Int](outputPath))
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import _root_.parquet.filter2.predicate.FilterPredicate
import cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.HasFilterPredicate
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetWriteSupport, ParquetReadSupport, TypedParquetTupleScheme }

import scala.reflect.ClassTag
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport, TypedParquetTupleScheme }

/**
* Typed parquet tuple
Expand All @@ -16,87 +14,55 @@ object TypedParquet {
/**
* Create readable typed parquet source.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class ReadSupport extends ParquetReadSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val tupleConverter: ParquetTupleConverter[SampleClassB] = caseClassParquetTupleConverter[SampleClassB]
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val parquetTuple = TypedParquet[SampleClassB, ReadSupport](Seq(outputPath))
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* val parquetTuple = TypedParquet[SampleClass](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null)
def apply[T](paths: Seq[String])(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null)

def apply[T](path: String)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] = apply[T](Seq(path))

/**
* Create readable typed parquet source with filter predicate.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String], fp: Option[FilterPredicate])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null) {
override def withFilter = fp
def apply[T](paths: Seq[String], fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null) {
override def withFilter = Some(fp)
}

/**
* Create typed parquet source supports both R/W.
* @param paths paths of parquet I/O
* @param r Read support type tag
* @param w Write support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit r: ClassTag[R],
w: ClassTag[W]) = {
val readSupport = r.runtimeClass.asInstanceOf[Class[R]]
val writeSupport = w.runtimeClass.asInstanceOf[Class[W]]
new TypedFixedPathParquetTuple[T, R, W](paths, readSupport, writeSupport)
}

def apply[T](path: String, fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
apply[T](Seq(path), fp)
}

object TypedParquetSink {
/**
* Create typed parquet sink.
* Here is an example:
*
* case class SampleClassB(string: String, int: Int, double: Option[Double], a: SampleClassA)
*
* class WriteSupport extends ParquetWriteSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
*
* override def writeRecord(r: SampleClassB, rc: RecordConsumer, schema: MessageType): Unit =
* caseClassWriteSupport[SampleClassB](r, rc, schema)
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* val sink = TypedParquetSink[SampleClassB, WriteSupport](Seq(outputPath))
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* val sink = TypedParquetSink[SampleClass](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam W Write support type
* @return a typed parquet source.
*/
def apply[T, W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit t: ClassTag[W]) =
new TypedFixedPathParquetTuple[T, ParquetReadSupport[T], W](paths, null, t.runtimeClass.asInstanceOf[Class[W]])
def apply[T](paths: Seq[String])(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, null, writeSupport)

def apply[T](path: String)(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] = apply[T](Seq(path))
}

/**
* Typed Parquet tuple source/sink.
*/
trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] extends FileSource with Mappable[T]
trait TypedParquet[T] extends FileSource with Mappable[T]
with TypedSink[T] with HasFilterPredicate {

val readSupport: Class[R]
val writeSupport: Class[W]
def readSupport: ParquetReadSupport[T]
def writeSupport: ParquetWriteSupport[T]

override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])

Expand All @@ -108,5 +74,5 @@ trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] e
}
}

class TypedFixedPathParquetTuple[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](val paths: Seq[String],
val readSupport: Class[R], val writeSupport: Class[W]) extends FixedPathSource(paths: _*) with TypedParquet[T, R, W]
class TypedFixedPathParquetTuple[T](val paths: Seq[String], val readSupport: ParquetReadSupport[T],
val writeSupport: ParquetWriteSupport[T]) extends FixedPathSource(paths: _*) with TypedParquet[T]
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.twitter.scalding.parquet.tuple.macros

import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetSchemaProvider, ParquetTupleConverterProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetReadSupportProvider, ParquetSchemaProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport }

import scala.language.experimental.macros

Expand Down Expand Up @@ -35,20 +33,17 @@ object Macros {
* @tparam T Case class type that contains primitive fields or collection fields or nested case class.
* @return Generated case class parquet message type string
*/
def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]
implicit def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]

/**
* Macro used to generate parquet tuple converter for a given case class.
*
* @tparam T Case class type that contains primitive or collection type fields or nested case class.
* @return Generated parquet converter
* Macro generated case class read support
*/
def caseClassParquetTupleConverter[T]: ParquetTupleConverter[T] = macro ParquetTupleConverterProvider.toParquetTupleConverterImpl[T]
implicit def caseClassParquetReadSupport[T]: ParquetReadSupport[T] = macro ParquetReadSupportProvider.toParquetReadSupportImpl[T]

/**
* Macro used to generate case class write support to parquet.
* @tparam T User defined case class tuple type.
* @return Generated case class tuple write support function.
*/
def caseClassWriteSupport[T]: (T, RecordConsumer, MessageType) => Unit = macro WriteSupportProvider.toWriteSupportImpl[T]
implicit def caseClassParquetWriteSupport[T]: ParquetWriteSupport[T] = macro WriteSupportProvider.toWriteSupportImpl[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import com.twitter.scalding.parquet.tuple.scheme._

import scala.reflect.macros.Context

object ParquetTupleConverterProvider {
object ParquetReadSupportProvider {
private[this] sealed trait CollectionType
private[this] case object NOT_A_COLLECTION extends CollectionType
private[this] case object OPTION extends CollectionType
private[this] case object LIST extends CollectionType
private[this] case object SET extends CollectionType
private[this] case object MAP extends CollectionType

def toParquetTupleConverterImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetTupleConverter[T]] = {
def toParquetReadSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetReadSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -194,6 +194,12 @@ object ParquetTupleConverterProvider {
val groupConverter = buildGroupConverter(T.tpe, converters, converterGetters, convertersResetCalls,
buildTupleValue(T.tpe, fieldValues))

ctx.Expr[ParquetTupleConverter[T]](groupConverter)
val schema = ParquetSchemaProvider.toParquetSchemaImpl[T](ctx)
val readSupport = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetReadSupport[$T]($schema) {
override val tupleConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter[$T] = $groupConverter
}
"""
ctx.Expr[ParquetReadSupport[T]](readSupport)
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.twitter.scalding.parquet.tuple.macros.impl

import com.twitter.bijection.macros.impl.IsCaseClassImpl
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport

import scala.reflect.macros.Context

object WriteSupportProvider {

def toWriteSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[(T, RecordConsumer, MessageType) => Unit] = {
def toWriteSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetWriteSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -39,7 +38,7 @@ object WriteSupportProvider {

fieldType match {
case tpe if tpe =:= typeOf[String] =>
writePrimitiveField(q"rc.addBinary(Binary.fromString($fValue))")
writePrimitiveField(q"rc.addBinary(_root_.parquet.io.api.Binary.fromString($fValue))")
case tpe if tpe =:= typeOf[Boolean] =>
writePrimitiveField(q"rc.addBoolean($fValue)")
case tpe if tpe =:= typeOf[Short] =>
Expand Down Expand Up @@ -124,16 +123,17 @@ object WriteSupportProvider {
if (finalIdx == 0)
ctx.abort(ctx.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?")

val writeFunction: Tree = q"""
val writeFunc = (t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType) => {

var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
val schema = ParquetSchemaProvider.toParquetSchemaImpl[T](ctx)
val writeSupport: Tree = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport[$T]($schema) {
override def writeRecord(t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType): Unit = {
var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
}
}
writeFunc
"""
ctx.Expr[(T, RecordConsumer, MessageType) => Unit](writeFunction)
ctx.Expr[ParquetWriteSupport[T]](writeSupport)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.scalding.parquet.tuple.scheme
import parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter }
import scala.util.Try

trait TupleFieldConverter[+T] extends Converter {
trait TupleFieldConverter[+T] extends Converter with Serializable {
/**
* Current value read from parquet column
*/
Expand Down
Loading