From 9282c0e7efc5bb104b9fcc3f182cc9af25c97aea Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 11:27:11 +0100 Subject: [PATCH 01/10] added kwargs to create_training_name_pairs --- emm/pipeline/pandas_entity_matching.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/emm/pipeline/pandas_entity_matching.py b/emm/pipeline/pandas_entity_matching.py index 0e3d786..190cbdf 100644 --- a/emm/pipeline/pandas_entity_matching.py +++ b/emm/pipeline/pandas_entity_matching.py @@ -312,6 +312,7 @@ def create_training_name_pairs( n_train_ids: int = -1, random_seed: int = 42, drop_duplicate_candidates: bool | None = None, + **kwargs, ) -> pd.DataFrame: """Create name-pairs for training from positive names that match to the ground truth. @@ -333,6 +334,7 @@ def create_training_name_pairs( drop_duplicate_candidates: if True drop any duplicate training candidates and keep just one, if available keep the correct match. Recommended for string-similarity models, eg. with without_rank_features=True. default is False. + kwargs: extra key-word arguments meant to be passed to prepare_name_pairs_pd. Returns: pandas dataframe with name-pair candidates to be used for training. @@ -383,6 +385,7 @@ def create_training_name_pairs( create_negative_sample_fraction=create_negative_sample_fraction, positive_set_col=self.parameters.get("positive_set_col", "positive_set"), random_seed=random_seed, + **kwargs, ) def fit_classifier( From 35d80026f637bc9e518f30606caa3d70b5006e20 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 11:33:33 +0100 Subject: [PATCH 02/10] added branch to test.yml --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e5247e7..99078d5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: Tests on: push: - branches: [ main ] + branches: [ main, update_create_training_name_pairs] pull_request: jobs: From 63cd09ea835c0308e90e4c79e37277463e651fd7 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 14:47:14 +0100 Subject: [PATCH 03/10] added kwargs to spark version of create_training_name_pairs --- emm/pipeline/spark_entity_matching.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 9687d4d..2a37392 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -343,6 +343,7 @@ def create_training_name_pairs( n_train_ids=-1, random_seed=42, drop_duplicate_candidates: bool | None = None, + **kwargs, ) -> pd.DataFrame: """Create name-pairs for training from positive names that match to the ground truth. @@ -409,6 +410,7 @@ def create_training_name_pairs( create_negative_sample_fraction=create_negative_sample_fraction, positive_set_col=self.parameters.get("positive_set_col", "positive_set"), random_seed=random_seed, + **kwargs, ) def fit_classifier( From 0f8d20a3aad94cb74624d4769fe5aa544649b8aa Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 14:53:08 +0100 Subject: [PATCH 04/10] added description for kwargs --- emm/pipeline/spark_entity_matching.py | 1 + 1 file changed, 1 insertion(+) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 2a37392..a6ba923 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -365,6 +365,7 @@ def create_training_name_pairs( drop_duplicate_candidates: if True drop any duplicate training candidates and keep just one, if available keep the correct match. Recommended for string-similarity models, eg. with without_rank_features=True. default is False. + kwargs: extra key-word arguments meant to be passed to prepare_name_pairs_pd. Returns: pandas dataframe with name-pair candidates to be used for training. From 77df841f6af612adc204a9de82805492fe350a70 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 15:00:50 +0100 Subject: [PATCH 05/10] ran pre-commit --- emm/pipeline/spark_entity_matching.py | 28 +++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index a6ba923..4506908 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -20,14 +20,11 @@ from __future__ import annotations import re -from typing import TYPE_CHECKING, Any, Callable, Literal, Mapping +from collections.abc import Callable, Mapping +from typing import TYPE_CHECKING, Any, Literal import numpy as np import pandas as pd -from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F - from emm.aggregation.base_entity_aggregation import BaseEntityAggregation from emm.aggregation.spark_entity_aggregation import SparkEntityAggregation from emm.data.prepare_name_pairs import prepare_name_pairs @@ -46,6 +43,9 @@ from emm.preprocessing.spark_preprocessor import SparkPreprocessor from emm.supervised_model.base_supervised_model import train_model from emm.supervised_model.spark_supervised_model import SparkSupervisedLayerEstimator +from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as F if TYPE_CHECKING: from pyspark.ml import Pipeline, PipelineModel @@ -269,7 +269,9 @@ def fit(self, ground_truth_df, copy_ground_truth: bool = False) -> SparkEntityMa # We repartition in order to have at least 200, to have a nice parallel computation # (assuming memory is not an issue here) and nice parallelism for joins in transform() later on. # We usually have less than 200 partitions in case the ground_truth is not that long. - ground_truth_df, self.n_ground_truth = auto_repartitioning(ground_truth_df, self.parameters["partition_size"]) + ground_truth_df, self.n_ground_truth = auto_repartitioning( + ground_truth_df, self.parameters["partition_size"] + ) ground_truth_df = check_uid(ground_truth_df, self.parameters["uid_col"]) ground_truth_df = self._normalize_column_names(ground_truth_df) self.model = self.pipeline.fit(ground_truth_df) @@ -326,7 +328,9 @@ def transform(self, names_df: DataFrame, top_n: int = -1) -> DataFrame: cols_to_keep = names_df.columns cols_to_drop = [c for c in matched_df.columns if c not in cols_to_keep] cols_to_drop = [ - c for c in cols_to_drop if not re.match(pattern, c) and not c.endswith("_score") and c != "preprocessed" + c + for c in cols_to_drop + if not re.match(pattern, c) and not c.endswith("_score") and c != "preprocessed" ] matched_df = matched_df.drop(*cols_to_drop) logger.debug(f"Dropping columns: {cols_to_drop}") @@ -365,7 +369,7 @@ def create_training_name_pairs( drop_duplicate_candidates: if True drop any duplicate training candidates and keep just one, if available keep the correct match. Recommended for string-similarity models, eg. with without_rank_features=True. default is False. - kwargs: extra key-word arguments meant to be passed to prepare_name_pairs_pd. + kwargs: extra key-word arguments meant to be passed to prepare_name_pairs_pd. Returns: pandas dataframe with name-pair candidates to be used for training. @@ -494,7 +498,9 @@ def fit_classifier( # keep both stages for re-adding later (also in case of do_training=False). if self.parameters.get("supervised_on", False): self.model.stages.pop(2) - aggregation_model = self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None + aggregation_model = ( + self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None + ) # remove any existing untrained model 'X', no longer needed. if isinstance(self.supervised_models, dict): self.supervised_models.pop("X", None) @@ -583,7 +589,9 @@ def add_supervised_model( # reinsert again below with new sklearn model included. if self.parameters.get("supervised_on", False): self.model.stages.pop(2) - aggregation_model = self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None + aggregation_model = ( + self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None + ) # add new supervised model to self.supervised_models # self.supervised_models contains all trained and untrained sklearn models From 65d1be89d1918552f5137094fddb9f4e7951539d Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 15:13:11 +0100 Subject: [PATCH 06/10] ran pre-commit --- emm/pipeline/spark_entity_matching.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 4506908..8b024ed 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -20,7 +20,6 @@ from __future__ import annotations import re -from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, Literal import numpy as np @@ -48,6 +47,8 @@ from pyspark.sql import functions as F if TYPE_CHECKING: + from collections.abc import Callable, Mapping + from pyspark.ml import Pipeline, PipelineModel From 8444486aa94c8f9c64a0ef5021c363de3fa5d4aa Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 15:40:02 +0100 Subject: [PATCH 07/10] ran pre-commit --- emm/pipeline/spark_entity_matching.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 8b024ed..607bcbc 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -24,6 +24,10 @@ import numpy as np import pandas as pd +from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql import functions as F + from emm.aggregation.base_entity_aggregation import BaseEntityAggregation from emm.aggregation.spark_entity_aggregation import SparkEntityAggregation from emm.data.prepare_name_pairs import prepare_name_pairs @@ -42,13 +46,9 @@ from emm.preprocessing.spark_preprocessor import SparkPreprocessor from emm.supervised_model.base_supervised_model import train_model from emm.supervised_model.spark_supervised_model import SparkSupervisedLayerEstimator -from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable -from pyspark.sql import DataFrame, SparkSession -from pyspark.sql import functions as F if TYPE_CHECKING: from collections.abc import Callable, Mapping - from pyspark.ml import Pipeline, PipelineModel @@ -270,9 +270,7 @@ def fit(self, ground_truth_df, copy_ground_truth: bool = False) -> SparkEntityMa # We repartition in order to have at least 200, to have a nice parallel computation # (assuming memory is not an issue here) and nice parallelism for joins in transform() later on. # We usually have less than 200 partitions in case the ground_truth is not that long. - ground_truth_df, self.n_ground_truth = auto_repartitioning( - ground_truth_df, self.parameters["partition_size"] - ) + ground_truth_df, self.n_ground_truth = auto_repartitioning(ground_truth_df, self.parameters["partition_size"]) ground_truth_df = check_uid(ground_truth_df, self.parameters["uid_col"]) ground_truth_df = self._normalize_column_names(ground_truth_df) self.model = self.pipeline.fit(ground_truth_df) @@ -329,9 +327,7 @@ def transform(self, names_df: DataFrame, top_n: int = -1) -> DataFrame: cols_to_keep = names_df.columns cols_to_drop = [c for c in matched_df.columns if c not in cols_to_keep] cols_to_drop = [ - c - for c in cols_to_drop - if not re.match(pattern, c) and not c.endswith("_score") and c != "preprocessed" + c for c in cols_to_drop if not re.match(pattern, c) and not c.endswith("_score") and c != "preprocessed" ] matched_df = matched_df.drop(*cols_to_drop) logger.debug(f"Dropping columns: {cols_to_drop}") @@ -499,9 +495,7 @@ def fit_classifier( # keep both stages for re-adding later (also in case of do_training=False). if self.parameters.get("supervised_on", False): self.model.stages.pop(2) - aggregation_model = ( - self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None - ) + aggregation_model = self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None # remove any existing untrained model 'X', no longer needed. if isinstance(self.supervised_models, dict): self.supervised_models.pop("X", None) @@ -590,9 +584,7 @@ def add_supervised_model( # reinsert again below with new sklearn model included. if self.parameters.get("supervised_on", False): self.model.stages.pop(2) - aggregation_model = ( - self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None - ) + aggregation_model = self.model.stages.pop() if self.parameters.get("aggregation_layer", False) else None # add new supervised model to self.supervised_models # self.supervised_models contains all trained and untrained sklearn models From bc825bf1583080cf102fdec76a4a46a3123592d3 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 15:40:41 +0100 Subject: [PATCH 08/10] ran pre-commit --- emm/pipeline/spark_entity_matching.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 607bcbc..30fa9ab 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -47,7 +47,7 @@ from emm.supervised_model.base_supervised_model import train_model from emm.supervised_model.spark_supervised_model import SparkSupervisedLayerEstimator -if TYPE_CHECKING: +if TYPE_CHECKING: from collections.abc import Callable, Mapping from pyspark.ml import Pipeline, PipelineModel From 64e3ce84ce3e95075ea638334ff461c2f7c5f9a4 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 15:43:55 +0100 Subject: [PATCH 09/10] ran pre-commit --- emm/pipeline/spark_entity_matching.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/emm/pipeline/spark_entity_matching.py b/emm/pipeline/spark_entity_matching.py index 30fa9ab..0df6565 100644 --- a/emm/pipeline/spark_entity_matching.py +++ b/emm/pipeline/spark_entity_matching.py @@ -47,8 +47,9 @@ from emm.supervised_model.base_supervised_model import train_model from emm.supervised_model.spark_supervised_model import SparkSupervisedLayerEstimator -if TYPE_CHECKING: +if TYPE_CHECKING: from collections.abc import Callable, Mapping + from pyspark.ml import Pipeline, PipelineModel From 361aa3f68843a5114a7b971663ef5c7412c35506 Mon Sep 17 00:00:00 2001 From: chrispyl Date: Fri, 15 Nov 2024 16:01:59 +0100 Subject: [PATCH 10/10] removed branch from test triggers --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 99078d5..e5247e7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,7 +2,7 @@ name: Tests on: push: - branches: [ main, update_create_training_name_pairs] + branches: [ main ] pull_request: jobs: