Skip to content

Commit

Permalink
[ADAM-1402] Fix INDEL realigner bad binary search.
Browse files Browse the repository at this point in the history
Resolves #1402. Includes fixes to consensus generator and reference scorer.

Improve INDEL realigner performance:

* Exit early when realigning will not yield a better score.
* Eliminate substring call in sweep over reference.
* Change datastructures to be immutable wherever possible.
* Add bound checking and other error checking.
* Rewrite target association code to use array instead of set, and improve load balancing.
* Delete high coverage targets with reduceByKey.

Additionally:
* Improve telemetry/logging to sort out load balance issue.
* Support using reference file in INDEL realignment.
* Log reads with negative alignment sizes.
* Improved test coverage for insertion realignment.
* Fix CIGARs on reads that partially overlap INDEL.
* Soft clip reads that partially align to an insertion.
* Eliminate non-determinism.
* Fixed reference file.
* Serialization fixes and debug.
* Fix bad score.
* Clean up clipping code?
* Unclip clipped reads.
  • Loading branch information
fnothaft authored and heuermh committed Mar 31, 2017
1 parent 65d8c9e commit d12fdfb
Show file tree
Hide file tree
Showing 19 changed files with 1,653 additions and 377 deletions.
28 changes: 22 additions & 6 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Transform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,14 @@ class TransformArgs extends Args4jBase with ADAMSaveAnyArgs with ParquetArgs {
var maxConsensusNumber = 30
@Args4jOption(required = false, name = "-log_odds_threshold", usage = "The log-odds threshold for accepting a realignment. Default value is 5.0.")
var lodThreshold = 5.0
@Args4jOption(required = false, name = "-unclip_reads", usage = "If true, unclips reads during realignment.")
var unclipReads = false
@Args4jOption(required = false, name = "-max_target_size", usage = "The maximum length of a target region to attempt realigning. Default length is 3000.")
var maxTargetSize = 3000
@Args4jOption(required = false, name = "-max_reads_per_target", usage = "The maximum number of reads attached to a target considered for realignment. Default is 20000.")
var maxReadsPerTarget = 20000
@Args4jOption(required = false, name = "-reference", usage = "Path to a reference file to use for indel realignment.")
var reference: String = null
@Args4jOption(required = false, name = "-repartition", usage = "Set the number of partitions to map data to")
var repartition: Int = -1
@Args4jOption(required = false, name = "-coalesce", usage = "Set the number of partitions written to the ADAM output directory")
Expand Down Expand Up @@ -161,7 +167,8 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
* Else, we generate candidate INDELs from the reads and then realign. If
* -realign_indels is not set, we return the input RDD.
*/
private def maybeRealign(rdd: AlignmentRecordRDD,
private def maybeRealign(sc: SparkContext,
rdd: AlignmentRecordRDD,
sl: StorageLevel): AlignmentRecordRDD = {
if (args.locallyRealign) {

Expand All @@ -178,14 +185,23 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
ConsensusGenerator.fromKnownIndels(rdd.rdd.context.loadVariants(file))
})

// optionally load a reference
val optReferenceFile = Option(args.reference).map(f => {
sc.loadReferenceFile(f,
fragmentLength = args.mdTagsFragmentSize)
})

// run realignment
val realignmentRdd = rdd.realignIndels(
consensusGenerator,
isSorted = false,
args.maxIndelSize,
args.maxConsensusNumber,
args.lodThreshold,
args.maxTargetSize
maxIndelSize = args.maxIndelSize,
maxConsensusNumber = args.maxConsensusNumber,
lodThreshold = args.lodThreshold,
maxTargetSize = args.maxTargetSize,
maxReadsPerTarget = args.maxReadsPerTarget,
optReferenceFile = optReferenceFile,
unclipReads = args.unclipReads
)

// unpersist our input, if persisting was requested
Expand Down Expand Up @@ -340,7 +356,7 @@ class Transform(protected val args: TransformArgs) extends BDGSparkCommand[Trans
val maybeDedupedRdd = maybeDedupe(initialRdd)

// once we've deduped our reads, maybe realign them
val maybeRealignedRdd = maybeRealign(maybeDedupedRdd, sl)
val maybeRealignedRdd = maybeRealign(sc, maybeDedupedRdd, sl)

// run BQSR
val maybeRecalibratedRdd = maybeRecalibrate(maybeRealignedRdd, sl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ private[adam] object Consensus extends Serializable {
case CigarOperator.I => Some(new Consensus(sequence.substring(readPos,
readPos + cigarElement.getLength),
ReferenceRegion(start.referenceName,
referencePos,
referencePos + 1)))
referencePos - 1,
referencePos)))
case CigarOperator.D => Some(new Consensus("",
ReferenceRegion(start.referenceName,
referencePos,
Expand Down Expand Up @@ -111,9 +111,14 @@ private[adam] case class Consensus(consensus: String, index: ReferenceRegion) {
"Consensus not contained in reference region: %s vs. %s.".format(
index, rr))

"%s%s%s".format(reference.substring(0, (index.start - rr.start).toInt),
consensus,
reference.substring((index.end - 1 - rr.start).toInt))
if (consensus.isEmpty) {
"%s%s".format(reference.substring(0, (index.start - rr.start).toInt),
reference.substring((index.end - rr.start - 1).toInt))
} else {
"%s%s%s".format(reference.substring(0, (index.start - rr.start + 1).toInt),
consensus,
reference.substring((index.end - rr.start).toInt))
}
}

override def toString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ object Timers extends Metrics {
val MapTargets = timer("Map Targets")
val RealignTargetGroup = timer("Realign Target Group")
val GetReferenceFromReads = timer("Get Reference From Reads")
val GetReferenceFromFile = timer("Get Reference From File")
val ComputingOriginalScores = timer("Computing Original Mismatch Scores")
val SweepReadsOverConsensus = timer("Sweeping Reads Over A Single Consensus")
val SweepReadOverReferenceForQuality = timer("Sweep Read Over Reference For Quality")
val FinalizingRealignments = timer("Finalizing Realignments")

// Sort Reads
val SortReads = timer("Sort Reads")
Expand Down
45 changes: 31 additions & 14 deletions adam-core/src/main/scala/org/bdgenomics/adam/models/MdTag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ object MdTag {

// dirty dancing to recalculate match sets
for (i <- 0 until cigarElement.getLength) {
if (reference(referencePos) == sequence(readPos)) {
if (reference(referencePos) ==
sequence(readPos)) {
if (!inMatch) {
rangeStart = referencePos.toLong
inMatch = true
Expand Down Expand Up @@ -439,10 +440,15 @@ case class MdTag(
* Given a read, returns the reference.
*
* @param read A read for which one desires the reference sequence.
* @param withGaps If true, applies INDEL gaps to the reference. Else, returns
* the raw reference sequence.
* @return A string corresponding to the reference overlapping this read.
*/
def getReference(read: RichAlignmentRecord): String = {
getReference(read.getSequence, read.samtoolsCigar, read.getStart)
def getReference(read: RichAlignmentRecord, withGaps: Boolean = false): String = {
getReferenceSequence(read.getSequence,
read.samtoolsCigar,
read.getStart,
withGaps = withGaps)
}

/**
Expand All @@ -451,9 +457,14 @@ case class MdTag(
* @param readSequence The base sequence of the read.
* @param cigar The cigar for the read.
* @param referenceFrom The starting point of this read alignment vs. the reference.
* @param withGaps If true, applies INDEL gaps to the reference. Else, returns
* the raw reference sequence.
* @return A string corresponding to the reference overlapping this read.
*/
def getReference(readSequence: String, cigar: Cigar, referenceFrom: Long): String = {
private def getReferenceSequence(readSequence: String,
cigar: Cigar,
referenceFrom: Long,
withGaps: Boolean = false): String = {

var referencePos = start
var readPos = 0
Expand All @@ -476,22 +487,28 @@ case class MdTag(
}
}
case CigarOperator.D => {
// if a delete, get from the delete pool
for (i <- 0 until cigarElement.getLength) {
reference += {
deletions.get(referencePos) match {
case Some(base) => base
case _ => throw new IllegalStateException("Could not find deleted base at cigar offset " + i)
if (!withGaps) {
// if a delete, get from the delete pool
for (i <- 0 until cigarElement.getLength) {
reference += {
deletions.get(referencePos) match {
case Some(base) => base
case _ => throw new IllegalStateException("Could not find deleted base at cigar offset " + i)
}
}
referencePos += 1
}

referencePos += 1
} else {
referencePos += cigarElement.getLength
}
}
case _ => {
// ignore inserts
if (cigarElement.getOperator.consumesReadBases) {
readPos += cigarElement.getLength
val insLength = cigarElement.getLength
if (withGaps) {
reference += ("_" * insLength)
}
readPos += insLength
}
if (cigarElement.getOperator.consumesReferenceBases) {
throw new IllegalArgumentException("Cannot handle operator: " + cigarElement.getOperator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,10 @@ case class AlignmentRecordRDD(
* are only finalized if the log-odds threshold is exceeded.
* @param maxTargetSize The maximum width of a single target region for
* realignment.
* @param optReferenceFile An optional reference. If not provided, reference
* will be inferred from MD tags.
* @param unclipReads If true, unclips reads prior to realignment. Else, omits
* clipped bases during realignment.
* @return Returns an RDD of mapped reads which have been realigned.
*/
def realignIndels(
Expand All @@ -671,8 +675,20 @@ case class AlignmentRecordRDD(
maxIndelSize: Int = 500,
maxConsensusNumber: Int = 30,
lodThreshold: Double = 5.0,
maxTargetSize: Int = 3000): AlignmentRecordRDD = RealignIndelsInDriver.time {
replaceRdd(RealignIndels(rdd, consensusModel, isSorted, maxIndelSize, maxConsensusNumber, lodThreshold))
maxTargetSize: Int = 3000,
maxReadsPerTarget: Int = 20000,
optReferenceFile: Option[ReferenceFile] = None,
unclipReads: Boolean = false): AlignmentRecordRDD = RealignIndelsInDriver.time {
replaceRdd(RealignIndels(rdd,
consensusModel = consensusModel,
dataIsSorted = isSorted,
maxIndelSize = maxIndelSize,
maxConsensusNumber = maxConsensusNumber,
lodThreshold = lodThreshold,
maxTargetSize = maxTargetSize,
maxReadsPerTarget = maxReadsPerTarget,
optReferenceFile = optReferenceFile,
unclipReads = unclipReads))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,6 @@ import org.bdgenomics.adam.instrumentation.Timers._
import scala.collection.JavaConversions._
import scala.collection.immutable.TreeSet

private[realignment] object ZippedTargetOrdering extends Ordering[(IndelRealignmentTarget, Int)] {

/**
* Order two indel realignment targets by earlier starting position.
*
* @param a Indel realignment target to compare.
* @param b Indel realignment target to compare.
* @return Comparison done by starting position.
*/
def compare(a: (IndelRealignmentTarget, Int), b: (IndelRealignmentTarget, Int)): Int = {
TargetOrdering.compare(a._1, b._1)
}
}

private[realignment] object TargetOrdering extends Ordering[IndelRealignmentTarget] {

/**
Expand All @@ -65,7 +51,7 @@ private[realignment] object TargetOrdering extends Ordering[IndelRealignmentTarg
}

/**
* Compares a read to an indel realignment target to see if it starts before the start of the indel realignment target.
* Compares a read to an indel realignment target to see if the target is before the read.
*
* @param target Realignment target to compare.
* @param read Read to compare.
Expand Down Expand Up @@ -217,27 +203,22 @@ private[adam] class TargetSetSerializer extends Serializer[TargetSet] {
}
}

private[adam] class ZippedTargetSetSerializer extends Serializer[ZippedTargetSet] {
private[adam] class IndelRealignmentTargetArraySerializer extends Serializer[Array[IndelRealignmentTarget]] {

val irts = new IndelRealignmentTargetSerializer()
private val irts = new IndelRealignmentTargetSerializer

def write(kryo: Kryo, output: Output, obj: ZippedTargetSet) = {
output.writeInt(obj.set.size)
obj.set.foreach(innerObj => {
irts.write(kryo, output, innerObj._1)
output.writeInt(innerObj._2)
})
def write(kryo: Kryo, output: Output, obj: Array[IndelRealignmentTarget]) = {
output.writeInt(obj.length)
obj.foreach(irts.write(kryo, output, _))
}

def read(kryo: Kryo, input: Input, klazz: Class[ZippedTargetSet]): ZippedTargetSet = {
val size = input.readInt()
val array = new Array[(IndelRealignmentTarget, Int)](size)
(0 until size).foreach(i => {
val target = irts.read(kryo, input, classOf[IndelRealignmentTarget])
val idx = input.readInt()
array(i) = (target, idx)
def read(kryo: Kryo, input: Input, klazz: Class[Array[IndelRealignmentTarget]]): Array[IndelRealignmentTarget] = {
val arrSize = input.readInt()
val arr = new Array[IndelRealignmentTarget](arrSize)
(0 until arrSize).foreach(idx => {
arr(idx) = irts.read(kryo, input, classOf[IndelRealignmentTarget])
})
new ZippedTargetSet(TreeSet(array: _*)(ZippedTargetOrdering))
arr
}
}

Expand All @@ -247,9 +228,6 @@ private[realignment] object TargetSet {
}
}

// These two case classes are needed to get around some serialization issues
// this case class is needed to get around some serialization issues (type erasure)
private[adam] case class TargetSet(set: TreeSet[IndelRealignmentTarget]) extends Serializable {
}

private[adam] case class ZippedTargetSet(set: TreeSet[(IndelRealignmentTarget, Int)]) extends Serializable {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.rdd.read.realignment

import org.apache.spark.Partitioner

private[realignment] case class ModPartitioner(numPartitions: Int) extends Partitioner {

def getPartition(key: Any): Int = key match {
case i: Int => {
i.abs % numPartitions
}
case _ => {
throw new IllegalArgumentException("Key %s is not an Int.".format(key))
}
}
}
Loading

0 comments on commit d12fdfb

Please sign in to comment.