Skip to content

Commit

Permalink
[ADAM-1257] Add program record support for alignment/fragment files.
Browse files Browse the repository at this point in the history
Resolves #1257.
  • Loading branch information
fnothaft committed May 22, 2017
1 parent ff75c70 commit d3427c7
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import java.time.Instant
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
Expand All @@ -29,6 +30,7 @@ import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, QualityScoreBin }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.ProcessingStep
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand All @@ -38,7 +40,9 @@ object TransformAlignments extends BDGCommandCompanion {
val commandDescription = "Convert SAM/BAM to ADAM format and optionally perform read pre-processing transformations"

def apply(cmdLine: Array[String]) = {
new TransformAlignments(Args4j[TransformAlignmentsArgs](cmdLine))
val args = Args4j[TransformAlignmentsArgs](cmdLine)
args.command = cmdLine.mkString(" ")
new TransformAlignments(args)
}
}

Expand Down Expand Up @@ -123,6 +127,10 @@ class TransformAlignmentsArgs extends Args4jBase with ADAMSaveAnyArgs with Parqu
var cache: Boolean = false
@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching.")
var storageLevel: String = "MEMORY_ONLY"
@Args4jOption(required = false, name = "-disable_pg", usage = "Disable writing a new @PG line.")
var disableProcessingStep = false

var command: String = null
}

class TransformAlignments(protected val args: TransformAlignmentsArgs) extends BDGSparkCommand[TransformAlignmentsArgs] with Logging {
Expand Down Expand Up @@ -427,7 +435,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
)
}

val aRdd: AlignmentRecordRDD =
val loadedRdd: AlignmentRecordRDD =
if (args.forceLoadBam) {
sc.loadBam(args.inputPath)
} else if (args.forceLoadFastq) {
Expand Down Expand Up @@ -461,9 +469,33 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
stringency = stringency
)
}

val aRdd: AlignmentRecordRDD = if (args.disableProcessingStep) {
loadedRdd
} else {
// add program info
val about = new About()
val version = if (about.isSnapshot()) {
"%s--%s".format(about.version(), about.commit())
} else {
about.version()
}
val epoch = Instant.now.getEpochSecond
val processingStep = ProcessingStep.newBuilder
.setId("ADAM_%d".format(epoch))
.setProgramName("org.bdgenomics.adam.cli.TransformAlignments")
.setVersion(version)
.setCommandLine(args.command)
.build
val newSteps: Seq[ProcessingStep] = (loadedRdd.processingSteps ++ Seq(processingStep))
val rddWithProvenance: AlignmentRecordRDD = loadedRdd.copy(processingSteps = newSteps)
rddWithProvenance
}

val rdd = aRdd.rdd
val sd = aRdd.sequences
val rgd = aRdd.recordGroups
val pgs = aRdd.processingSteps

// Optionally load a second RDD and concatenate it with the first.
// Paired-FASTQ loading is avoided here because that wouldn't make sense
Expand All @@ -484,12 +516,12 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
})

// if we have a second rdd that we are merging in, process the merger here
val (mergedRdd, mergedSd, mergedRgd) = concatOpt.fold((rdd, sd, rgd))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups)
val (mergedRdd, mergedSd, mergedRgd, mergedPgs) = concatOpt.fold((rdd, sd, rgd, pgs))(t => {
(rdd ++ t.rdd, sd ++ t.sequences, rgd ++ t.recordGroups, pgs ++ t.processingSteps)
})

// make a new aligned read rdd, that merges the two RDDs together
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd)
val newRdd = AlignmentRecordRDD(mergedRdd, mergedSd, mergedRgd, mergedPgs)

