Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read and writable partitioned sources #969

Merged
merged 3 commits into from Jul 28, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextDelimited
import cascading.scheme.local.{ TextDelimited => LocalTextDelimited }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/**
* Trait to assist with creating partitioned sources.
*
* Apart from the abstract members below, `hdfsScheme` and `localScheme` also need to be set.
* Note that for both of them the sink fields need to be set to only include the actual fields
* that should be written to file and not the partition fields.
*/
trait PartitionSchemed[P, T] extends SchemedSource with TypedSink[(P, T)] with Mappable[(P, T)] {
def path: String
def template: String
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add more comments and a link to cascading as to what the format of this template should be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Template is just a format string such as "%s/%s".

It's actually not a cascading thing. In order for Cascading Partition tap to work you need to provide it with something that extends the Partition interface, which has two keep methods:

  • toTuple takes the relative partition path and a tuple entry that contains the file content and modifies the tuple entry to include the partition information.
  • toPartition takes a tuple entry that only contains the partition field values and creates the relative partition path from it.

def valueSetter: TupleSetter[T]
def valueConverter: TupleConverter[T]
def partitionSetter: TupleSetter[P]
def partitionConverter: TupleConverter[P]
def fields: Fields

// The partition fields, offset by the value arity.
def partitionFields =
PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity)

/*
Advertise all the sinkFields, both the value and partition ones, this needs to be like this even
though it is the incorrect sink fields, otherwise scalding validation falls over. The sink fields
of the scheme itself then to be over written to only include the actual sink fields.
*/
override def sinkFields: Fields = fields.append(partitionFields)

/**
* Combine both the partition and value converter to extract the data from a flat cascading tuple
* into a pair of `P` and `T`.
*/
override def converter[U >: (P, T)] =
PartitionUtil.converter[P, T, U](valueConverter, partitionConverter)

/** Flatten a pair of `P` and `T` into a cascading tuple.*/
override def setter[U <: (P, T)] =
PartitionUtil.setter[P, T, U](valueSetter, partitionSetter)

