-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADAM-1533] Set Theory #1561
[ADAM-1533] Set Theory #1561
Conversation
Test FAILed. Build result: ABORTED[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1561/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 1daa69788cb362d5664b22a495ca180ca1b99871 # timeout=10Checking out Revision 1daa69788cb362d5664b22a495ca180ca1b99871 (origin/pr/1561/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 1daa69788cb362d5664b22a495ca180ca1b99871First time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.10,2.0.0,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result ABORTEDADAM-prb ? 2.3.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,2.0.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result ABORTEDADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.10,2.0.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1561/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains c9ef99a590e6f63b0763b6a25b9df9f8dbf90d0e # timeout=10Checking out Revision c9ef99a590e6f63b0763b6a25b9df9f8dbf90d0e (origin/pr/1561/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f c9ef99a590e6f63b0763b6a25b9df9f8dbf90d0eFirst time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.10,2.0.0,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,2.0.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,2.0.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result FAILUREADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,2.0.0,centos completed with result FAILUREADAM-prb ? 2.3.0,2.10,2.0.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1561/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains 866341412bcb1847e609c4971aeb0eb21ad60026 # timeout=10Checking out Revision 866341412bcb1847e609c4971aeb0eb21ad60026 (origin/pr/1561/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f 866341412bcb1847e609c4971aeb0eb21ad60026First time build. Skipping changelog.Triggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.10,2.0.0,centosADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,2.0.0,centos completed with result FAILUREADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.10,2.0.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few detailed comments within. I'm hesitant to move forward with this. My main architectural objection is that I think we're inverting an abstraction. Is there any reason we can't build the set theory primitives on top of the join code, instead of building the set theory operators using the guts of the join code? See my comment on closest for a concrete example.
Additionally, I'd like to avoid spreading the partitionMap data structure outside of the GenomicRDD hierarchy. We have to do a full scan of the data to compute the partitionMap, and I believe that we can implement a lighter weight alternative that is cheaper to compute and that is compatible with legacy formats.
* | ||
* @return An empty ReferenceRegion. | ||
*/ | ||
private[adam] val empty: ReferenceRegion = ReferenceRegion("", 0L, 0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 on adding this. What's the use case? Also, this violates our width invariant (end - start > 0) and should throw an illegal argument exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. This was in here temporarily for some tests.
@@ -66,7 +67,6 @@ private[rdd] object GenomicRDD { | |||
* Replaces file references in a command. | |||
* | |||
* @see pipe | |||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert comment spacing changes throughout this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will revert these on a future push. I want to avoid reverting and re-reverting repeatedly.
* @tparam RT The resulting type of the left after the join. | ||
* @tparam RU The resulting type of the right after the join. | ||
*/ | ||
sealed abstract class Closest[T: ClassTag, U: ClassTag, RT, RU] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add inline docs to this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I feel like we're inverting an abstraction here. IMO, the correct way to implement this would be to run a self joinAndGroupByLeft and to then:
val (left, rightIter) = kv
(left, rightIter.min(_.distance(left)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Closest doesn't necessarily overlap, and is performed between two RDD
s. In your proposed architecture, in order to be truly exhaustive we'd end up joining each record on the left with all records with the same referenceName
on the right.
protected val optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] | ||
|
||
/** | ||
* The condition that should be met in order for the primitive to be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be clearer with an example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if this package is set theoretic, then we should describe it through set theoretic notation, right? I think this method would be "This method evaluates whether a given pair of regions should be members of the set output by this set theoretic operation." Is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"SetTheory" doesn't work for me as a class and package name.
Perhaps for package rdd.sets
, and SetOperation
for the top level abstract class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on package and class rename
* @tparam RT The return type for the left side row data. | ||
* @tparam RU The return type for the right side row data. | ||
*/ | ||
private[settheory] abstract class SetTheoryBetweenCollections[T, U, RT, RU] extends SetTheory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OOC, is there a reason to prefer an abstract class to a trait here? I know that ShuffleRegionJoin
is an abstract class, but I was looking through the history and couldn't figure out what was the impetus for the change. CCing in @ryan-williams who made the change in 9ff0fa5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and put this back to a trait. I believe it makes more sense, especially now that we are using the GenomicRDD
.
sealed abstract class ShuffleRegionJoin[T: ClassTag, U: ClassTag, RT, RU] | ||
extends SetTheoryBetweenCollections[T, U, RT, RU] with SetTheoryPrimitive { | ||
|
||
override protected def condition(firstRegion: ReferenceRegion, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't condition
abstract in the superclass? If so, we shouldn't be using the override
modifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that proper style? Is it still useful for inheriting docs, as @Overrides
does in java?
val sortedLeft = leftRdd.sortByKey() | ||
val partitionMap = | ||
sortedLeft.mapPartitions(getRegionBoundsFromPartition).collect | ||
(sortedLeft, partitionMap.map(_.get)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the left RDD is not already sorted, this will lead to an execution DAG that does two full sorts of the left RDD.
// convert to an IntervalArray for fast range query | ||
val partitionMapIntervals = IntervalArray( | ||
adjustedPartitionMapWithIndex, | ||
adjustedPartitionMapWithIndex.maxBy(_._1.width)._1.width, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer adjustedPartitionMapWithIndex.map(_._1.width).max
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was an attempted optimization. Rather than iterating over the entire list twice, we do it once with maxBy
. There is certainly little effect because the number of partitions will presumably be relatively small. I'll go ahead and change it.
extends SetTheoryBetweenCollections[T, U, RT, RU] | ||
with SetTheoryPrimitive { | ||
|
||
var currentClosest: ReferenceRegion = ReferenceRegion.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thread safety...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It hasn't been a problem yet, but I am working on a fix for this in the case that the Spark Serialization rules change. I don't like this here either.
implicit tTag: ClassTag[T], | ||
xTag: ClassTag[X], | ||
txTag: ClassTag[(T, X)]): GenomicRDD[(T, X), Z] = InnerShuffleJoin.time { | ||
|
||
val (leftRddToJoin, rightRddToJoin) = | ||
prepareForShuffleRegionJoin(genomicRdd, optPartitions) | ||
val preparedLeft = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're moving the preparation logic into the join primitive, this code should move with it.
Test PASSed. |
Test PASSed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @devin-petersohn,
This is related to the comments I was making in our meeting on Monday. I don't like this architectural shift, because all of the set theoretic operations we are implementing should be able to be implemented on top of the region join primitive. I believe that all the primitives should map into a join [ -> aggregate ] -> predicate
flow. There are several advantages to this:
- This will allow us to support both join strategies (shuffle and broadcast) in most cases.
- This approach requires substantially less code.
- This approach should further isolate bugs.
- This approach would allow us to minimize the openness of the interfaces we build the join primitives from.
Additionally, I'm not a big fan of making the partition map visible outside of GenomicRDD
. Again, there are several reasons for this:
- I think that if the entrypoint to
ShuffleRegionJoin
clearly assumes the contract that the two RDDs are copartitioned, then we can require that all callers enforce that contract, whether they are starting with aGenomicRDD
or a plainReferenceRegion
-keyed RDD. If we have an entrypoint that does the prepwork and sets up the contract, then the contract is a bit less clear if you are calling with aReferenceRegion
-keyed RDD. I'm word vomiting a bit here, so let me know if this makes sense or not. - Additionally, this change means that we need to open up various protections on the partition map.
- Part of which, I would like to avoid, because I think that we can refactor the partition map in a later PR to simplify the data structure and make it easier to compute.
Let me know your thoughts.
@@ -154,14 +154,16 @@ trait GenomicRDD[T, U <: GenomicRDD[T, U]] extends Logging { | |||
// The (ReferenceRegion, ReferenceRegion) tuple contains the bounds of the | |||
// partition, such that the lowest start is first and the highest end is | |||
// second. | |||
private[rdd] val optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a strong -1 on opening up protections on the partition map.
@@ -1307,6 +1309,54 @@ abstract class AvroGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroGenomicRDD[ | |||
} | |||
} | |||
|
|||
private[rdd] case class PartitionMap(private val optPartitionMap: Option[Array[Option[(ReferenceRegion, ReferenceRegion)]]]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you may have guessed, I am also a strong -1 on making a PartitionMap
class, esp. if it is not private to GenomicRDD
.
Except for unbounded closest, this is true. Unbounded closest is the one of the reasons the
I agree 100% that the majority of the operations should be performed with a join as the first phase, independent of the type of join. However, this will not work, or be efficient, for
The way that it is currently architected, it does not assume this. The goal for pulling the
I am not sure what you mean by opening up protections on the partition map since I have moved from
The Sorry for the wall of text. Feel free to address anything I've said. |
* Perform an Inner ShuffleRegionJoin. This is publicly accessible to be | ||
* compatible with legacy code. | ||
*/ | ||
object InnerShuffleRegionJoin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test FAILed. Build result: FAILURE[...truncated 15 lines...] > /home/jenkins/git2/bin/git fetch --tags --progress https://github.com/bigdatagenomics/adam.git +refs/pull/:refs/remotes/origin/pr/ # timeout=15 > /home/jenkins/git2/bin/git rev-parse origin/pr/1561/merge^{commit} # timeout=10 > /home/jenkins/git2/bin/git branch -a -v --no-abbrev --contains f08e10f43371f4280767a3e0c8b22fc4bc6de9f8 # timeout=10Checking out Revision f08e10f43371f4280767a3e0c8b22fc4bc6de9f8 (origin/pr/1561/merge) > /home/jenkins/git2/bin/git config core.sparsecheckout # timeout=10 > /home/jenkins/git2/bin/git checkout -f f08e10f43371f4280767a3e0c8b22fc4bc6de9f8First time build. Skipping changelog.Triggering ADAM-prb ? 2.6.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.11,2.0.0,centosTriggering ADAM-prb ? 2.6.0,2.10,2.0.0,centosTriggering ADAM-prb ? 2.3.0,2.11,1.6.1,centosTriggering ADAM-prb ? 2.6.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.10,1.6.1,centosTriggering ADAM-prb ? 2.3.0,2.10,2.0.0,centosADAM-prb ? 2.6.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.3.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.11,2.0.0,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,2.0.0,centos completed with result FAILUREADAM-prb ? 2.3.0,2.11,1.6.1,centos completed with result SUCCESSADAM-prb ? 2.6.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.10,1.6.1,centos completed with result FAILUREADAM-prb ? 2.3.0,2.10,2.0.0,centos completed with result FAILURENotifying endpoint 'HTTP:https://webhooks.gitter.im/e/ac8bb6e9f53357bc8aa8'Test FAILed. |
Closing as won't merge. This work belongs in a downstream app. |
WIP. Looking for quick feedback on architecture changes. The idea is to move the prepare code into the individual set theory primitive classes, which will unbloat
GenomicRDD
a bit.Most of the primitives can be reduced to post-processing on the
ShuffleRegionJoin
implementations now that I have generalized joins to allow distances also.TODO: