Skip to content

Commit

Permalink
Merge 1c2d022 into 8e8007e
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Davidge authored Jul 17, 2018
2 parents 8e8007e + 1c2d022 commit bf7ed82
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SQLContext }
import org.apache.spark.sql.functions._
import org.bdgenomics.adam.converters.AlignmentRecordConverter
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.models.{
Expand All @@ -38,11 +39,12 @@ import org.bdgenomics.adam.rdd.{
import org.bdgenomics.adam.rdd.read.{
AlignmentRecordRDD,
BinQualities,
DatasetBoundAlignmentRecordRDD,
MarkDuplicates,
QualityScoreBin
}
import org.bdgenomics.adam.serialization.AvroSerializer
import org.bdgenomics.adam.sql.{ Fragment => FragmentProduct }
import org.bdgenomics.adam.sql.{ Fragment => FragmentProduct, AlignmentRecord => AlignmentRecordProduct }
import org.bdgenomics.formats.avro._
import org.bdgenomics.utils.interval.array.{
IntervalArray,
Expand Down Expand Up @@ -202,7 +204,7 @@ case class DatasetBoundFragmentRDD private[rdd] (
pageSize: Int = 1 * 1024 * 1024,
compressCodec: CompressionCodecName = CompressionCodecName.GZIP,
disableDictionaryEncoding: Boolean = false) {
log.info("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
log.warn("Saving directly as Parquet from SQL. Options other than compression codec are ignored.")
dataset.toDF()
.write
.format("parquet")
Expand Down Expand Up @@ -230,6 +232,13 @@ case class DatasetBoundFragmentRDD private[rdd] (
newProcessingSteps: Seq[ProcessingStep]): FragmentRDD = {
copy(processingSteps = newProcessingSteps)
}

override def toReads(): AlignmentRecordRDD = {
import dataset.sparkSession.implicits._
val df = dataset.select(explode(col("alignments")).as("rec")).select("rec.*")
DatasetBoundAlignmentRecordRDD(df.as[AlignmentRecordProduct], sequences,
recordGroups, processingSteps)
}
}

case class RDDBoundFragmentRDD private[rdd] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,4 +610,16 @@ class FragmentRDDSuite extends ADAMFunSuite {
assert(fragment.getAlignments.size() == 2)
})
}

sparkTest("dataset and rdd conversion to reads are equivalent") {
val fragments = sc.loadFragments(testFile("small.sam"))
val fragmentRdd = RDDBoundFragmentRDD(fragments.rdd, fragments.sequences,
fragments.recordGroups, fragments.processingSteps, None)
val fragmentDataset = DatasetBoundFragmentRDD(fragments.dataset, fragments.sequences,
fragments.recordGroups, fragments.processingSteps)
val convertedRdd = fragmentRdd.toReads()
val convertedDataset = fragmentDataset.toReads()

assert(convertedRdd.rdd.collect().toSet == convertedDataset.rdd.collect().toSet)
}
}

0 comments on commit bf7ed82

Please sign in to comment.