Skip to content

Commit

Permalink
[ADAM-916] New strategy for writing header.
Browse files Browse the repository at this point in the history
Resolves #916. Makes several modifications that should eliminate the header
attach issue when writing back to SAM/BAM:

* Writes the SAM/BAM header as a single file.
* Instead of trying to attach the SAM/BAM header to the output format via a
  singleton object, we pass the path to the SAM/BAM header file via the Hadoop
  configuration.
* The output format reads the header from HDFS when creating the record writer.
* At the end, once we've written the full RDD and the header file, we merge all
  via Hadoop's FsUtil.
  • Loading branch information
fnothaft committed Jan 14, 2016
1 parent 4415b04 commit 386f759
Show file tree
Hide file tree
Showing 4 changed files with 303 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringBAMOutputFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.apache.hadoop.mapreduce.OutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringBAMOutputFormat,
KeyIgnoringBAMRecordWriter,
SAMRecordWritable
}

object ADAMBAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,11 +82,26 @@ class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMBAMOutputFormatHeaderLess[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {

setSAMHeader(ADAMBAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMBAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteBAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormatHeaderLess[K]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringAnySAMOutputFormat, SAMFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.apache.hadoop.mapreduce.OutputFormat
import org.seqdoop.hadoop_bam.{
KeyIgnoringAnySAMOutputFormat,
KeyIgnoringSAMRecordWriter,
SAMFormat,
SAMRecordWritable
}

object ADAMSAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,9 +83,23 @@ class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMSAMOutputFormatHeaderLess[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {

setSAMHeader(ADAMSAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
Expand Down
Loading

0 comments on commit 386f759

Please sign in to comment.