Skip to content

Commit

Permalink
Improve TypedParquetTuple twitter#1302
Browse files Browse the repository at this point in the history
  • Loading branch information
JiJiTang committed May 24, 2015
1 parent 536bd0c commit 08c7382
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 130 deletions.
3 changes: 2 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ object ScaldingBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.apache.hadoop" % "hadoop-core" % hadoopVersion % "provided",
"org.scala-lang" % "scala-reflect" % scalaVersion,
"com.twitter" %% "bijection-macros" % bijectionVersion
"com.twitter" %% "bijection-macros" % bijectionVersion,
"com.twitter" %% "chill-bijection" % chillVersion
) ++ (if(isScala210x(scalaVersion)) Seq("org.scalamacros" %% "quasiquotes" % quasiquotesVersion) else Seq())
}, addCompilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full))
.dependsOn(scaldingCore, scaldingHadoopTest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import _root_.parquet.filter2.predicate.FilterPredicate
import cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.HasFilterPredicate
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetWriteSupport, ParquetReadSupport, TypedParquetTupleScheme }

import scala.reflect.ClassTag
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport, TypedParquetTupleScheme }

/**
* Typed parquet tuple
Expand All @@ -28,36 +26,28 @@ object TypedParquet {
* val parquetTuple = TypedParquet[SampleClassB, ReadSupport](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null)
def apply[T](paths: Seq[String], readSupport: ParquetReadSupport[T]) =
new TypedFixedPathParquetTuple[T](paths, readSupport, null)

/**
* Create readable typed parquet source with filter predicate.
*/
def apply[T, R <: ParquetReadSupport[T]](paths: Seq[String], fp: Option[FilterPredicate])(implicit t: ClassTag[R]) =
new TypedFixedPathParquetTuple[T, R, ParquetWriteSupport[T]](paths, t.runtimeClass.asInstanceOf[Class[R]], null) {
def apply[T](paths: Seq[String], readSupport: ParquetReadSupport[T], fp: Option[FilterPredicate]) =
new TypedFixedPathParquetTuple[T](paths, readSupport, null) {
override def withFilter = fp
}

/**
* Create typed parquet source supports both R/W.
* @param paths paths of parquet I/O
* @param r Read support type tag
* @param w Write support type tag
* @tparam T Tuple type
* @tparam R Read support type
* @return a typed parquet source.
*/
def apply[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit r: ClassTag[R],
w: ClassTag[W]) = {
val readSupport = r.runtimeClass.asInstanceOf[Class[R]]
val writeSupport = w.runtimeClass.asInstanceOf[Class[W]]
new TypedFixedPathParquetTuple[T, R, W](paths, readSupport, writeSupport)
def apply[T](paths: Seq[String], readSupport: ParquetReadSupport[T], writeSupport: ParquetWriteSupport[T]) = {
new TypedFixedPathParquetTuple[T](paths, readSupport, writeSupport)
}

}
Expand All @@ -80,23 +70,21 @@ object TypedParquetSink {
* val sink = TypedParquetSink[SampleClassB, WriteSupport](Seq(outputPath))
*
* @param paths paths of parquet I/O
* @param t Read support type tag
* @tparam T Tuple type
* @tparam W Write support type
* @return a typed parquet source.
*/
def apply[T, W <: ParquetWriteSupport[T]](paths: Seq[String])(implicit t: ClassTag[W]) =
new TypedFixedPathParquetTuple[T, ParquetReadSupport[T], W](paths, null, t.runtimeClass.asInstanceOf[Class[W]])
def apply[T](paths: Seq[String], writeSupport: ParquetWriteSupport[T]) =
new TypedFixedPathParquetTuple[T](paths, null, writeSupport)
}

/**
* Typed Parquet tuple source/sink.
*/
trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] extends FileSource with Mappable[T]
trait TypedParquet[T] extends FileSource with Mappable[T]
with TypedSink[T] with HasFilterPredicate {

val readSupport: Class[R]
val writeSupport: Class[W]
val readSupport: ParquetReadSupport[T]
val writeSupport: ParquetWriteSupport[T]

override def converter[U >: T] = TupleConverter.asSuperConverter[T, U](TupleConverter.singleConverter[T])

Expand All @@ -108,5 +96,5 @@ trait TypedParquet[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]] e
}
}

class TypedFixedPathParquetTuple[T, R <: ParquetReadSupport[T], W <: ParquetWriteSupport[T]](val paths: Seq[String],
val readSupport: Class[R], val writeSupport: Class[W]) extends FixedPathSource(paths: _*) with TypedParquet[T, R, W]
class TypedFixedPathParquetTuple[T](val paths: Seq[String], val readSupport: ParquetReadSupport[T],
val writeSupport: ParquetWriteSupport[T]) extends FixedPathSource(paths: _*) with TypedParquet[T]
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.twitter.scalding.parquet.tuple.macros

