Skip to content

Commit

Permalink
Use empty sequence dictionary when loading features, fixes bigdatagen…
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh committed Jul 6, 2017
1 parent 6f76e9f commit 52258fd
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -52,25 +51,16 @@ class TransformFeaturesArgs extends Args4jBase with ParquetSaveArgs {
@Args4jOption(required = false, name = "-disable_fast_concat",
usage = "Disables the parallel file concatenation engine.")
var disableFastConcat: Boolean = false

@Args4jOption(required = false, name = "-cache", usage = "Cache before building the sequence dictionary. Recommended for formats other than IntervalList and Parquet.")
var cache: Boolean = false

@Args4jOption(required = false, name = "-storage_level", usage = "Set the storage level to use for caching. Defaults to MEMORY_ONLY.")
var storageLevel: String = "MEMORY_ONLY"
}

class TransformFeatures(val args: TransformFeaturesArgs)
extends BDGSparkCommand[TransformFeaturesArgs] {

val companion = TransformFeatures
val storageLevel = StorageLevel.fromString(args.storageLevel)
val optStorageLevel = if (args.cache) Some(storageLevel) else None

def run(sc: SparkContext) {
sc.loadFeatures(
args.featuresFile,
optStorageLevel = optStorageLevel,
optMinPartitions = Option(args.numPartitions),
optProjection = None
).save(args.outputPath, args.single, args.disableFastConcat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,4 @@ object Timers extends Metrics {
val FullOuterShuffleJoin = timer("Full outer shuffle region join")
val ShuffleJoinAndGroupByLeft = timer("Shuffle join followed by group-by on left")
val RightOuterShuffleJoinAndGroupByLeft = timer("Right outer shuffle join followed by group-by on left")

// org.bdgenomics.adam.rdd.feature.FeatureRDD
val BuildSequenceDictionary = timer("Build SequenceDictionary for Features")
}
32 changes: 4 additions & 28 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
Expand Down Expand Up @@ -1272,8 +1271,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to use. For
* textual formats, if this is None, fall back to the Spark default
* parallelism. Defaults to None.
Expand All @@ -1288,15 +1285,13 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadCoverage(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): CoverageRDD = LoadCoverage.time {

loadFeatures(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
optPredicate = optPredicate,
optProjection = optProjection,
Expand Down Expand Up @@ -1327,8 +1322,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in GFF3 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating GFF3 format.
Expand All @@ -1338,7 +1331,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadGff3(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGff3.time {

Expand All @@ -1347,7 +1339,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1356,8 +1348,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in GTF/GFF2 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating GTF/GFF2 format.
Expand All @@ -1367,7 +1357,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadGtf(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadGtf.time {

Expand All @@ -1376,7 +1365,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1385,8 +1374,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in BED6/12 format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating BED6/12 format.
Expand All @@ -1396,7 +1383,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadBed(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadBed.time {

Expand All @@ -1405,7 +1391,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand All @@ -1414,8 +1400,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* @param pathName The path name to load features in NarrowPeak format from.
* Globs/directories are supported.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to load. If
* not set, falls back to the configured Spark default parallelism. Defaults to None.
* @param stringency The validation stringency to use when validating NarrowPeak format.
Expand All @@ -1425,7 +1409,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadNarrowPeak(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
stringency: ValidationStringency = ValidationStringency.STRICT): FeatureRDD = LoadNarrowPeak.time {

Expand All @@ -1434,7 +1417,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
if (Metrics.isRecording) records.instrument() else records

optSequenceDictionary
.fold(FeatureRDD(records, optStorageLevel = optStorageLevel))(FeatureRDD(records, _))
.fold(FeatureRDD(records))(FeatureRDD(records, _))
}

/**
Expand Down Expand Up @@ -1561,8 +1544,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported, although file extension must be present
* for BED6/12, GFF3, GTF/GFF2, NarrowPeak, or IntervalList formats.
* @param optSequenceDictionary Optional sequence dictionary. Defaults to None.
* @param optStorageLevel Optional storage level to use for cache before building
* the sequence dictionary, if one is not provided. Defaults to StorageLevel.MEMORY_ONLY.
* @param optMinPartitions An optional minimum number of partitions to use. For
* textual formats, if this is None, fall back to the Spark default
* parallelism. Defaults to None.
Expand All @@ -1577,7 +1558,6 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
def loadFeatures(
pathName: String,
optSequenceDictionary: Option[SequenceDictionary] = None,
optStorageLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
optMinPartitions: Option[Int] = None,
optPredicate: Option[FilterPredicate] = None,
optProjection: Option[Schema] = None,
Expand All @@ -1588,28 +1568,24 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
log.info(s"Loading $pathName as BED and converting to Features.")
loadBed(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isGff3Ext(trimmedPathName)) {
log.info(s"Loading $pathName as GFF3 and converting to Features.")
loadGff3(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isGtfExt(trimmedPathName)) {
log.info(s"Loading $pathName as GTF/GFF2 and converting to Features.")
loadGtf(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isNarrowPeakExt(trimmedPathName)) {
log.info(s"Loading $pathName as NarrowPeak and converting to Features.")
loadNarrowPeak(pathName,
optSequenceDictionary = optSequenceDictionary,
optStorageLevel = optStorageLevel,
optMinPartitions = optMinPartitions,
stringency = stringency)
} else if (isIntervalListExt(trimmedPathName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.google.common.collect.ComparisonChain
import java.util.Comparator
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.{
Expand Down Expand Up @@ -109,43 +108,26 @@ private object FeatureOrdering extends FeatureOrdering[Feature] {}
object FeatureRDD {

/**
* Builds a FeatureRDD without SequenceDictionary information by running an
* aggregate to rebuild the SequenceDictionary.
* Builds a FeatureRDD with an empty sequence dictionary and without a partition map.
*
* @param rdd The underlying Feature RDD to build from.
* @param optStorageLevel Optional storage level to use for cache before
* building the SequenceDictionary.
* @return Returns a new FeatureRDD.
*/
def apply(
rdd: RDD[Feature],
optStorageLevel: Option[StorageLevel]): FeatureRDD = BuildSequenceDictionary.time {

// optionally cache the rdd, since we're making multiple passes
optStorageLevel.foreach(rdd.persist(_))

// create sequence records with length max(start, end) + 1L
val sequenceRecords = rdd
.keyBy(_.getContigName)
.map(kv => (kv._1, max(kv._2.getStart, kv._2.getEnd) + 1L))
.reduceByKey(max(_, _))
.map(kv => SequenceRecord(kv._1, kv._2))

val sd = new SequenceDictionary(sequenceRecords.collect.toVector)

FeatureRDD(rdd, sd)
def apply(rdd: RDD[Feature]): FeatureRDD = {
FeatureRDD(rdd, SequenceDictionary.empty, optPartitionMap = None)
}

/**
* Builds a FeatureRDD without a partitionMap.
* Builds a FeatureRDD given a sequence dictionary and without a partition map.
*
* @param rdd The underlying Feature RDD.
* @param sd The Sequence Dictionary for the Feature RDD.
* @param rdd The underlying Feature RDD to build from.
* @param sd The sequence dictionary for this FeatureRDD.
* @return Returns a new FeatureRDD.
*/
def apply(rdd: RDD[Feature], sd: SequenceDictionary): FeatureRDD = {
FeatureRDD(rdd, sd, None)
}

/**
* @param feature Feature to convert to GTF format.
* @return Returns this feature as a GTF line.
Expand Down Expand Up @@ -253,7 +235,8 @@ object FeatureRDD {
* A GenomicRDD that wraps Feature data.
*
* @param rdd An RDD of genomic Features.
* @param sequences The reference genome this data is aligned to.
* @param sequences The reference genome these data are aligned to.
* @param optPartitionMap Optional partition map.
*/
case class FeatureRDD(rdd: RDD[Feature],
sequences: SequenceDictionary,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.models._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.util.PhredUtils._
Expand Down Expand Up @@ -143,12 +142,6 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(features.count === 4)
}

sparkTest("Can read a .bed file without cache") {
val path = testFile("gencode.v7.annotation.trunc10.bed")
val features: RDD[Feature] = sc.loadFeatures(path, optStorageLevel = Some(StorageLevel.NONE)).rdd
assert(features.count === 10)
}

sparkTest("Can read a .narrowPeak file") {
val path = testFile("wgEncodeOpenChromDnaseGm19238Pk.trunc10.narrowPeak")
val annot: RDD[Feature] = sc.loadFeatures(path).rdd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".bed")
Expand All @@ -73,7 +73,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(15).setEnd(20).setScore(2.0).build()
val f3 = Feature.newBuilder().setContigName("chr2").setStart(15).setEnd(20).setScore(2.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val outputFile = tmpLocation(".adam")
Expand All @@ -90,7 +90,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage
val coverage = coverageRDD.coverage(bpPerBin = 4)

Expand All @@ -102,7 +102,7 @@ class CoverageRDDSuite extends ADAMFunSuite {
val f2 = Feature.newBuilder().setContigName("chr1").setStart(5).setEnd(7).setScore(3.0).build()
val f3 = Feature.newBuilder().setContigName("chr1").setStart(7).setEnd(20).setScore(4.0).build()

val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)), optStorageLevel = None)
val featureRDD: FeatureRDD = FeatureRDD(sc.parallelize(Seq(f1, f2, f3)))
val coverageRDD: CoverageRDD = featureRDD.toCoverage

val coverage = coverageRDD
Expand Down
Loading

0 comments on commit 52258fd

Please sign in to comment.