From 37f4193857c0fb81fe70fa63364ed6e8f8b23db7 Mon Sep 17 00:00:00 2001 From: NickEdwards7502 Date: Thu, 19 Sep 2024 14:24:11 +1000 Subject: [PATCH] DEV: Update VSContext to support covariates (#237) FEAT: Add functions for importing std and transposed CSVs FEAT: Add function for unioning features and covariates --- .../au/csiro/variantspark/api/VSContext.scala | 74 +++++++++++++++++-- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/src/main/scala/au/csiro/variantspark/api/VSContext.scala b/src/main/scala/au/csiro/variantspark/api/VSContext.scala index ef0a186b..39654004 100644 --- a/src/main/scala/au/csiro/variantspark/api/VSContext.scala +++ b/src/main/scala/au/csiro/variantspark/api/VSContext.scala @@ -1,15 +1,23 @@ package au.csiro.variantspark.api -import au.csiro.variantspark.input.{CsvLabelSource, FeatureSource, VCFFeatureSource, VCFSource} +import au.csiro.variantspark.input.{ + CsvLabelSource, + FeatureSource, + VCFFeatureSource, + VCFSource, + CsvFeatureSource, + CsvStdFeatureSource, + UnionedFeatureSource +} import org.apache.hadoop.fs.FileSystem import org.apache.spark.sql.SparkSession import org.apache.spark.sql.SQLContext -import au.csiro.variantspark.input.CsvFeatureSource -import au.csiro.variantspark.input.CsvFeatureSource._ import com.github.tototoshi.csv.CSVFormat import au.csiro.variantspark.input.DefaultCSVFormatSpec import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import scala.collection.JavaConverters._ trait SqlContextHolder { def sqlContext: SQLContext @@ -28,7 +36,7 @@ class VSContext(val spark: SparkSession) extends SqlContextHolder { /** Import features from a VCF file * @param inputFile path to file or directory with VCF files to load - * @return FeatureSource loaded from the VCF file or files + * @return FeatureSource loaded from the VCF file */ def importVCF(inputFile: String, sparkPar: Int = 0): FeatureSource = { val vcfSource = @@ -37,12 +45,62 @@ class VSContext(val spark: SparkSession) extends SqlContextHolder { } /** Import features from a CSV file - * @param inputFile: path to file or directory with VCF files to load + * @param inputFile: path to CSV file + * @param optVariableTypes: optional type specifications + * @param csvFormat: [[com.github.tototoshi.csv.CSVFormat]] row format + * @return FeatureSource loaded from the CSV file + */ + def importTransposedCSV(inputFile: String, + optVariableTypes: Option[RDD[String]], csvFormat: CSVFormat): FeatureSource = { + CsvFeatureSource(sc.textFile(inputFile), csvFormat = csvFormat, + optVariableTypes = optVariableTypes) + } + def importTransposedCSV(inputFile: String, + variableTypes: java.util.ArrayList[String] = null): FeatureSource = { + val csvFormat: CSVFormat = DefaultCSVFormatSpec + val optVariableTypes: Option[RDD[String]] = Option(variableTypes).map { types => + sc.parallelize(types.asScala.toSeq) + } + importTransposedCSV(inputFile, optVariableTypes, csvFormat) + } + def importTransposedCSV(inputFile: String): FeatureSource = { + val csvFormat: CSVFormat = DefaultCSVFormatSpec + val optVariableTypes: Option[RDD[String]] = None + importTransposedCSV(inputFile, optVariableTypes, csvFormat) + } + + /** Import features from a transposed CSV file + * @param inputFile: path to CSV file + * @param optVariableTypes: optional type specifications * @param csvFormat: [[com.github.tototoshi.csv.CSVFormat]] row format - * @return FeatureSource loaded from the VCF file or files + * @return FeatureSource loaded from CSV file + */ + def importStdCSV(inputFile: String, + optVariableTypes: Option[RDD[String]], csvFormat: CSVFormat): FeatureSource = { + CsvStdFeatureSource(sc.textFile(inputFile), csvFormat = csvFormat, + optVariableTypes = optVariableTypes) + } + def importStdCSV(inputFile: String, + variableTypes: java.util.ArrayList[String] = null): FeatureSource = { + val csvFormat: CSVFormat = DefaultCSVFormatSpec + val optVariableTypes: Option[RDD[String]] = Option(variableTypes).map { types => + sc.parallelize(types.asScala.toSeq) + } + importStdCSV(inputFile, optVariableTypes, csvFormat) + } + def importStdCSV(inputFile: String): FeatureSource = { + val csvFormat: CSVFormat = DefaultCSVFormatSpec + val optVariableTypes: Option[RDD[String]] = None + importStdCSV(inputFile, optVariableTypes, csvFormat) + } + + /** Combine FeatureSource objects (typically a genotype source and a covariate source) + * @param featureSource: FeatureSource object containing genotype information + * @param covariateSource: FeatureSource object containing covariate information */ - def importCSV(inputFile: String, csvFormat: CSVFormat = DefaultCSVFormatSpec): FeatureSource = { - CsvFeatureSource(sc.textFile(inputFile), csvFormat = csvFormat) + def unionFeaturesAndCovariates(featureSource: FeatureSource, + covariateSource: FeatureSource): FeatureSource = { + UnionedFeatureSource(featureSource, covariateSource) } /** Loads a labels form a column in a CSV file