-
Notifications
You must be signed in to change notification settings - Fork 309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding ability to convert reference FASTA files for nucleotide sequences #79
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Copyright (c) 2014. Regents of the University of California | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package edu.berkeley.cs.amplab.adam.cli | ||
|
||
import edu.berkeley.cs.amplab.adam.avro.{ADAMNucleotideContig, ADAMRecord} | ||
import edu.berkeley.cs.amplab.adam.converters.FastaConverter | ||
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ | ||
import org.apache.hadoop.io.{LongWritable, Text} | ||
import org.apache.hadoop.mapreduce.Job | ||
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat | ||
import org.apache.spark.{SparkContext, Logging} | ||
import org.apache.spark.rdd.RDD | ||
import org.kohsuke.args4j.{Option => Args4jOption, Argument} | ||
|
||
object Fasta2Adam extends AdamCommandCompanion { | ||
val commandName: String = "fasta2adam" | ||
val commandDescription: String = "Converts a text FASTA sequence file into an ADAMNucleotideContig Parquet file which represents assembled sequences." | ||
|
||
def apply(cmdLine: Array[String]): AdamCommand = { | ||
new Fasta2Adam(Args4j[Fasta2AdamArgs](cmdLine)) | ||
} | ||
} | ||
|
||
class Fasta2AdamArgs extends Args4jBase with ParquetArgs with SparkArgs { | ||
@Argument(required = true, metaVar = "FASTA", usage = "The FASTA file to convert", index = 0) | ||
var fastaFile: String = null | ||
@Argument(required = true, metaVar = "ADAM", usage = "Location to write ADAM data", index = 1) | ||
var outputPath: String = null | ||
@Args4jOption(required = false, name = "-verbose", usage = "Prints enhanced debugging info, including contents of seq dict.") | ||
var verbose: Boolean = false | ||
@Args4jOption(required = false, name = "-reads", usage = "Maps contig IDs to match contig IDs of reads.") | ||
var reads: String = "" | ||
} | ||
|
||
class Fasta2Adam(protected val args: Fasta2AdamArgs) extends AdamSparkCommand[Fasta2AdamArgs] with Logging { | ||
val companion = Fasta2Adam | ||
|
||
def run(sc: SparkContext, job: Job) { | ||
log.info("Loading FASTA data from disk.") | ||
val fastaData: RDD[(LongWritable, Text)] = sc.newAPIHadoopFile(args.fastaFile, | ||
classOf[TextInputFormat], | ||
classOf[LongWritable], | ||
classOf[Text]) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you compare this against fi.tkk.ics.hadoop.bam.FastaInputFormat from Hadoop-BAM? Any reason not to use that here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had actually been unaware of the FastaInputFormat or the ReferenceFragment from Hadoop-BAM—thank you for pointing me towards it! FWIW, the one significant difference that I can see is that the Hadoop-BAM format doesn't seem to handle any sequence meta-information; e.g., fragment name, fragment description. |
||
val remapData = fastaData.map(kv => (kv._1.get.toInt, kv._2.toString.toString)) | ||
|
||
log.info("Converting FASTA to ADAM.") | ||
val adamFasta = FastaConverter(remapData) | ||
|
||
if (args.verbose) { | ||
println("FASTA contains:") | ||
println(adamFasta.adamGetSequenceDictionary()) | ||
} | ||
|
||
val remapped = if (args.reads != "") { | ||
val readDict = sc.adamDictionaryLoad[ADAMRecord](args.reads) | ||
|
||
if (args.verbose) { | ||
println("Remapping with:") | ||
println(readDict) | ||
} | ||
|
||
val remapFasta = adamFasta.adamRewriteContigIds(readDict) | ||
|
||
if (args.verbose) { | ||
println("After remapping, have:") | ||
println(remapFasta.adamGetSequenceDictionary()) | ||
} | ||
|
||
remapFasta | ||
} else { | ||
adamFasta | ||
} | ||
|
||
log.info("Writing records to disk.") | ||
remapped.adamSave(args.outputPath, blockSize = args.blockSize, pageSize = args.pageSize, | ||
compressCodec = args.compressionCodec, disableDictionaryEncoding = args.disableDictionary) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
/* | ||
* Copyright (c) 2014. Regents of the University of California | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package edu.berkeley.cs.amplab.adam.converters | ||
|
||
import edu.berkeley.cs.amplab.adam.avro.{ADAMNucleotideContig, Base} | ||
import edu.berkeley.cs.amplab.adam.rdd.AdamContext._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.SparkContext._ | ||
import scala.math.Ordering._ | ||
|
||
/** | ||
* Object for converting an RDD containing FASTA sequence data into ADAM FASTA data. | ||
*/ | ||
private[adam] object FastaConverter { | ||
|
||
/** | ||
* Converts an RDD containing ints and strings into an RDD containing ADAM nucleotide contig assemblies. | ||
* | ||
* @note Input dataset is assumed to have come in from a Hadoop TextInputFormat reader. This sets | ||
* a specific format for the RDD's Key-Value pairs. | ||
* | ||
* @throws AssertionError Thrown if there appear to be multiple sequences in a single file | ||
* that do not have descriptions. | ||
* @throws IllegalArgumentError Thrown if a sequence does not have sequence data. | ||
* | ||
* @param rdd RDD containing Int,String tuples, where the Int corresponds to the number | ||
* of the file line, and the String is the line of the file. | ||
* @return An RDD of ADAM FASTA data. | ||
*/ | ||
def apply (rdd: RDD[(Int, String)]): RDD[ADAMNucleotideContig] = { | ||
val filtered = rdd.map(kv => (kv._1, kv._2.trim())) | ||
.filter((kv: (Int, String)) => !kv._2.startsWith(";")) | ||
|
||
val indices: Map[Int, Int] = rdd.filter(kv => kv._2.startsWith(">")) | ||
.map(kv => kv._1) | ||
.collect() | ||
.sortWith(_ < _) | ||
.zipWithIndex | ||
.reverse | ||
.toMap | ||
|
||
val groupedContigs: RDD[(Int, Seq[(Int, String)])] = if (indices.size == 0) { | ||
filtered.keyBy(kv => -1) | ||
.groupByKey() | ||
} else { | ||
filtered.keyBy((kv: (Int, String)) => indices.find(p => p._1 <= kv._1)) | ||
.filter((kv: (Option[(Int, Int)], (Int, String))) => kv._1.isDefined) | ||
.map(kv => (kv._1.get._2, kv._2)) | ||
.groupByKey() | ||
} | ||
|
||
val converter = new FastaConverter | ||
|
||
val convertedData = groupedContigs.map(kv => { | ||
val (id, lines) = kv | ||
|
||
val descriptionLine = lines.filter(kv => kv._2.startsWith(">")) | ||
|
||
val (name, comment): (Option[String], Option[String]) = if (descriptionLine.size == 0) { | ||
assert(id == -1, "Cannot have a headerless line in a file with more than one fragment.") | ||
(None, None) | ||
} else if (descriptionLine.forall(kv => kv._2.contains(' '))) { | ||
val description: String = descriptionLine.head._2 | ||
val splitIndex = description.indexOf(' ') | ||
val split = description.splitAt(splitIndex) | ||
|
||
val contigName: String = split._1.stripPrefix(">").trim | ||
val contigDescription: String = split._2.trim | ||
|
||
(Some(contigName), Some(contigDescription)) | ||
} else { | ||
(Some(descriptionLine.head._2.stripPrefix(">").trim), None) | ||
} | ||
|
||
val seqId = if (id == -1) { | ||
0 | ||
} else { | ||
id | ||
} | ||
|
||
val sequenceLines = lines.filter(kv => !kv._2.startsWith(">")) | ||
assert(sequenceLines.length != 0, "Sequence " + seqId + " has no sequence data.") | ||
|
||
val sequence = sequenceLines.sortBy(kv => kv._1) | ||
.map(kv => kv._2.stripSuffix("*")) | ||
.reduce(_ + _) | ||
|
||
converter.convert(name, seqId, sequence, comment) | ||
}) | ||
|
||
convertedData | ||
} | ||
} | ||
|
||
/** | ||
* Conversion methods for single FASTA sequences into ADAM FASTA data. | ||
*/ | ||
private[converters] class FastaConverter extends Serializable { | ||
|
||
/** | ||
* Converts a string of nucleotides into an array of Base. | ||
* | ||
* @see Base | ||
* @throws IllegalArgumentException Thrown if sequence contains an illegal character. | ||
* | ||
* @param sequence Nucleotide string. | ||
* @return An array of Base enums. | ||
*/ | ||
def convertSequence (sequence: String): List[Base] = { | ||
sequence.toList.map((c: Char) => { | ||
try { | ||
Base.valueOf(c.toString.toUpperCase) | ||
} catch { | ||
case iae: java.lang.IllegalArgumentException => { | ||
throw new IllegalArgumentException(c + " in FASTA is an invalid base.") | ||
} | ||
} | ||
}) | ||
} | ||
|
||
/** | ||
* Converts a single FASTA sequence into an ADAM FASTA contig. | ||
* | ||
* @throws IllegalArgumentException Thrown if sequence contains an illegal character. | ||
* | ||
* @param name String option for the sequence name. | ||
* @param id Numerical identifier for the sequence. | ||
* @param sequence Nucleotide sequence. | ||
* @param description Optional description of the sequence. | ||
* @return The converted ADAM FASTA contig. | ||
*/ | ||
def convert (name: Option[String], | ||
id: Int, | ||
sequence: String, | ||
description: Option[String]): ADAMNucleotideContig = { | ||
|
||
// convert sequence to sequence array | ||
val bases = convertSequence(sequence) | ||
|
||
// make new builder and set up non-optional fields | ||
val builder = ADAMNucleotideContig.newBuilder() | ||
.setContigId(id) | ||
.setSequence(bases) | ||
.setSequenceLength(sequence.length) | ||
|
||
// map over optional fields | ||
name.foreach((n: String) => builder.setContigName(n)) | ||
description.foreach(builder.setDescription(_)) | ||
|
||
// build and return | ||
builder.build() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Copyright (c) 2013. Regents of the University of California | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package edu.berkeley.cs.amplab.adam.projections | ||
|
||
import edu.berkeley.cs.amplab.adam.avro.ADAMNucleotideContig | ||
|
||
/** | ||
* This enumeration exist in order to reduce typo errors in the code. It needs to be kept | ||
* in sync with any changes to ADAMRecord. | ||
* | ||
* This enumeration is necessary because Parquet needs the field string names | ||
* for predicates and projections. | ||
*/ | ||
object ADAMNucleotideContigField extends FieldEnumeration(ADAMNucleotideContig.SCHEMA$) { | ||
|
||
val contigName, | ||
contigId, | ||
description, | ||
sequence, | ||
sequenceLength, | ||
url = SchemaValue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I deleted my comment and somehow it deleted yours as well. I first thought we didn't, then I thought we did, then I double checked and we didn't. Sorry for the confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NP :-)
Can you add the issue number to this line? And then I'll merge!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no issue number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create one? Retrospectively, I mean? It'd be a good place to write a simple description of the problem this particular PR is solving, and it'd set a good example for future commiters (including, ahem, myself, as I often forget to write these issues up ahead of time, and I know Matt and Jeff are eager for us to be a bit more disciplined).
If you don't want to, um, that's fine too :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of retrospective issue creation so that we'll have an anchor in CHANGES.txt for this particular change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively is pointing to a PR in CHANGES.txt better? I'm used to code review comments getting copied into the body of an issue tracker so that all of the discussion is in one place, so I defer to others who have used GitHub in anger with a large group in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jeff, the Github UI makes me think that each PR is an issue too: https://github.com/bigdatagenomics/adam/issues?state=open
It'd just be nice to have a high-level "why this PR" somewhere, as I confess it took me a couple of minutes to figure out that this was importing FASTA files into an ADAM format, and not exporting FASTA files from reads in ADAMRecords. I admit I haven't been getting much sleep though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdanford you're right. I can navigate here with #79.
@fnothaft, I think the right thing to do here would be to expand your initial comment to provide a bit more information about the change, and then to add a link to this issue in CHANGES.txt.
Cool? Sorry for the friction, we'll get the process ironed out after a few turns of the crank.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem—I'm about to push an update that links to the PR. Going forward, I'll create an issue before starting work, as we've discussed.