From e5dcc2dafdbecddd312719341190e4acea2ebd2a Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 30 Aug 2016 10:46:18 -0700 Subject: [PATCH 01/10] Add partitioned parquet scrooge sources --- .../parquet/scrooge/ParquetScrooge.scala | 4 +- .../scrooge/PartitionParquetScrooge.scala | 46 ++++++++++++++ .../PartitionParquetScroogeTests.scala | 62 +++++++++++++++++++ .../parquet/thrift/ParquetThrift.scala | 6 +- .../thrift/PartitionParquetThrift.scala | 8 +++ .../thrift/PartitionParquetThriftTests.scala | 8 +++ 6 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala create mode 100644 scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala create mode 100644 scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala create mode 100644 scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala index a899541d41..c798662298 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala @@ -2,11 +2,11 @@ package com.twitter.scalding.parquet.scrooge import cascading.scheme.Scheme import com.twitter.scalding._ -import com.twitter.scalding.parquet.thrift.ParquetThriftBase +import com.twitter.scalding.parquet.thrift.ParquetThriftBaseFileSource import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource } import com.twitter.scrooge.ThriftStruct -trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBase[T] { +trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] { override def hdfsScheme = { // See docs in Parquet346ScroogeScheme diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala new file mode 100644 index 0000000000..5551ea1328 --- /dev/null +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala @@ -0,0 +1,46 @@ +package com.twitter.scalding.parquet.scrooge + +import _root_.cascading.scheme.Scheme +import _root_.cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.parquet.thrift.ParquetThriftBase +import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } +import com.twitter.scrooge.ThriftStruct + +/** + * Scalding source to read or write partitioned Parquet scrooge data. + * + * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and + * `T` is the scrooge object. Below is an example. + * {{{ + * val data = MyScroogeObject() + * IterablePipe(data, flowDef, mode) + * .write(PartitionParquetScrooge[(String, String), MyScroogeObject](path, "%s/%s")) + * }}} + * + * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding + * scrooge object. Below is an example. + * {{{ + * val in: TypedPipe[(String, String), MyScroogeObject] = + * TypedPipe.from( PartitionParquetScrooge[(String, String), MyScroogeObject](path, "%s/%s") ) + * }}} + * + */ +case class PartitionParquetScrooge[P, T <: ThriftStruct]( + path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], + val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) + extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { + + assert( + fields.size == valueSetter.arity, + "The number of fields needs to be the same as the arity of the value setter") + + // Create the underlying scheme and explicitly set the source, sink fields to be only the specified fields + override def hdfsScheme = { + val scroogeScheme = new Parquet346ScroogeScheme[T](this.config) + val scheme = HadoopSchemeInstance(scroogeScheme.asInstanceOf[Scheme[_, _, _, _, _]]) + scheme.setSinkFields(fields) + scheme.setSourceFields(fields) + scheme + } +} diff --git a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala new file mode 100644 index 0000000000..152f900a08 --- /dev/null +++ b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala @@ -0,0 +1,62 @@ +package com.twitter.scalding.parquet.scrooge + +import java.io.File + +import com.twitter.scalding._ +import com.twitter.scalding.parquet.scrooge.thrift_java.test.{Address => TAddress} +import com.twitter.scalding.parquet.scrooge.thrift_scala.test.Address +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.thrift.ThriftParquetReader + +import org.scalatest.{Matchers, WordSpec} + + +object PartitionedParquetScroogeTestSources { + val path = "/a/path" + val partitionSource = PartitionParquetScrooge[String, Address](path, "%s") +} + +class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) { + import PartitionedParquetScroogeTestSources._ + val input = Seq( Address("123 Embarcadero", "94111"), Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075") ) + + TypedPipe.from(input) + .map { case Address(street, zipcode) => (zipcode, Address(street, zipcode)) } + .write(partitionSource) +} + +class PartitionParquetScroogeTests extends WordSpec with Matchers { + import PartitionedParquetScroogeTestSources._ + + def validate(path: Path, expectedAddresses: TAddress*) = { + val parquetReader: ParquetReader[TAddress] = + ThriftParquetReader.build(path).withThriftClass(classOf[TAddress]).build() + Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses + } + + "PartitionParquetScrooge" should { + "write out partitioned scrooge objects" in { + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartitionedParquetScroogeWriteJob(args) + job + } + JobTest(buildJob(_)) + .runHadoop + .finish() + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(partitionSource)) + + directory.listFiles().map({ _.getName() }).toSet shouldBe Set("94111", "10075") + + // check that the partitioning is done correctly by zipcode + validate(new Path(directory.getPath + "/94111/part-00000-00000-m-00000.parquet"), + new TAddress("123 Embarcadero", "94111")) + validate(new Path(directory.getPath + "/10075/part-00000-00001-m-00000.parquet"), + new TAddress("123 E 79th St", "10075"), new TAddress("456 W 80th St", "10075")) + } + } +} diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala index 1fa0ebc88c..e72d6896a6 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala @@ -33,7 +33,7 @@ object ParquetThrift extends Serializable { type ThriftBase = TBase[_ <: TBase[_, _], _ <: TFieldIdEnum] } -trait ParquetThriftBase[T] extends FileSource with SingleMappable[T] with TypedSink[T] with LocalTapSource with HasFilterPredicate with HasColumnProjection { +trait ParquetThriftBase[T] extends LocalTapSource with HasFilterPredicate with HasColumnProjection { def mf: Manifest[T] @@ -52,11 +52,13 @@ trait ParquetThriftBase[T] extends FileSource with SingleMappable[T] with TypedS configWithProjection } +} +trait ParquetThriftBaseFileSource[T] extends FileSource with ParquetThriftBase[T] with SingleMappable[T] with TypedSink[T] { override def setter[U <: T] = TupleSetter.asSubSetter[T, U](TupleSetter.singleSetter[T]) } -trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBase[T] { +trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBaseFileSource[T] { override def hdfsScheme = { // See docs in Parquet346TBaseScheme diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala new file mode 100644 index 0000000000..3b42dacde3 --- /dev/null +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala @@ -0,0 +1,8 @@ +package com.twitter.scalding.parquet.thrift + +/** + * Created by pnarang on 8/30/16. + */ +class PartitionParquetThrift { + +} diff --git a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala new file mode 100644 index 0000000000..f1d33dc511 --- /dev/null +++ b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala @@ -0,0 +1,8 @@ +package com.twitter.scalding.parquet.thrift + +/** + * Created by pnarang on 8/30/16. + */ +class PartitionParquetThriftTests { + +} From 057eb566cf54f88ea2685f83207e89edc93808c7 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 30 Aug 2016 10:56:36 -0700 Subject: [PATCH 02/10] Add partitioned parquet thrift source --- .../src/test/resources/test.thrift | 5 ++ .../thrift/PartitionParquetThrift.scala | 43 ++++++++++++- .../thrift/PartitionParquetThriftTests.scala | 60 +++++++++++++++++-- 3 files changed, 101 insertions(+), 7 deletions(-) diff --git a/scalding-parquet-fixtures/src/test/resources/test.thrift b/scalding-parquet-fixtures/src/test/resources/test.thrift index f30e354571..e96749a05d 100644 --- a/scalding-parquet-fixtures/src/test/resources/test.thrift +++ b/scalding-parquet-fixtures/src/test/resources/test.thrift @@ -5,3 +5,8 @@ struct Name { 1: required string first_name, 2: optional string last_name } + +struct Address { + 1: string street, + 2: required string zip +} diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala index 3b42dacde3..33325102f0 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala @@ -1,8 +1,45 @@ package com.twitter.scalding.parquet.thrift +import cascading.scheme.Scheme +import cascading.tuple.Fields +import com.twitter.scalding.{ HadoopSchemeInstance, FixedPathSource, TupleConverter, TupleSetter } +import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } + /** - * Created by pnarang on 8/30/16. - */ -class PartitionParquetThrift { + * Scalding source to read or write partitioned Parquet thrift data. + * + * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and + * `T` is the thrift object. Below is an example. + * {{{ + * val data = MyThriftObject() + * IterablePipe(data, flowDef, mode) + * .write(PartitionParquetThrift[(String, String), MyThriftObject](path, "%s/%s")) + * }}} + * + * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding + * thrift object. Below is an example. + * {{{ + * val in: TypedPipe[(String, String), MyThriftObject] = + * TypedPipe.from( PartitionParquetThrift[(String, String), MyThriftObject](path, "%s/%s") ) + * }}} + * + */ +case class PartitionParquetThrift[P, T <: ParquetThrift.ThriftBase]( + path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], + val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) + extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { + + assert( + fields.size == valueSetter.arity, + "The number of fields needs to be the same as the arity of the value setter") + // Create the underlying scheme and explicitly set the source, sink fields to be only the specified fields + override def hdfsScheme = { + // See docs in Parquet346TBaseScheme + val baseScheme = new Parquet346TBaseScheme[T](this.config) + val scheme = HadoopSchemeInstance(baseScheme.asInstanceOf[Scheme[_, _, _, _, _]]) + scheme.setSinkFields(fields) + scheme.setSourceFields(fields) + scheme + } } diff --git a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala index f1d33dc511..dafcf73950 100644 --- a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala +++ b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala @@ -1,8 +1,60 @@ package com.twitter.scalding.parquet.thrift -/** - * Created by pnarang on 8/30/16. - */ -class PartitionParquetThriftTests { +import java.io.File +import com.twitter.scalding._ +import com.twitter.scalding.parquet.thrift_java.test.Address +import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetReader +import org.apache.parquet.thrift.ThriftParquetReader + +import org.scalatest.{ Matchers, WordSpec } + +object PartitionedParquetThriftTestSources { + val path = "/a/path" + val partitionSource = PartitionParquetThrift[String, Address](path, "%s") +} + +class PartitionedParquetThriftWriteJob(args: Args) extends Job(args) { + import PartitionedParquetThriftTestSources._ + val input = Seq(new Address("123 Embarcadero", "94111"), new Address("123 E 79th St", "10075"), new Address("456 W 80th St", "10075")) + + TypedPipe.from(input) + .map { address => (address.getZip, address) } + .write(partitionSource) } + +class PartitionParquetThriftTests extends WordSpec with Matchers { + import PartitionedParquetThriftTestSources._ + + def validate(path: Path, expectedAddresses: Address*) = { + val parquetReader: ParquetReader[Address] = + ThriftParquetReader.build(path).withThriftClass(classOf[Address]).build() + Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses + } + + "PartitionParquetThrift" should { + "write out partitioned thrift objects" in { + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartitionedParquetThriftWriteJob(args) + job + } + JobTest(buildJob(_)) + .runHadoop + .finish() + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(partitionSource)) + + directory.listFiles().map({ _.getName() }).toSet shouldBe Set("94111", "10075") + + // check that the partitioning is done correctly by zipcode + validate(new Path(directory.getPath + "/94111/part-00000-00000-m-00000.parquet"), + new Address("123 Embarcadero", "94111")) + validate(new Path(directory.getPath + "/10075/part-00000-00001-m-00000.parquet"), + new Address("123 E 79th St", "10075"), new Address("456 W 80th St", "10075")) + } + } +} \ No newline at end of file From 8863cd8be00ea0932f75eebda95b54db557b5672 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 30 Aug 2016 13:19:28 -0700 Subject: [PATCH 03/10] Rename sources --- ...tScrooge.scala => PartitionedParquetScroogeSource.scala} | 6 +++--- ...sts.scala => PartitionedParquetScroogeSourceTests.scala} | 6 +++--- ...uetThrift.scala => PartitionedParquetThriftSource.scala} | 6 +++--- ...ests.scala => PartitionedParquetThriftSourceTests.scala} | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) rename scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/{PartitionParquetScrooge.scala => PartitionedParquetScroogeSource.scala} (87%) rename scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/{PartitionParquetScroogeTests.scala => PartitionedParquetScroogeSourceTests.scala} (91%) rename scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/{PartitionParquetThrift.scala => PartitionedParquetThriftSource.scala} (86%) rename scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/{PartitionParquetThriftTests.scala => PartitionedParquetThriftSourceTests.scala} (90%) diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala similarity index 87% rename from scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala rename to scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala index 5551ea1328..e323162c5a 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScrooge.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala @@ -15,18 +15,18 @@ import com.twitter.scrooge.ThriftStruct * {{{ * val data = MyScroogeObject() * IterablePipe(data, flowDef, mode) - * .write(PartitionParquetScrooge[(String, String), MyScroogeObject](path, "%s/%s")) + * .write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s")) * }}} * * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding * scrooge object. Below is an example. * {{{ * val in: TypedPipe[(String, String), MyScroogeObject] = - * TypedPipe.from( PartitionParquetScrooge[(String, String), MyScroogeObject](path, "%s/%s") ) + * TypedPipe.from( PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s") ) * }}} * */ -case class PartitionParquetScrooge[P, T <: ThriftStruct]( +case class PartitionedParquetScroogeSource[P, T <: ThriftStruct]( path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { diff --git a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala similarity index 91% rename from scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala rename to scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala index 152f900a08..994aebd05a 100644 --- a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionParquetScroogeTests.scala +++ b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala @@ -14,7 +14,7 @@ import org.scalatest.{Matchers, WordSpec} object PartitionedParquetScroogeTestSources { val path = "/a/path" - val partitionSource = PartitionParquetScrooge[String, Address](path, "%s") + val partitionSource = PartitionedParquetScroogeSource[String, Address](path, "%s") } class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) { @@ -26,7 +26,7 @@ class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) { .write(partitionSource) } -class PartitionParquetScroogeTests extends WordSpec with Matchers { +class PartitionedParquetScroogeSourceTests extends WordSpec with Matchers { import PartitionedParquetScroogeTestSources._ def validate(path: Path, expectedAddresses: TAddress*) = { @@ -35,7 +35,7 @@ class PartitionParquetScroogeTests extends WordSpec with Matchers { Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses } - "PartitionParquetScrooge" should { + "PartitionedParquetScroogeSource" should { "write out partitioned scrooge objects" in { var job: Job = null; def buildJob(args: Args): Job = { diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala similarity index 86% rename from scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala rename to scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala index 33325102f0..5012fbbc41 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala @@ -13,18 +13,18 @@ import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } * {{{ * val data = MyThriftObject() * IterablePipe(data, flowDef, mode) - * .write(PartitionParquetThrift[(String, String), MyThriftObject](path, "%s/%s")) + * .write(PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s")) * }}} * * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding * thrift object. Below is an example. * {{{ * val in: TypedPipe[(String, String), MyThriftObject] = - * TypedPipe.from( PartitionParquetThrift[(String, String), MyThriftObject](path, "%s/%s") ) + * TypedPipe.from( PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s") ) * }}} * */ -case class PartitionParquetThrift[P, T <: ParquetThrift.ThriftBase]( +case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase]( path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { diff --git a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSourceTests.scala similarity index 90% rename from scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala rename to scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSourceTests.scala index dafcf73950..e34da43d4e 100644 --- a/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionParquetThriftTests.scala +++ b/scalding-parquet/src/test/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSourceTests.scala @@ -12,7 +12,7 @@ import org.scalatest.{ Matchers, WordSpec } object PartitionedParquetThriftTestSources { val path = "/a/path" - val partitionSource = PartitionParquetThrift[String, Address](path, "%s") + val partitionSource = PartitionedParquetThriftSource[String, Address](path, "%s") } class PartitionedParquetThriftWriteJob(args: Args) extends Job(args) { @@ -24,7 +24,7 @@ class PartitionedParquetThriftWriteJob(args: Args) extends Job(args) { .write(partitionSource) } -class PartitionParquetThriftTests extends WordSpec with Matchers { +class PartitionedParquetThriftSourceTests extends WordSpec with Matchers { import PartitionedParquetThriftTestSources._ def validate(path: Path, expectedAddresses: Address*) = { @@ -33,7 +33,7 @@ class PartitionParquetThriftTests extends WordSpec with Matchers { Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses } - "PartitionParquetThrift" should { + "PartitionedParquetThriftSource" should { "write out partitioned thrift objects" in { var job: Job = null; def buildJob(args: Args): Job = { From 4140d57151f78ec86b5dc494e8bc7d22a20b8410 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Tue, 30 Aug 2016 13:28:57 -0700 Subject: [PATCH 04/10] Make reformatter happy --- .../scrooge/PartitionedParquetScroogeSourceTests.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala index 994aebd05a..be88c0ec79 100644 --- a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala +++ b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala @@ -3,14 +3,13 @@ package com.twitter.scalding.parquet.scrooge import java.io.File import com.twitter.scalding._ -import com.twitter.scalding.parquet.scrooge.thrift_java.test.{Address => TAddress} +import com.twitter.scalding.parquet.scrooge.thrift_java.test.{ Address => TAddress } import com.twitter.scalding.parquet.scrooge.thrift_scala.test.Address import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.thrift.ThriftParquetReader -import org.scalatest.{Matchers, WordSpec} - +import org.scalatest.{ Matchers, WordSpec } object PartitionedParquetScroogeTestSources { val path = "/a/path" @@ -19,7 +18,7 @@ object PartitionedParquetScroogeTestSources { class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) { import PartitionedParquetScroogeTestSources._ - val input = Seq( Address("123 Embarcadero", "94111"), Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075") ) + val input = Seq(Address("123 Embarcadero", "94111"), Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075")) TypedPipe.from(input) .map { case Address(street, zipcode) => (zipcode, Address(street, zipcode)) } From a2d346894ea8a586f3853f45a402a60451ac447c Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 Sep 2016 11:04:25 -0700 Subject: [PATCH 05/10] Update usage documentation --- .../parquet/scrooge/PartitionedParquetScroogeSource.scala | 7 ++++--- .../parquet/thrift/PartitionedParquetThriftSource.scala | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala index e323162c5a..478755d01c 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala @@ -13,9 +13,10 @@ import com.twitter.scrooge.ThriftStruct * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and * `T` is the scrooge object. Below is an example. * {{{ - * val data = MyScroogeObject() - * IterablePipe(data, flowDef, mode) - * .write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s")) + * val data: TypedPipe[MyScroogeObject] = ??? + * data.map { obj => + * ( (obj.country, obj.city), obj) + * }.write(PartitionedParquetScroogeSource[(String, String), MyScroogeObject](path, "%s/%s")) * }}} * * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala index 5012fbbc41..e94bc5a275 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala @@ -11,9 +11,10 @@ import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and * `T` is the thrift object. Below is an example. * {{{ - * val data = MyThriftObject() - * IterablePipe(data, flowDef, mode) - * .write(PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s")) + * val data: TypedPipe[MyThriftObject] = ??? + * data.map{ obj => + * ( (obj.country, obj.city), obj) + * }.write(PartitionedParquetThriftSource[(String, String), MyThriftObject](path, "%s/%s")) * }}} * * For reading it produces a pair `(P, T)` where `P` is the partition data, `T` is the corresponding From 5488d10c8668b273a1b700f06c8a85101996bf08 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 Sep 2016 13:42:27 -0700 Subject: [PATCH 06/10] Use scrooge reader for partition parquet scrooge test --- .../PartitionedParquetScroogeSourceTests.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala index be88c0ec79..368a44eb92 100644 --- a/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala +++ b/scalding-parquet-scrooge/src/test/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSourceTests.scala @@ -3,11 +3,10 @@ package com.twitter.scalding.parquet.scrooge import java.io.File import com.twitter.scalding._ -import com.twitter.scalding.parquet.scrooge.thrift_java.test.{ Address => TAddress } import com.twitter.scalding.parquet.scrooge.thrift_scala.test.Address +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetReader -import org.apache.parquet.thrift.ThriftParquetReader import org.scalatest.{ Matchers, WordSpec } @@ -28,9 +27,14 @@ class PartitionedParquetScroogeWriteJob(args: Args) extends Job(args) { class PartitionedParquetScroogeSourceTests extends WordSpec with Matchers { import PartitionedParquetScroogeTestSources._ - def validate(path: Path, expectedAddresses: TAddress*) = { - val parquetReader: ParquetReader[TAddress] = - ThriftParquetReader.build(path).withThriftClass(classOf[TAddress]).build() + def validate(path: Path, expectedAddresses: Address*) = { + val conf: Configuration = new Configuration + conf.set("parquet.thrift.converter.class", classOf[ScroogeRecordConverter[Address]].getName) + val parquetReader: ParquetReader[Address] = + ParquetReader.builder[Address](new ScroogeReadSupport[Address], path) + .withConf(conf) + .build() + Stream.continually(parquetReader.read).takeWhile(_ != null).toArray shouldBe expectedAddresses } @@ -53,9 +57,9 @@ class PartitionedParquetScroogeSourceTests extends WordSpec with Matchers { // check that the partitioning is done correctly by zipcode validate(new Path(directory.getPath + "/94111/part-00000-00000-m-00000.parquet"), - new TAddress("123 Embarcadero", "94111")) + Address("123 Embarcadero", "94111")) validate(new Path(directory.getPath + "/10075/part-00000-00001-m-00000.parquet"), - new TAddress("123 E 79th St", "10075"), new TAddress("456 W 80th St", "10075")) + Address("123 E 79th St", "10075"), Address("456 W 80th St", "10075")) } } } From 363fed0f3820cb03a8faf8cf54a127b1f1f399f8 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 1 Sep 2016 15:08:57 -0700 Subject: [PATCH 07/10] Tweak docs + remove unused imports --- .../twitter/scalding/typed/PartitionSchemed.scala | 13 +++---------- .../scrooge/PartitionedParquetScroogeSource.scala | 3 ++- .../thrift/PartitionedParquetThriftSource.scala | 3 ++- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala index e2865ec256..1f57ccbc9e 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala @@ -15,17 +15,10 @@ package com.twitter.scalding package typed -import java.util.Properties -import java.io.{ InputStream, OutputStream } - -import cascading.scheme.Scheme -import cascading.scheme.hadoop.TextDelimited -import cascading.scheme.local.{ TextDelimited => LocalTextDelimited } -import cascading.tap.{ Tap, SinkMode } -import cascading.tap.hadoop.{ Hfs, PartitionTap } +import cascading.tap.hadoop.PartitionTap import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap } -import cascading.tap.partition.Partition -import cascading.tuple.{ Fields, Tuple, TupleEntry } +import cascading.tap.{ SinkMode, Tap } +import cascading.tuple.Fields /** * Trait to assist with creating partitioned sources. diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala index 478755d01c..fde0022b1d 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala @@ -11,7 +11,8 @@ import com.twitter.scrooge.ThriftStruct * Scalding source to read or write partitioned Parquet scrooge data. * * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and - * `T` is the scrooge object. Below is an example. + * `T` is the scrooge object. `P` must be either a String or a tuple of Strings. + * Below is an example. * {{{ * val data: TypedPipe[MyScroogeObject] = ??? * data.map { obj => diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala index e94bc5a275..33cb556abb 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala @@ -9,7 +9,8 @@ import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } * Scalding source to read or write partitioned Parquet thrift data. * * For writing it expects a pair of `(P, T)`, where `P` is the data used for partitioning and - * `T` is the thrift object. Below is an example. + * `T` is the thrift object. `P` must be either a String or a tuple of Strings. + * Below is an example. * {{{ * val data: TypedPipe[MyThriftObject] = ??? * data.map{ obj => From b02c28111b93c1cc5cc50c68c29c090ebca48117 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Wed, 14 Sep 2016 16:40:45 -0700 Subject: [PATCH 08/10] Remove fields param from partitioned parquet ctors --- .../scrooge/PartitionedParquetScroogeSource.scala | 8 ++++---- .../thrift/PartitionedParquetThriftSource.scala | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala index fde0022b1d..0da9e0ee33 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala @@ -1,7 +1,6 @@ package com.twitter.scalding.parquet.scrooge import _root_.cascading.scheme.Scheme -import _root_.cascading.tuple.Fields import com.twitter.scalding._ import com.twitter.scalding.parquet.thrift.ParquetThriftBase import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } @@ -28,11 +27,12 @@ import com.twitter.scrooge.ThriftStruct * }}} * */ -case class PartitionedParquetScroogeSource[P, T <: ThriftStruct]( - path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], - val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) +case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](path: String, template: String)(implicit val mf: Manifest[T], + val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { + override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity) + assert( fields.size == valueSetter.arity, "The number of fields needs to be the same as the arity of the value setter") diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala index 33cb556abb..5607a4fe11 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala @@ -1,9 +1,8 @@ package com.twitter.scalding.parquet.thrift import cascading.scheme.Scheme -import cascading.tuple.Fields -import com.twitter.scalding.{ HadoopSchemeInstance, FixedPathSource, TupleConverter, TupleSetter } import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } +import com.twitter.scalding.{ FixedPathSource, HadoopSchemeInstance, TupleConverter, TupleSetter } /** * Scalding source to read or write partitioned Parquet thrift data. @@ -26,11 +25,12 @@ import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } * }}} * */ -case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase]( - path: String, template: String, fields: Fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))(implicit val mf: Manifest[T], - val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) +case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](path: String, template: String)(implicit val mf: Manifest[T], + val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { + override val fields = PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity) + assert( fields.size == valueSetter.arity, "The number of fields needs to be the same as the arity of the value setter") From 09a8b759eadef1c7b11fd3d6e32a7a2570d58c7b Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 15 Sep 2016 11:39:37 -0700 Subject: [PATCH 09/10] Replace Manifests with ClassTags --- .../scalding/parquet/scrooge/ParquetScrooge.scala | 8 +++++--- .../scrooge/PartitionedParquetScroogeSource.scala | 4 +++- .../scalding/parquet/thrift/ParquetThrift.scala | 15 +++++++++------ .../thrift/PartitionedParquetThriftSource.scala | 4 +++- 4 files changed, 20 insertions(+), 11 deletions(-) diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala index c798662298..f5c1256e13 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetScrooge.scala @@ -6,6 +6,8 @@ import com.twitter.scalding.parquet.thrift.ParquetThriftBaseFileSource import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource } import com.twitter.scrooge.ThriftStruct +import scala.reflect.ClassTag + trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] { override def hdfsScheme = { @@ -18,13 +20,13 @@ trait ParquetScrooge[T <: ThriftStruct] extends ParquetThriftBaseFileSource[T] { class DailySuffixParquetScrooge[T <: ThriftStruct]( path: String, - dateRange: DateRange)(implicit override val mf: Manifest[T]) + dateRange: DateRange)(implicit override val ct: ClassTag[T]) extends DailySuffixSource(path, dateRange) with ParquetScrooge[T] class HourlySuffixParquetScrooge[T <: ThriftStruct]( path: String, - dateRange: DateRange)(implicit override val mf: Manifest[T]) + dateRange: DateRange)(implicit override val ct: ClassTag[T]) extends HourlySuffixSource(path, dateRange) with ParquetScrooge[T] -class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val mf: Manifest[T]) +class FixedPathParquetScrooge[T <: ThriftStruct](paths: String*)(implicit override val ct: ClassTag[T]) extends FixedPathSource(paths: _*) with ParquetScrooge[T] diff --git a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala index 0da9e0ee33..4f958a61a0 100644 --- a/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala +++ b/scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/PartitionedParquetScroogeSource.scala @@ -6,6 +6,8 @@ import com.twitter.scalding.parquet.thrift.ParquetThriftBase import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } import com.twitter.scrooge.ThriftStruct +import scala.reflect.ClassTag + /** * Scalding source to read or write partitioned Parquet scrooge data. * @@ -27,7 +29,7 @@ import com.twitter.scrooge.ThriftStruct * }}} * */ -case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](path: String, template: String)(implicit val mf: Manifest[T], +case class PartitionedParquetScroogeSource[P, T <: ThriftStruct](path: String, template: String)(implicit val ct: ClassTag[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala index e72d6896a6..f4437feee9 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala @@ -29,16 +29,19 @@ import com.twitter.scalding.source.{ DailySuffixSource, HourlySuffixSource } import java.io.Serializable import org.apache.thrift.{ TBase, TFieldIdEnum } +import scala.reflect.ClassTag + object ParquetThrift extends Serializable { type ThriftBase = TBase[_ <: TBase[_, _], _ <: TFieldIdEnum] } trait ParquetThriftBase[T] extends LocalTapSource with HasFilterPredicate with HasColumnProjection { - def mf: Manifest[T] + implicit def ct: ClassTag[T] - def config: ParquetValueScheme.Config[T] = { - val config = new ParquetValueScheme.Config[T].withRecordClass(mf.runtimeClass.asInstanceOf[Class[T]]) + def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = { + val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + val config = new ParquetValueScheme.Config[T].withRecordClass(clazz) val configWithFp = withFilter match { case Some(fp) => config.withFilterPredicate(fp) case None => config @@ -110,13 +113,13 @@ trait ParquetThrift[T <: ParquetThrift.ThriftBase] extends ParquetThriftBaseFile */ class DailySuffixParquetThrift[T <: ParquetThrift.ThriftBase]( path: String, - dateRange: DateRange)(implicit override val mf: Manifest[T]) + dateRange: DateRange)(implicit override val ct: ClassTag[T]) extends DailySuffixSource(path, dateRange) with ParquetThrift[T] class HourlySuffixParquetThrift[T <: ParquetThrift.ThriftBase]( path: String, - dateRange: DateRange)(implicit override val mf: Manifest[T]) + dateRange: DateRange)(implicit override val ct: ClassTag[T]) extends HourlySuffixSource(path, dateRange) with ParquetThrift[T] -class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val mf: Manifest[T]) +class FixedPathParquetThrift[T <: ParquetThrift.ThriftBase](paths: String*)(implicit override val ct: ClassTag[T]) extends FixedPathSource(paths: _*) with ParquetThrift[T] diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala index 5607a4fe11..b24ca3cc3f 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/PartitionedParquetThriftSource.scala @@ -4,6 +4,8 @@ import cascading.scheme.Scheme import com.twitter.scalding.typed.{ PartitionSchemed, PartitionUtil } import com.twitter.scalding.{ FixedPathSource, HadoopSchemeInstance, TupleConverter, TupleSetter } +import scala.reflect.ClassTag + /** * Scalding source to read or write partitioned Parquet thrift data. * @@ -25,7 +27,7 @@ import com.twitter.scalding.{ FixedPathSource, HadoopSchemeInstance, TupleConver * }}} * */ -case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](path: String, template: String)(implicit val mf: Manifest[T], +case class PartitionedParquetThriftSource[P, T <: ParquetThrift.ThriftBase](path: String, template: String)(implicit val ct: ClassTag[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends FixedPathSource(path) with ParquetThriftBase[T] with PartitionSchemed[P, T] with Serializable { From 82d13870b03a91b5ff19a7626380944ace118fc9 Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 15 Sep 2016 13:50:34 -0700 Subject: [PATCH 10/10] Clean up implicit classtags --- .../com/twitter/scalding/parquet/thrift/ParquetThrift.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala index f4437feee9..077b9db1f1 100644 --- a/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala +++ b/scalding-parquet/src/main/scala/com/twitter/scalding/parquet/thrift/ParquetThrift.scala @@ -39,8 +39,8 @@ trait ParquetThriftBase[T] extends LocalTapSource with HasFilterPredicate with H implicit def ct: ClassTag[T] - def config(implicit ct: ClassTag[T]): ParquetValueScheme.Config[T] = { - val clazz = implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]] + def config: ParquetValueScheme.Config[T] = { + val clazz = ct.runtimeClass.asInstanceOf[Class[T]] val config = new ParquetValueScheme.Config[T].withRecordClass(clazz) val configWithFp = withFilter match { case Some(fp) => config.withFilterPredicate(fp)