Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JsonLine should handle empty lines #966

Merged
merged 3 commits into from
Aug 8, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions scalding-json/src/main/scala/com/twitter/scalding/JsonLine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
*
* TODO: it would be nice to have a way to add read/write transformations to pipes
* that doesn't require extending the sources and overriding methods.
*
* @param failOnEmptyLines When set to false, it just skips empty lines instead of failing the jobs. Defaults to true
* for backwards compatibility.
*/
case class JsonLine(p: String, fields: Fields = Fields.ALL,
override val sinkMode: SinkMode = SinkMode.REPLACE,
override val transformInTest: Boolean = false)
override val transformInTest: Boolean = false,
failOnEmptyLines: Boolean = true)
extends FixedPathSource(p) with TextLineScheme {

import Dsl._
Expand All @@ -45,7 +49,7 @@ case class JsonLine(p: String, fields: Fields = Fields.ALL,
t: TupleEntry => mapper.writeValueAsString(TupleConverter.ToMap(t))
}

override def transformForRead(pipe: Pipe) = pipe.mapTo('line -> fields) {
override def transformForRead(pipe: Pipe) = {
@scala.annotation.tailrec
def nestedRetrieval(node: Option[Map[String, AnyRef]], path: List[String]): AnyRef = {
(path, node) match {
Expand All @@ -61,10 +65,12 @@ case class JsonLine(p: String, fields: Fields = Fields.ALL,

val splitFields = (0 until fields.size).map { i: Int => fields.get(i).toString.split('.').toList }

line: String =>
val fs: Map[String, AnyRef] = mapper.readValue(line, mapTypeReference)
val values = splitFields.map { nestedRetrieval(Option(fs), _) }
new cascading.tuple.Tuple(values: _*)
pipe.collectTo[String, Tuple]('line -> fields) {
case line: String if failOnEmptyLines || line.trim.nonEmpty =>
val fs: Map[String, AnyRef] = mapper.readValue(line, mapTypeReference)
val values = splitFields.map { nestedRetrieval(Option(fs), _) }
new cascading.tuple.Tuple(values: _*)
}
}

override def toString = "JsonLine(" + p + ", " + fields.toString + ")"
Expand All @@ -74,7 +80,8 @@ case class JsonLine(p: String, fields: Fields = Fields.ALL,
* TODO: at the next binary incompatible version remove the AbstractFunction2/scala.Serializable jank which
* was added to get mima to not report binary errors
*/
object JsonLine extends scala.runtime.AbstractFunction4[String, Fields, SinkMode, Boolean, JsonLine] with Serializable with scala.Serializable {
object JsonLine extends scala.runtime.AbstractFunction5[String, Fields, SinkMode, Boolean, Boolean, JsonLine]
with Serializable with scala.Serializable {

val mapTypeReference = typeReference[Map[String, AnyRef]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@ limitations under the License.

package com.twitter.scalding.json

import org.specs._
import com.twitter.scalding.{ JsonLine => StandardJsonLine, _ }

import cascading.tuple.Fields
import cascading.flow.FlowException
import cascading.tap.SinkMode
import cascading.tuple.Fields
import com.twitter.scalding.{ JsonLine => StandardJsonLine, _ }
import org.specs._

object JsonLine {
def apply(p: String, fields: Fields = Fields.ALL) = new JsonLine(p, fields)
def apply(p: String, fields: Fields = Fields.ALL, failOnEmptyLines: Boolean = true) =
new JsonLine(p, fields, failOnEmptyLines)
}

class JsonLine(p: String, fields: Fields) extends StandardJsonLine(p, fields, SinkMode.REPLACE,
class JsonLine(p: String, fields: Fields, failOnEmptyLines: Boolean) extends StandardJsonLine(p, fields, SinkMode.REPLACE,
// We want to test the actual transformation here.
transformInTest = true)
transformInTest = true, failOnEmptyLines = failOnEmptyLines)

class JsonLineJob(args: Args) extends Job(args) {
try {
Expand Down Expand Up @@ -57,6 +58,17 @@ class JsonLineInputJob(args: Args) extends Job(args) {
}
}

class JsonLineInputJobSkipEmptyLines(args: Args) extends Job(args) {
try {
JsonLine("input0", ('foo, 'bar), failOnEmptyLines = false).read
.project('foo, 'bar)
.write(Tsv("output0"))

} catch {
case e: Exception => e.printStackTrace
}
}

class JsonLineNestedInputJob(args: Args) extends Job(args) {
try {
JsonLine("input0", (Symbol("foo.too"), 'bar)).read
Expand All @@ -71,7 +83,7 @@ class JsonLineNestedInputJob(args: Args) extends Job(args) {

class JsonLineTest extends Specification {
noDetailedDiffs()
import Dsl._
import com.twitter.scalding.Dsl._

"A JsonLine sink" should {
JobTest(new JsonLineJob(_))
Expand Down Expand Up @@ -134,5 +146,27 @@ class JsonLineTest extends Specification {
}
.run
.finish

"fail on empty lines by default" in {
JobTest(new JsonLineInputJob(_))
.source(JsonLine("input0", ('foo, 'bar)), List((0, json), (1, json2), (2, ""), (3, " ")))
.sink[(Int, String)](Tsv("output0")) {
outBuf => outBuf.toList

}
.run
.finish must throwAnException[FlowException]
}

JobTest(new JsonLineInputJobSkipEmptyLines(_))
.source(JsonLine("input0", ('foo, 'bar)), List((0, json), (1, json2), (2, ""), (3, " ")))
.sink[(Int, String)](Tsv("output0")) {
outBuf =>
"handle empty lines when `failOnEmptyLines` is set to false" in {
outBuf.toList.size must be_==(2)
}
}
.run
.finish
}
}