From 53ede34c920a77023ec3952f2fd525b06a686c8f Mon Sep 17 00:00:00 2001 From: enriquea Date: Wed, 17 Jan 2024 22:45:17 +0100 Subject: [PATCH] check if methods are implemented using spark or not. --- fsspark/fs/ml.py | 26 +++++++++++++++----------- fsspark/fs/multivariate.py | 10 +++++++++- fsspark/fs/univariate.py | 15 ++++++++++----- fsspark/fs/utils.py | 13 +++++++++---- 4 files changed, 43 insertions(+), 21 deletions(-) diff --git a/fsspark/fs/ml.py b/fsspark/fs/ml.py index 8f861eb..9f372de 100644 --- a/fsspark/fs/ml.py +++ b/fsspark/fs/ml.py @@ -19,10 +19,13 @@ ) from pyspark.ml.regression import RandomForestRegressor, FMRegressor from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel +from pyspark.pandas import DataFrame from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag +@tag("spark implementation") def cv_rf_classification( fsdf: FSDataFrame, binary_classification: bool = True ) -> CrossValidatorModel: @@ -34,7 +37,6 @@ def cv_rf_classification( Otherwise, implement a multi-class classification problem. :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" sdf = fsdf.get_sdf_vector(output_column_vector=features_col) @@ -69,6 +71,7 @@ def cv_rf_classification( return cv_model +@tag("spark implementation") def cv_svc_classification( fsdf: FSDataFrame, ) -> CrossValidatorModel: @@ -79,7 +82,6 @@ def cv_svc_classification( :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" @@ -108,6 +110,7 @@ def cv_svc_classification( return cv_model +@tag("spark implementation") def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: """ Cross-validation with Random Forest regressor as estimator. @@ -116,7 +119,6 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Consider here if make sense to return the full CV Model. """ features_col = "features" @@ -148,6 +150,7 @@ def cv_rf_regression(fsdf: FSDataFrame) -> CrossValidatorModel: return cv_model +@tag("spark implementation") def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: """ Cross-validation with Factorization Machines as estimator. @@ -156,7 +159,6 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: :param fsdf: FSDataFrame :return: CrossValidatorModel - TODO: Do it make sense here to return the full CV Model?? """ features_col = "features" @@ -184,12 +186,14 @@ def cv_fm_regression(fsdf: FSDataFrame) -> CrossValidatorModel: def get_accuracy(model: CrossValidatorModel) -> float: """ + Get accuracy from a trained CrossValidatorModel (best model). # TODO: This function should be able to parse all available models. Currently only support RandomForestClassificationModel. :param model: Trained CrossValidatorModel - :return: Training accuracy + :return: accuracy """ + best_model = model.bestModel if isinstance(best_model, RandomForestClassificationModel): acc = best_model.summary.accuracy @@ -200,7 +204,7 @@ def get_accuracy(model: CrossValidatorModel) -> float: return acc -def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame: +def get_predictions(model: CrossValidatorModel) -> pyspark.pandas.DataFrame: """ # TODO: This function should be able to parse all available models. Currently only support RandomForestClassificationModel. @@ -219,11 +223,11 @@ def get_predictions(model: CrossValidatorModel) -> pyspark.sql.DataFrame: ) else: pred = None - return pred + return pred.pandas_api() def get_feature_scores(model: CrossValidatorModel, - indexed_features: pyspark.pandas.series.Series = None) -> pd.DataFrame: + indexed_features: pyspark.pandas.series.Series = None) -> pyspark.pandas.DataFrame: """ Extract features scores (e.g. importance or coefficients) from a trained CrossValidatorModel. @@ -234,7 +238,7 @@ def get_feature_scores(model: CrossValidatorModel, :param indexed_features: If provided, report features names rather than features indices. Usually, the output from `training_data.get_features_indexed()`. - :return: Pandas on DataFrame with feature importance + :return: Pandas DataFrame with feature importance """ df_features = (None if indexed_features is None @@ -279,5 +283,5 @@ def get_feature_scores(model: CrossValidatorModel, return df.sort_values(by="coefficients", ascending=False) else: - df = None # this should follow with parsing options for the different models. - return df + # TODO: here we should support other models. + pass diff --git a/fsspark/fs/multivariate.py b/fsspark/fs/multivariate.py index 79b9cf6..c82f67b 100644 --- a/fsspark/fs/multivariate.py +++ b/fsspark/fs/multivariate.py @@ -8,18 +8,19 @@ from fsspark.fs.core import FSDataFrame from fsspark.fs.utils import find_maximal_independent_set +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:MULTIVARIATE") logger.setLevel(logging.INFO) +@tag("experimental") def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, features_col: str = 'features', method: str = "pearson") -> np.ndarray: """ Compute features Matrix Correlation. - TODO: Warning: Computed matrix correlation will collected into the drive with this implementation. :param sdf: Spark DataFrame :param features_col: Name of the feature column vector name. @@ -27,6 +28,11 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, :return: Numpy array. """ + + logger.warning("Warning: Computed matrix correlation will be collected into the drive with this implementation.\n" + "This may cause memory issues. Use it preferably with small datasets.") + logger.info(f"Computing correlation matrix using {method} method.") + mcorr = (Correlation .corr(sdf, features_col, method) .collect()[0][0] @@ -35,6 +41,7 @@ def _compute_correlation_matrix(sdf: pyspark.sql.DataFrame, return mcorr +@tag("experimental") def multivariate_correlation_selector(fsdf: FSDataFrame, strict: bool = True, corr_threshold: float = 0.75, @@ -84,6 +91,7 @@ def multivariate_correlation_selector(fsdf: FSDataFrame, return selected_features +@tag("spark implementation") def multivariate_variance_selector(fsdf: FSDataFrame, variance_threshold: float = 0.0) -> List[str]: """ diff --git a/fsspark/fs/univariate.py b/fsspark/fs/univariate.py index e921208..56a21df 100644 --- a/fsspark/fs/univariate.py +++ b/fsspark/fs/univariate.py @@ -5,15 +5,17 @@ from pyspark.ml.feature import UnivariateFeatureSelector from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:UNIVARIATE") logger.setLevel(logging.INFO) +@tag("spark implementation") def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: """ - Compute the correlation coefficient between every column (features) in the input DataFrame and a defined target. + Compute the correlation coefficient between every column (features) in the input DataFrame and the label (class). :param fsdf: Input FSDataFrame @@ -32,7 +34,7 @@ def compute_univariate_corr(fsdf: FSDataFrame) -> Dict[str, float]: def univariate_correlation_selector(fsdf: FSDataFrame, corr_threshold: float = 0.3) -> List[str]: """ - Select features based on its correlation with a target label, if corr value is less than a specified threshold. + Select features based on its correlation with a label (class), if corr value is less than a specified threshold. Expected both features and label to be of type numeric. :param fsdf: FSDataFrame @@ -46,6 +48,7 @@ def univariate_correlation_selector(fsdf: FSDataFrame, return selected_features +@tag("spark implementation") def univariate_selector(fsdf: FSDataFrame, label_type: str = 'categorical', **kwargs) -> List[str]: @@ -63,7 +66,7 @@ def univariate_selector(fsdf: FSDataFrame, """ vector_col_name = 'features' - vsdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) + sdf = fsdf.get_sdf_vector(output_column_vector=vector_col_name) label = fsdf.get_label_col_name() # set selector @@ -85,13 +88,14 @@ def univariate_selector(fsdf: FSDataFrame, .setLabelCol(label) ) - model = selector.fit(vsdf) + model = selector.fit(sdf) selected_features_indices = model.selectedFeatures selected_features = fsdf.get_features_by_index(selected_features_indices) return selected_features +@tag("spark implementation") def univariate_filter(fsdf: FSDataFrame, univariate_method: str = 'u_corr', **kwargs) -> FSDataFrame: @@ -100,7 +104,8 @@ def univariate_filter(fsdf: FSDataFrame, :param fsdf: Input FSDataFrame :param univariate_method: Univariate selector method. - Possible values are 'u_corr', 'anova' or 'f_regression'. + Possible values are 'u_corr', 'anova' (categorical label) + or 'f_regression' (continuous label). :return: Filtered FSDataFrame """ diff --git a/fsspark/fs/utils.py b/fsspark/fs/utils.py index 9f5adf5..31d4c39 100644 --- a/fsspark/fs/utils.py +++ b/fsspark/fs/utils.py @@ -7,12 +7,14 @@ from pyspark.ml.feature import Imputer from fsspark.fs.core import FSDataFrame +from fsspark.utils.generic import tag logging.basicConfig(format="%(levelname)s (%(name)s %(lineno)s): %(message)s") logger = logging.getLogger("FSSPARK:UTILS") logger.setLevel(logging.INFO) +@tag("spark implementation") def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: """ Compute per features missingness rate. @@ -28,7 +30,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: missing_rates = sdf.select( [ ( - f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances + f.sum(f.when(f.isnan(sdf[c]) | f.isnull(sdf[c]), 1).otherwise(0)) / n_instances ).alias(c) for c in features ] @@ -38,7 +40,7 @@ def compute_missingness_rate(fsdf: FSDataFrame) -> Dict[str, float]: def remove_features_by_missingness_rate( - fsdf: FSDataFrame, threshold: float = 0.15 + fsdf: FSDataFrame, threshold: float = 0.15 ) -> FSDataFrame: """ Remove features from FSDataFrame with missingness rate higher or equal than a specified threshold. @@ -57,6 +59,7 @@ def remove_features_by_missingness_rate( return fsdf_filtered +@tag("spark implementation") def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame: """ Impute missing values using the mean, median or mode. @@ -93,14 +96,16 @@ def impute_missing(fsdf: FSDataFrame, strategy: str = "mean") -> FSDataFrame: ) +@tag("experimental") def find_maximal_independent_set(pairs: Tuple[int], keep: bool = True) -> Set[int]: """ Given a set of indices pairs, returns a random maximal independent set. - :param pairs: + :param pairs: Set of indices pairs. :param keep: If true (default), return the maximal independent set. Otherwise, return the remaining indices after removing the maximal independent set. - :return: + + :return: Set of indices (maximal independent set or remaining indices). """ logger.warning("This method is experimental and have been not extensively tested...")