Skip to content

Commit

Permalink
Merge pull request #571 from twitter/hotfix/backport_ser
Browse files Browse the repository at this point in the history
Backport Meatlocker
  • Loading branch information
johnynek committed Sep 3, 2013
2 parents ed9daa8 + f917a3d commit afd91e6
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 71 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Scalding #

### Version 0.8.11 ###
* Use Kryo to serialize all Function objects rather than default Java Serialization that Cascading does.

### Version 0.8.8 ###
* Publish 0.8.7 for scala 2.9.3.

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. Scalding is built on top of [Cascading](http://www.cascading.org/), a Java library that abstracts away low-level Hadoop details. Scalding is comparable to [Pig](http://pig.apache.org/), but offers tight intergation with Scala, bringing advantages of Scala to your MapReduce jobs.

Current version: 0.8.8
Current version: 0.8.11

## Word Count

Expand Down Expand Up @@ -58,7 +58,7 @@ Please refer to [FAQ page](https://github.com/twitter/scalding/wiki/Frequently-a
We use [Travis CI](http://travis-ci.org/) to verify the build:
[![Build Status](https://secure.travis-ci.org/twitter/scalding.png)](http://travis-ci.org/twitter/scalding)

The current version is 0.8.8 and is available from maven central: org="com.twitter", artifact="scalding_2.9.2".
The current version is 0.8.11 and is available from maven central: org="com.twitter", artifact="scalding-core_2.9.3".

## Contact

Expand Down
15 changes: 11 additions & 4 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import com.typesafe.tools.mima.plugin.MimaKeys._
import scala.collection.JavaConverters._

object ScaldingBuild extends Build {
def withCross(dep: ModuleID) =
dep cross CrossVersion.binaryMapped {
case "2.9.3" => "2.9.2" // TODO: hack because twitter hasn't built things against 2.9.3
case version if version startsWith "2.10" => "2.10" // TODO: hack because sbt is broken
case x => x
}

val sharedSettings = Project.defaultSettings ++ assemblySettings ++
releaseSettings ++ Seq(
organization := "com.twitter",
Expand Down Expand Up @@ -51,7 +58,7 @@ object ScaldingBuild extends Build {
if (v.trim.endsWith("SNAPSHOT"))
Some("sonatype-snapshots" at nexus + "content/repositories/snapshots")
else
Some("sonatype-releases" at nexus + "service/local/staging/deploy/maven2")
Some("sonatype-releases" at nexus + "service/local/staging/deploy/maven2")
},

// Janino includes a broken signature, and is not needed:
Expand Down Expand Up @@ -148,10 +155,10 @@ object ScaldingBuild extends Build {
"cascading" % "cascading-hadoop" % cascadingVersion,
"cascading.kryo" % "cascading.kryo" % "0.4.6",
"com.twitter" % "maple" % "0.2.7",
"com.twitter" %% "chill" % "0.2.3",
"com.twitter" %% "algebird-core" % "0.1.13",
withCross("com.twitter" %% "chill" % "0.2.3"),
withCross("com.twitter" %% "algebird-core" % "0.1.13"),
"commons-lang" % "commons-lang" % "2.4",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.1.3",
withCross("com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.1.3"),
"org.apache.hadoop" % "hadoop-core" % "0.20.2" % "provided",
"org.slf4j" % "slf4j-api" % "1.6.6",
"org.slf4j" % "slf4j-log4j12" % "1.6.6" % "provided"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import cascading.pipe.Pipe
import scala.annotation.tailrec
import java.util.Comparator

import com.esotericsoftware.kryo.DefaultSerializer

trait LowPriorityFieldConversions {

protected def anyToFieldArg(f: Any): Comparable[_] = f match {
Expand Down Expand Up @@ -69,11 +71,10 @@ trait FieldConversions extends LowPriorityFieldConversions {
// Cascading Fields are either java.lang.String or java.lang.Integer, both are comparable.
def asSet(f : Fields) : Set[Comparable[_]] = asList(f).toSet

// TODO get the comparator also
def getField(f : Fields, idx : Int) : Fields = { new Fields(f.get(idx)) }

def hasInts(f : Fields) = {
f.iterator.find { _.isInstanceOf[java.lang.Integer] }.isDefined
}
def hasInts(f : Fields): Boolean = f.iterator.exists { _.isInstanceOf[java.lang.Integer] }

/**
* Rather than give the full power of cascading's selectors, we have
Expand Down Expand Up @@ -244,29 +245,26 @@ trait FieldConversions extends LowPriorityFieldConversions {
// val myFields: Fields = ...
// myFields.toFieldList

class RichFields(f : Traversable[Field[_]]) extends Fields(f.toSeq.map(_.id) : _*) {

f.foreach { field: Field[_] => setComparator(field.id, field.ord) }

lazy val toFieldList: List[Field[_]] = f.toList

case class RichFields(toFieldList : List[Field[_]]) extends Fields(toFieldList.map { _.id } : _*) {
toFieldList.foreach { field: Field[_] => setComparator(field.id, field.ord) }
}

object RichFields {

def apply(f: Field[_]*) = new RichFields(f)
def apply(f: Traversable[Field[_]]) = new RichFields(f)
def apply(f: Field[_]*) = new RichFields(f.toList)
def apply(f: Traversable[Field[_]]) = new RichFields(f.toList)

}

sealed abstract class Field[T] extends java.io.Serializable {
val id : Comparable[_]
val ord : Ordering[T]
val mf : Option[Manifest[T]]
sealed trait Field[T] extends java.io.Serializable {
def id : Comparable[_]
def ord : Ordering[T]
def mf : Option[Manifest[T]]
}

@DefaultSerializer(classOf[serialization.IntFieldSerializer])
case class IntField[T](override val id: java.lang.Integer)(implicit override val ord : Ordering[T], override val mf : Option[Manifest[T]]) extends Field[T]

@DefaultSerializer(classOf[serialization.StringFieldSerializer])
case class StringField[T](override val id: String)(implicit override val ord : Ordering[T], override val mf : Option[Manifest[T]]) extends Field[T]

object Field {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class GroupBuilder(val groupFields : Fields) extends
assert(middleSetter.arity > 0,
"The middle arity must have definite size, try wrapping in scala.Tuple1 if you need a hack")
// Create the required number of middlefields based on the arity of middleSetter
val middleFields = strFields( Range(0, middleSetter.arity).map{i => getNextMiddlefield} )
val middleFields = strFields( (0 until middleSetter.arity).map{i => getNextMiddlefield} )
val mrmBy = new MRMBy[T,X,U](fromFields, middleFields, toFields,
mapfn, redfn, mapfn2, startConv, middleSetter, middleConv, endSetter)
tryAggregateBy(mrmBy, ev)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Job(val args : Args) extends TupleConversions
}) ++
Map("cascading.spill.threshold" -> "100000", //Tune these for better performance
"cascading.spillmap.threshold" -> "100000") ++
Map("scalding.version" -> "0.9.0-SNAPSHOT",
Map("scalding.version" -> "0.8.11",
"cascading.app.name" -> name,
"scalding.flow.class.name" -> getClass.getName,
"scalding.job.args" -> args.toString,
Expand Down
Loading

0 comments on commit afd91e6

Please sign in to comment.