Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a OrderedSerialization.viaTransform with no dependencies, and a B… #1329

Merged
merged 7 commits into from
Jun 18, 2015
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Copyright 2014 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding

import com.twitter.scalding.serialization.OrderedSerialization
import com.twitter.bijection.ImplicitBijection

object BijectedOrderedSerialization {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Thinking out loud about usage in a scalding job, user will at the very least need to know that she has a type that does not have OrdSer and there is a Bijection from that Type to another Type for which OrdSer does exist and will have to import that bijection. We are slightly moving away from AutoMagically making this work by importing 'RequiredBinaryComparator' but not sure if there is a better way? We should probably improve our error message when we encounter a Type with no OrdSer and nudge user toward this approach.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its only ever going to be best effort in the default case I think, if you have a complex JVM type this isn't a good solution either. Though there is a reasonable notion that for new jobs we should force them to re-write in compatible types for the performance improvements... but hard to put in an error message that should write an injection. They probably should write an ordered serialization I suppose... Its a reasonably advanced thing to have to deal with

implicit def fromBijection[T, U](implicit bij: ImplicitBijection[T, U], ordSer: OrderedSerialization[U]) =
OrderedSerialization.viaTransform[T, U](bij.apply(_), bij.invert(_))

implicit def fromInjection[T, U](implicit bij: Injection[T, U], ordSer: OrderedSerialization[U]) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you did not import Injection. Maybe even more coffee? :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my meager defense i got sidetracked and didn't drink the extra coffee yet ;)

OrderedSerialization.viaTryTransform[T, U](bij.apply(_), bij.invert(_))
}

Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,53 @@ object OrderedSerialization {
case NonFatal(e) => CompareFailure(e)
}

private[this] def internalTransformer[T, U, V](packFn: T => U,
unpackFn: U => V,
presentFn: Try[V] => Try[T])(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] =
{
new OrderedSerialization[T] {
private[this] var cache: (T, U) = null
private[this] def packCache(t: T): U = {
val readCache = cache
if (null == readCache || readCache._1 != t) {
val u = packFn(t)
cache = (t, u)
u
} else {
readCache._2
}
}

override def hash(t: T) = otherOrdSer.hash(packCache(t))

override def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result =
otherOrdSer.compareBinary(a, b)

override def compare(x: T, y: T) =
otherOrdSer.compare(packFn(x), packFn(y))

override def read(in: InputStream): Try[T] =
presentFn(otherOrdSer.read(in).map(unpackFn))

override def write(out: OutputStream, t: T): Try[Unit] =
otherOrdSer.write(out, packCache(t))

override def staticSize: Option[Int] = otherOrdSer.staticSize

override def dynamicSize(t: T): Option[Int] = otherOrdSer.dynamicSize(packCache(t))
}
}

def viaTransform[T, U](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about a variant that takes U => Try[T] for the deserialization side? For instance, cases where you serialize to strings (Json for instance), but clearly that can sometimes fail if we are reading random strings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think it probably makes sense to just make this the base/normal variant. We only hit the Try when we are doing deserialization, wherein we have a try anyway. Not sure there is likely much overhead to another allocation for this path?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually avoiding allocations and nerding in these paths makes sense, i'll just add it and the Injection one in scalding core

packFn: T => U,
unpackFn: U => T)(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] =
internalTransformer[T, U, T](packFn, unpackFn, identity)

def viaTryTransform[T, U](
packFn: T => U,
unpackFn: U => Try[T])(implicit otherOrdSer: OrderedSerialization[U]): OrderedSerialization[T] =
internalTransformer[T, U, Try[T]](packFn, unpackFn, _.flatMap(identity))

/**
* The the serialized comparison matches the unserialized comparison
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalacheck.Prop._

import JavaStreamEnrichments._
import java.io._
import scala.util.Try
import scala.util.{ Try, Success }

object LawTester {
def apply[T: Arbitrary](base: String, laws: Iterable[Law[T]]): Properties =
Expand Down Expand Up @@ -56,6 +56,22 @@ object SerializationProperties extends Properties("SerializationProperties") {

implicit val stringOrdSer: OrderedSerialization[String] = new StringOrderedSerialization

class IntWrapperClass(val x: Int)

implicit val myIntWrapperOrdSer: OrderedSerialization[IntWrapperClass] =
OrderedSerialization.viaTransform[IntWrapperClass, Int](_.x, new IntWrapperClass(_))

class IntTryWrapperClass(val x: Int)

implicit val myTryIntWrapperOrdSer: OrderedSerialization[IntTryWrapperClass] =
OrderedSerialization.viaTryTransform[IntTryWrapperClass, Int](_.x, { x: Int => Success(new IntTryWrapperClass(x)) })

implicit val arbIntWrapperClass: Arbitrary[IntWrapperClass] =
Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntWrapperClass(_)))

implicit val arbIntTryWrapperClass: Arbitrary[IntTryWrapperClass] =
Arbitrary(implicitly[Arbitrary[Int]].arbitrary.map(new IntTryWrapperClass(_)))

implicit def tuple[A: OrderedSerialization, B: OrderedSerialization]: OrderedSerialization[(A, B)] =
new OrderedSerialization2[A, B](implicitly, implicitly)

Expand Down Expand Up @@ -119,11 +135,18 @@ object SerializationProperties extends Properties("SerializationProperties") {
property("sequences compare well [(String, String)]") = serializeSequenceCompare[(String, String)]
property("sequences equiv well [(String, String)]") = serializeSequenceEquiv[(String, String)]

property("sequences compare well [IntWrapperClass]") = serializeSequenceCompare[IntWrapperClass]
property("sequences compare well [IntTryWrapperClass]") = serializeSequenceCompare[IntTryWrapperClass]
property("sequences equiv well [IntWrapperClass]") = serializeSequenceEquiv[IntWrapperClass]
property("sequences equiv well [IntTryWrapperClass]") = serializeSequenceEquiv[IntTryWrapperClass]

// Test the independent, non-sequenced, laws as well
include(LawTester("Int Ordered", OrderedSerialization.allLaws[Int]))
include(LawTester("(Int, Int) Ordered", OrderedSerialization.allLaws[(Int, Int)]))
include(LawTester("String Ordered", OrderedSerialization.allLaws[String]))
include(LawTester("(String, Int) Ordered", OrderedSerialization.allLaws[(String, Int)]))
include(LawTester("(Int, String) Ordered", OrderedSerialization.allLaws[(Int, String)]))
include(LawTester("(String, String) Ordered", OrderedSerialization.allLaws[(String, String)]))
include(LawTester("IntWrapperClass Ordered", OrderedSerialization.allLaws[IntWrapperClass]))
include(LawTester("IntTryWrapperClass Ordered", OrderedSerialization.allLaws[IntTryWrapperClass]))
}