Skip to content

Commit

Permalink
DEV: Replace to_df functionality with head (#237)
Browse files Browse the repository at this point in the history
REFACTOR: Remove python component of converting Feature RDD to pandas

FEAT: Add RDD slice to DF function
  • Loading branch information
NickEdwards7502 committed Sep 19, 2024
1 parent 12d6137 commit 4df6e32
Showing 1 changed file with 10 additions and 51 deletions.
61 changes: 10 additions & 51 deletions python/varspark/featuresource.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,59 +11,18 @@ def __init__(self, _jvm, _vs_api, _jsql, sql, _jfs):
self._jsql = _jsql
self.sql = sql

@params(sample_list_str=str)
def extract_samples(sample_list_str):
"""Convert the sample list string to a list of sample names.
:param (str) sample_list_str: The string representation of the sample list.
:return List[str]: A list of sample names.
"""
cleaned_str = sample_list_str.replace("List(", "").replace(")", "")
return [s.strip().strip('"') for s in cleaned_str.split(",")]

@params(feature=object)
def unpack_feature_data(feature):
"""Unpack feature data from byte array into a tuple of integers.
:param feature: The feature object containing the data.
return tuple: A tuple containing unpacked integers.
"""
byte_string = feature.data().valueAsByteArray()
format_string = f"{len(byte_string)}B"
return struct.unpack(format_string, byte_string)

@params(features_ref=object)
def collect_feature_data(features_ref):
"""Collect and organize feature data into a dictionary.
:param features_ref: The list of feature objects.
:return dict: A dictionary with feature labels as keys and unpacked data as values.
"""
return {
feature.label(): FeatureSource.unpack_feature_data(feature)
for feature in features_ref
}

@params(self=object, scala=Nullable(bool))
def to_df(self, scala=False):
@params(self=object, row_lim=Nullable(int), col_lim=Nullable(int))
def head(self, row_lim=10, col_lim=10):
"""Converts a Feature Source RDD to a pandas dataframe.
:param (bool) scala: Indicates whether to use the scala version of DataFrame conversion
:param (int) row_lim: Specifies the number of rows (features) to take
:param (int) col_lim: Specifies the number of columns (samples) to take
:return features (DataFrame): dataframe with values for respective samples (rows)
"""
if scala:
jdf = self._jfs.toDF(self._jsql)
jdf.count()
jdf.createOrReplaceTempView("df")
features = self.sql.table("df").toPandas()
features.set_index("variant_id", inplace=True)
return features
else:
features_ref = self._jfs.features().collect()
samples = FeatureSource.extract_samples(str(self._jfs.sampleNames()))
feature_data = FeatureSource.collect_feature_data(features_ref)
return pd.DataFrame(feature_data, index=samples)
jdf = self._jfs.head(self._jsql, row_lim, col_lim)
jdf.count()
jdf.createOrReplaceTempView("df")
features = self.sql.table("df").toPandas()
features.set_index("variant_id", inplace=True)
return features

0 comments on commit 4df6e32

Please sign in to comment.