Skip to content

Commit

Permalink
check if methods are implemented using spark or not.
Browse files Browse the repository at this point in the history
  • Loading branch information
enriquea committed Jan 17, 2024
1 parent 02edcc8 commit 53ede34
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 21 deletions.
26 changes: 15 additions & 11 deletions fsspark/fs/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
)
from pyspark.ml.regression import RandomForestRegressor, FMRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
from pyspark.pandas import DataFrame

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag


@tag("spark implementation")
def cv_rf_classification(
fsdf: FSDataFrame, binary_classification: bool = True
) -> CrossValidatorModel:
Expand All @@ -34,7 +37,6 @@ def cv_rf_classification(
Otherwise, implement a multi-class classification problem.
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""
features_col = "features"
sdf = fsdf.get_sdf_vector(output_column_vector=features_col)
Expand Down Expand Up @@ -69,6 +71,7 @@ def cv_rf_classification(
return cv_model


@tag("spark implementation")
def cv_svc_classification(
fsdf: FSDataFrame,
) -> CrossValidatorModel:
Expand All @@ -79,7 +82,6 @@ def cv_svc_classification(
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -108,6 +110,7 @@ def cv_svc_classification(
return cv_model


@tag("spark implementation")
def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Random Forest regressor as estimator.
Expand All @@ -116,7 +119,6 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Consider here if make sense to return the full CV Model.
"""

features_col = "features"
Expand Down Expand Up @@ -148,6 +150,7 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
return cv_model


@tag("spark implementation")
def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
"""
Cross-validation with Factorization Machines as estimator.
Expand All @@ -156,7 +159,6 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:
:param fsdf: FSDataFrame
:return: CrossValidatorModel
TODO: Do it make sense here to return the full CV Model??
"""

features_col = "features"
Expand Down Expand Up @@ -184,12 +186,14 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel:

def get_accuracy(model: CrossValidatorModel) -> float:
"""
Get accuracy from a trained CrossValidatorModel (best model).
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.
:param model: Trained CrossValidatorModel
:return: Training accuracy
:return: accuracy
"""

best_model = model.bestModel
if isinstance(best_model, RandomForestClassificationModel):
acc = best_model.summary.accuracy
Expand All @@ -200,7 +204,7 @@ def get_accuracy(model: CrossValidatorModel) -> float:
return acc


def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame:
"""
# TODO: This function should be able to parse all available models.
Currently only support RandomForestClassificationModel.
Expand All @@ -219,11 +223,11 @@ def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame:
)
else:
pred = None
return pred
return pred.pandas_api()


def get_feature_scores(model: CrossValidatorModel,
indexed_features: pyspark.pandas.series.Series = None) -> pd.DataFrame:
indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame:
"""
Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel.
Expand All @@ -234,7 +238,7 @@ def get_feature_scores(model: CrossValidatorModel,
:param indexed_features: If provided, report features names rather than features indices.
Usually, the output from `training_data.get_features_indexed()`.
:return: Pandas on DataFrame with feature importance
:return: Pandas DataFrame with feature importance
"""

df_features = (None if indexed_features is None
Expand Down Expand Up @@ -279,5 +283,5 @@ def get_feature_scores(model: CrossValidatorModel,
return df.sort_values(by="coefficients", ascending=False)

else:
df = None # this should follow with parsing options for the different models.
return df
# TODO: here we should support other models.
pass
10 changes: 9 additions & 1 deletion fsspark/fs/multivariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,31 @@

from fsspark.fs.core import FSDataFrame
from fsspark.fs.utils import find_maximal_independent_set
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:MULTIVARIATE")
logger.setLevel(logging.INFO)


@tag("experimental")
def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
features_col: str = 'features',
method: str = "pearson") -> np.ndarray:
"""
Compute features Matrix Correlation.
TODO: Warning: Computed matrix correlation will collected into the drive with this implementation.
:param sdf: Spark DataFrame
:param features_col: Name of the feature column vector name.
:param method: One of `pearson` (default) or `spearman`.
:return: Numpy array.
"""

logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n"
"This may cause memory issues. Use it preferably with small datasets.")
logger.info(f"Computing correlation matrix using {method} method.")

mcorr = (Correlation
.corr(sdf, features_col, method)
.collect()[0][0]
Expand All @@ -35,6 +41,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame,
return mcorr


@tag("experimental")
def multivariate_correlation_selector(fsdf: FSDataFrame,
strict: bool = True,
corr_threshold: float = 0.75,
Expand Down Expand Up @@ -84,6 +91,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def multivariate_variance_selector(fsdf: FSDataFrame,
variance_threshold: float = 0.0) -> List[str]:
"""
Expand Down
15 changes: 10 additions & 5 deletions fsspark/fs/univariate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
from pyspark.ml.feature import UnivariateFeatureSelector

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:UNIVARIATE")
logger.setLevel(logging.INFO)


@tag("spark implementation")
def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
"""
Compute the correlation coefficient between every column (features) in the input DataFrame and a defined target.
Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class).
:param fsdf: Input FSDataFrame
Expand All @@ -32,7 +34,7 @@ def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]:
def univariate_correlation_selector(fsdf: FSDataFrame,
corr_threshold: float = 0.3) -> List[str]:
"""
Select features based on its correlation with a target label, if corr value is less than a specified threshold.
Select features based on its correlation with a label (class), if corr value is less than a specified threshold.
Expected both features and label to be of type numeric.
:param fsdf: FSDataFrame
Expand All @@ -46,6 +48,7 @@ def univariate_correlation_selector(fsdf: FSDataFrame,
return selected_features


@tag("spark implementation")
def univariate_selector(fsdf: FSDataFrame,
label_type: str = 'categorical',
**kwargs) -> List[str]:
Expand All @@ -63,7 +66,7 @@ def univariate_selector(fsdf: FSDataFrame,
"""

vector_col_name = 'features'
vsdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name)
label = fsdf.get_label_col_name()

# set selector
Expand All @@ -85,13 +88,14 @@ def univariate_selector(fsdf: FSDataFrame,
.setLabelCol(label)
)

model = selector.fit(vsdf)
model = selector.fit(sdf)
selected_features_indices = model.selectedFeatures
selected_features = fsdf.get_features_by_index(selected_features_indices)

return selected_features


@tag("spark implementation")
def univariate_filter(fsdf: FSDataFrame,
univariate_method: str = 'u_corr',
**kwargs) -> FSDataFrame:
Expand All @@ -100,7 +104,8 @@ def univariate_filter(fsdf: FSDataFrame,
:param fsdf: Input FSDataFrame
:param univariate_method: Univariate selector method.
Possible values are 'u_corr', 'anova' or 'f_regression'.
Possible values are 'u_corr', 'anova' (categorical label)
or 'f_regression' (continuous label).
:return: Filtered FSDataFrame
"""
Expand Down
13 changes: 9 additions & 4 deletions fsspark/fs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from pyspark.ml.feature import Imputer

from fsspark.fs.core import FSDataFrame
from fsspark.utils.generic import tag

logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s")
logger = logging.getLogger("FSSPARK:UTILS")
logger.setLevel(logging.INFO)


@tag("spark implementation")
def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]:
"""
Compute per features missingness rate.
Expand All @@ -28,7 +30,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]:
missing_rates = sdf.select(
[
(
f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances
f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances
).alias(c)
for c in features
]
Expand All @@ -38,7 +40,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]:


def remove_features_by_missingness_rate(
fsdf: FSDataFrame, threshold: float = 0.15
fsdf: FSDataFrame, threshold: float = 0.15
) -> FSDataFrame:
"""
Remove features from FSDataFrame with missingness rate higher or equal than a specified threshold.
Expand All @@ -57,6 +59,7 @@ def remove_features_by_missingness_rate(
return fsdf_filtered


@tag("spark implementation")
def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame:
"""
Impute missing values using the mean, median or mode.
Expand Down Expand Up @@ -93,14 +96,16 @@ def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame:
)


@tag("experimental")
def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[int]:
"""
Given a set of indices pairs, returns a random maximal independent set.
:param pairs:
:param pairs: Set of indices pairs.
:param keep: If true (default), return the maximal independent set.
Otherwise, return the remaining indices after removing the maximal independent set.
:return:
:return: Set of indices (maximal independent set or remaining indices).
"""
logger.warning("This method is experimental and have been not extensively tested...")

Expand Down

0 comments on commit 53ede34

Please sign in to comment.