Skip to content

Commit

Permalink
Merge pull request #969 from CommBank/partitioned_sources
Browse files Browse the repository at this point in the history
Read and writable partitioned sources
  • Loading branch information
johnynek committed Jul 28, 2014
2 parents 2da0540 + 1bda47b commit c4e6b82
Show file tree
Hide file tree
Showing 7 changed files with 646 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextDelimited
import cascading.scheme.local.{ TextDelimited => LocalTextDelimited }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/**
* Trait to assist with creating partitioned sources.
*
* Apart from the abstract members below, `hdfsScheme` and `localScheme` also need to be set.
* Note that for both of them the sink fields need to be set to only include the actual fields
* that should be written to file and not the partition fields.
*/
trait PartitionSchemed[P, T] extends SchemedSource with TypedSink[(P, T)] with Mappable[(P, T)] {
def path: String
def template: String
def valueSetter: TupleSetter[T]
def valueConverter: TupleConverter[T]
def partitionSetter: TupleSetter[P]
def partitionConverter: TupleConverter[P]
def fields: Fields

// The partition fields, offset by the value arity.
def partitionFields =
PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity)

/*
Advertise all the sinkFields, both the value and partition ones, this needs to be like this even
though it is the incorrect sink fields, otherwise scalding validation falls over. The sink fields
of the scheme itself then to be over written to only include the actual sink fields.
*/
override def sinkFields: Fields = fields.append(partitionFields)

/**
* Combine both the partition and value converter to extract the data from a flat cascading tuple
* into a pair of `P` and `T`.
*/
override def converter[U >: (P, T)] =
PartitionUtil.converter[P, T, U](valueConverter, partitionConverter)

/** Flatten a pair of `P` and `T` into a cascading tuple.*/
override def setter[U <: (P, T)] =
PartitionUtil.setter[P, T, U](valueSetter, partitionSetter)

