Skip to content

Commit

Permalink
Merge ee23b1b into 4d8dcfc
Browse files Browse the repository at this point in the history
  • Loading branch information
heuermh authored Mar 12, 2019
2 parents 4d8dcfc + ee23b1b commit af687be
Show file tree
Hide file tree
Showing 56 changed files with 239 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

class ADAM2FastaArgs extends Args4jBase {
Expand Down Expand Up @@ -52,11 +53,12 @@ class ADAM2Fasta(val args: ADAM2FastaArgs) extends BDGSparkCommand[ADAM2FastaArg
override val companion = ADAM2Fasta

override def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

log.info("Loading ADAM nucleotide contig fragments from disk.")
info("Loading ADAM nucleotide contig fragments from disk.")
val contigFragments = sc.loadContigFragments(args.inputPath)

log.info("Merging fragments and writing FASTA to disk.")
info("Merging fragments and writing FASTA to disk.")
val contigs = contigFragments.mergeFragments()

val cc = if (args.coalesce > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.bdgenomics.adam.cli
import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -60,6 +61,7 @@ class ADAM2Fastq(val args: ADAM2FastqArgs) extends BDGSparkCommand[ADAM2FastqArg
override val companion = ADAM2Fastq

override def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

val projectionOpt =
if (!args.disableProjection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.logging.Level._
import javax.inject.Inject
import com.google.inject.AbstractModule
import net.codingwell.scalaguice.ScalaModule
import org.bdgenomics.utils.misc.Logging
import grizzled.slf4j.Logging
import org.bdgenomics.adam.util.ParquetLogger
import org.bdgenomics.utils.cli._

Expand Down Expand Up @@ -106,7 +106,7 @@ class ADAMMain @Inject() (commandGroups: List[CommandGroup]) extends Logging {
}

def apply(args: Array[String]) {
log.info("ADAM invoked with args: %s".format(argsToString(args)))
info("ADAM invoked with args: %s".format(argsToString(args)))
if (args.length < 1) {
printCommands()
} else if (args.contains("--version") || args.contains("-version")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import org.apache.spark.SparkContext
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object CountContigKmers extends BDGCommandCompanion {
Expand All @@ -47,6 +48,7 @@ class CountContigKmers(protected val args: CountContigKmersArgs) extends BDGSpar
val companion = CountContigKmers

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// read from disk
val fragments = sc.loadContigFragments(args.inputPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.{ AlignmentRecordField, Projection }
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object CountReadKmers extends BDGCommandCompanion {
Expand Down Expand Up @@ -50,6 +51,7 @@ class CountReadKmers(protected val args: CountReadKmersArgs) extends BDGSparkCom
val companion = CountReadKmers

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// read from disk
var adamRecords = sc.loadAlignments(
Expand Down
11 changes: 7 additions & 4 deletions adam-cli/src/main/scala/org/bdgenomics/adam/cli/Fasta2ADAM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object Fasta2ADAM extends BDGCommandCompanion {
Expand Down Expand Up @@ -51,14 +52,16 @@ class Fasta2ADAM(protected val args: Fasta2ADAMArgs) extends BDGSparkCommand[Fas
val companion = Fasta2ADAM

def run(sc: SparkContext) {
log.info("Loading FASTA data from disk.")
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

info("Loading FASTA data from disk.")
val adamFasta = sc.loadFasta(args.fastaFile, maximumLength = args.maximumLength)

if (args.verbose) {
log.info("FASTA contains: %s", adamFasta.sequences.toString)
info("FASTA contains: %s".format(adamFasta.sequences.toString))
}

log.info("Writing records to disk.")
info("Writing records to disk.")
val finalFasta = if (args.partitions > 0) {
adamFasta.transform(_.repartition(args.partitions))
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to Big Data Genomics (BDG) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The BDG licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.bdgenomics.adam.cli

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileAlreadyExistsException

/**
* Utility methods for file systems.
*/
private[cli] object FileSystemUtils {
private def exists(pathName: String, conf: Configuration): Boolean = {
val p = new Path(pathName)
val fs = p.getFileSystem(conf)
fs.exists(p)
}

// move to BDGSparkCommand in bdg-utils?
def checkWriteablePath(pathName: String, conf: Configuration): Unit = {
if (exists(pathName, conf)) {
throw new FileAlreadyExistsException("Cannot write to path name, %s already exists".format(pathName))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util
import org.apache.avro.generic.{ GenericDatumWriter, IndexedRecord }
import org.apache.avro.io.EncoderFactory
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.util.ParquetFileTraversable
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }
Expand Down Expand Up @@ -99,6 +100,7 @@ class PrintADAM(protected val args: PrintADAMArgs) extends BDGSparkCommand[Print

def run(sc: SparkContext) {
val output = Option(args.outputFile)
output.foreach(checkWriteablePath(_, sc.hadoopConfiguration))
args.filesToPrint.foreach(file => {
displayRaw(sc, file, pretty = args.prettyRaw, output = output)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.projections.AlignmentRecordField._
import org.bdgenomics.adam.projections.Projection
import org.bdgenomics.adam.rdd.ADAMContext._
Expand Down Expand Up @@ -66,6 +67,8 @@ class Reads2Coverage(protected val args: Reads2CoverageArgs) extends BDGSparkCom
val companion: BDGCommandCompanion = Reads2Coverage

def run(sc: SparkContext): Unit = {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

if (args.sortLexicographically) {
require(args.collapse,
"-sort_lexicographically can only be provided when collapsing (-collapse).")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import htsjdk.samtools.ValidationStringency
import java.time.Instant
import org.apache.parquet.filter2.dsl.Dsl._
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.algorithms.consensus._
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io.FastqRecordReader
import org.bdgenomics.adam.models.{ ReferenceRegion, SnpTable }
Expand All @@ -33,7 +35,6 @@ import org.bdgenomics.adam.rdd.read.{ AlignmentRecordDataset, QualityScoreBin }
import org.bdgenomics.adam.rich.RichVariant
import org.bdgenomics.formats.avro.ProcessingStep
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object TransformAlignments extends BDGCommandCompanion {
Expand Down Expand Up @@ -171,7 +172,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
*/
private def maybeRepartition(rdd: AlignmentRecordDataset): AlignmentRecordDataset = {
if (args.repartition != -1) {
log.info("Repartitioning reads to to '%d' partitions".format(args.repartition))
info("Repartitioning reads to to '%d' partitions".format(args.repartition))
rdd.transform(_.repartition(args.repartition))
} else {
rdd
Expand All @@ -186,7 +187,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
*/
private def maybeDedupe(rdd: AlignmentRecordDataset): AlignmentRecordDataset = {
if (args.markDuplicates) {
log.info("Marking duplicates")
info("Marking duplicates")
rdd.markDuplicates()
} else {
rdd
Expand All @@ -209,7 +210,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
sl: StorageLevel): AlignmentRecordDataset = {
if (args.locallyRealign) {

log.info("Locally realigning indels.")
info("Locally realigning indels.")

// has the user asked us to cache the rdd before multi-pass stages?
if (args.cache) {
Expand Down Expand Up @@ -268,7 +269,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
sl: StorageLevel): AlignmentRecordDataset = {
if (args.recalibrateBaseQualities) {

log.info("Recalibrating base qualities")
info("Recalibrating base qualities")

// bqsr is a two pass algorithm, so cache the rdd if requested
val optSl = if (args.cache) {
Expand Down Expand Up @@ -307,7 +308,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
*/
private def maybeCoalesce(rdd: AlignmentRecordDataset): AlignmentRecordDataset = {
if (args.coalesce != -1) {
log.info("Coalescing the number of partitions to '%d'".format(args.coalesce))
info("Coalescing the number of partitions to '%d'".format(args.coalesce))
if (args.coalesce > rdd.rdd.partitions.length || args.forceShuffle) {
rdd.transform(_.coalesce(args.coalesce, shuffle = true))
} else {
Expand Down Expand Up @@ -337,7 +338,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
rdd.rdd.persist(sl)
}

log.info("Sorting reads")
info("Sorting reads")

// are we sorting lexicographically or using legacy SAM sort order?
val sortedRdd = if (args.sortLexicographically) {
Expand Down Expand Up @@ -369,7 +370,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
rdd: AlignmentRecordDataset,
stringencyOpt: Option[ValidationStringency]): AlignmentRecordDataset = {
if (args.mdTagsReferenceFile != null) {
log.info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}")
info(s"Adding MDTags to reads based on reference file ${args.mdTagsReferenceFile}")
val referenceFile = sc.loadReferenceFile(args.mdTagsReferenceFile,
maximumLength = args.mdTagsFragmentSize)
rdd.computeMismatchingPositions(
Expand Down Expand Up @@ -432,6 +433,8 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

// throw exception if aligned read predicate or limit projection flags are used improperly
if (args.useAlignedReadPredicate && forceNonParquet()) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -581,7 +584,7 @@ class TransformAlignments(protected val args: TransformAlignmentsArgs) extends B

if (args.partitionByStartPos) {
if (outputRdd.sequences.isEmpty) {
log.warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset")
warn("This dataset is not aligned and therefore will not benefit from being saved as a partitioned dataset")
}
outputRdd.saveAsPartitionedParquet(args.outputPath, partitionSize = args.partitionedBinSize)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.bdgenomics.adam.cli

import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.utils.cli._
import org.kohsuke.args4j.{ Argument, Option Args4jOption }
Expand Down Expand Up @@ -59,6 +60,8 @@ class TransformFeatures(val args: TransformFeaturesArgs)
val companion = TransformFeatures

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

sc.loadFeatures(
args.featuresFile,
optMinPartitions = Option(args.numPartitions),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/
package org.bdgenomics.adam.cli

import grizzled.slf4j.Logging
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.io.FastqRecordReader
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMSaveAnyArgs
import org.bdgenomics.adam.rdd.read.QualityScoreBin
import org.bdgenomics.adam.rdd.fragment.FragmentDataset
import org.bdgenomics.utils.cli._
import org.bdgenomics.utils.misc.Logging
import org.kohsuke.args4j.{ Argument, Option => Args4jOption }

object TransformFragments extends BDGCommandCompanion {
Expand Down Expand Up @@ -95,8 +96,10 @@ class TransformFragments(protected val args: TransformFragmentsArgs) extends BDG
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

if (args.loadAsReads && args.saveAsReads) {
log.warn("If loading and saving as reads, consider using TransformAlignments instead.")
warn("If loading and saving as reads, consider using TransformAlignments instead.")
}
if (args.sortReads) {
require(args.saveAsReads,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.converters.VariantContextConverter
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
Expand Down Expand Up @@ -124,6 +125,8 @@ class TransformGenotypes(val args: TransformGenotypesArgs)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.bdgenomics.adam.cli

import htsjdk.samtools.ValidationStringency
import org.apache.spark.SparkContext
import org.bdgenomics.adam.cli.FileSystemUtils._
import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.{ ADAMSaveAnyArgs, GenomicDataset }
import org.bdgenomics.utils.cli._
Expand Down Expand Up @@ -114,6 +115,8 @@ class TransformVariants(val args: TransformVariantsArgs)
}

def run(sc: SparkContext) {
checkWriteablePath(args.outputPath, sc.hadoopConfiguration)

require(!(args.sort && args.sortLexicographically),
"Cannot set both -sort_on_save and -sort_lexicographically_on_save.")

Expand Down
Loading

0 comments on commit af687be

Please sign in to comment.