From afc06bb720b41aa365e52b7cd03d781b1ec0f59c Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Mon, 27 Jul 2015 10:58:16 -1000 Subject: [PATCH 1/2] Support nesting Options in TypeDescriptor --- .../impl/CaseClassBasedSetterImpl.scala | 184 ++++++------ .../macros/impl/FieldsProviderImpl.scala | 189 ++++++------ .../macros/impl/TupleConverterImpl.scala | 282 +++++------------- .../macros/impl/TupleFieldSetter.scala | 7 +- .../impl/TypeDescriptorProviderImpl.scala | 69 ++++- .../com/twitter/scalding/TypedPipeTest.scala | 21 +- .../scalding/macros/MacrosUnitTests.scala | 47 ++- 7 files changed, 371 insertions(+), 428 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/CaseClassBasedSetterImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/CaseClassBasedSetterImpl.scala index b376c0fcbb..139a955444 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/CaseClassBasedSetterImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/CaseClassBasedSetterImpl.scala @@ -32,113 +32,97 @@ object CaseClassBasedSetterImpl { fsetter: CaseClassFieldSetter)(implicit T: c.WeakTypeTag[T]): (Int, c.Tree) = { import c.universe._ - val maybeHandlePrimitive: Option[(Int, Tree)] = { - def innerLoop(outerTpe: Type, pTree: Tree): Option[Tree] = { - val typedSetter: scala.util.Try[c.Tree] = fsetter.from(c)(outerTpe, 0, container, pTree) - - (outerTpe, typedSetter) match { - case (_, Success(setter)) => Some(setter) - case (tpe, _) if tpe.erasure =:= typeOf[Option[Any]] => - val cacheName = newTermName(c.fresh(s"optiIndx")) - innerLoop(tpe.asInstanceOf[TypeRefApi].args.head, q"$cacheName").map { subTree => - //pTree is a tree of an Option, so we get and recurse, or write absent - q""" - if($pTree.isDefined) { - val $cacheName = $pTree.get - $subTree - } else { - ${fsetter.absent(c)(0, container)} - } - """ - } - case _ => None - } - } - - // in TupleSetterImpl, the outer-most input val is called t, so we pass that in here: - innerLoop(T.tpe, q"t").map { resTree => - (1, resTree) + sealed trait SetterBuilder { + def columns: Int + /** + * This Tree assumes that "val $value = ..." has been set + */ + def setTree(value: Tree, offset: Int): Tree + } + case class PrimitiveSetter(tpe: Type) extends SetterBuilder { + def columns = 1 + def setTree(value: Tree, offset: Int) = fsetter.from(c)(tpe, offset, container, value) match { + case Success(tree) => tree + case Failure(e) => c.abort(c.enclosingPosition, + s"Case class ${T} is supported. Error on $tpe, ${e.getMessage}") } } - - maybeHandlePrimitive.getOrElse { - if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe)) - c.abort(c.enclosingPosition, - s"""|We cannot enforce ${T.tpe} is a case class, either it is not a - |case class or this macro call is possibly enclosed in a class. - |This will mean the macro is operating on a non-resolved type. - |Issue when building Setter.""".stripMargin) - - @annotation.tailrec - def normalized(tpe: Type): Type = { - val norm = tpe.normalize - if (!(norm =:= tpe)) - normalized(norm) - else - tpe + case object DefaultSetter extends SetterBuilder { + def columns = 1 + def setTree(value: Tree, offset: Int) = fsetter.default(c)(offset, container, value) + } + case class OptionSetter(inner: SetterBuilder) extends SetterBuilder { + def columns = inner.columns + def setTree(value: Tree, offset: Int) = { + val someVal = newTermName(c.fresh(s"someVal")) + val someValTree = q"$someVal" + q"""if($value.isDefined) { + val $someVal = $value.get + ${inner.setTree(someValTree, offset)} + } else { + ${fsetter.absent(c)(offset, container)} + }""" } - - /* - * For a given outerType to be written at position idx, that has been stored in val named - * pTree, return the next position to write into, and the Tree to do the writing of the Type - */ - def matchField(outerTpe: Type, idx: Int, pTree: Tree): (Int, Tree) = { - val typedSetter: scala.util.Try[c.Tree] = fsetter.from(c)(outerTpe, idx, container, pTree) - (outerTpe, typedSetter) match { - case (_, Success(setter)) => - // use type-specific setter if present - (idx + 1, setter) - case (tpe, _) if tpe.erasure =:= typeOf[Option[Any]] => - val cacheName = newTermName(c.fresh(s"optiIndx")) - // Recurse on the inner type - val (newIdx, subTree) = matchField(tpe.asInstanceOf[TypeRefApi].args.head, idx, q"$cacheName") - // If we are absent, we use these setters to mark all fields null - val nullSetters = (idx until newIdx).map { curIdx => - fsetter.absent(c)(idx, container) - } - - (newIdx, q""" - if($pTree.isDefined) { - val $cacheName = $pTree.get - $subTree - } else { - ..$nullSetters - } - """) - - case (tpe, _) if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => - expandMethod(normalized(tpe), idx, pTree) - case (tpe, _) if allowUnknownTypes => - // This just puts the value in directly - (idx + 1, fsetter.default(c)(idx, container, pTree)) - case _ => - c.abort(c.enclosingPosition, - s"Case class ${T} is not pure primitives, Option of a primitive nested case classes, when building Setter") + } + case class CaseClassSetter(members: Vector[(Tree => Tree, SetterBuilder)]) extends SetterBuilder { + val columns = members.map(_._2.columns).sum + def setTree(value: Tree, offset: Int) = { + val setters = members.scanLeft((offset, Option.empty[Tree])) { + case ((off, _), (access, sb)) => + val cca = newTermName(c.fresh(s"access")) + val ccaT = q"$cca" + (off + sb.columns, Some(q"val $cca = ${access(value)}; ${sb.setTree(ccaT, off)}")) } + .collect { case (_, Some(tree)) => tree } + q"""..$setters""" } + } - /* - * For a given outerType to be written at position parentIdx, that has been stored in val named - * pTree, return the next position to write into, and the Tree to do the writing of the Type - */ - def expandMethod(outerTpe: Type, parentIdx: Int, pTree: Tree): (Int, Tree) = - outerTpe - .declarations - .collect { case m: MethodSymbol if m.isCaseAccessor => m } - .foldLeft((parentIdx, q"")) { - case ((idx, existingTree), accessorMethod) => - val fieldType = normalized(accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass)) - - val (newIdx, subTree) = matchField(fieldType, idx, q"""$pTree.$accessorMethod""") - (newIdx, q""" - $existingTree - $subTree""") - } + @annotation.tailrec + def normalized(tpe: Type): Type = { + val norm = tpe.normalize + if (!(norm =:= tpe)) + normalized(norm) + else + tpe + } - // in TupleSetterImpl, the outer-most input val is called t, so we pass that in here: - val (finalIdx, set) = expandMethod(normalized(T.tpe), 0, q"t") - if (finalIdx == 0) c.abort(c.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?") - (finalIdx, set) + def matchField(outerType: Type): SetterBuilder = { + // we do this just to see if the setter matches. + val dummyIdx = 0 + val dummyTree = q"t" + outerType match { + case tpe if fsetter.from(c)(tpe, dummyIdx, container, dummyTree).isSuccess => + PrimitiveSetter(tpe) + case tpe if tpe.erasure =:= typeOf[Option[Any]] => + val innerType = tpe.asInstanceOf[TypeRefApi].args.head + OptionSetter(matchField(innerType)) + case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => + CaseClassSetter(expandMethod(normalized(tpe)).map { + case (fn, tpe) => + (fn, matchField(tpe)) + }) + case tpe if allowUnknownTypes => + DefaultSetter + case _ => + c.abort(c.enclosingPosition, + s"Case class ${T.tpe} is not supported at type: $outerType") + } } + def expandMethod(outerTpe: Type): Vector[(Tree => Tree, Type)] = + outerTpe + .declarations + .collect { case m: MethodSymbol if m.isCaseAccessor => m } + .map { accessorMethod => + val fieldType = normalized(accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass)) + + ({ pTree: Tree => q"""$pTree.$accessorMethod""" }, fieldType) + } + .toVector + + // in TupleSetterImpl, the outer-most input val is called t, so we pass that in here: + val sb = matchField(normalized(T.tpe)) + if (sb.columns == 0) c.abort(c.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?") + (sb.columns, sb.setTree(q"t", 0)) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/FieldsProviderImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/FieldsProviderImpl.scala index d255bf6ff5..c4e726a67a 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/FieldsProviderImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/FieldsProviderImpl.scala @@ -80,112 +80,97 @@ object FieldsProviderImpl { def toFieldsCommonImpl[T](c: Context, namingScheme: NamingScheme, allowUnknownTypes: Boolean)(implicit T: c.WeakTypeTag[T]): c.Expr[cascading.tuple.Fields] = { import c.universe._ - /** - * This returns the a Tree expressing the Class[_] to use for T if T is primitive or Option of - * primitive - * If we are in an Option, we return classOf[Object] since otherwise cascading will convert - * nulls or empty strings to numeric zeros. - * This is only applied - */ - val maybeHandlePrimitive: Option[Tree] = { - def innerLoop(tpe: Type, inOption: Boolean): Option[Tree] = { - val returningType = if (inOption) q"""classOf[java.lang.Object]""" else q"""classOf[${T.tpe}]""" - val simpleRet = Some(returningType) - - tpe match { - case tpe if tpe =:= typeOf[String] => simpleRet - case tpe if tpe =:= typeOf[Boolean] => simpleRet - case tpe if tpe =:= typeOf[Short] => simpleRet - case tpe if tpe =:= typeOf[Int] => simpleRet - case tpe if tpe =:= typeOf[Long] => simpleRet - case tpe if tpe =:= typeOf[Float] => simpleRet - case tpe if tpe =:= typeOf[Double] => simpleRet - case tpe if tpe.erasure =:= typeOf[Option[Any]] && inOption == true => - c.abort(c.enclosingPosition, s"Case class ${T} has nested options, not supported currently.") - case tpe if tpe.erasure =:= typeOf[Option[Any]] => - val innerType = tpe.asInstanceOf[TypeRefApi].args.head - innerLoop(innerType, true) - - case tpe => None - } - } - innerLoop(T.tpe, false) - } - - val flattened: Either[List[(Tree, String)], List[(Tree, Int)]] = - maybeHandlePrimitive - .map { t => Right(List((t, 0))) } - .getOrElse { - if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe)) - c.abort(c.enclosingPosition, - s"""|We cannot enforce ${T.tpe} is a case class, either it is not a case class or this macro call - |is possibly enclosed in a class. - |This will mean the macro is operating on a non-resolved type. Issue when building Fields Provider.""".stripMargin) - - /** - * This returns a List of pairs which flatten fieldType into (class, name) pairs - */ - def matchField(fieldType: Type, outerName: Option[String], fieldName: String, isOption: Boolean): List[(Tree, String)] = { - val returningType = if (isOption) q"""classOf[java.lang.Object]""" else q"""classOf[$fieldType]""" - val simpleRet = outerName match { - case Some(outer) => List((returningType, s"$outer$fieldName")) - case None => List((returningType, s"$fieldName")) - } - fieldType match { - case tpe if tpe =:= typeOf[String] => simpleRet - case tpe if tpe =:= typeOf[Boolean] => simpleRet - case tpe if tpe =:= typeOf[Short] => simpleRet - case tpe if tpe =:= typeOf[Int] => simpleRet - case tpe if tpe =:= typeOf[Long] => simpleRet - case tpe if tpe =:= typeOf[Float] => simpleRet - case tpe if tpe =:= typeOf[Double] => simpleRet - case tpe if tpe.erasure =:= typeOf[Option[Any]] && isOption == true => - c.abort(c.enclosingPosition, s"Case class ${T} has nested options, not supported currently.") - case tpe if tpe.erasure =:= typeOf[Option[Any]] => - val innerType = tpe.asInstanceOf[TypeRefApi].args.head - matchField(innerType, outerName, fieldName, true) - case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => - val prefix = outerName.map(pre => s"$pre$fieldName.") - expandMethod(tpe, prefix, isOption) - case tpe if allowUnknownTypes => simpleRet - case _ => - c.abort(c.enclosingPosition, s"Case class ${T} is not pure primitives or nested case classes") - } + import TypeDescriptorProviderImpl.{ optionInner, evidentColumn } + + def isNumbered(t: Type): Boolean = + t match { + case tpe if tpe =:= typeOf[Boolean] => true + case tpe if tpe =:= typeOf[Short] => true + case tpe if tpe =:= typeOf[Int] => true + case tpe if tpe =:= typeOf[Long] => true + case tpe if tpe =:= typeOf[Float] => true + case tpe if tpe =:= typeOf[Double] => true + case tpe if tpe =:= typeOf[String] => true + case tpe => + optionInner(c)(tpe) match { + case Some(t) => isNumbered(t) + case None => false } + } - def expandMethod(outerTpe: Type, outerName: Option[String], isOption: Boolean): List[(Tree, String)] = - outerTpe - .declarations - .collect { case m: MethodSymbol if m.isCaseAccessor => m } - .flatMap { accessorMethod => - val fieldName = accessorMethod.name.toTermName.toString - val fieldType = accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass) - - matchField(fieldType, outerName, fieldName, isOption) - }.toList - - val prefix = if (namingScheme == NamedNoPrefix) None else Some("") - val expanded = expandMethod(T.tpe, prefix, false) - if (expanded.isEmpty) c.abort(c.enclosingPosition, s"Case class ${T} has no primitive types we were able to extract") - - Left(expanded) + object FieldBuilder { + // This is method on the object to work around this compiler bug: SI-6231 + def toFieldsTree(fb: FieldBuilder, scheme: NamingScheme): Tree = { + val nameTree = scheme match { + case Indexed => + val indices = fb.names.zipWithIndex.map(_._2) + q"""_root_.scala.Array.apply[_root_.java.lang.Comparable[_]](..$indices)""" + case _ => + q"""_root_.scala.Array.apply[_root_.java.lang.Comparable[_]](..${fb.names})""" } - - val typeTrees = flattened.fold({ list => list.map(_._1) }, { list => list.map(_._1) }) - val namesOrIds = if (namingScheme == NamedWithPrefix || namingScheme == NamedNoPrefix) { - flattened match { - case Left(fieldNames) => - q"""_root_.scala.Array.apply[_root_.java.lang.Comparable[_]](..${fieldNames.map(_._2)})""" - case Right(fieldIds) => - q"""_root_.scala.Array.apply[_root_.java.lang.Comparable[_]](..${fieldIds.map(_._2)})""" + q"""new _root_.cascading.tuple.Fields($nameTree, + _root_.scala.Array.apply[_root_.java.lang.reflect.Type](..${fb.columnTypes})) + """ } - } else { - val indices = typeTrees.zipWithIndex.map(_._2) - q"""_root_.scala.Array.apply[_root_.java.lang.Comparable[_]](..$indices)""" } - c.Expr[cascading.tuple.Fields](q""" - new _root_.cascading.tuple.Fields($namesOrIds, - _root_.scala.Array.apply[_root_.java.lang.reflect.Type](..$typeTrees)) - """) + sealed trait FieldBuilder { + def columnTypes: Vector[Tree] + def names: Vector[String] + } + case class Primitive(name: String, tpe: Type) extends FieldBuilder { + def columnTypes = Vector(q"""_root_.scala.Predef.classOf[$tpe]""") + def names = Vector(name) + } + case class OptionBuilder(of: FieldBuilder) extends FieldBuilder { + // Options just use Object as the type, due to the way cascading works on number types + def columnTypes = of.columnTypes.map(_ => q"""_root_.scala.Predef.classOf[_root_.java.lang.Object]""") + def names = of.names + } + case class CaseClassBuilder(prefix: String, members: Vector[FieldBuilder]) extends FieldBuilder { + def columnTypes = members.flatMap(_.columnTypes) + def names = for { + member <- members + name <- member.names + } yield if (namingScheme == NamedWithPrefix && prefix.nonEmpty) s"$prefix.$name" else name + } + + /** + * This returns a List of pairs which flatten fieldType into (class, name) pairs + */ + def matchField(fieldType: Type, name: String): FieldBuilder = + fieldType match { + case tpe if tpe =:= typeOf[String] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Boolean] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Short] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Int] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Long] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Float] => Primitive(name, tpe) + case tpe if tpe =:= typeOf[Double] => Primitive(name, tpe) + case tpe if tpe.erasure =:= typeOf[Option[Any]] => + val innerType = tpe.asInstanceOf[TypeRefApi].args.head + OptionBuilder(matchField(innerType, name)) + case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => + CaseClassBuilder(name, expandMethod(tpe).map { case (t, s) => matchField(t, s) }) + case tpe if allowUnknownTypes => Primitive(name, tpe) + case tpe => + c.abort(c.enclosingPosition, s"${T.tpe} is unsupported at $tpe") + } + + def expandMethod(outerTpe: Type): Vector[(Type, String)] = + outerTpe + .declarations + .collect { case m: MethodSymbol if m.isCaseAccessor => m } + .map { accessorMethod => + val fieldName = accessorMethod.name.toTermName.toString + val fieldType = accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass) + (fieldType, fieldName) + }.toVector + + val builder = matchField(T.tpe, "") + if (builder.columnTypes.isEmpty) + c.abort(c.enclosingPosition, s"Case class ${T.tpe} has no primitive types we were able to extract") + val scheme = if (isNumbered(T.tpe)) Indexed else namingScheme + val tree = FieldBuilder.toFieldsTree(builder, scheme) + c.Expr[cascading.tuple.Fields](tree) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala index c20116436d..36db15af80 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala @@ -37,224 +37,94 @@ object TupleConverterImpl { def caseClassTupleConverterCommonImpl[T](c: Context, allowUnknownTypes: Boolean)(implicit T: c.WeakTypeTag[T]): c.Expr[TupleConverter[T]] = { import c.universe._ - val maybeHandlePrimitive: Option[Tree] = { - /** - * This returns the Tree to get a single primitive out of a TupleEntry - */ - def innerLoop(tpe: Type, inOption: Boolean): Option[Tree] = { - def getPrimitive(primitiveGetter: Tree, boxedType: Type, box: Option[Tree]): Some[Tree] = Some( - if (inOption) { - val boxed = box.map { b => q"""$b($primitiveGetter)""" }.getOrElse(primitiveGetter) - q""" - if(t.getObject(0) == null) { - _root_.scala.None - } else { - _root_.scala.Some($boxed) - } - """ - } else { - primitiveGetter - }) - - tpe match { - case tpe if tpe =:= typeOf[String] => - getPrimitive(q"""t.getString(0)""", typeOf[java.lang.String], None) - case tpe if tpe =:= typeOf[Boolean] => - getPrimitive(q"""t.getBoolean(0)""", typeOf[java.lang.Boolean], Some(q"_root_.java.lang.Boolean.valueOf")) - case tpe if tpe =:= typeOf[Short] => - getPrimitive(q"""t.getShort(0)""", typeOf[java.lang.Short], Some(q"_root_.java.lang.Short.valueOf")) - case tpe if tpe =:= typeOf[Int] => - getPrimitive(q"""t.getInteger(0)""", typeOf[java.lang.Integer], Some(q"_root_.java.lang.Integer.valueOf")) - case tpe if tpe =:= typeOf[Long] => - getPrimitive(q"""t.getLong(0)""", typeOf[java.lang.Long], Some(q"_root_.java.lang.Long.valueOf")) - case tpe if tpe =:= typeOf[Float] => - getPrimitive(q"""t.getFloat(0)""", typeOf[java.lang.Float], Some(q"_root_.java.lang.Float.valueOf")) - case tpe if tpe =:= typeOf[Double] => - getPrimitive(q"""t.getDouble(0)""", typeOf[java.lang.Double], Some(q"_root_.java.lang.Double.valueOf")) - - case tpe if tpe.erasure =:= typeOf[Option[Any]] && inOption => - c.abort(c.enclosingPosition, s"Nested options do not make sense being mapped onto a tuple fields in cascading.") - - case tpe if tpe.erasure =:= typeOf[Option[Any]] => - val innerType = tpe.asInstanceOf[TypeRefApi].args.head - innerLoop(innerType, true) - - case tpe => None + import TypeDescriptorProviderImpl.{ optionInner, evidentColumn } + + def membersOf(outerTpe: Type): Vector[Type] = + outerTpe + .declarations + .collect { case m: MethodSymbol if m.isCaseAccessor => m } + .map { accessorMethod => + accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass) } - } - innerLoop(T.tpe, false).map { resTree => - q""" - new _root_.com.twitter.scalding.TupleConverter[$T] with _root_.com.twitter.bijection.macros.MacroGenerated { - override def apply(t: _root_.cascading.tuple.TupleEntry): $T = { - $resTree - } - override val arity: _root_.scala.Int = 1 - } - """ + .toVector + + sealed trait ConverterBuilder { + def columns: Int + def applyTree(offset: Int): Tree + } + case class PrimitiveBuilder(primitiveGetter: Int => Tree) extends ConverterBuilder { + def columns = 1 + def applyTree(offset: Int) = primitiveGetter(offset) + } + case class OptionBuilder(evidentCol: Int, of: ConverterBuilder) extends ConverterBuilder { + def columns = of.columns + def applyTree(offset: Int) = { + val testIdx = offset + evidentCol + q"""if (t.getObject($testIdx) == null) None + else Some(${of.applyTree(offset)})""" } } + case class CaseClassBuilder(tpe: Type, members: Vector[ConverterBuilder]) extends ConverterBuilder { + val columns = members.map(_.columns).sum + def applyTree(offset: Int) = { + val trees = members.scanLeft((offset, Option.empty[Tree])) { + case ((o, _), cb) => + val nextOffset = o + cb.columns + (nextOffset, Some(cb.applyTree(o))) + } + .collect { case (_, Some(tree)) => tree } - val res = maybeHandlePrimitive.getOrElse { - if (!IsCaseClassImpl.isCaseClassType(c)(T.tpe)) - c.abort(c.enclosingPosition, - s"""|We cannot enforce ${T.tpe} is a case class, either it is not a case class or - |this macro call is possibly enclosed in a class. - |This will mean the macro is operating on a non-resolved type. - |Issue when building Converter.""") - - // This holds a type and a variable name (toTree) - case class Extractor(tpe: Type, toTree: Tree) - // This holds the code to populate the variable name in an Extractor - case class Builder(toTree: Tree = q"") - - implicit val builderLiftable = new Liftable[Builder] { def apply(b: Builder): Tree = b.toTree } - implicit val extractorLiftable = new Liftable[Extractor] { def apply(b: Extractor): Tree = b.toTree } - - @annotation.tailrec - def normalized(tpe: Type): Type = { - val norm = tpe.normalize - if (!(norm =:= tpe)) normalized(norm) else tpe + q"${tpe.typeSymbol.companionSymbol}(..$trees)" } + } - def matchField(outerTpe: Type, idx: Int, inOption: Boolean): (Int, Extractor, List[Builder]) = { - /** - * This returns a List, to match the return type of matchField, but that List has size 0 or 1 - * PS: would be great to have dependent or refinement types here. + def matchField(outerTpe: Type): ConverterBuilder = + outerTpe match { + /* + * First we handle primitives, which never recurse */ - def getPrimitive(primitiveGetter: Tree, boxedType: Type, box: Option[Tree]): (Int, Extractor, List[Builder]) = - if (inOption) { - val cachedResult = newTermName(c.fresh(s"cacheVal")) - val boxed = box.map { b => q"""$b($primitiveGetter)""" }.getOrElse(primitiveGetter) - - val builder = q""" - val $cachedResult: $boxedType = if(t.getObject($idx) == null) { - null.asInstanceOf[$boxedType] - } else { - $boxed - } - """ - (idx + 1, - Extractor(boxedType, q"$cachedResult"), - List(Builder(builder))) - } else { - (idx + 1, Extractor(outerTpe, primitiveGetter), Nil) + case tpe if tpe =:= typeOf[String] => + PrimitiveBuilder(idx => q"""t.getString($idx)""") + case tpe if tpe =:= typeOf[Boolean] => + PrimitiveBuilder(idx => q"""t.getBoolean($idx)""") + case tpe if tpe =:= typeOf[Short] => + PrimitiveBuilder(idx => q"""t.getShort($idx)""") + case tpe if tpe =:= typeOf[Int] => + PrimitiveBuilder(idx => q"""t.getInteger($idx)""") + case tpe if tpe =:= typeOf[Long] => + PrimitiveBuilder(idx => q"""t.getLong($idx)""") + case tpe if tpe =:= typeOf[Float] => + PrimitiveBuilder(idx => q"""t.getFloat($idx)""") + case tpe if tpe =:= typeOf[Double] => + PrimitiveBuilder(idx => q"""t.getDouble($idx)""") + case tpe if tpe.erasure =:= typeOf[Option[Any]] => + val innerType = tpe.asInstanceOf[TypeRefApi].args.head + evidentColumn(c, allowUnknownTypes)(innerType) match { + case None => // there is no evident column, not supported. + c.abort(c.enclosingPosition, s"$tpe has unsupported nesting of Options at: $innerType") + case Some(ev) => // we can recurse here + OptionBuilder(ev, matchField(innerType)) } - - outerTpe match { - /* - * First we handle primitives, which never recurse - */ - case tpe if tpe =:= typeOf[String] => - getPrimitive(q"""t.getString(${idx})""", typeOf[java.lang.String], None) - case tpe if tpe =:= typeOf[Boolean] => - getPrimitive(q"""t.getBoolean(${idx})""", typeOf[java.lang.Boolean], Some(q"_root_.java.lang.Boolean.valueOf")) - case tpe if tpe =:= typeOf[Short] => - getPrimitive(q"""t.getShort(${idx})""", typeOf[java.lang.Short], Some(q"_root_.java.lang.Short.valueOf")) - case tpe if tpe =:= typeOf[Int] => - getPrimitive(q"""t.getInteger(${idx})""", typeOf[java.lang.Integer], Some(q"_root_.java.lang.Integer.valueOf")) - case tpe if tpe =:= typeOf[Long] => - getPrimitive(q"""t.getLong(${idx})""", typeOf[java.lang.Long], Some(q"_root_.java.lang.Long.valueOf")) - case tpe if tpe =:= typeOf[Float] => - getPrimitive(q"""t.getFloat(${idx})""", typeOf[java.lang.Float], Some(q"_root_.java.lang.Float.valueOf")) - case tpe if tpe =:= typeOf[Double] => - getPrimitive(q"""t.getDouble(${idx})""", typeOf[java.lang.Double], Some(q"_root_.java.lang.Double.valueOf")) - case tpe if tpe.erasure =:= typeOf[Option[Any]] && inOption => - c.abort(c.enclosingPosition, s"Nested options do not make sense being mapped onto a tuple fields in cascading.") - - /* - * Options require recursion on the inner type - */ - case tpe if tpe.erasure =:= typeOf[Option[Any]] => - val innerType = tpe.asInstanceOf[TypeRefApi].args.head - - val (newIdx, extractor, builders) = matchField(innerType, idx, true) - - val cachedResult = newTermName(c.fresh(s"opti")) - /* - * when have an Option of a primitive, we return the boxed java class - * TODO: so here we handle converting that back into a scala type. - * We could perhaps cast here, since we are unboxing just to box again - */ - val extractorTypeVal: Tree = if (extractor.tpe =:= innerType) - extractor.toTree - else - q"${innerType.typeSymbol.companionSymbol}.unbox($extractor)" - - val build = Builder(q""" - val $cachedResult = if($extractor == null) { - _root_.scala.None: _root_.scala.Option[$innerType] - } else { - _root_.scala.Some($extractorTypeVal) - } - """) - (newIdx, Extractor(tpe, q"""$cachedResult"""), builders :+ build) - - case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => - expandCaseClass(tpe, idx, inOption) - case tpe if allowUnknownTypes => - getPrimitive(q"""t.getObject(${idx}).asInstanceOf[$tpe]""", tpe, None) - case tpe => - c.abort(c.enclosingPosition, - s"TT Case class ${T} is not pure primitives, Option of a primitive, nested case classes when looking at type ${tpe}") - } + case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => + CaseClassBuilder(tpe, membersOf(tpe).map(matchField)) + case tpe if allowUnknownTypes => + PrimitiveBuilder(idx => q"""t.getObject(${idx}).asInstanceOf[$tpe]""") + case tpe => + c.abort(c.enclosingPosition, + s"${T.tpe} is not pure primitives, Option of a primitive, nested case classes when looking at type ${tpe}") } - def expandCaseClass(outerTpe: Type, parentIdx: Int, inOption: Boolean): (Int, Extractor, List[Builder]) = { - val (idx, extractors, builders) = outerTpe - .declarations - .collect { case m: MethodSymbol if m.isCaseAccessor => m } - .foldLeft((parentIdx, List[Extractor](), List[Builder]())) { - case ((idx, oldExtractors, oldBuilders), accessorMethod) => - val fieldType = accessorMethod.returnType.asSeenFrom(outerTpe, outerTpe.typeSymbol.asClass) + val builder = matchField(T.tpe) + if (builder.columns == 0) c.abort(c.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?") - val (newIdx, extractor, builders) = matchField(fieldType, idx, inOption) - (newIdx, oldExtractors :+ extractor, oldBuilders ::: builders) - } - - val cachedResult = newTermName(c.fresh(s"cacheVal")) - val simpleBuilder = q"${outerTpe.typeSymbol.companionSymbol}(..$extractors)" - val builder = if (inOption) { - val tstOpt = extractors.map(e => q"$e == null") - .foldLeft(Option.empty[Tree]) { - // Since nested options are not allowed - // if we are in an option, the current is None if any of the members are None - case (Some(t), nxt) => Some(q"$t || $nxt") - case (None, nxt) => Some(nxt) - } - - tstOpt match { - case Some(tst) => - q""" - val $cachedResult: $outerTpe = if ($tst) { - null - } else { - $simpleBuilder - } - """ - case None => q"val $cachedResult = $simpleBuilder" - } - } else { - q"val $cachedResult = $simpleBuilder" - } - (idx, - Extractor(outerTpe, q"$cachedResult"), - builders :+ Builder(builder)) - } - - val (finalIdx, extractor, builders) = expandCaseClass(T.tpe, 0, false) - if (finalIdx == 0) c.abort(c.enclosingPosition, "Didn't consume any elements in the tuple, possibly empty case class?") - - q""" - new _root_.com.twitter.scalding.TupleConverter[$T] with _root_.com.twitter.bijection.macros.MacroGenerated { - override def apply(t: _root_.cascading.tuple.TupleEntry): $T = { - ..$builders - $extractor - } - override val arity: _root_.scala.Int = $finalIdx - } - """ + val res = q""" + new _root_.com.twitter.scalding.TupleConverter[$T] with _root_.com.twitter.bijection.macros.MacroGenerated { + override def apply(t: _root_.cascading.tuple.TupleEntry): $T = { + ${builder.applyTree(0)} } - + override val arity: _root_.scala.Int = ${builder.columns} + } + """ c.Expr[TupleConverter[T]](res) } } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleFieldSetter.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleFieldSetter.scala index 09e091f5f4..6f02e574f2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleFieldSetter.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleFieldSetter.scala @@ -26,7 +26,12 @@ object TupleFieldSetter extends CaseClassFieldSetter { override def absent(c: Context)(idx: Int, container: c.TermName): c.Tree = { import c.universe._ - q"""$container.set($idx, null)""" + /* A more defensive approach is to set to null, but since + * we always allocate an empty TupleEntry, which is initially null, + * this is unneeded. + * q"""$container.set($idx, null)""" + */ + q"""()""" } override def default(c: Context)(idx: Int, container: c.TermName, fieldValue: c.Tree): c.Tree = { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala index 6c31eaab2a..a41e7ef1b3 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala @@ -34,12 +34,64 @@ object TypeDescriptorProviderImpl { def caseClassTypeDescriptorWithUnknownImpl[T](c: Context)(implicit T: c.WeakTypeTag[T]): c.Expr[TypeDescriptor[T]] = caseClassTypeDescriptorCommonImpl(c, true)(T) - def caseClassTypeDescriptorCommonImpl[T](c: Context, allowUnknownTypes: Boolean)(implicit T: c.WeakTypeTag[T]): c.Expr[TypeDescriptor[T]] = { + /** + * When flattening a nested structure with Options, the evidentColumn is a column, relative to the + * the first 0-offset column, that represents evidence of this T, and hence set of columns, are + * present or absent. This is to handle Option types in text files such as CSV and TSV. + * a type T is evident if it the evidentColumn.exists + * + * primitive numbers are evident + * case classes are evident if they have at least one evident member. + * + * Strings are not evident (we can't distinguish Empty from "") + * Option[T] is not evident (we can't tell Some(None) from None). + */ + def evidentColumn(c: Context, allowUnknown: Boolean = false)(tpe: c.universe.Type): Option[Int] = { import c.universe._ - val converter = TupleConverterImpl.caseClassTupleConverterCommonImpl[T](c, allowUnknownTypes) - val setter = TupleSetterImpl.caseClassTupleSetterCommonImpl[T](c, allowUnknownTypes) + def flattenOnce(t: Type): List[Type] = + t.declarations + .collect { case m: MethodSymbol if m.isCaseAccessor => m } + .map(_.returnType.asSeenFrom(t, t.typeSymbol.asClass)) + .toList + def go(t: Type, offset: Int): (Int, Option[Int]) = { + val thisColumn = (offset + 1, Some(offset)) + t match { + case tpe if tpe =:= typeOf[String] => (offset + 1, None) + case tpe if tpe =:= typeOf[Boolean] => thisColumn + case tpe if tpe =:= typeOf[Short] => thisColumn + case tpe if tpe =:= typeOf[Int] => thisColumn + case tpe if tpe =:= typeOf[Long] => thisColumn + case tpe if tpe =:= typeOf[Float] => thisColumn + case tpe if tpe =:= typeOf[Double] => thisColumn + // We recurse on Option and case classes + case tpe if tpe.erasure =:= typeOf[Option[Any]] => + val innerTpe = optionInner(c)(tpe).get + // we have no evidentColumn, but we need to compute the next index + (go(innerTpe, offset)._1, None) + case tpe if (tpe.typeSymbol.isClass && tpe.typeSymbol.asClass.isCaseClass) => + val flattened = flattenOnce(tpe) + .scanLeft((offset, Option.empty[Int])) { case ((off, _), t) => go(t, off) } + + val nextPos = flattened.last._1 + val ev = flattened.collectFirst { case (_, Some(col)) => col } + (nextPos, ev) + case _ if allowUnknown => thisColumn + case t => + c.abort(c.enclosingPosition, s"Case class ${tpe} at $t is not pure primitives or nested case classes") + } + } + go(tpe, 0)._2 + } + + def optionInner(c: Context)(opt: c.universe.Type): Option[c.universe.Type] = + if (opt.erasure =:= c.universe.typeOf[Option[Any]]) { + Some(opt.asInstanceOf[c.universe.TypeRefApi].args.head) + } else None + + def isTuple[T](c: Context)(implicit T: c.WeakTypeTag[T]): Boolean = { + import c.universe._ val tupleTypes = List(typeOf[Tuple1[Any]], typeOf[Tuple2[Any, Any]], typeOf[Tuple3[Any, Any, Any]], @@ -62,7 +114,16 @@ object TypeDescriptorProviderImpl { typeOf[Tuple20[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]], typeOf[Tuple21[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]], typeOf[Tuple22[Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any]]) - val namingScheme = if (tupleTypes.exists { _ =:= T.tpe.erasure }) Indexed else NamedWithPrefix + (tupleTypes.exists { _ =:= T.tpe.erasure }) + } + + def caseClassTypeDescriptorCommonImpl[T](c: Context, allowUnknownTypes: Boolean)(implicit T: c.WeakTypeTag[T]): c.Expr[TypeDescriptor[T]] = { + import c.universe._ + + val converter = TupleConverterImpl.caseClassTupleConverterCommonImpl[T](c, allowUnknownTypes) + val setter = TupleSetterImpl.caseClassTupleSetterCommonImpl[T](c, allowUnknownTypes) + + val namingScheme = if (isTuple[T](c)) Indexed else NamedWithPrefix val fields = FieldsProviderImpl.toFieldsCommonImpl[T](c, namingScheme, allowUnknownTypes) diff --git a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala index 6ee3921d98..3c94cbbd90 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/TypedPipeTest.scala @@ -867,7 +867,8 @@ class TypedSortWithTakeTest extends WordSpec with Matchers { class TypedLookupJob(args: Args) extends Job(args) { TypedPipe.from(TypedText.tsv[Int]("input0")) .hashLookup(TypedPipe.from(TypedText.tsv[(Int, String)]("input1")).group) - .write(TypedText.tsv[(Int, Option[String])]("output")) + .mapValues { o: Option[String] => o.getOrElse("") } + .write(TypedText.tsv[(Int, String)]("output")) } class TypedLookupJobTest extends WordSpec with Matchers { @@ -880,17 +881,16 @@ class TypedLookupJobTest extends WordSpec with Matchers { JobTest(new TypedLookupJob(_)) .source(TypedText.tsv[Int]("input0"), (-1 to 100)) .source(TypedText.tsv[(Int, String)]("input1"), mk) - .typedSink(TypedText.tsv[(Int, Option[String])]("output")) { outBuf => + .typedSink(TypedText.tsv[(Int, String)]("output")) { outBuf => "correctly TypedPipe.hashLookup" in { val data = mk.groupBy(_._1) - .mapValues(kvs => kvs.map { case (k, v) => (k, Some(v)) }) val correct = (-1 to 100).flatMap { k => - data.get(k).getOrElse(List((k, None))) + data.get(k).getOrElse(List((k, ""))) }.toList.sorted outBuf should have size (correct.size) outBuf.toList.sorted shouldBe correct } - }(implicitly[TypeDescriptor[(Int, Option[String])]].converter) + }(implicitly[TypeDescriptor[(Int, String)]].converter) .run .finish } @@ -899,7 +899,8 @@ class TypedLookupJobTest extends WordSpec with Matchers { class TypedLookupReduceJob(args: Args) extends Job(args) { TypedPipe.from(TypedText.tsv[Int]("input0")) .hashLookup(TypedPipe.from(TypedText.tsv[(Int, String)]("input1")).group.max) - .write(TypedText.tsv[(Int, Option[String])]("output")) + .mapValues { o: Option[String] => o.getOrElse("") } + .write(TypedText.tsv[(Int, String)]("output")) } class TypedLookupReduceJobTest extends WordSpec with Matchers { @@ -912,20 +913,20 @@ class TypedLookupReduceJobTest extends WordSpec with Matchers { JobTest(new TypedLookupReduceJob(_)) .source(TypedText.tsv[Int]("input0"), (-1 to 100)) .source(TypedText.tsv[(Int, String)]("input1"), mk) - .typedSink(TypedText.tsv[(Int, Option[String])]("output")) { outBuf => + .typedSink(TypedText.tsv[(Int, String)]("output")) { outBuf => "correctly TypedPipe.hashLookup" in { val data = mk.groupBy(_._1) .mapValues { kvs => val (k, v) = kvs.maxBy(_._2) - (k, Some(v)) + (k, v) } val correct = (-1 to 100).map { k => - data.get(k).getOrElse((k, None)) + data.get(k).getOrElse((k, "")) }.toList.sorted outBuf should have size (correct.size) outBuf.toList.sorted shouldBe correct } - }(implicitly[TypeDescriptor[(Int, Option[String])]].converter) + }(implicitly[TypeDescriptor[(Int, String)]].converter) .run .finish } diff --git a/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala b/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala index aad48410ce..cf1ab1d05f 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala @@ -16,15 +16,19 @@ package com.twitter.scalding.macros import cascading.tuple.{ Tuple => CTuple, TupleEntry } - -import org.scalatest.{ Matchers, WordSpec } -import scala.language.experimental.{ macros => smacros } +import com.twitter.bijection.macros.{ IsCaseClass, MacroGenerated } import com.twitter.scalding._ import com.twitter.scalding.macros._ import com.twitter.scalding.macros.impl._ import com.twitter.scalding.serialization.Externalizer - -import com.twitter.bijection.macros.{ IsCaseClass, MacroGenerated } +import org.scalacheck.Arbitrary +import org.scalacheck.Gen.choose +import org.scalacheck.Prop +import org.scalacheck.Prop.forAll +import org.scalacheck.Properties +import org.scalatest.{ Matchers, WordSpec } +import scala.language.experimental.{ macros => smacros } +import scala.reflect.runtime.universe._ // We avoid nesting these just to avoid any complications in the serialization test case class SampleClassA(x: Int, y: String) @@ -37,6 +41,25 @@ case class SampleClassG(a: java.util.Date) case class SampleClassFail(a: Option[Option[Int]]) +object MacroProperties extends Properties("TypeDescriptor.roundTrip") { + def roundTrip[T: Arbitrary: TypeDescriptor]: Prop = forAll { t: T => + val setter = implicitly[TypeDescriptor[T]].setter + val converter = implicitly[TypeDescriptor[T]].converter + val fields = implicitly[TypeDescriptor[T]].fields + converter(new TupleEntry(fields, setter(t))) == t + } + + def propertyFor[T: TypeTag: Arbitrary: TypeDescriptor]: Unit = { + property(typeTag[T].tpe.toString) = roundTrip[T] + } + + propertyFor[Int] + propertyFor[Option[Int]] + propertyFor[Option[(Int, String, Option[Long])]] + propertyFor[Option[(Option[Boolean], Int, String, Option[Long])]] + propertyFor[(Int, Double, String, Option[(String, Int, Option[Long])])] +} + class MacrosUnitTests extends WordSpec with Matchers { import MacroImplicits._ def isMg[T](t: T): T = { @@ -76,6 +99,16 @@ class MacrosUnitTests extends WordSpec with Matchers { def canExternalize(t: AnyRef) { Externalizer(t).javaWorks shouldBe true } + "MacroGenerated TupleConverter" should { + "Not compile for Option[Option[Int]]" in { + //TODO figure out a way to test this does not compile. See: + //https://github.com/milessabin/shapeless/blob/master/core/src/main/scala/shapeless/test/typechecking.scala + //uncommenting fails to compile, but we want to be more sure + //Macros.caseClassTupleConverter[Option[Option[Int]]] + //Macros.caseClassTupleConverter[Option[String]] + } + } + "MacroGenerated TupleSetter" should { "Generate the setter SampleClassA" in { Macros.caseClassTupleSetter[SampleClassA] } @@ -103,6 +136,10 @@ class MacrosUnitTests extends WordSpec with Matchers { "Generate the converter SampleClassE" in { Macros.caseClassTupleConverter[SampleClassE] } "Generate the converter SampleClassF" in { Macros.caseClassTupleConverter[SampleClassF] } "Generate the converter SampleClassG" in { Macros.caseClassTupleConverterWithUnknown[SampleClassG] } + "Generate the converter Option[(Int, String)]" in { Macros.caseClassTupleConverter[Option[(Int, String)]] } + "Generate the converter Option[(Int, Option[(Long, String)])]" in { + Macros.caseClassTupleConverter[Option[(Int, Option[(Long, String)])]] + } "Not generate a convertor for SampleClassFail" in { isMacroTupleConverterAvailable[SampleClassFail] shouldBe false } From 75939689a25eef49ee04d550141486d4db735856 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Mon, 27 Jul 2015 11:33:00 -1000 Subject: [PATCH 2/2] Fix DB macro breakage --- .../twitter/scalding/macros/impl/TupleConverterImpl.scala | 5 ++++- .../scalding/macros/impl/TypeDescriptorProviderImpl.scala | 6 +++++- .../scala/com/twitter/scalding/macros/MacrosUnitTests.scala | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala index 36db15af80..f8ea670d2b 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TupleConverterImpl.scala @@ -83,8 +83,11 @@ object TupleConverterImpl { /* * First we handle primitives, which never recurse */ - case tpe if tpe =:= typeOf[String] => + case tpe if tpe =:= typeOf[String] && allowUnknownTypes => PrimitiveBuilder(idx => q"""t.getString($idx)""") + case tpe if tpe =:= typeOf[String] => + // In this case, null is identical to empty, and we always return non-null + PrimitiveBuilder(idx => q"""{val s = t.getString($idx); if (s == null) "" else s}""") case tpe if tpe =:= typeOf[Boolean] => PrimitiveBuilder(idx => q"""t.getBoolean($idx)""") case tpe if tpe =:= typeOf[Short] => diff --git a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala index a41e7ef1b3..a6fdbf47be 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/macros/impl/TypeDescriptorProviderImpl.scala @@ -58,7 +58,11 @@ object TypeDescriptorProviderImpl { def go(t: Type, offset: Int): (Int, Option[Int]) = { val thisColumn = (offset + 1, Some(offset)) t match { - case tpe if tpe =:= typeOf[String] => (offset + 1, None) + case tpe if tpe =:= typeOf[String] => + // if we don't allowUnknown here, we treat null and "" is indistinguishable + // for text formats + if (allowUnknown) thisColumn + else (offset + 1, None) case tpe if tpe =:= typeOf[Boolean] => thisColumn case tpe if tpe =:= typeOf[Short] => thisColumn case tpe if tpe =:= typeOf[Int] => thisColumn diff --git a/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala b/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala index cf1ab1d05f..172289eaa3 100644 --- a/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala +++ b/scalding-core/src/test/scala/com/twitter/scalding/macros/MacrosUnitTests.scala @@ -49,7 +49,7 @@ object MacroProperties extends Properties("TypeDescriptor.roundTrip") { converter(new TupleEntry(fields, setter(t))) == t } - def propertyFor[T: TypeTag: Arbitrary: TypeDescriptor]: Unit = { + def propertyFor[T:TypeTag: Arbitrary: TypeDescriptor]: Unit = { property(typeTag[T].tpe.toString) = roundTrip[T] }