Skip to content

Commit

Permalink
[ADAM-1257] Add program record support for alignment/fragment files.
Browse files Browse the repository at this point in the history
Resolves #1257.
  • Loading branch information
fnothaft committed Jul 17, 2017
1 parent 406d1e3 commit 257695d
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import java.time.Instant
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
Expand All @@ -29,6 +30,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.ProcessingStep
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand All @@ -38,7 +40,9 @@ object TransformAlignments extends BDGCommandCompanion {
val commandDescription = "Convert SAM/BAM to ADAM format and optionally perform read pre-processing transformations"

def apply(cmdLine: Array[String]) = {
new TransformAlignments(Args4j[TransformAlignmentsArgs](cmdLine))
val args = Args4j[TransformAlignmentsArgs](cmdLine)
args.command = cmdLine.mkString(" ")
new TransformAlignments(args)
}
}

Expand Down Expand Up @@ -125,6 +129,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var cache: Boolean = false
@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false

var command: String = null
}

class TransformAlignments(protected val args: TransformAlignmentsArgs) extends BDGSparkCommand[TransformAlignmentsArgs] with Logging {
Expand Down Expand Up @@ -434,7 +442,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
)
}

val aRdd: AlignmentRecordRDD =
val loadedRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
if (args.regionPredicate != null) {
val loci = ReferenceRegion.fromString(args.regionPredicate)
Expand Down Expand Up @@ -484,9 +492,31 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
loadedReads
}
}

val aRdd: AlignmentRecordRDD = if (args.disableProcessingStep) {
loadedRdd
} else {
// add program info
val about = new About()
val version = if (about.isSnapshot()) {
"%s--%s".format(about.version(), about.commit())
} else {
about.version()
}
val epoch = Instant.now.getEpochSecond
val processingStep = ProcessingStep.newBuilder
.setId("ADAM_%d".format(epoch))
.setProgramName("org.bdgenomics.adam.cli.TransformAlignments")
.setVersion(version)
.setCommandLine(args.command)
.build
loadedRdd.addProcessingStep(processingStep)
}

val rdd = aRdd.rdd
val sd = aRdd.sequences
val rgd = aRdd.recordGroups
val pgs = aRdd.processingSteps

// Optionally load a second RDD and concatenate it with the first.
// Paired-FASTQ loading is avoided here because that wouldn't make sense
Expand All @@ -507,12 +537,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
})

// if we have a second rdd that we are merging in, process the merger here
val (mergedRdd, mergedSd, mergedRgd) = concatOpt.fold((rdd, sd, rgd))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups)
val (mergedRdd, mergedSd, mergedRgd, mergedPgs) = concatOpt.fold((rdd, sd, rgd, pgs))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups, pgs ++ t.processingSteps)
})

// make a new aligned read rdd, that merges the two RDDs together
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd)
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd, mergedPgs)

// run our transformation
val outputRdd = this.apply(newRdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class MergeShardsSuite extends ADAMFunSuite {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array("-single", "-defer_merging", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-defer_merging", "-disable_pg",
inputPath, actualPath)).run(sc)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
Expand All @@ -41,6 +42,7 @@ class MergeShardsSuite extends ADAMFunSuite {
"-sort_lexicographically",
"-defer_merging",
inputPath, actualPath)).run(sc)
println(actualPath)
MergeShards(Array(actualPath + "_tail", actualPath,
"-header_path", actualPath + "_head")).run(sc)
checkFiles(expectedPath, actualPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array("-single", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam to ordered sam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -42,8 +42,8 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val intermediateAdamPath = tmpFile("unordered.adam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-disable_pg", inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -53,7 +53,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.bdgenomics.adam.models

import htsjdk.samtools.SAMFileHeader
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
import scala.collection.JavaConversions._

private[adam] object SAMFileHeaderWritable {
Expand Down Expand Up @@ -51,7 +53,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S
private val sd = SequenceDictionary(hdr.getSequenceDictionary)
private val pgl = {
val pgs = hdr.getProgramRecords
pgs.map(ProgramRecord(_))
pgs.map(ADAMContext.convertSAMProgramRecord)
}
private val comments = {
val cmts = hdr.getComments
Expand All @@ -68,7 +70,7 @@ private[adam] class SAMFileHeaderWritable private (hdr: SAMFileHeader) extends S
// add back optional fields
text.foreach(h.setTextHeader)
h.setSequenceDictionary(sd.toSAMSequenceDictionary)
pgl.foreach(p => h.addProgramRecord(p.toSAMProgramRecord))
pgl.foreach(p => h.addProgramRecord(AlignmentRecordRDD.processingStepToSam(p)))
comments.foreach(h.addComment)
rgs.recordGroups.foreach(rg => h.addReadGroup(rg.toSAMReadGroupRecord))

Expand Down
Loading

0 comments on commit 257695d

Please sign in to comment.