Skip to content

Commit

Permalink
DEV: Update FeatureSource dataframe conversion (#237)
Browse files Browse the repository at this point in the history
REFACTOR: Remove conversion of whole RDD to DataFrame

FEAT: Add function for slicing rows and columns and converting to DF
  • Loading branch information
NickEdwards7502 committed Sep 19, 2024
1 parent 4df6e32 commit b1fe760
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions src/main/scala/au/csiro/variantspark/input/VCFFeatureSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,19 @@ class VCFFeatureSource(vcfSource: VCFSource, converter: VariantToFeatureConverte
vcfSource.genotypes().map(converterRef.convert)
}

lazy val sampleNamesStructArr: Array[StructField] =
sampleNames.map(StructField(_, ByteType, true)).toArray

lazy val featureDFSchema: StructType =
StructType(Seq(StructField("variant_id", StringType, true)) ++ sampleNamesStructArr)

def toDF(sqlContext: SQLContext): DataFrame = {
def head(sqlContext: SQLContext, rowLim: Int = 10, colLim: Int = 10): DataFrame = {
lazy val sampleNamesStructArr: Array[StructField] =
sampleNames.take(colLim).map(StructField(_, ByteType, true)).toArray
lazy val featureDFSchema: StructType =
StructType(Seq(StructField("variant_id", StringType, true)) ++ sampleNamesStructArr)
val sc = sqlContext.sparkContext

val featureRDD: RDD[Row] =
features.mapPartitions { it =>
it.map { f => Row.fromSeq(f.label +: f.valueAsByteArray.toSeq) }
val slicedFeatureArray: Array[Row] =
features.take(rowLim).map { f =>
Row.fromSeq(f.label +: f.valueAsByteArray.take(colLim).toSeq)
}
sqlContext.createDataFrame(featureRDD, featureDFSchema)
val slicedFeatureRDD: RDD[Row] = sc.parallelize(slicedFeatureArray)
sqlContext.createDataFrame(slicedFeatureRDD, featureDFSchema)
}

}
Expand Down

0 comments on commit b1fe760

Please sign in to comment.