From a7c8c25d9d37f8e32dbcec8cfe1def9e9732acf9 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 08:55:57 -0700 Subject: [PATCH 1/7] Add a OrderedSerialization.viaTransform with no dependencies, and a BijectedOrderedSerialization in scalding core --- .../BijectedOrderedSerialization.scala | 24 +++++++++++++++++++ .../serialization/OrderedSerialization.scala | 24 +++++++++++++++++++ .../SerializationProperties.scala | 12 ++++++++++ 3 files changed, 60 insertions(+) create mode 100644 scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala diff --git a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala new file mode 100644 index 0000000000..28a5df6bb3 --- /dev/null +++ b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala @@ -0,0 +1,24 @@ +/* +Copyright 2014 Twitter, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + */ +package com.twitter.scalding + +import com.twitter.scalding.serialization.OrderedSerialization +import com.twitter.bijection.Bijection + +object BijectedOrderedSerialization { + implicit def fromBijection[T, U](implicit bij: Bijection[T, U], ordSer: OrderedSerialization[U]) = + OrderedSerialization.viaTransform[T, U](bij.apply(_), bij.invert(_)) +} \ No newline at end of file diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala index 708986a870..3dca1dabed 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala @@ -101,6 +101,30 @@ object OrderedSerialization { case NonFatal(e) => CompareFailure(e) } + def viaTransform[T, U]( + packFn: T => U, + unpackFn: U => T)(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] = + new OrderedSerialization[T] { + + override def hash(t: T) = otherOrdSer.hash(packFn(t)) + + override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = + otherOrdSer.compareBinary(a, b) + + override def compare(x: T, y: T) = + otherOrdSer.compare(packFn(x), packFn(y)) + + override def read(in: InputStream): Try[T] = + otherOrdSer.read(in).map(unpackFn) + + override def write(out: OutputStream, t: T): Try[Unit] = + otherOrdSer.write(out, packFn(t)) + + override def staticSize: Option[Int] = otherOrdSer.staticSize + + override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packFn(t)) + } + /** * The the serialized comparison matches the unserialized comparison */ diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala index c382e3c554..6d469a70fa 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala @@ -56,6 +56,14 @@ object SerializationProperties extends Properties("SerializationProperties") { implicit val stringOrdSer: OrderedSerialization[String] = new StringOrderedSerialization + class IntWrapperClass(val x: Int) + + implicit val myIntWrapperOrdSer: OrderedSerialization[IntWrapperClass] = + OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_)) + + implicit val arbIntWrapperClass: Arbitrary[IntWrapperClass] = + Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntWrapperClass(_))) + implicit def tuple[A: OrderedSerialization, B: OrderedSerialization]: OrderedSerialization[(A, B)] = new OrderedSerialization2[A, B](implicitly, implicitly) @@ -119,6 +127,9 @@ object SerializationProperties extends Properties("SerializationProperties") { property("sequences compare well [(String, String)]") = serializeSequenceCompare[(String, String)] property("sequences equiv well [(String, String)]") = serializeSequenceEquiv[(String, String)] + property("sequences compare well [IntWrapperClass]") = serializeSequenceCompare[IntWrapperClass] + property("sequences equiv well [IntWrapperClass]") = serializeSequenceEquiv[IntWrapperClass] + // Test the independent, non-sequenced, laws as well include(LawTester("Int Ordered", OrderedSerialization.allLaws[Int])) include(LawTester("(Int, Int) Ordered", OrderedSerialization.allLaws[(Int, Int)])) @@ -126,4 +137,5 @@ object SerializationProperties extends Properties("SerializationProperties") { include(LawTester("(String, Int) Ordered", OrderedSerialization.allLaws[(String, Int)])) include(LawTester("(Int, String) Ordered", OrderedSerialization.allLaws[(Int, String)])) include(LawTester("(String, String) Ordered", OrderedSerialization.allLaws[(String, String)])) + include(LawTester("IntWrapperClass Ordered", OrderedSerialization.allLaws[IntWrapperClass])) } From 4ca8c0f341712dd047bd01d1d528758b296b8a3a Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 10:22:04 -0700 Subject: [PATCH 2/7] Use ImplicitBijection, add Try variants and Injection --- .../BijectedOrderedSerialization.scala | 10 +++++--- .../serialization/OrderedSerialization.scala | 24 +++++++++++++++++++ .../SerializationProperties.scala | 13 ++++++++-- 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala index 28a5df6bb3..5a982f30cc 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala @@ -16,9 +16,13 @@ limitations under the License. package com.twitter.scalding import com.twitter.scalding.serialization.OrderedSerialization -import com.twitter.bijection.Bijection +import com.twitter.bijection.ImplicitBijection object BijectedOrderedSerialization { - implicit def fromBijection[T, U](implicit bij: Bijection[T, U], ordSer: OrderedSerialization[U]) = + implicit def fromBijection[T, U](implicit bij: ImplicitBijection[T, U], ordSer: OrderedSerialization[U]) = OrderedSerialization.viaTransform[T, U](bij.apply(_), bij.invert(_)) -} \ No newline at end of file + + implicit def fromInjection[U, U](implicit bij: Injection[T, U], ordSer: OrderedSerialization[U]) = + OrderedSerialization.viaTryTransform[T, U](bij.apply(_), bij.invert(_)) +} + diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala index 3dca1dabed..f9293733af 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala @@ -125,6 +125,30 @@ object OrderedSerialization { override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packFn(t)) } + def viaTryTransform[T, U]( + packFn: T => U, + unpackFn: U => Try[T])(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] = + new OrderedSerialization[T] { + + override def hash(t: T) = otherOrdSer.hash(packFn(t)) + + override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = + otherOrdSer.compareBinary(a, b) + + override def compare(x: T, y: T) = + otherOrdSer.compare(packFn(x), packFn(y)) + + override def read(in: InputStream): Try[T] = + otherOrdSer.read(in).flatMap(unpackFn) + + override def write(out: OutputStream, t: T): Try[Unit] = + otherOrdSer.write(out, packFn(t)) + + override def staticSize: Option[Int] = otherOrdSer.staticSize + + override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packFn(t)) + } + /** * The the serialized comparison matches the unserialized comparison */ diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala index 6d469a70fa..ca5c4a4cec 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala @@ -24,7 +24,7 @@ import org.scalacheck.Prop._ import JavaStreamEnrichments._ import java.io._ -import scala.util.Try +import scala.util.{ Try, Success } object LawTester { def apply[T: Arbitrary](base: String, laws: Iterable[Law[T]]): Properties = @@ -59,7 +59,13 @@ object SerializationProperties extends Properties("SerializationProperties") { class IntWrapperClass(val x: Int) implicit val myIntWrapperOrdSer: OrderedSerialization[IntWrapperClass] = - OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_)) + OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_) }) + + class IntTryWrapperClass(val x: Int) + + implicit val myIntWrapperOrdSer: OrderedSerialization[IntTryWrapperClass] = + OrderedSerialization.viaTryTransform[IntTryWrapperClass, Int](_.x, { x: Int => Success(new IntTryWrapperClass(x)) }) + implicit val arbIntWrapperClass: Arbitrary[IntWrapperClass] = Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntWrapperClass(_))) @@ -128,7 +134,9 @@ object SerializationProperties extends Properties("SerializationProperties") { property("sequences equiv well [(String, String)]") = serializeSequenceEquiv[(String, String)] property("sequences compare well [IntWrapperClass]") = serializeSequenceCompare[IntWrapperClass] + property("sequences compare well [IntTryWrapperClass]") = serializeSequenceCompare[IntTryWrapperClass] property("sequences equiv well [IntWrapperClass]") = serializeSequenceEquiv[IntWrapperClass] + property("sequences equiv well [IntTryWrapperClass]") = serializeSequenceEquiv[IntTryWrapperClass] // Test the independent, non-sequenced, laws as well include(LawTester("Int Ordered", OrderedSerialization.allLaws[Int])) @@ -138,4 +146,5 @@ object SerializationProperties extends Properties("SerializationProperties") { include(LawTester("(Int, String) Ordered", OrderedSerialization.allLaws[(Int, String)])) include(LawTester("(String, String) Ordered", OrderedSerialization.allLaws[(String, String)])) include(LawTester("IntWrapperClass Ordered", OrderedSerialization.allLaws[IntWrapperClass])) + include(LawTester("IntTryWrapperClass Ordered", OrderedSerialization.allLaws[IntTryWrapperClass])) } From 298fbabd3c5b9677db23ec119723b9aac1efb545 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 10:26:19 -0700 Subject: [PATCH 3/7] Fix typo --- .../scalding/serialization/SerializationProperties.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala index ca5c4a4cec..552f12875a 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala @@ -59,7 +59,7 @@ object SerializationProperties extends Properties("SerializationProperties") { class IntWrapperClass(val x: Int) implicit val myIntWrapperOrdSer: OrderedSerialization[IntWrapperClass] = - OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_) }) + OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_)) class IntTryWrapperClass(val x: Int) From cd43fba1e35c9f5d57b600ccedc10b0564c5f988 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 10:29:35 -0700 Subject: [PATCH 4/7] clicked push too fast --- .../scalding/serialization/SerializationProperties.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala index 552f12875a..d2811aa741 100644 --- a/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala +++ b/scalding-serialization/src/test/scala/com/twitter/scalding/serialization/SerializationProperties.scala @@ -63,13 +63,15 @@ object SerializationProperties extends Properties("SerializationProperties") { class IntTryWrapperClass(val x: Int) - implicit val myIntWrapperOrdSer: OrderedSerialization[IntTryWrapperClass] = + implicit val myTryIntWrapperOrdSer: OrderedSerialization[IntTryWrapperClass] = OrderedSerialization.viaTryTransform[IntTryWrapperClass, Int](_.x, { x: Int => Success(new IntTryWrapperClass(x)) }) - implicit val arbIntWrapperClass: Arbitrary[IntWrapperClass] = Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntWrapperClass(_))) + implicit val arbIntTryWrapperClass: Arbitrary[IntTryWrapperClass] = + Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntTryWrapperClass(_))) + implicit def tuple[A: OrderedSerialization, B: OrderedSerialization]: OrderedSerialization[(A, B)] = new OrderedSerialization2[A, B](implicitly, implicitly) From 8e2e82dd8fbace20aaa1883636a25657b5f2fcd9 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 10:39:16 -0700 Subject: [PATCH 5/7] Need more coffee --- .../com/twitter/scalding/BijectedOrderedSerialization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala index 5a982f30cc..59f86f3c10 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala @@ -22,7 +22,7 @@ object BijectedOrderedSerialization { implicit def fromBijection[T, U](implicit bij: ImplicitBijection[T, U], ordSer: OrderedSerialization[U]) = OrderedSerialization.viaTransform[T, U](bij.apply(_), bij.invert(_)) - implicit def fromInjection[U, U](implicit bij: Injection[T, U], ordSer: OrderedSerialization[U]) = + implicit def fromInjection[T, U](implicit bij: Injection[T, U], ordSer: OrderedSerialization[U]) = OrderedSerialization.viaTryTransform[T, U](bij.apply(_), bij.invert(_)) } From aa2b97c4d0e7c9d4c39fc8a6cf5dbc91797b4b93 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 12:36:06 -0700 Subject: [PATCH 6/7] Try cache the transform --- .../serialization/OrderedSerialization.scala | 79 +++++++++---------- 1 file changed, 39 insertions(+), 40 deletions(-) diff --git a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala index f9293733af..f9fe9c2e20 100644 --- a/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala +++ b/scalding-serialization/src/main/scala/com/twitter/scalding/serialization/OrderedSerialization.scala @@ -101,53 +101,52 @@ object OrderedSerialization { case NonFatal(e) => CompareFailure(e) } + private[this] def internalTransformer[T, U, V](packFn: T => U, + unpackFn: U => V, + presentFn: Try[V] => Try[T])(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] = + { + new OrderedSerialization[T] { + private[this] var cache: (T, U) = null + private[this] def packCache(t: T): U = { + val readCache = cache + if (null == readCache || readCache._1 != t) { + val u = packFn(t) + cache = (t, u) + u + } else { + readCache._2 + } + } + + override def hash(t: T) = otherOrdSer.hash(packCache(t)) + + override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = + otherOrdSer.compareBinary(a, b) + + override def compare(x: T, y: T) = + otherOrdSer.compare(packFn(x), packFn(y)) + + override def read(in: InputStream): Try[T] = + presentFn(otherOrdSer.read(in).map(unpackFn)) + + override def write(out: OutputStream, t: T): Try[Unit] = + otherOrdSer.write(out, packCache(t)) + + override def staticSize: Option[Int] = otherOrdSer.staticSize + + override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packCache(t)) + } + } + def viaTransform[T, U]( packFn: T => U, unpackFn: U => T)(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] = - new OrderedSerialization[T] { - - override def hash(t: T) = otherOrdSer.hash(packFn(t)) - - override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = - otherOrdSer.compareBinary(a, b) - - override def compare(x: T, y: T) = - otherOrdSer.compare(packFn(x), packFn(y)) - - override def read(in: InputStream): Try[T] = - otherOrdSer.read(in).map(unpackFn) - - override def write(out: OutputStream, t: T): Try[Unit] = - otherOrdSer.write(out, packFn(t)) - - override def staticSize: Option[Int] = otherOrdSer.staticSize - - override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packFn(t)) - } + internalTransformer[T, U, T](packFn, unpackFn, identity) def viaTryTransform[T, U]( packFn: T => U, unpackFn: U => Try[T])(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] = - new OrderedSerialization[T] { - - override def hash(t: T) = otherOrdSer.hash(packFn(t)) - - override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result = - otherOrdSer.compareBinary(a, b) - - override def compare(x: T, y: T) = - otherOrdSer.compare(packFn(x), packFn(y)) - - override def read(in: InputStream): Try[T] = - otherOrdSer.read(in).flatMap(unpackFn) - - override def write(out: OutputStream, t: T): Try[Unit] = - otherOrdSer.write(out, packFn(t)) - - override def staticSize: Option[Int] = otherOrdSer.staticSize - - override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packFn(t)) - } + internalTransformer[T, U, Try[T]](packFn, unpackFn, _.flatMap(identity)) /** * The the serialized comparison matches the unserialized comparison From a56d408505560b705a65efe9448a464b10aac6b3 Mon Sep 17 00:00:00 2001 From: Ian O Connell Date: Thu, 18 Jun 2015 13:19:59 -0700 Subject: [PATCH 7/7] Import injection --- .../com/twitter/scalding/BijectedOrderedSerialization.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala index 59f86f3c10..f70abce635 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/BijectedOrderedSerialization.scala @@ -16,7 +16,7 @@ limitations under the License. package com.twitter.scalding import com.twitter.scalding.serialization.OrderedSerialization -import com.twitter.bijection.ImplicitBijection +import com.twitter.bijection.{ImplicitBijection, Injection} object BijectedOrderedSerialization { implicit def fromBijection[T, U](implicit bij: ImplicitBijection[T, U], ordSer: OrderedSerialization[U]) =