Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned sources for Parquet thrift / scrooge #1590

Merged
merged 10 commits into from
Sep 16, 2016
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextDelimited
import cascading.scheme.local.{ TextDelimited => LocalTextDelimited }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.hadoop.PartitionTap
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }
import cascading.tap.{ SinkMode, Tap }
import cascading.tuple.Fields

/**
* Trait to assist with creating partitioned sources.
Expand Down
5 changes: 5 additions & 0 deletions scalding-parquet-fixtures/src/test/resources/test.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ struct Name {
1: required string first_name,
2: optional string last_name
}

struct Address {
1: string street,
2: required string zip
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package com.twitter.scalding.parquet.scrooge

import cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.thrift.ParquetThriftBase
import com.twitter.scalding.parquet.thrift.ParquetThriftBaseFileSource
import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource }
import com.twitter.scrooge.ThriftStruct

trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBase[T] {
import scala.reflect.ClassTag

trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] {

override def hdfsScheme = {
// See docs in Parquet346ScroogeScheme
Expand All @@ -18,13 +20,13 @@ trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBase[T] {

class DailySuffixParquetScrooge[T <: ThriftStruct](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends DailySuffixSource(path, dateRange) with ParquetScrooge[T]

class HourlySuffixParquetScrooge[T <: ThriftStruct](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends HourlySuffixSource(path, dateRange) with ParquetScrooge[T]

class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val mf: Manifest[T])
class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val ct: ClassTag[T])
extends FixedPathSource(paths: _*) with ParquetScrooge[T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.twitter.scalding.parquet.scrooge

import _root_.cascading.scheme.Scheme
import com.twitter.scalding._
import com.twitter.scalding.parquet.thrift.ParquetThriftBase
import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil }
import com.twitter.scrooge.ThriftStruct

import scala.reflect.ClassTag

/**
* Scalding source to read or write partitioned Parquet scrooge data.
*
* For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and
* `T` is the scrooge object. `P` must be either a String or a tuple of Strings.
* Below is an example.
* {{{
* val data: TypedPipe[MyScroogeObject] = ???
* data.map { obj =>
* ( (obj.country, obj.city), obj)
* }.write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s"))
* }}}
*
* For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding
* scrooge object. Below is an example.
* {{{
* val in: TypedPipe[(String, String), MyScroogeObject] =
* TypedPipe.from( PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s") )
* }}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for reads, is it required that P be a String or a TupleN of Strings? Seems like it'd have to be?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can also be a string (if its just "%s"). If you see the unit tests, there P ends up being a String.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we should clarify that in the docs? Could add an example for the %s case cause it isn't a tuple?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so P must be either: String or tuple of Strings right? That'd be good to clarify.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll update the docs to indicate this and also include an example

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible, maybe a geek out, to have a macro that checks the format string at compile time?

this would allow it to be really clear that the string matches the tuple type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into this. Not super familiar with Scala macros so this can be a learning opportunity :-). Currently we do check this (at runtime though) - in TemplatePartition

case class TemplatePartition(partitionFields: Fields, template: String) extends Partition {
  assert(
    partitionFields.size == "%s".r.findAllIn(template).length,
    "Number of partition fields %s does not correspond to template (%s)".format(partitionFields, template))
...
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if you do something like:

val partitionSource = PartitionedParquetScroogeSource[String, Address](path, "%s/%s") // should be just %s

you get an error:
assertion failed: Number of partition fields '1' does not correspond to template (%s/%s)
java.lang.AssertionError: assertion failed: Number of partition fields '1' does not correspond to template (%s/%s)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it would be nice to just call the code cascading uses to verify it at compile time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll dig into this

*
*/
case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](path: String, template: String)(implicit val ct: ClassTag[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable {

override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity)

assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")

// Create the underlying scheme and explicitly set the source, sink fields to be only the specified fields
override def hdfsScheme = {
val scroogeScheme = new Parquet346ScroogeScheme[T](this.config)
val scheme = HadoopSchemeInstance(scroogeScheme.asInstanceOf[Scheme[_, _, _, _, _]])
scheme.setSinkFields(fields)
scheme.setSourceFields(fields)
scheme
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.twitter.scalding.parquet.scrooge

import java.io.File

import com.twitter.scalding._
import com.twitter.scalding.parquet.scrooge.thrift_scala.test.Address
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetReader

import org.scalatest.{ Matchers, WordSpec }

object PartitionedParquetScroogeTestSources {
val path = "/a/path"
val partitionSource = PartitionedParquetScroogeSource[String, Address](path, "%s")
}

class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) {
import PartitionedParquetScroogeTestSources._
val input = Seq(Address("123 Embarcadero", "94111"), Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075"))

TypedPipe.from(input)
.map { case Address(street, zipcode) => (zipcode, Address(street, zipcode)) }
.write(partitionSource)
}

class PartitionedParquetScroogeSourceTests extends WordSpec with Matchers {
import PartitionedParquetScroogeTestSources._

def validate(path: Path, expectedAddresses: Address*) = {
val conf: Configuration = new Configuration
conf.set("parquet.thrift.converter.class", classOf[ScroogeRecordConverter[Address]].getName)
val parquetReader: ParquetReader[Address] =
ParquetReader.builder[Address](new ScroogeReadSupport[Address], path)
.withConf(conf)
.build()

Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses
}

"PartitionedParquetScroogeSource" should {
"write out partitioned scrooge objects" in {
var job: Job = null;
def buildJob(args: Args): Job = {
job = new PartitionedParquetScroogeWriteJob(args)
job
}
JobTest(buildJob(_))
.runHadoop
.finish()

val testMode = job.mode.asInstanceOf[HadoopTest]

val directory = new File(testMode.getWritePathFor(partitionSource))

directory.listFiles().map({ _.getName() }).toSet shouldBe Set("94111", "10075")

// check that the partitioning is done correctly by zipcode
validate(new Path(directory.getPath + "/94111/part-00000-00000-m-00000.parquet"),
Address("123 Embarcadero", "94111"))
validate(new Path(directory.getPath + "/10075/part-00000-00001-m-00000.parquet"),
Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,19 @@ import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource }
import java.io.Serializable
import org.apache.thrift.{ TBase, TFieldIdEnum }

import scala.reflect.ClassTag

object ParquetThrift extends Serializable {
type ThriftBase = TBase[_ <: TBase[_, _], _ <: TFieldIdEnum]
}

trait ParquetThriftBase[T] extends FileSource with SingleMappable[T] with TypedSink[T] with LocalTapSource with HasFilterPredicate with HasColumnProjection {
trait ParquetThriftBase[T] extends LocalTapSource with HasFilterPredicate with HasColumnProjection {

def mf: Manifest[T]
implicit def ct: ClassTag[T]

def config: ParquetValueScheme.Config[T] = {
val config = new ParquetValueScheme.Config[T].withRecordClass(mf.runtimeClass.asInstanceOf[Class[T]])
def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we accepting ct as a parameter if we already have it on line 40?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall remove this, added this on one of my prior passes over the code when I was seeing some compile errors. Not needed anymore.

val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just do val clazz = ct.runtimeClass.asInstanceOf[Class[T]] here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, that's cleaner.

val config = new ParquetValueScheme.Config[T].withRecordClass(clazz)
val configWithFp = withFilter match {
case Some(fp) => config.withFilterPredicate(fp)
case None => config
Expand All @@ -52,11 +55,13 @@ trait ParquetThriftBase[T] extends FileSource with SingleMappable[T] with TypedS

configWithProjection
}
}

trait ParquetThriftBaseFileSource[T] extends FileSource with ParquetThriftBase[T] with SingleMappable[T] with TypedSink[T] {
override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T])
}

trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBase[T] {
trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBaseFileSource[T] {

override def hdfsScheme = {
// See docs in Parquet346TBaseScheme
Expand Down Expand Up @@ -108,13 +113,13 @@ trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBase[T]
*/
class DailySuffixParquetThrift[T <: ParquetThrift.ThriftBase](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends DailySuffixSource(path, dateRange) with ParquetThrift[T]

class HourlySuffixParquetThrift[T <: ParquetThrift.ThriftBase](
path: String,
dateRange: DateRange)(implicit override val mf: Manifest[T])
dateRange: DateRange)(implicit override val ct: ClassTag[T])
extends HourlySuffixSource(path, dateRange) with ParquetThrift[T]

class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val mf: Manifest[T])
class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val ct: ClassTag[T])
extends FixedPathSource(paths: _*) with ParquetThrift[T]
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.twitter.scalding.parquet.thrift

import cascading.scheme.Scheme
import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil }
import com.twitter.scalding.{ FixedPathSource, HadoopSchemeInstance, TupleConverter, TupleSetter }

import scala.reflect.ClassTag

/**
* Scalding source to read or write partitioned Parquet thrift data.
*
* For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and
* `T` is the thrift object. `P` must be either a String or a tuple of Strings.
* Below is an example.
* {{{
* val data: TypedPipe[MyThriftObject] = ???
* data.map{ obj =>
* ( (obj.country, obj.city), obj)
* }.write(PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s"))
* }}}
*
* For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding
* thrift object. Below is an example.
* {{{
* val in: TypedPipe[(String, String), MyThriftObject] =
* TypedPipe.from( PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s") )
* }}}
*
*/
case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](path: String, template: String)(implicit val ct: ClassTag[T],
val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P])
extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable {

override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity)

assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")

// Create the underlying scheme and explicitly set the source, sink fields to be only the specified fields
override def hdfsScheme = {
// See docs in Parquet346TBaseScheme
val baseScheme = new Parquet346TBaseScheme[T](this.config)
val scheme = HadoopSchemeInstance(baseScheme.asInstanceOf[Scheme[_, _, _, _, _]])
scheme.setSinkFields(fields)
scheme.setSourceFields(fields)
scheme
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.twitter.scalding.parquet.thrift

import java.io.File

import com.twitter.scalding._
import com.twitter.scalding.parquet.thrift_java.test.Address
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.thrift.ThriftParquetReader

import org.scalatest.{ Matchers, WordSpec }

object PartitionedParquetThriftTestSources {
val path = "/a/path"
val partitionSource = PartitionedParquetThriftSource[String, Address](path, "%s")
}

class PartitionedParquetThriftWriteJob(args: Args) extends Job(args) {
import PartitionedParquetThriftTestSources._
val input = Seq(new Address("123 Embarcadero", "94111"), new Address("123 E 79th St", "10075"), new Address("456 W 80th St", "10075"))

TypedPipe.from(input)
.map { address => (address.getZip, address) }
.write(partitionSource)
}

class PartitionedParquetThriftSourceTests extends WordSpec with Matchers {
import PartitionedParquetThriftTestSources._

def validate(path: Path, expectedAddresses: Address*) = {
val parquetReader: ParquetReader[Address] =
ThriftParquetReader.build(path).withThriftClass(classOf[Address]).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the TBase reader? should be able to use the scrooge reader here right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is the TBase reader. Can try and hook up the scrooge reader. Do you want to use the scrooge reader in both the partitioned thrift & partitioned scrooge tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I misread, didn't realize this was the thrift test not the scrooge test

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so I updated the scrooge test to use the scrooge reader. Left this one as is

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, can you update the docs to explain the requirements on P being a String or tuple of Strings? I guess that might not be true if you provide a different TupleSetter, but at least taht part should be clarified too (that P and the TupleSetter need to match, which is not tracked by the type system)

Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses
}

"PartitionedParquetThriftSource" should {
"write out partitioned thrift objects" in {
var job: Job = null;
def buildJob(args: Args): Job = {
job = new PartitionedParquetThriftWriteJob(args)
job
}
JobTest(buildJob(_))
.runHadoop
.finish()

val testMode = job.mode.asInstanceOf[HadoopTest]

val directory = new File(testMode.getWritePathFor(partitionSource))

directory.listFiles().map({ _.getName() }).toSet shouldBe Set("94111", "10075")

// check that the partitioning is done correctly by zipcode
validate(new Path(directory.getPath + "/94111/part-00000-00000-m-00000.parquet"),
new Address("123 Embarcadero", "94111"))
validate(new Path(directory.getPath + "/10075/part-00000-00001-m-00000.parquet"),
new Address("123 E 79th St", "10075"), new Address("456 W 80th St", "10075"))
}
}
}