Skip to content

Commit

Permalink
[ADAM-916] New strategy for writing header.
Browse files Browse the repository at this point in the history
Resolves #916. Makes several modifications that should eliminate the header
attach issue when writing back to SAM/BAM:

* Writes the SAM/BAM header as a single file.
* Instead of trying to attach the SAM/BAM header to the output format via a
  singleton object, we pass the path to the SAM/BAM header file via the Hadoop
  configuration.
* The output format reads the header from HDFS when creating the record writer.
* At the end, once we've written the full RDD and the header file, we merge all
  via Hadoop's FsUtil.
  • Loading branch information
fnothaft committed Jan 12, 2016
1 parent 6cdc017 commit 4d1c406
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringBAMOutputFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.apache.hadoop.mapreduce.OutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.seqdoop.hadoop_bam.{
KeyIgnoringBAMOutputFormat,
KeyIgnoringBAMRecordWriter,
SAMRecordWritable
}

object ADAMBAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,11 +82,26 @@ class InstrumentedADAMBAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMBAMOutputFormatHeaderLess[K]
extends KeyIgnoringBAMOutputFormat[K] with Serializable {

setSAMHeader(ADAMBAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringBAMRecordWriter[K](getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMBAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
override def timerName(): String = Timers.WriteBAMRecord.timerName
override def outputFormatClass(): Class[_ <: OutputFormat[K, SAMRecordWritable]] = classOf[ADAMBAMOutputFormatHeaderLess[K]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
*/
package org.bdgenomics.adam.rdd.read

import org.seqdoop.hadoop_bam.{ SAMRecordWritable, KeyIgnoringAnySAMOutputFormat, SAMFormat }
import htsjdk.samtools.SAMFileHeader
import hbparquet.hadoop.util.ContextUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{ OutputFormat, RecordWriter, TaskAttemptContext }
import org.apache.spark.rdd.InstrumentedOutputFormat
import org.bdgenomics.adam.instrumentation.Timers
import org.apache.hadoop.mapreduce.OutputFormat
import org.seqdoop.hadoop_bam.{
KeyIgnoringAnySAMOutputFormat,
KeyIgnoringSAMRecordWriter,
SAMFormat,
SAMRecordWritable
}

object ADAMSAMOutputFormat extends Serializable {

Expand Down Expand Up @@ -76,9 +83,23 @@ class InstrumentedADAMSAMOutputFormat[K] extends InstrumentedOutputFormat[K, org
class ADAMSAMOutputFormatHeaderLess[K]
extends KeyIgnoringAnySAMOutputFormat[K](SAMFormat.valueOf("SAM")) with Serializable {

setSAMHeader(ADAMSAMOutputFormat.getHeader)
setWriteHeader(false)

override def getRecordWriter(context: TaskAttemptContext): RecordWriter[K, SAMRecordWritable] = {
val conf = ContextUtil.getConfiguration(context)

// where is our header file?
val path = new Path(conf.get("org.bdgenomics.adam.rdd.read.bam_header_path"))

// read the header file
readSAMHeaderFrom(path, conf)

// now that we have the header set, we need to make a record reader
return new KeyIgnoringSAMRecordWriter(getDefaultWorkFile(context, ""),
header,
false,
context)
}
}

class InstrumentedADAMSAMOutputFormatHeaderLess[K] extends InstrumentedOutputFormat[K, org.seqdoop.hadoop_bam.SAMRecordWritable] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
*/
package org.bdgenomics.adam.rdd.read

import java.io.StringWriter
import htsjdk.samtools.{ SAMFileHeader, SAMTextHeaderCodec, SAMTextWriter, ValidationStringency }
import java.io.{ OutputStream, StringWriter, Writer }
import htsjdk.samtools._
import htsjdk.samtools.util.{ BinaryCodec, BlockCompressedOutputStream }
import org.apache.hadoop.fs.{ Path, FileUtil, FileSystem }
import org.apache.hadoop.io.LongWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.{ PartitionPruningRDD, RDD }
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus.{ ConsensusGenerator, ConsensusGeneratorFromReads }
import org.bdgenomics.adam.converters.AlignmentRecordConverter
Expand Down Expand Up @@ -130,59 +131,59 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
filePath: String,
asSam: Boolean = true,
asSingleFile: Boolean = false,
isSorted: Boolean = false) = SAMSave.time {
isSorted: Boolean = false): Unit = SAMSave.time {

// convert the records
val (convertRecords: RDD[SAMRecordWritable], header: SAMFileHeader) = rdd.adamConvertToSAM(isSorted)

// add keys to our records
val withKey = convertRecords.keyBy(v => new LongWritable(v.get.getAlignmentStart))

val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx")
val header = bcastHeader.value
synchronized {
// perform map partition call to ensure that the SAM/BAM header is set on all
// nodes in the cluster; see:
// https://github.com/bigdatagenomics/adam/issues/353,
// https://github.com/bigdatagenomics/adam/issues/676

asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info(s"Set SAM header for partition $idx")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info(s"Set BAM header for partition $idx")
}
}
Iterator[Int]()
}).count()

// force value check, ensure that computation happens
if (mp != 0) {
log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.")
}

// attach header to output format
asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info("Set SAM header on driver")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info("Set BAM header on driver")
}

// write file to disk
val conf = rdd.context.hadoopConfiguration

if (!asSingleFile) {
val bcastHeader = rdd.context.broadcast(header)
val mp = rdd.mapPartitionsWithIndex((idx, iter) => {
log.info(s"Setting ${if (asSam) "SAM" else "BAM"} header for partition $idx")
val header = bcastHeader.value
synchronized {
// perform map partition call to ensure that the SAM/BAM header is set on all
// nodes in the cluster; see:
// https://github.com/bigdatagenomics/adam/issues/353,
// https://github.com/bigdatagenomics/adam/issues/676

asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info(s"Set SAM header for partition $idx")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info(s"Set BAM header for partition $idx")
}
}
Iterator[Int]()
}).count()

// force value check, ensure that computation happens
if (mp != 0) {
log.error("Had more than 0 elements after map partitions call to set VCF header across cluster.")
}

// attach header to output format
asSam match {
case true =>
ADAMSAMOutputFormat.clearHeader()
ADAMSAMOutputFormat.addHeader(header)
log.info("Set SAM header on driver")
case false =>
ADAMBAMOutputFormat.clearHeader()
ADAMBAMOutputFormat.addHeader(header)
log.info("Set BAM header on driver")
}

asSam match {
case true =>
withKey.saveAsNewAPIHadoopFile(
Expand All @@ -204,50 +205,79 @@ class AlignmentRecordRDDFunctions(rdd: RDD[AlignmentRecord])
}
} else {
log.info(s"Writing single ${if (asSam) "SAM" else "BAM"} file (not Hadoop-style directory)")
val (outputFormat, headerLessOutputFormat) = asSam match {
case true =>
(
classOf[InstrumentedADAMSAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
)

case false =>
(
classOf[InstrumentedADAMBAMOutputFormat[LongWritable]],
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
)
}
val fs = FileSystem.get(conf)

val headPath = filePath + "_head"
val tailPath = filePath + "_tail"

PartitionPruningRDD.create(withKey, i => { i == 0 }).saveAsNewAPIHadoopFile(
headPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
outputFormat,
conf
)
PartitionPruningRDD.create(withKey, i => { i != 0 }).saveAsNewAPIHadoopFile(
// get an output stream
val os = fs.create(new Path(headPath))
.asInstanceOf[OutputStream]

// TIL: sam and bam are written in completely different ways!
if (asSam) {
val sw: Writer = new StringWriter()
val stw = new SAMTextWriter(os)
stw.setHeader(header)
stw.close()
} else {
// create htsjdk specific streams for writing the bam header
val compressedOut: OutputStream = new BlockCompressedOutputStream(os, null)
val binaryCodec = new BinaryCodec(compressedOut);

// write a bam header - cribbed from Hadoop-BAM
binaryCodec.writeBytes("BAM\001".getBytes())
val sw: Writer = new StringWriter()
new SAMTextHeaderCodec().encode(sw, header)
binaryCodec.writeString(sw.toString, true, false)

// write sequence dictionary
val ssd = header.getSequenceDictionary()
binaryCodec.writeInt(ssd.size())
ssd.getSequences
.toList
.foreach(r => {
binaryCodec.writeString(r.getSequenceName(), true, true)
binaryCodec.writeInt(r.getSequenceLength())
})

// flush and close all the streams
compressedOut.flush()
compressedOut.close()
}

// more flushing and closing
os.flush()
os.close()

// set path to header file
conf.set("org.bdgenomics.adam.rdd.read.bam_header_path", headPath.toString)

// set up output format
val headerLessOutputFormat = if (asSam) {
classOf[InstrumentedADAMSAMOutputFormatHeaderLess[LongWritable]]
} else {
classOf[InstrumentedADAMBAMOutputFormatHeaderLess[LongWritable]]
}

// save rdd
withKey.saveAsNewAPIHadoopFile(
tailPath,
classOf[LongWritable],
classOf[SAMRecordWritable],
headerLessOutputFormat,
conf
)

val fs = FileSystem.get(conf)
// copy and merge files
FileUtil.copy(fs, headPath, fs, tailPath + "/head", false, false, conf)
FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null)

fs.listStatus(headPath)
.find(_.getPath.getName.startsWith("part-r"))
.map(fileStatus => FileUtil.copy(fs, fileStatus.getPath, fs, tailPath + "/head", false, false, conf))
// delete temp files
fs.delete(headPath, true)

FileUtil.copyMerge(fs, tailPath, fs, filePath, true, conf, null)
fs.delete(tailPath, true)

}

}

implicit def str2Path(str: String): Path = new Path(str)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
checkFiles(resourcePath("ordered.sam"), actualSortedPath)
}

sparkTest("write single sam file back") {
def testBQSR(asSam: Boolean, filename: String) {
val inputPath = resourcePath("bqsr1.sam")
val tempFile = Files.createTempDirectory("bqsr1")
val rdd = sc.loadAlignments(inputPath).cache()
rdd.adamSAMSave(tempFile.toAbsolutePath.toString + "/bqsr1.sam",
asSam = true,
rdd.adamSAMSave("%s/%s".format(tempFile.toAbsolutePath.toString, filename),
asSam = asSam,
asSingleFile = true)
val rdd2 = sc.loadAlignments(tempFile.toAbsolutePath.toString + "/bqsr1.sam")
val rdd2 = sc.loadAlignments("%s/%s".format(tempFile.toAbsolutePath.toString, filename))
.cache()

val (fsp1, fsf1) = rdd.adamFlagStat()
Expand Down Expand Up @@ -340,4 +340,12 @@ class AlignmentRecordRDDFunctionsSuite extends ADAMFunSuite {
}
})
}

sparkTest("write single sam file back") {
testBQSR(true, "bqsr1.sam")
}

sparkTest("write single bam file back") {
testBQSR(true, "bqsr1.bam")
}
}

0 comments on commit 4d1c406

Please sign in to comment.