Skip to content

Commit

Permalink
DEV: Update std CSV features to support optional variable type specs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
NickEdwards7502 committed Sep 19, 2024
1 parent 37f4193 commit dfae3c2
Showing 1 changed file with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import au.csiro.variantspark.data.Feature
import au.csiro.variantspark.data.FeatureBuilder
import au.csiro.variantspark.data.DataBuilder
import au.csiro.variantspark.data.StdFeature
import au.csiro.variantspark.data.DefRepresentationFactory
import org.apache.spark.broadcast.Broadcast

class MapAccumulator
Expand Down Expand Up @@ -80,11 +81,13 @@ class MapAccumulator
* ingestion of traditional CSV files for analysis.
*/
case class CsvStdFeatureSource[V](data: RDD[String],
defaultType: VariableType = ContinuousVariable, csvFormat: CSVFormat = DefaultCSVFormatSpec)
defaultType: VariableType = ContinuousVariable,
optVariableTypes: Option[RDD[String]] = None, csvFormat: CSVFormat = DefaultCSVFormatSpec)
extends FeatureSource {

val variableNames: List[String] = new CSVParser(csvFormat).parseLine(data.first).get.tail
val br_variableNames: Broadcast[List[String]] = data.context.broadcast(variableNames)
val br_types = data.context.broadcast(optVariableTypes.map(parseTypes))

lazy val transposedData: RDD[(String, Array[String])] = {
// expects data in coma separated format of
Expand Down Expand Up @@ -148,12 +151,25 @@ case class CsvStdFeatureSource[V](data: RDD[String],
.drop(1)
}

def parseTypes(typeRDD: RDD[String]): Map[String, VariableType] = {
typeRDD
.mapPartitions { it =>
val csvParser = new CSVParser(csvFormat)
it.map(csvParser.parseLine(_).get).map(l => (l.head, VariableType.fromString(l.last)))
}
.collect()
.toMap
}

def features: RDD[Feature] = featuresAs[Vector]

def featuresAs[T](implicit cr: DataBuilder[T]): RDD[Feature] = {
transposedData.map({
val types = br_types.value
val representationFactory = DefRepresentationFactory
transposedData.map {
case (varId, values) =>
StdFeature.from[T](varId, defaultType, values.toList)
})
val variableType = types.flatMap(_.get(varId)).getOrElse(defaultType)
StdFeature.from[T](varId, variableType, values.toList)
}
}
}

0 comments on commit dfae3c2

Please sign in to comment.