Skip to content

Commit

Permalink
Improve TypedParquetTuple twitter#1302
Browse files Browse the repository at this point in the history
 *Refacto
 *Add example in README
  • Loading branch information
JiJiTang committed May 29, 2015
1 parent c6980f3 commit 6995da4
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
27 changes: 26 additions & 1 deletion scalding-parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,29 @@ The implementation is ported from code used by Twitter internally written by Sam
## Use com.twitter.scalding.parquet.thrift for reading apache Thrift (TBase) records
## Use com.twitter.scalding.parquet.scrooge for reading scrooge Thrift (ThriftStruct) records
Located in the scalding-parquet-scrooge module
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple for reading Tuple records
## Use com.twitter.scalding.parquet.tuple.TypedParquet for reading or writing case classes:
Can use macro in com.twitter.scalding.parquet.tuple.macros.Macros to generate parquet read/write support. Here's an example:
```scala
import com.twitter.scalding.parquet.tuple.macros.Macros._

case class SampleClass(x: Int, y: String)

class WriteToTypedParquetTupleJob(args: Args) extends Job(args) {
val outputPath = args.required("output")
val sink = TypedParquetSink[SampleClass](outputPath)

TypedPipe.from(List(SampleClass(0, "foo"), SampleClass(1, "bar"))).write(sink)
}

class ReadWithFilterPredicateJob(args: Args) extends Job(args) {
val fp: FilterPredicate = FilterApi.eq(binaryColumn("y"), Binary.fromString("foo"))

val inputPath = args.required("input")
val outputPath = args.required("output")

val input = TypedParquet[SampleClass](inputPath, fp)

TypedPipe.from(input).map(_.x).write(TypedTsv[Int](outputPath))
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@ object TypedParquet {
* @tparam T Tuple type
* @return a typed parquet source.
*/
def apply[T](paths: Seq[String])(implicit readSupport: ParquetReadSupport[T]) =
def apply[T](paths: Seq[String])(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null)

def apply[T](path: String)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] = apply[T](Seq(path))

/**
* Create readable typed parquet source with filter predicate.
*/
def apply[T](paths: Seq[String], fp: Option[FilterPredicate])(implicit readSupport: ParquetReadSupport[T]) =
def apply[T](paths: Seq[String], fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, readSupport, null) {
override def withFilter = fp
override def withFilter = Some(fp)
}

def apply[T](path: String, fp: FilterPredicate)(implicit readSupport: ParquetReadSupport[T]): TypedParquet[T] =
apply[T](Seq(path), fp)
}

object TypedParquetSink {
Expand All @@ -44,8 +49,10 @@ object TypedParquetSink {
* @tparam T Tuple type
* @return a typed parquet source.
*/
def apply[T](paths: Seq[String])(implicit writeSupport: ParquetWriteSupport[T]) =
def apply[T](paths: Seq[String])(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] =
new TypedFixedPathParquetTuple[T](paths, null, writeSupport)

def apply[T](path: String)(implicit writeSupport: ParquetWriteSupport[T]): TypedParquet[T] = apply[T](Seq(path))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class WriteToTypedParquetTupleJob(args: Args) extends Job(args) {

val outputPath = args.required("output")

val sink = TypedParquetSink[SampleClassB](Seq(outputPath))
val sink = TypedParquetSink[SampleClassB](outputPath)
TypedPipe.from(values).write(sink)
}

Expand All @@ -79,7 +79,7 @@ class ReadWithFilterPredicateJob(args: Args) extends Job(args) {
val inputPath = args.required("input")
val outputPath = args.required("output")

val input = TypedParquet[SampleClassC](Seq(inputPath), Some(fp))
val input = TypedParquet[SampleClassC](inputPath, fp)

TypedPipe.from(input).map(_.a.bool).write(TypedTsv[Boolean](outputPath))
}
Expand Down

0 comments on commit 6995da4

Please sign in to comment.