// run our transformation
val outputRdd = this.apply(newRdd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array("-single", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

sparkTest("unordered sam to ordered sam") {
val inputPath = copyResource("unordered.sam")
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", inputPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -42,8 +42,8 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val intermediateAdamPath = tmpFile("unordered.adam")
val actualPath = tmpFile("unordered.sam")
val expectedPath = inputPath
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-disable_pg", inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand All @@ -53,7 +53,7 @@ class TransformAlignmentsSuite extends ADAMFunSuite {
val actualPath = tmpFile("ordered.sam")
val expectedPath = copyResource("ordered.sam")
TransformAlignments(Array(inputPath, intermediateAdamPath)).run(sc)
TransformAlignments(Array("-single", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
TransformAlignments(Array("-single", "-disable_pg", "-sort_reads", "-sort_lexicographically", intermediateAdamPath, actualPath)).run(sc)
checkFiles(expectedPath, actualPath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.bdgenomics.adam.models

import htsjdk.samtools.SAMProgramRecord

private[models] object ProgramRecord {
private[adam] object ProgramRecord {

/**
* Builds a program record model from a SAM program record.
Expand Down Expand Up @@ -51,7 +51,7 @@ private[models] object ProgramRecord {
* @param previousID An optional ID for the ID of the previous stage that was
* run.
*/
private[models] case class ProgramRecord(
private[adam] case class ProgramRecord(
id: String,
commandLine: Option[String],
name: Option[String],
Expand Down
72 changes: 61 additions & 11 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.bdgenomics.adam.rdd

import java.io.{ File, FileNotFoundException, InputStream }
import htsjdk.samtools.{ SAMFileHeader, ValidationStringency }
import htsjdk.samtools.{ SAMFileHeader, SAMProgramRecord, ValidationStringency }
import htsjdk.samtools.util.Locatable
import htsjdk.variant.vcf.{
VCFHeader,
Expand Down Expand Up @@ -70,6 +70,7 @@ import org.bdgenomics.formats.avro.{
Fragment,
Genotype,
NucleotideContigFragment,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample,
Variant
Expand Down Expand Up @@ -180,6 +181,33 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
RecordGroupDictionary.fromSAMHeader(samHeader)
}

/**
* @param samHeader The header to extract processing lineage from.
* @return Returns the dictionary converted to an Avro model.
*/
private[rdd] def loadBamPrograms(
samHeader: SAMFileHeader): Seq[ProcessingStep] = {
val pgs = samHeader.getProgramRecords().toSeq
pgs.map(convertSAMProgramRecord(_))
}

/**
* Builds a program description from a htsjdk program record.
*
* @param record Program record to convert.
* @return Returns an Avro formatted program record.
*/
private[rdd] def convertSAMProgramRecord(
record: SAMProgramRecord): ProcessingStep = {
val builder = ProcessingStep.newBuilder
.setId(record.getId)
Option(record.getPreviousProgramGroupId).foreach(builder.setPreviousId(_))
Option(record.getProgramVersion).foreach(builder.setVersion(_))
Option(record.getProgramName).foreach(builder.setProgramName(_))
Option(record.getCommandLine).foreach(builder.setCommandLine(_))
builder.build
}

/**
* @param pathName The path name to load VCF format metadata from.
* Globs/directories are supported.
Expand Down Expand Up @@ -298,6 +326,18 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
.distinct
}

/**
* @param pathName The path name to load Avro processing steps from.
* Globs/directories are supported.
* @return Returns a seq of processing steps.
*/
private[rdd] def loadAvroPrograms(pathName: String): Seq[ProcessingStep] = {
getFsAndFilesWithFilter(pathName, new FileFilter("_processing.avro"))
.map(p => {
loadAvro[ProcessingStep](p.toString, ProcessingStep.SCHEMA$)
}).reduce(_ ++ _)
}

/**
* @param pathName The path name to load Avro sequence dictionaries from.
* Globs/directories are supported.
Expand Down Expand Up @@ -596,7 +636,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
require(filteredFiles.nonEmpty,
"Did not find any files at %s.".format(path))

val (seqDict, readGroups) =
val (seqDict, readGroups, programs) =
filteredFiles
.flatMap(fp => {
try {
Expand All @@ -608,7 +648,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
log.info("Loaded header from " + fp)
val sd = loadBamDictionary(samHeader)
val rg = loadBamReadGroups(samHeader)
Some((sd, rg))
val pgs = loadBamPrograms(samHeader)
Some((sd, rg, pgs))
} catch {
case e: Throwable => {
if (validationStringency == ValidationStringency.STRICT) {
Expand All @@ -622,7 +663,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
}
}
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2)
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3)
})

val job = HadoopUtil.newJob(sc)
Expand All @@ -648,7 +689,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups)
readGroups,
programs)
}

/**
Expand Down Expand Up @@ -689,7 +731,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

require(bamFiles.nonEmpty,
"Did not find any files at %s.".format(path))
val (seqDict, readGroups) = bamFiles
val (seqDict, readGroups, programs) = bamFiles
.map(fp => {
// We need to separately read the header, so that we can inject the sequence dictionary
// data into each individual Read (see the argument to samRecordConverter.convert,
Expand All @@ -699,10 +741,11 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
log.info("Loaded header from " + fp)
val sd = loadBamDictionary(samHeader)
val rg = loadBamReadGroups(samHeader)
val pgs = loadBamPrograms(samHeader)

(sd, rg)
(sd, rg, pgs)
}).reduce((kv1, kv2) => {
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2)
(kv1._1 ++ kv2._1, kv1._2 ++ kv2._2, kv1._3 ++ kv2._3)
})

val job = HadoopUtil.newJob(sc)
Expand All @@ -717,7 +760,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
val samRecordConverter = new SAMRecordConverter
AlignmentRecordRDD(records.map(p => samRecordConverter.convert(p._2.get)),
seqDict,
readGroups)
readGroups,
programs)
}

/**
Expand Down Expand Up @@ -829,7 +873,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

AlignmentRecordRDD(rdd, sd, rgd)
// load processing step descriptions
val pgs = loadAvroPrograms(pathName)

AlignmentRecordRDD(rdd, sd, rgd, pgs)
}

/**
Expand Down Expand Up @@ -1482,10 +1529,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
// convert avro to sequence dictionary
val rgd = loadAvroRecordGroupDictionary(pathName)

// load processing step descriptions
val pgs = loadAvroPrograms(pathName)

// load fragment data from parquet
val rdd = loadParquet[Fragment](pathName, optPredicate, optProjection)

FragmentRDD(rdd, sd, rgd)
FragmentRDD(rdd, sd, rgd, pgs)
}

/**
Expand Down
12 changes: 12 additions & 0 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.bdgenomics.adam.models.{
}
import org.bdgenomics.formats.avro.{
Contig,
ProcessingStep,
RecordGroup => RecordGroupMetadata,
Sample
}
Expand Down Expand Up @@ -984,6 +985,11 @@ trait MultisampleGenomicRDD[T, U <: MultisampleGenomicRDD[T, U]] extends Genomic
*/
abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroReadGroupGenomicRDD[T, U]] extends AvroGenomicRDD[T, U] {

/**
* The processing steps that have been applied to this GenomicRDD.
*/
val processingSteps: Seq[ProcessingStep]

/**
* A dictionary describing the read groups attached to this GenomicRDD.
*/
Expand All @@ -998,6 +1004,12 @@ abstract class AvroReadGroupGenomicRDD[T <% IndexedRecord: Manifest, U <: AvroRe
Contig.SCHEMA$,
contigs)

// save processing metadata
saveAvro("%s/_processing.avro".format(filePath),
rdd.context,
ProcessingStep.SCHEMA$,
processingSteps)

// convert record group to avro and save
val rgMetadata = recordGroups.recordGroups
.map(_.toMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ object FragmentRDD {
* @param sequences The genomic sequences this data was aligned to, if any.
* @param recordGroups The record groups these Fragments came from.
*/
case class FragmentRDD(rdd: RDD[Fragment],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] {
case class FragmentRDD(
rdd: RDD[Fragment],
sequences: SequenceDictionary,
recordGroups: RecordGroupDictionary,
@transient processingSteps: Seq[ProcessingStep] = Seq.empty) extends AvroReadGroupGenomicRDD[Fragment, FragmentRDD] {

protected def buildTree(rdd: RDD[(ReferenceRegion, Fragment)])(
implicit tTag: ClassTag[Fragment]): IntervalArray[ReferenceRegion, Fragment] = {
Expand Down Expand Up @@ -144,7 +146,7 @@ case class FragmentRDD(rdd: RDD[Fragment],
val newRdd = rdd.flatMap(converter.convertFragment)

// are we aligned?
AlignmentRecordRDD(newRdd, sequences, recordGroups)
AlignmentRecordRDD(newRdd, sequences, recordGroups, processingSteps)
}

/**
Expand Down
Loading

0 comments on commit d3427c7

Please sign in to comment.