diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala index d2bcf0cc49..03353657b4 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDD.scala @@ -21,6 +21,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } +import org.apache.spark.sql.functions._ import org.bdgenomics.adam.converters.AlignmentRecordConverter import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.models.{ @@ -38,11 +39,12 @@ import org.bdgenomics.adam.rdd.{ import org.bdgenomics.adam.rdd.read.{ AlignmentRecordRDD, BinQualities, + DatasetBoundAlignmentRecordRDD, MarkDuplicates, QualityScoreBin } import org.bdgenomics.adam.serialization.AvroSerializer -import org.bdgenomics.adam.sql.{ Fragment => FragmentProduct } +import org.bdgenomics.adam.sql.{ Fragment => FragmentProduct, AlignmentRecord => AlignmentRecordProduct } import org.bdgenomics.formats.avro._ import org.bdgenomics.utils.interval.array.{ IntervalArray, @@ -202,7 +204,7 @@ case class DatasetBoundFragmentRDD private[rdd] ( pageSize: Int = 1 * 1024 * 1024, compressCodec: CompressionCodecName = CompressionCodecName.GZIP, disableDictionaryEncoding: Boolean = false) { - log.info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.") + log.warn("Saving directly as Parquet from SQL. Options other than compression codec are ignored.") dataset.toDF() .write .format("parquet") @@ -230,6 +232,13 @@ case class DatasetBoundFragmentRDD private[rdd] ( newProcessingSteps: Seq[ProcessingStep]): FragmentRDD = { copy(processingSteps = newProcessingSteps) } + + override def toReads(): AlignmentRecordRDD = { + import dataset.sparkSession.implicits._ + val df = dataset.select(explode(col("alignments")).as("rec")).select("rec.*") + DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sequences, + recordGroups, processingSteps) + } } case class RDDBoundFragmentRDD private[rdd] ( diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala index 91f0522130..df0e166814 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/fragment/FragmentRDDSuite.scala @@ -610,4 +610,16 @@ class FragmentRDDSuite extends ADAMFunSuite { assert(fragment.getAlignments.size() == 2) }) } + + sparkTest("dataset and rdd conversion to reads are equivalent") { + val fragments = sc.loadFragments(testFile("small.sam")) + val fragmentRdd = RDDBoundFragmentRDD(fragments.rdd, fragments.sequences, + fragments.recordGroups, fragments.processingSteps, None) + val fragmentDataset = DatasetBoundFragmentRDD(fragments.dataset, fragments.sequences, + fragments.recordGroups, fragments.processingSteps) + val convertedRdd = fragmentRdd.toReads() + val convertedDataset = fragmentDataset.toReads() + + assert(convertedRdd.rdd.collect().toSet == convertedDataset.rdd.collect().toSet) + } }