From d1bfeb4d51082cf80d89553889b50d1afc176aaf Mon Sep 17 00:00:00 2001 From: Stephan Hoermann Date: Wed, 23 Jul 2014 17:04:39 +1000 Subject: [PATCH 1/3] Adds read and writable partitioned versions of TextLine and TypeDelimited sources. --- .../scalding/typed/PartitionSchemed.scala | 82 ++++++++++++ .../scalding/typed/PartitionUtil.scala | 69 ++++++++++ .../typed/PartitionedDelimitedSource.scala | 122 ++++++++++++++++++ .../scalding/typed/PartitionedTextLine.scala | 117 +++++++++++++++++ .../scalding/typed/TemplatePartition.scala | 61 +++++++++ 5 files changed, 451 insertions(+) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionUtil.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/typed/TemplatePartition.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala new file mode 100644 index 0000000000..1ca139ec9c --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala @@ -0,0 +1,82 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import java.util.Properties +import java.io.{ InputStream, OutputStream } + +import cascading.scheme.Scheme +import cascading.scheme.hadoop.TextDelimited +import cascading.scheme.local.{ TextDelimited => LocalTextDelimited } +import cascading.tap.{ Tap, SinkMode } +import cascading.tap.hadoop.{ Hfs, PartitionTap } +import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap } +import cascading.tap.partition.Partition +import cascading.tuple.{ Fields, Tuple, TupleEntry } + +/** + * Trait to assist with creating partitioned sources. + * + * Apart from the abstract members below, `hdfsScheme` and `localScheme` also need to be set. + * Note that for both of them the sink fields need to be set to only include the actual fields + * that should be written to file and not the partition fields. + */ +trait PartitionSchemed[P, T] extends SchemedSource with TypedSink[(P, T)] with Mappable[(P, T)] { + def path: String + def template: String + def valueSetter: TupleSetter[T] + def valueConverter: TupleConverter[T] + def partitionSetter: TupleSetter[P] + def partitionConverter: TupleConverter[P] + def fields: Fields + + // The partition fields, offset by the value arity. + def partitionFields = + PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity) + + /* + Advertise all the sinkFields, both the value and partition ones, this needs to be like this even + though it is the incorrect sink fields, otherwise scalding validation falls over. The sink fields + of the scheme itself then to be over written to only include the actual sink fields. + */ + override def sinkFields: Fields = fields.append(partitionFields) + + /** + * Combine both the partition and value converter to extract the data from a flat cascading tuple + * into a pair of `P` and `T`. + */ + override def converter[U >: (P, T)] = + PartitionUtil.converter[P, T, U](valueConverter, partitionConverter) + + /** Flatten a pair of `P` and `T` into a cascading tuple.*/ + override def setter[U <: (P, T)] = + PartitionUtil.setter[P, T, U](valueSetter, partitionSetter) + + /** Creates the taps for local and hdfs mode.*/ + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = + mode match { + case Local(_) => { + val fileTap = new FileTap(localScheme, path, SinkMode.REPLACE) + new LocalPartitionTap(fileTap, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + case Hdfs(_, _) => { + val hfs = new Hfs(hdfsScheme, path, SinkMode.REPLACE) + new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + } +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionUtil.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionUtil.scala new file mode 100644 index 0000000000..d93180b618 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionUtil.scala @@ -0,0 +1,69 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import cascading.scheme.Scheme +import cascading.tap.{ Tap, SinkMode } +import cascading.tap.hadoop.{ Hfs, PartitionTap } +import cascading.tap.partition.Partition +import cascading.tuple.{ Fields, Tuple, TupleEntry } + +/** Utility functions to assist with creating partitioned sourced. */ +object PartitionUtil { + // DO NOT USE intFields, scalding / cascading Fields.merge is broken and gets called in bowels of + // TemplateTap. See scalding/#803. + def toFields(start: Int, end: Int): Fields = + Dsl.strFields((start until end).map(_.toString)) + + /** A tuple converter that splits a cascading tuple into a pair of types.*/ + def converter[P, T, U >: (P, T)](valueConverter: TupleConverter[T], partitionConverter: TupleConverter[P]) = { + TupleConverter.asSuperConverter[(P, T), U](new TupleConverter[(P, T)] { + val arity = valueConverter.arity + partitionConverter.arity + + def apply(te: TupleEntry): (P, T) = { + val value = Tuple.size(valueConverter.arity) + val partition = Tuple.size(partitionConverter.arity) + + (0 until valueConverter.arity).foreach(idx => value.set(idx, te.getObject(idx))) + (0 until partitionConverter.arity) + .foreach(idx => partition.set(idx, te.getObject(idx + valueConverter.arity))) + + val valueTE = new TupleEntry(toFields(0, valueConverter.arity), value) + val partitionTE = new TupleEntry(toFields(0, partitionConverter.arity), partition) + + (partitionConverter(partitionTE), valueConverter(valueTE)) + } + }) + } + + /** A tuple setter for a pair of types which are flattened into a cascading tuple.*/ + def setter[P, T, U <: (P, T)](valueSetter: TupleSetter[T], partitionSetter: TupleSetter[P]): TupleSetter[U] = + TupleSetter.asSubSetter[(P, T), U](new TupleSetter[(P, T)] { + val arity = valueSetter.arity + partitionSetter.arity + + def apply(in: (P, T)) = { + val partition = partitionSetter(in._1) + val value = valueSetter(in._2) + val output = Tuple.size(partition.size + value.size) + + (0 until value.size).foreach(idx => output.set(idx, value.getObject(idx))) + (0 until partition.size).foreach(idx => + output.set(idx + value.size, partition.getObject(idx))) + + output + } + }) +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala new file mode 100644 index 0000000000..6c109e984f --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala @@ -0,0 +1,122 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import java.util.Properties +import java.io.{ InputStream, OutputStream, Serializable } + +import cascading.scheme.Scheme +import cascading.scheme.hadoop.TextDelimited +import cascading.scheme.local.{ TextDelimited => LocalTextDelimited } +import cascading.tap.{ Tap, SinkMode } +import cascading.tap.hadoop.{ Hfs, PartitionTap } +import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap } +import cascading.tap.partition.Partition +import cascading.tuple.{ Fields, Tuple, TupleEntry } + +/** + * Scalding source to read or write partitioned delimited text. + * + * For writing it expects a pair of `(P, R)`, where `P` is the data used for partitioning and + * `T` is the output to write out. Below is an example. + * {{{ + * val data = List( + * (("a", "x"), ("i", 1)), + * (("a", "y"), ("j", 2)), + * (("b", "z"), ("k", 3)) + * ) + * IterablePipe(data, flowDef, mode) + * .write(PartitionedDelimited[(String, String), (String, Int)](args("out"), "col1=%s/col2=%s")) + * }}} + * + * For reading it produces a pair `(P, T` where `P` is the partition data and `T` is data in the + * files. Below is an example. + * {{{ + * val in: TypedPipe[((String, String), (String, Int))] = PartitionedDelimited[(String, String), (String, Int)](args("in"), "col1=%s/col2=%s") + * }}} + */ +case class PartitionedDelimitedSource[P, T]( + path: String, template: String, separator: String, fields: Fields, skipHeader: Boolean = false, + writeHeader: Boolean = false, quote: String = "\"", strict: Boolean = true, safe: Boolean = true)(implicit mt: Manifest[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T], + val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends PartitionSchemed[P, T] with Serializable { + assert( + fields.size == valueSetter.arity, + "The number of fields needs to be the same as the arity of the value setter") + + val types: Array[Class[_]] = { + if (classOf[scala.Product].isAssignableFrom(mt.erasure)) { + //Assume this is a Tuple: + mt.typeArguments.map { _.erasure }.toArray + } else { + //Assume there is only a single item + Array(mt.erasure) + } + } + + // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the + // thrift struct see sinkFields for other half of this work around. + override def hdfsScheme = { + val scheme = + HadoopSchemeInstance(new TextDelimited(fields, null, skipHeader, writeHeader, separator, strict, quote, types, safe) + .asInstanceOf[Scheme[_, _, _, _, _]]) + scheme.setSinkFields(fields) + scheme + } + + // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the + // thrift struct see sinkFields for other half of this work around. + override def localScheme = { + val scheme = + new LocalTextDelimited(fields, skipHeader, writeHeader, separator, strict, quote, types, safe) + .asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] + scheme.setSinkFields(fields) + scheme + } +} + +/** + * Trait to assist with creating objects such as [[PartitionedTsv]] to read from separated files. + * Override separator, skipHeader, writeHeader as needed. + */ +trait PartitionedDelimited extends Serializable { + def separator: String + + def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String): PartitionedDelimitedSource[P, T] = + PartitionedDelimitedSource(path, template, separator, PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity)) + + def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String, fields: Fields): PartitionedDelimitedSource[P, T] = + PartitionedDelimitedSource(path, template, separator, fields) +} + +/** Partitioned typed tab separated source.*/ +object PartitionedTsv extends PartitionedDelimited { + val separator = "\t" +} + +/** Partitioned typed commma separated source.*/ +object PartitionedCsv extends PartitionedDelimited { + val separator = "," +} + +/** Partitioned typed pipe separated source.*/ +object PartitionedPsv extends PartitionedDelimited { + val separator = "|" +} + +/** Partitioned typed `\1` separated source (commonly used by Pig).*/ +object PartitionedOsv extends PartitionedDelimited { + val separator = "\1" +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala new file mode 100644 index 0000000000..fff41c83f0 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala @@ -0,0 +1,117 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import java.util.Properties +import java.io.{ InputStream, OutputStream } + +import cascading.scheme.Scheme +import cascading.scheme.hadoop.TextLine +import cascading.scheme.local.{ TextLine => LocalTextLine } +import cascading.tap.{ Tap, SinkMode } +import cascading.tap.hadoop.{ Hfs, PartitionTap } +import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap } +import cascading.tap.partition.Partition +import cascading.tuple.{ Fields, Tuple, TupleEntry } + +/** + * Scalding source to read or write partitioned text. + * + * For writing it expects a pair of `(P, String)`, where `P` is the data used for partitioning and + * `String` is the output to write out. Below is an example. + * {{{ + * val data = List( + * (("a", "x"), "line1"), + * (("a", "y"), "line2"), + * (("b", "z"), "line3") + * ) + * IterablePipe(data, flowDef, mode) + * .write(PartitionTextLine[(String, String)](args("out"), "col1=%s/col2=%s")) + * }}} + * + * For reading it produces a pair `(P, (Long, String))` where `P` is the partition data, `Long` + * is the offset into the file and `String` is a line from the file. Below is an example. + * {{{ + * val in: TypedPipe[((String, String), (Long, String))] = PartitionTextLine[(String, String)](args("in"), "col1=%s/col2=%s") + * }}} + * + * @param path Base path of the partitioned directory + * @param template Template for the partitioned path + * @param encoding Text encoding of the file content + */ +case class PartitionedTextLine[P]( + path: String, template: String, encoding: String = TextLine.DEFAULT_CHARSET)(implicit val valueSetter: TupleSetter[String], val valueConverter: TupleConverter[(Long, String)], + val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends SchemedSource with TypedSink[(P, String)] with Mappable[(P, (Long, String))] + with java.io.Serializable { + + // The partition fields, offset by the value arity. + val partitionFields = + PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity) + + // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the + // thrift struct see sinkFields for other half of this work around. + override def hdfsScheme = { + val scheme = + HadoopSchemeInstance(new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding) + .asInstanceOf[Scheme[_, _, _, _, _]]) + scheme.setSinkFields(PartitionUtil.toFields(0, valueSetter.arity)) + scheme + } + + // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the + // thrift struct see sinkFields for other half of this work around. + override def localScheme = { + val scheme = + new LocalTextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding) + .asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]] + scheme.setSinkFields(PartitionUtil.toFields(0, valueSetter.arity)) + scheme + } + + /* + Advertise all the sinkFields, both the value and partition ones, this needs to be like this even + though it is the incorrect sink fields, otherwise scalding validation falls over, see hdfsScheme + for other part of tweak to narrow fields back to value again to work around this. + */ + override def sinkFields: Fields = + PartitionUtil.toFields(0, valueSetter.arity + partitionSetter.arity) + + /** Creates the taps for local and hdfs mode.*/ + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] = + mode match { + case Local(_) => { + val fileTap = new FileTap(localScheme, path, SinkMode.REPLACE) + new LocalPartitionTap(fileTap, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + case Hdfs(_, _) => { + val hfs = new Hfs(hdfsScheme, path, SinkMode.REPLACE) + new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + } + + /** + * Combine both the partition and value converter to extract the data from a flat cascading tuple + * into a pair of `P` and `(offset, line)`. + */ + override def converter[U >: (P, (Long, String))] = + PartitionUtil.converter[P, (Long, String), U](valueConverter, partitionConverter) + + /** Flatten a pair of `P` and `line` into a cascading tuple.*/ + override def setter[U <: (P, String)] = + PartitionUtil.setter[P, String, U](valueSetter, partitionSetter) +} diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TemplatePartition.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TemplatePartition.scala new file mode 100644 index 0000000000..da620f5808 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TemplatePartition.scala @@ -0,0 +1,61 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import scala.collection.JavaConverters._ + +import cascading.tap.partition.Partition +import cascading.tuple.{ Fields, TupleEntry } + +/** + * Creates a partition using the given template string. + * + * The template string needs to have %s as placeholder for a given field. + */ +case class TemplatePartition(partitionFields: Fields, template: String) extends Partition { + assert( + partitionFields.size == "%s".r.findAllIn(template).length, + "Number of partition fields %s does not correspond to template (%s)".format(partitionFields, template)) + + /** Regex pattern created from the template to extract the partition values from a path.*/ + lazy val pattern = template.replaceAll("%s", "(.*)").r.pattern + + /** Returns the path depth. In this case the number of partition fields. */ + override def getPathDepth(): Int = partitionFields.size + + /** Returns the partition fields. */ + override def getPartitionFields(): Fields = partitionFields + + /** + * Converts the given partition string to field values and populates the supplied tuple entry + * with it. + */ + override def toTuple(partition: String, tupleEntry: TupleEntry): Unit = { + val m = pattern.matcher(partition) + m.matches + val parts: Array[Object] = (1 to partitionFields.size).map(i => m.group(i)).toArray + tupleEntry.setCanonicalValues(parts) + } + + /** + * Given the specified tuple entry fill in the supplied template entry to create the partition + * path. + */ + override def toPartition(tupleEntry: TupleEntry): String = { + val fields = tupleEntry.asIterableOf(classOf[String]).asScala.toList + template.format(fields: _*) + } +} From 542a2bbd70627c2c931697e9e8332813d024fb12 Mon Sep 17 00:00:00 2001 From: Stephan Hoermann Date: Mon, 28 Jul 2014 17:34:04 +1000 Subject: [PATCH 2/3] Fixes comment. --- .../scalding/typed/PartitionedDelimitedSource.scala | 8 ++++---- .../com/twitter/scalding/typed/PartitionedTextLine.scala | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala index 6c109e984f..f50e0b3d78 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedDelimitedSource.scala @@ -66,8 +66,8 @@ case class PartitionedDelimitedSource[P, T]( } } - // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the - // thrift struct see sinkFields for other half of this work around. + // Create the underlying scheme and explicitly set the sink fields to be only the specified fields + // see sinkFields in PartitionSchemed for other half of this work around. override def hdfsScheme = { val scheme = HadoopSchemeInstance(new TextDelimited(fields, null, skipHeader, writeHeader, separator, strict, quote, types, safe) @@ -76,8 +76,8 @@ case class PartitionedDelimitedSource[P, T]( scheme } - // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the - // thrift struct see sinkFields for other half of this work around. + // Create the underlying scheme and explicitly set the sink fields to be only the specified fields + // see sinkFields in PartitionSchemed for other half of this work around. override def localScheme = { val scheme = new LocalTextDelimited(fields, skipHeader, writeHeader, separator, strict, quote, types, safe) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala index fff41c83f0..738c007aab 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala @@ -61,8 +61,8 @@ case class PartitionedTextLine[P]( val partitionFields = PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity) - // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the - // thrift struct see sinkFields for other half of this work around. + // Create the underlying scheme and explicitly set the sink fields to be only the specified fields + // see sinkFields in PartitionSchemed for other half of this work around. override def hdfsScheme = { val scheme = HadoopSchemeInstance(new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding) @@ -71,8 +71,8 @@ case class PartitionedTextLine[P]( scheme } - // Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the - // thrift struct see sinkFields for other half of this work around. + // Create the underlying scheme and explicitly set the sink fields to be only the specified fields + // see sinkFields in PartitionSchemed for other half of this work around. override def localScheme = { val scheme = new LocalTextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding) From 1bda47beef6740d06d22656845a6b8c8e8fec738 Mon Sep 17 00:00:00 2001 From: Stephan Hoermann Date: Mon, 28 Jul 2014 17:54:54 +1000 Subject: [PATCH 3/3] Write tests for typed partitioned sources. --- .../scalding/typed/PartitionSchemed.scala | 6 + .../scalding/typed/PartitionedTextLine.scala | 13 ++- .../PartitionedDelimitedSourceTest.scala | 71 ++++++++++++ .../typed/PartitionedTextLineTest.scala | 109 ++++++++++++++++++ 4 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedDelimitedSourceTest.scala create mode 100644 scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedTextLineTest.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala index 1ca139ec9c..9c9153680a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionSchemed.scala @@ -78,5 +78,11 @@ trait PartitionSchemed[P, T] extends SchemedSource with TypedSink[(P, T)] with M new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) .asInstanceOf[Tap[_, _, _]] } + case hdfsTest @ HadoopTest(_, _) => { + val hfs = new Hfs(hdfsScheme, hdfsTest.getWritePathFor(this), SinkMode.REPLACE) + new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala index 738c007aab..e732c6a901 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/PartitionedTextLine.scala @@ -53,8 +53,11 @@ import cascading.tuple.{ Fields, Tuple, TupleEntry } * @param encoding Text encoding of the file content */ case class PartitionedTextLine[P]( - path: String, template: String, encoding: String = TextLine.DEFAULT_CHARSET)(implicit val valueSetter: TupleSetter[String], val valueConverter: TupleConverter[(Long, String)], - val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends SchemedSource with TypedSink[(P, String)] with Mappable[(P, (Long, String))] + path: String, template: String, encoding: String = TextLine.DEFAULT_CHARSET +)(implicit + val valueSetter: TupleSetter[String], val valueConverter: TupleConverter[(Long, String)], + val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P] +) extends SchemedSource with TypedSink[(P, String)] with Mappable[(P, (Long, String))] with java.io.Serializable { // The partition fields, offset by the value arity. @@ -102,6 +105,12 @@ case class PartitionedTextLine[P]( new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) .asInstanceOf[Tap[_, _, _]] } + case hdfsTest @ HadoopTest(_, _) => { + val hfs = new Hfs(hdfsScheme, hdfsTest.getWritePathFor(this), SinkMode.REPLACE) + new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE) + .asInstanceOf[Tap[_, _, _]] + } + case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite) } /** diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedDelimitedSourceTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedDelimitedSourceTest.scala new file mode 100644 index 0000000000..b5bc22050e --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedDelimitedSourceTest.scala @@ -0,0 +1,71 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import java.io.File + +import scala.io.{ Source => ScalaSource } + +import org.specs._ + +import com.twitter.scalding.TDsl._ + +object PartitionedDelimitedTestSources { + val singlePartition = PartitionedCsv[String, (String, String)]("out", "%s") +} + +class PartitionedDelimitedWriteJob(args: Args) extends Job(args) { + import PartitionedDelimitedTestSources._ + TypedCsv[(String, String, String)]("in") + .map { case (v1, v2, v3) => (v1, (v2, v3)) } + .write(singlePartition) +} + +class PartitionedDelimitedTest extends Specification { + import PartitionedDelimitedTestSources._ + + noDetailedDiffs() + + "PartitionedDelimited" should { + "write out CSVs" in { + val input = Seq(("A", "X", "1"), ("A", "Y", "2"), ("B", "Z", "3")) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartitionedDelimitedWriteJob(args) + job + } + + JobTest(buildJob(_)) + .source(TypedCsv[(String, String, String)]("in"), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(singlePartition)) + + directory.listFiles().map({ _.getName() }).toSet mustEqual Set("A", "B") + + val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000-00000")) + val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000-00001")) + + aSource.getLines.toList mustEqual Seq("X,1", "Y,2") + bSource.getLines.toList mustEqual Seq("Z,3") + } + } +} diff --git a/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedTextLineTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedTextLineTest.scala new file mode 100644 index 0000000000..f0743e55e0 --- /dev/null +++ b/scalding-core/src/test/scala/com/twitter/scalding/typed/PartitionedTextLineTest.scala @@ -0,0 +1,109 @@ +// Copyright 2014 Commonwealth Bank of Australia +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.twitter.scalding +package typed + +import java.io.File + +import scala.io.{ Source => ScalaSource } + +import org.specs._ + +import com.twitter.scalding.TDsl._ + +object PartitionedTextLineTestSources { + val singlePartition = PartitionedTextLine[String]("out", "%s") + val multiplePartition = PartitionedTextLine[(String, String)]("out", "%s/%s") +} + +class PartitionedTextLineSingleWriteJob(args: Args) extends Job(args) { + import PartitionedTextLineTestSources._ + TypedCsv[(String, String)]("in").write(singlePartition) +} + +class PartitionedTextLineMultipleWriteJob(args: Args) extends Job(args) { + import PartitionedTextLineTestSources._ + TypedCsv[(String, String, String)]("in") + .map { case (v1, v2, v3) => ((v1, v2), v3) } + .write(multiplePartition) +} + + +class PartitionedTextLineTest extends Specification { + import PartitionedTextLineTestSources._ + + noDetailedDiffs() + + "PartitionedTextLine" should { + "be able to split output by a single partition" in { + val input = Seq(("A", "1"), ("A", "2"), ("B", "3")) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartitionedTextLineSingleWriteJob(args) + job + } + + JobTest(buildJob(_)) + .source(TypedCsv[(String, String)]("in"), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(singlePartition)) + println(directory) + + directory.listFiles().map({ _.getName() }).toSet mustEqual Set("A", "B") + + val aSource = ScalaSource.fromFile(new File(directory, "A/part-00000-00000")) + val bSource = ScalaSource.fromFile(new File(directory, "B/part-00000-00001")) + + aSource.getLines.toList mustEqual Seq("1", "2") + bSource.getLines.toList mustEqual Seq("3") + } + "be able to split output by multiple partitions" in { + val input = Seq(("A", "X", "1"), ("A", "Y", "2"), ("B", "Z", "3")) + + // Need to save the job to allow, find the temporary directory data was written to + var job: Job = null; + def buildJob(args: Args): Job = { + job = new PartitionedTextLineMultipleWriteJob(args) + job + } + + JobTest(buildJob(_)) + .source(TypedCsv[(String, String, String)]("in"), input) + .runHadoop + .finish + + val testMode = job.mode.asInstanceOf[HadoopTest] + + val directory = new File(testMode.getWritePathFor(multiplePartition)) + println(directory) + + directory.listFiles.flatMap(d => d.listFiles.map(d.getName + "/" + _.getName)).toSet mustEqual Set("A/X", "A/Y", "B/Z") + + val axSource = ScalaSource.fromFile(new File(directory, "A/X/part-00000-00000")) + val aySource = ScalaSource.fromFile(new File(directory, "A/Y/part-00000-00001")) + val bzSource = ScalaSource.fromFile(new File(directory, "B/Z/part-00000-00002")) + + axSource.getLines.toList mustEqual Seq("1") + aySource.getLines.toList mustEqual Seq("2") + bzSource.getLines.toList mustEqual Seq("3") + } + } +}