import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetSchemaProvider, ParquetTupleConverterProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.macros.impl.{ ParquetReadSupportProvider, ParquetSchemaProvider, WriteSupportProvider }
import com.twitter.scalding.parquet.tuple.scheme.{ ParquetReadSupport, ParquetWriteSupport }

import scala.language.experimental.macros

Expand Down Expand Up @@ -38,17 +36,14 @@ object Macros {
def caseClassParquetSchema[T]: String = macro ParquetSchemaProvider.toParquetSchemaImpl[T]

/**
* Macro used to generate parquet tuple converter for a given case class.
*
* @tparam T Case class type that contains primitive or collection type fields or nested case class.
* @return Generated parquet converter
* Macro generated case class read support
*/
def caseClassParquetTupleConverter[T]: ParquetTupleConverter[T] = macro ParquetTupleConverterProvider.toParquetTupleConverterImpl[T]
def caseClassParquetReadSupport[T](schema: String): ParquetReadSupport[T] = macro ParquetReadSupportProvider.toParquetReadSupportImpl[T]

/**
* Macro used to generate case class write support to parquet.
* @tparam T User defined case class tuple type.
* @return Generated case class tuple write support function.
*/
def caseClassWriteSupport[T]: (T, RecordConsumer, MessageType) => Unit = macro WriteSupportProvider.toWriteSupportImpl[T]
def caseClassParquetWriteSupport[T](schema: String): ParquetWriteSupport[T] = macro WriteSupportProvider.toWriteSupportImpl[T]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import com.twitter.scalding.parquet.tuple.scheme._

import scala.reflect.macros.Context

object ParquetTupleConverterProvider {
object ParquetReadSupportProvider {
private[this] sealed trait CollectionType
private[this] case object NOT_A_COLLECTION extends CollectionType
private[this] case object OPTION extends CollectionType
private[this] case object LIST extends CollectionType
private[this] case object SET extends CollectionType
private[this] case object MAP extends CollectionType

def toParquetTupleConverterImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetTupleConverter[T]] = {
def toParquetReadSupportImpl[T](ctx: Context)(schema: ctx.Expr[String])(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetReadSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -194,6 +194,11 @@ object ParquetTupleConverterProvider {
val groupConverter = buildGroupConverter(T.tpe, converters, converterGetters, convertersResetCalls,
buildTupleValue(T.tpe, fieldValues))

ctx.Expr[ParquetTupleConverter[T]](groupConverter)
val readSupport = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetReadSupport[$T]($schema) {
override val tupleConverter: _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetTupleConverter[$T] = $groupConverter
}
"""
ctx.Expr[ParquetReadSupport[T]](readSupport)
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package com.twitter.scalding.parquet.tuple.macros.impl

import com.twitter.bijection.macros.impl.IsCaseClassImpl
import parquet.io.api.RecordConsumer
import parquet.schema.MessageType
import com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport

import scala.reflect.macros.Context

object WriteSupportProvider {

def toWriteSupportImpl[T](ctx: Context)(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[(T, RecordConsumer, MessageType) => Unit] = {
def toWriteSupportImpl[T](ctx: Context)(schema: ctx.Expr[String])(implicit T: ctx.WeakTypeTag[T]): ctx.Expr[ParquetWriteSupport[T]] = {
import ctx.universe._

if (!IsCaseClassImpl.isCaseClassType(ctx)(T.tpe))
Expand Down Expand Up @@ -39,7 +38,7 @@ object WriteSupportProvider {

fieldType match {
case tpe if tpe =:= typeOf[String] =>
writePrimitiveField(q"rc.addBinary(Binary.fromString($fValue))")
writePrimitiveField(q"rc.addBinary(_root_.parquet.io.api.Binary.fromString($fValue))")
case tpe if tpe =:= typeOf[Boolean] =>
writePrimitiveField(q"rc.addBoolean($fValue)")
case tpe if tpe =:= typeOf[Short] =>
Expand Down Expand Up @@ -124,16 +123,16 @@ object WriteSupportProvider {
if (finalIdx == 0)
ctx.abort(ctx.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?")

val writeFunction: Tree = q"""
val writeFunc = (t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType) => {

var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
val writeSupport: Tree = q"""
new _root_.com.twitter.scalding.parquet.tuple.scheme.ParquetWriteSupport[$T]($schema) {
override def writeRecord(t: $T, rc: _root_.parquet.io.api.RecordConsumer, schema: _root_.parquet.schema.MessageType): Unit = {
var $rootGroupName: _root_.parquet.schema.GroupType = schema
rc.startMessage
$funcBody
rc.endMessage
}
}
writeFunc
"""
ctx.Expr[(T, RecordConsumer, MessageType) => Unit](writeFunction)
ctx.Expr[ParquetWriteSupport[T]](writeSupport)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.twitter.scalding.parquet.tuple.scheme
import parquet.io.api.{ Binary, Converter, GroupConverter, PrimitiveConverter }
import scala.util.Try

trait TupleFieldConverter[+T] extends Converter {
trait TupleFieldConverter[+T] extends Converter with Serializable {
/**
* Current value read from parquet column
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,22 @@ import java.util.{ HashMap => JHashMap, Map => JMap }
import _root_.parquet.filter2.predicate.FilterPredicate
import _root_.parquet.hadoop.api.ReadSupport.ReadContext
import _root_.parquet.hadoop.api.WriteSupport.WriteContext
import _root_.parquet.hadoop.api.{ ReadSupport, WriteSupport }
import _root_.parquet.hadoop.mapred.{ Container, DeprecatedParquetInputFormat, DeprecatedParquetOutputFormat }
import parquet.hadoop.api.{ WriteSupport, ReadSupport }
import _root_.parquet.io.api._
import cascading.flow.FlowProcess
import cascading.scheme.{ Scheme, SinkCall, SourceCall }
import cascading.tap.Tap
import cascading.tuple.Tuple
import com.twitter.bijection.{ GZippedBase64String, Bijection }
import com.twitter.chill.KryoInjection
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader }
import org.apache.hadoop.mapred._
import parquet.hadoop.mapred.{ Container, DeprecatedParquetOutputFormat, DeprecatedParquetInputFormat }
import parquet.hadoop.{ ParquetInputFormat, ParquetOutputFormat }
import parquet.schema._

import scala.util.{ Failure, Success }

/**
* Parquet tuple materializer permits to create user defined type record from parquet tuple values
* @param converter root converter
Expand All @@ -34,19 +38,18 @@ class ParquetTupleMaterializer[T](val converter: ParquetTupleConverter[T]) exten
* For case class types, we provide a macro to generate the schema and the tuple converter so that user
* can define a ParquetReadSupport like this:
*
* case class SampleClass(bool: Boolean, long: Long, float: Float)
* case class SampleClass(bool: Boolean, long: Long, float: Float)
*
* class SampleClassReadSupport extends ParquetReadSupport[SampleClass] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val tupleConverter: ParquetTupleConverter = caseClassParquetTupleConverter[SampleClass]
* override val rootSchema: String = caseClassParquetSchema[SampleClass]
* }
* class SampleClassReadSupport extends ParquetReadSupport[SampleClass] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* override val tupleConverter: ParquetTupleConverter = caseClassParquetTupleConverter[SampleClass]
* override val rootSchema: String = caseClassParquetSchema[SampleClass]
* }
*
* @tparam T user defined value type
*/
trait ParquetReadSupport[T] extends ReadSupport[T] {
abstract class ParquetReadSupport[T](val rootSchema: String) extends ReadSupport[T] with Serializable {
val tupleConverter: ParquetTupleConverter[T]
val rootSchema: String

lazy val rootType: MessageType = MessageTypeParser.parseMessageType(rootSchema)

Expand All @@ -63,26 +66,24 @@ trait ParquetReadSupport[T] extends ReadSupport[T] {
* User must provide record schema and a function which permits to write a used defined case class to parquet store with
* the record consumer and schema definition.
*
* For case class value types, we provide a macro to generate the write support function so that user
* For case class value types, we provide a macro to generate the write support function so that user
* can define a ParquetWriteSupport like this:
*
* class SampleClassWriteSupport extends TupleWriteSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
* class SampleClassWriteSupport extends TupleWriteSupport[SampleClassB] {
* import com.twitter.scalding.parquet.tuple.macros.Macros._
*
* override def writeRecord(r: SampleClassB, rc: RecordConsumer, schema: MessageType):Unit =
* Macros.caseClassWriteSupport[SampleClassB](r, rc, schema)
* override def writeRecord(r: SampleClassB, rc: RecordConsumer, schema: MessageType):Unit =
* Macros.caseClassWriteSupport[SampleClassB](r, rc, schema)
*
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
* override val rootSchema: String = caseClassParquetSchema[SampleClassB]
* }
*
* @tparam T user defined value type
*/
trait ParquetWriteSupport[T] extends WriteSupport[T] {
abstract class ParquetWriteSupport[T](val rootSchema: String) extends WriteSupport[T] with Serializable {

var recordConsumer: RecordConsumer = null

val rootSchema: String

lazy val rootType: MessageType = MessageTypeParser.parseMessageType(rootSchema)

override def init(configuration: Configuration): WriteContext =
Expand All @@ -95,14 +96,56 @@ trait ParquetWriteSupport[T] extends WriteSupport[T] {
def writeRecord(r: T, rc: RecordConsumer, schema: MessageType): Unit
}

object InnerParquetInputFormat {
val READ_SUPPORT_INSTANCE = "parquet.read.support.instance"
}

class ParquetInputFormatFromReadSupportInstance[T] extends ParquetInputFormat[T] {
override def getReadSupport(conf: Configuration): ReadSupport[T] = {
val readSupport = conf.get(InnerParquetInputFormat.READ_SUPPORT_INSTANCE)
require(readSupport != null && !readSupport.isEmpty, "no read support instance is configured")
val deserialize = (GZippedBase64String(_)) andThen Bijection.bytes2GZippedBase64.inverse andThen KryoInjection.invert
val readSupportInstance = deserialize(readSupport)
readSupportInstance match {
case Success(obj) => obj.asInstanceOf[ReadSupport[T]]
case Failure(e) => throw e
}
}
}

private class InnerDeprecatedParquetInputFormat[T] extends DeprecatedParquetInputFormat[T] {
this.realInputFormat = new ParquetInputFormatFromReadSupportInstance[T]
}

object InnerParquetOutputFormat {
val WRITE_SUPPORT_INSTANCE = "parquet.write.support.instance"
}

class ParquetOutputFormatFromWriteSupportInstance[T] extends ParquetOutputFormat[T] {
override def getWriteSupport(conf: Configuration): WriteSupport[T] = {
val writeSupport = conf.get(InnerParquetOutputFormat.WRITE_SUPPORT_INSTANCE)
require(writeSupport != null && !writeSupport.isEmpty, "no WRITE support instance is configured")
val deserialize = (GZippedBase64String(_)) andThen Bijection.bytes2GZippedBase64.inverse andThen KryoInjection.invert
val writeSupportInstance = deserialize(writeSupport)
writeSupportInstance match {
case Success(obj) => obj.asInstanceOf[WriteSupport[T]]
case Failure(e) => throw e
}
}
}

private class InnerDeprecatedParquetOutputFormat[T] extends DeprecatedParquetOutputFormat[T] {
this.realOutputFormat = new ParquetOutputFormatFromWriteSupportInstance[T]
}

/**
* Typed parquet tuple scheme.
* @param readSupport read support class
* @param writeSupport write support class
* @param fp filter predicate
* @tparam T tuple value type
*/
class TypedParquetTupleScheme[T](val readSupport: Class[_], val writeSupport: Class[_],
class TypedParquetTupleScheme[T](val readSupport: ParquetReadSupport[T], val writeSupport: ParquetWriteSupport[T],
val fp: Option[FilterPredicate] = None)
extends Scheme[JobConf, RecordReader[AnyRef, Container[T]], OutputCollector[AnyRef, T], Array[AnyRef], Array[AnyRef]] {

Expand All @@ -114,8 +157,9 @@ class TypedParquetTupleScheme[T](val readSupport: Class[_], val writeSupport: Cl

override def sourceConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = {
fp.map(ParquetInputFormat.setFilterPredicate(jobConf, _))
jobConf.setInputFormat(classOf[DeprecatedParquetInputFormat[T]])
ParquetInputFormat.setReadSupportClass(jobConf, readSupport)
jobConf.setInputFormat(classOf[InnerDeprecatedParquetInputFormat[T]])
jobConf.set(InnerParquetInputFormat.READ_SUPPORT_INSTANCE,
Bijection.bytes2GZippedBase64(KryoInjection(readSupport)).str)
}

override def source(flowProcess: FlowProcess[JobConf], sc: SourceCallType): Boolean = {
Expand All @@ -133,8 +177,9 @@ class TypedParquetTupleScheme[T](val readSupport: Class[_], val writeSupport: Cl
}

override def sinkConfInit(flowProcess: FlowProcess[JobConf], tap: TapType, jobConf: JobConf): Unit = {
jobConf.setOutputFormat(classOf[DeprecatedParquetOutputFormat[T]])
ParquetOutputFormat.setWriteSupportClass(jobConf, writeSupport)
jobConf.setOutputFormat(classOf[InnerDeprecatedParquetOutputFormat[T]])
jobConf.set(InnerParquetOutputFormat.WRITE_SUPPORT_INSTANCE,
Bijection.bytes2GZippedBase64(KryoInjection(writeSupport)).str)
}

override def sink(flowProcess: FlowProcess[JobConf], sinkCall: SinkCallType): Unit = {
Expand Down
Loading

0 comments on commit 08c7382

Please sign in to comment.