/** Creates the taps for local and hdfs mode.*/
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] =
mode match {
case Local(_) => {
val fileTap = new FileTap(localScheme, path, SinkMode.REPLACE)
new LocalPartitionTap(fileTap, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
case Hdfs(_, _) => {
val hfs = new Hfs(hdfsScheme, path, SinkMode.REPLACE)
new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
case hdfsTest @ HadoopTest(_, _) => {
val hfs = new Hfs(hdfsScheme, hdfsTest.getWritePathFor(this), SinkMode.REPLACE)
new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
case _ => TestTapFactory(this, hdfsScheme).createTap(readOrWrite)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import cascading.scheme.Scheme
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/** Utility functions to assist with creating partitioned sourced. */
object PartitionUtil {
// DO NOT USE intFields, scalding / cascading Fields.merge is broken and gets called in bowels of
// TemplateTap. See scalding/#803.
def toFields(start: Int, end: Int): Fields =
Dsl.strFields((start until end).map(_.toString))

/** A tuple converter that splits a cascading tuple into a pair of types.*/
def converter[P, T, U >: (P, T)](valueConverter: TupleConverter[T], partitionConverter: TupleConverter[P]) = {
TupleConverter.asSuperConverter[(P, T), U](new TupleConverter[(P, T)] {
val arity = valueConverter.arity + partitionConverter.arity

def apply(te: TupleEntry): (P, T) = {
val value = Tuple.size(valueConverter.arity)
val partition = Tuple.size(partitionConverter.arity)

(0 until valueConverter.arity).foreach(idx => value.set(idx, te.getObject(idx)))
(0 until partitionConverter.arity)
.foreach(idx => partition.set(idx, te.getObject(idx + valueConverter.arity)))

val valueTE = new TupleEntry(toFields(0, valueConverter.arity), value)
val partitionTE = new TupleEntry(toFields(0, partitionConverter.arity), partition)

(partitionConverter(partitionTE), valueConverter(valueTE))
}
})
}

/** A tuple setter for a pair of types which are flattened into a cascading tuple.*/
def setter[P, T, U <: (P, T)](valueSetter: TupleSetter[T], partitionSetter: TupleSetter[P]): TupleSetter[U] =
TupleSetter.asSubSetter[(P, T), U](new TupleSetter[(P, T)] {
val arity = valueSetter.arity + partitionSetter.arity

def apply(in: (P, T)) = {
val partition = partitionSetter(in._1)
val value = valueSetter(in._2)
val output = Tuple.size(partition.size + value.size)

(0 until value.size).foreach(idx => output.set(idx, value.getObject(idx)))
(0 until partition.size).foreach(idx =>
output.set(idx + value.size, partition.getObject(idx)))

output
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream, Serializable }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextDelimited
import cascading.scheme.local.{ TextDelimited => LocalTextDelimited }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/**
* Scalding source to read or write partitioned delimited text.
*
* For writing it expects a pair of `(P, R)`, where `P` is the data used for partitioning and
* `T` is the output to write out. Below is an example.
* {{{
* val data = List(
* (("a", "x"), ("i", 1)),
* (("a", "y"), ("j", 2)),
* (("b", "z"), ("k", 3))
* )
* IterablePipe(data, flowDef, mode)
* .write(PartitionedDelimited[(String, String), (String, Int)](args("out"), "col1=%s/col2=%s"))
* }}}
*
* For reading it produces a pair `(P, T` where `P` is the partition data and `T` is data in the
* files. Below is an example.
* {{{
* val in: TypedPipe[((String, String), (String, Int))] = PartitionedDelimited[(String, String), (String, Int)](args("in"), "col1=%s/col2=%s")
* }}}
*/
case class PartitionedDelimitedSource[P, T](
path: String, template: String, separator: String, fields: Fields, skipHeader: Boolean = false,
writeHeader: Boolean = false, quote: String = "\"", strict: Boolean = true, safe: Boolean = true)(implicit mt: Manifest[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T],
val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends PartitionSchemed[P, T] with Serializable {
assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")

val types: Array[Class[_]] = {
if (classOf[scala.Product].isAssignableFrom(mt.erasure)) {
//Assume this is a Tuple:
mt.typeArguments.map { _.erasure }.toArray
} else {
//Assume there is only a single item
Array(mt.erasure)
}
}

// Create the underlying scheme and explicitly set the sink fields to be only the specified fields
// see sinkFields in PartitionSchemed for other half of this work around.
override def hdfsScheme = {
val scheme =
HadoopSchemeInstance(new TextDelimited(fields, null, skipHeader, writeHeader, separator, strict, quote, types, safe)
.asInstanceOf[Scheme[_, _, _, _, _]])
scheme.setSinkFields(fields)
scheme
}

// Create the underlying scheme and explicitly set the sink fields to be only the specified fields
// see sinkFields in PartitionSchemed for other half of this work around.
override def localScheme = {
val scheme =
new LocalTextDelimited(fields, skipHeader, writeHeader, separator, strict, quote, types, safe)
.asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]]
scheme.setSinkFields(fields)
scheme
}
}

/**
* Trait to assist with creating objects such as [[PartitionedTsv]] to read from separated files.
* Override separator, skipHeader, writeHeader as needed.
*/
trait PartitionedDelimited extends Serializable {
def separator: String

def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String): PartitionedDelimitedSource[P, T] =
PartitionedDelimitedSource(path, template, separator, PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))

def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String, fields: Fields): PartitionedDelimitedSource[P, T] =
PartitionedDelimitedSource(path, template, separator, fields)
}

/** Partitioned typed tab separated source.*/
object PartitionedTsv extends PartitionedDelimited {
val separator = "\t"
}

/** Partitioned typed commma separated source.*/
object PartitionedCsv extends PartitionedDelimited {
val separator = ","
}

/** Partitioned typed pipe separated source.*/
object PartitionedPsv extends PartitionedDelimited {
val separator = "|"
}

/** Partitioned typed `\1` separated source (commonly used by Pig).*/
object PartitionedOsv extends PartitionedDelimited {
val separator = "\1"
}
Loading

0 comments on commit c4e6b82

Please sign in to comment.