/** Creates the taps for local and hdfs mode.*/
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] =
mode match {
case Local(_) => {
val fileTap = new FileTap(localScheme, path, SinkMode.REPLACE)
new LocalPartitionTap(fileTap, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
case Hdfs(_, _) => {
val hfs = new Hfs(hdfsScheme, path, SinkMode.REPLACE)
new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add the TestTapFactory call here so that people can mock this source using JobTest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added TestTapFactory and HadoopTest.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import cascading.scheme.Scheme
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/** Utility functions to assist with creating partitioned sourced. */
object PartitionUtil {
// DO NOT USE intFields, scalding / cascading Fields.merge is broken and gets called in bowels of
// TemplateTap. See scalding/#803.
def toFields(start: Int, end: Int): Fields =
Dsl.strFields((start until end).map(_.toString))

/** A tuple converter that splits a cascading tuple into a pair of types.*/
def converter[P, T, U >: (P, T)](valueConverter: TupleConverter[T], partitionConverter: TupleConverter[P]) = {
TupleConverter.asSuperConverter[(P, T), U](new TupleConverter[(P, T)] {
val arity = valueConverter.arity + partitionConverter.arity

def apply(te: TupleEntry): (P, T) = {
val value = Tuple.size(valueConverter.arity)
val partition = Tuple.size(partitionConverter.arity)

(0 until valueConverter.arity).foreach(idx => value.set(idx, te.getObject(idx)))
(0 until partitionConverter.arity)
.foreach(idx => partition.set(idx, te.getObject(idx + valueConverter.arity)))

val valueTE = new TupleEntry(toFields(0, valueConverter.arity), value)
val partitionTE = new TupleEntry(toFields(0, partitionConverter.arity), partition)

(partitionConverter(partitionTE), valueConverter(valueTE))
}
})
}

/** A tuple setter for a pair of types which are flattened into a cascading tuple.*/
def setter[P, T, U <: (P, T)](valueSetter: TupleSetter[T], partitionSetter: TupleSetter[P]): TupleSetter[U] =
TupleSetter.asSubSetter[(P, T), U](new TupleSetter[(P, T)] {
val arity = valueSetter.arity + partitionSetter.arity

def apply(in: (P, T)) = {
val partition = partitionSetter(in._1)
val value = valueSetter(in._2)
val output = Tuple.size(partition.size + value.size)

(0 until value.size).foreach(idx => output.set(idx, value.getObject(idx)))
(0 until partition.size).foreach(idx =>
output.set(idx + value.size, partition.getObject(idx)))

output
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream, Serializable }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextDelimited
import cascading.scheme.local.{ TextDelimited => LocalTextDelimited }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/**
* Scalding source to read or write partitioned delimited text.
*
* For writing it expects a pair of `(P, R)`, where `P` is the data used for partitioning and
* `T` is the output to write out. Below is an example.
* {{{
* val data = List(
* (("a", "x"), ("i", 1)),
* (("a", "y"), ("j", 2)),
* (("b", "z"), ("k", 3))
* )
* IterablePipe(data, flowDef, mode)
* .write(PartitionedDelimited[(String, String), (String, Int)](args("out"), "col1=%s/col2=%s"))
* }}}
*
* For reading it produces a pair `(P, T` where `P` is the partition data and `T` is data in the
* files. Below is an example.
* {{{
* val in: TypedPipe[((String, String), (String, Int))] = PartitionedDelimited[(String, String), (String, Int)](args("in"), "col1=%s/col2=%s")
* }}}
*/
case class PartitionedDelimitedSource[P, T](
path: String, template: String, separator: String, fields: Fields, skipHeader: Boolean = false,
writeHeader: Boolean = false, quote: String = "\"", strict: Boolean = true, safe: Boolean = true)(implicit mt: Manifest[T], val valueSetter: TupleSetter[T], val valueConverter: TupleConverter[T],
val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends PartitionSchemed[P, T] with Serializable {
assert(
fields.size == valueSetter.arity,
"The number of fields needs to be the same as the arity of the value setter")

val types: Array[Class[_]] = {
if (classOf[scala.Product].isAssignableFrom(mt.erasure)) {
//Assume this is a Tuple:
mt.typeArguments.map { _.erasure }.toArray
} else {
//Assume there is only a single item
Array(mt.erasure)
}
}

// Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like these comments are out of sync.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

// thrift struct see sinkFields for other half of this work around.
override def hdfsScheme = {
val scheme =
HadoopSchemeInstance(new TextDelimited(fields, null, skipHeader, writeHeader, separator, strict, quote, types, safe)
.asInstanceOf[Scheme[_, _, _, _, _]])
scheme.setSinkFields(fields)
scheme
}

// Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the
// thrift struct see sinkFields for other half of this work around.
override def localScheme = {
val scheme =
new LocalTextDelimited(fields, skipHeader, writeHeader, separator, strict, quote, types, safe)
.asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]]
scheme.setSinkFields(fields)
scheme
}
}

/**
* Trait to assist with creating objects such as [[PartitionedTsv]] to read from separated files.
* Override separator, skipHeader, writeHeader as needed.
*/
trait PartitionedDelimited extends Serializable {
def separator: String

def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String): PartitionedDelimitedSource[P, T] =
PartitionedDelimitedSource(path, template, separator, PartitionUtil.toFields(0, implicitly[TupleSetter[T]].arity))

def apply[P: Manifest: TupleConverter: TupleSetter, T: Manifest: TupleConverter: TupleSetter](path: String, template: String, fields: Fields): PartitionedDelimitedSource[P, T] =
PartitionedDelimitedSource(path, template, separator, fields)
}

/** Partitioned typed tab separated source.*/
object PartitionedTsv extends PartitionedDelimited {
val separator = "\t"
}

/** Partitioned typed commma separated source.*/
object PartitionedCsv extends PartitionedDelimited {
val separator = ","
}

/** Partitioned typed pipe separated source.*/
object PartitionedPsv extends PartitionedDelimited {
val separator = "|"
}

/** Partitioned typed `\1` separated source (commonly used by Pig).*/
object PartitionedOsv extends PartitionedDelimited {
val separator = "\1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2014 Commonwealth Bank of Australia
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.twitter.scalding
package typed

import java.util.Properties
import java.io.{ InputStream, OutputStream }

import cascading.scheme.Scheme
import cascading.scheme.hadoop.TextLine
import cascading.scheme.local.{ TextLine => LocalTextLine }
import cascading.tap.{ Tap, SinkMode }
import cascading.tap.hadoop.{ Hfs, PartitionTap }
import cascading.tap.local.{ FileTap, PartitionTap => LocalPartitionTap }
import cascading.tap.partition.Partition
import cascading.tuple.{ Fields, Tuple, TupleEntry }

/**
* Scalding source to read or write partitioned text.
*
* For writing it expects a pair of `(P, String)`, where `P` is the data used for partitioning and
* `String` is the output to write out. Below is an example.
* {{{
* val data = List(
* (("a", "x"), "line1"),
* (("a", "y"), "line2"),
* (("b", "z"), "line3")
* )
* IterablePipe(data, flowDef, mode)
* .write(PartitionTextLine[(String, String)](args("out"), "col1=%s/col2=%s"))
* }}}
*
* For reading it produces a pair `(P, (Long, String))` where `P` is the partition data, `Long`
* is the offset into the file and `String` is a line from the file. Below is an example.
* {{{
* val in: TypedPipe[((String, String), (Long, String))] = PartitionTextLine[(String, String)](args("in"), "col1=%s/col2=%s")
* }}}
*
* @param path Base path of the partitioned directory
* @param template Template for the partitioned path
* @param encoding Text encoding of the file content
*/
case class PartitionedTextLine[P](
path: String, template: String, encoding: String = TextLine.DEFAULT_CHARSET)(implicit val valueSetter: TupleSetter[String], val valueConverter: TupleConverter[(Long, String)],
val partitionSetter: TupleSetter[P], val partitionConverter: TupleConverter[P]) extends SchemedSource with TypedSink[(P, String)] with Mappable[(P, (Long, String))]
with java.io.Serializable {

// The partition fields, offset by the value arity.
val partitionFields =
PartitionUtil.toFields(valueSetter.arity, valueSetter.arity + partitionSetter.arity)

// Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the
// thrift struct see sinkFields for other half of this work around.
override def hdfsScheme = {
val scheme =
HadoopSchemeInstance(new TextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding)
.asInstanceOf[Scheme[_, _, _, _, _]])
scheme.setSinkFields(PartitionUtil.toFields(0, valueSetter.arity))
scheme
}

// Create the underlying scrooge-parquet scheme and explicitly set the sink fields to be only the
// thrift struct see sinkFields for other half of this work around.
override def localScheme = {
val scheme =
new LocalTextLine(TextLine.DEFAULT_SOURCE_FIELDS, encoding)
.asInstanceOf[Scheme[Properties, InputStream, OutputStream, _, _]]
scheme.setSinkFields(PartitionUtil.toFields(0, valueSetter.arity))
scheme
}

/*
Advertise all the sinkFields, both the value and partition ones, this needs to be like this even
though it is the incorrect sink fields, otherwise scalding validation falls over, see hdfsScheme
for other part of tweak to narrow fields back to value again to work around this.
*/
override def sinkFields: Fields =
PartitionUtil.toFields(0, valueSetter.arity + partitionSetter.arity)

/** Creates the taps for local and hdfs mode.*/
override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_, _, _] =
mode match {
case Local(_) => {
val fileTap = new FileTap(localScheme, path, SinkMode.REPLACE)
new LocalPartitionTap(fileTap, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
case Hdfs(_, _) => {
val hfs = new Hfs(hdfsScheme, path, SinkMode.REPLACE)
new PartitionTap(hfs, new TemplatePartition(partitionFields, template), SinkMode.UPDATE)
.asInstanceOf[Tap[_, _, _]]
}
}

/**
* Combine both the partition and value converter to extract the data from a flat cascading tuple
* into a pair of `P` and `(offset, line)`.
*/
override def converter[U >: (P, (Long, String))] =
PartitionUtil.converter[P, (Long, String), U](valueConverter, partitionConverter)

/** Flatten a pair of `P` and `line` into a cascading tuple.*/
override def setter[U <: (P, String)] =
PartitionUtil.setter[P, String, U](valueSetter, partitionSetter)
}
Loading