diff --git a/nemo_curator/classifiers/aegis.py b/nemo_curator/classifiers/aegis.py index 1306903f..497fde57 100644 --- a/nemo_curator/classifiers/aegis.py +++ b/nemo_curator/classifiers/aegis.py @@ -94,7 +94,7 @@ def forward(self, batch): class AegisHFModel(HFModel): - def __init__(self, config: AegisConfig, max_mem_gb=None): + def __init__(self, config: AegisConfig, max_mem_gb: Optional[int] = None): self.config = config if max_mem_gb is None: max_mem_gb = _get_suggest_memory_for_classifier() @@ -109,7 +109,7 @@ def __init__(self, config: AegisConfig, max_mem_gb=None): seq_len_increment=1024, ) - def load_model(self, device="cuda"): + def load_model(self, device: str = "cuda"): model = AegisModel( self.config.pretrained_model_name_or_path, self.config.peft_model_name_or_path, @@ -171,7 +171,7 @@ def __init__( keep_raw_pred: bool = False, max_chars: int = 6000, device_type: str = "cuda", - max_mem_gb: int = None, + max_mem_gb: Optional[int] = None, ): """ Constructs the classifier @@ -270,7 +270,7 @@ def _postprocess_responses(self, df): df[self.pred_column] = cudf.Series(parsed_response) return df - def _run_classifier(self, dataset: DocumentDataset): + def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: print("Starting AEGIS classifier inference", flush=True) ddf = dataset.df hidden_meta = ddf._meta.copy() diff --git a/nemo_curator/classifiers/base.py b/nemo_curator/classifiers/base.py index 452bb9bb..38c4ae48 100644 --- a/nemo_curator/classifiers/base.py +++ b/nemo_curator/classifiers/base.py @@ -16,7 +16,7 @@ os.environ["RAPIDS_NO_INITIALIZE"] = "1" from abc import ABC, abstractmethod -from typing import List +from typing import List, Optional import torch import torch.nn as nn @@ -33,15 +33,15 @@ class DistributedDataClassifier(ABC): def __init__( self, - model, - labels, - filter_by, - batch_size, - out_dim, - pred_column, - max_chars, - device_type, - autocast, + model: str, + labels: List[str], + filter_by: Optional[List[str]], + batch_size: int, + out_dim: int, + pred_column: str, + max_chars: int, + device_type: str, + autocast: bool, ): self.model = model self.labels = labels @@ -53,7 +53,7 @@ def __init__( self.device_type = device_type self.autocast = autocast - def __call__(self, dataset: DocumentDataset): + def __call__(self, dataset: DocumentDataset) -> DocumentDataset: result_doc_dataset = self._run_classifier(dataset) if self.filter_by is not None: return self._filter_documents(result_doc_dataset) @@ -61,13 +61,13 @@ def __call__(self, dataset: DocumentDataset): return result_doc_dataset @abstractmethod - def _run_classifier(self): + def _run_classifier(self) -> DocumentDataset: pass def _filter_documents( self, dataset: DocumentDataset, - ): + ) -> DocumentDataset: df = dataset.df filter_by = self.filter_by @@ -106,7 +106,7 @@ def forward(self, batch): else: return self._forward(batch) - def set_autocast(self, autocast): + def set_autocast(self, autocast: bool): self.autocast = autocast @@ -117,6 +117,7 @@ def _run_classifier_helper( max_chars: int, batch_size: int, label_col: str, + text_field: str = "text", prob_col: str = None, ) -> "dask_cudf.DataFrame": @@ -124,7 +125,7 @@ def _run_classifier_helper( prob_internal_col = "_prob" # TODO: Make crossfit handle this cleanly pred_internal_col = "labels" - df["sliced_text"] = df["text"].str.slice(0, max_chars) + df["sliced_text"] = df[text_field].str.slice(0, max_chars) columns_to_keep_list = df.columns.to_list() columns_to_keep_list.remove("sliced_text") diff --git a/nemo_curator/classifiers/domain.py b/nemo_curator/classifiers/domain.py index f77bade7..843b35a2 100644 --- a/nemo_curator/classifiers/domain.py +++ b/nemo_curator/classifiers/domain.py @@ -13,6 +13,7 @@ # limitations under the License. import os from dataclasses import dataclass +from typing import List, Optional os.environ["RAPIDS_NO_INITIALIZE"] = "1" from crossfit.backend.torch.hf.model import HFModel @@ -31,14 +32,17 @@ @dataclass class DomainModelConfig: - model = "microsoft/deberta-v3-base" - fc_dropout = 0.2 - max_len = 512 + model: str = "microsoft/deberta-v3-base" + fc_dropout: float = 0.2 + max_len: int = 512 class DomainModel(HFModel): def __init__( - self, config: DomainModelConfig, autocast: bool = False, max_mem_gb=None + self, + config: DomainModelConfig, + autocast: bool = False, + max_mem_gb: Optional[int] = None, ): self.config = config self.autocast = autocast @@ -47,7 +51,7 @@ def __init__( super().__init__(self.config.model, max_mem_gb=max_mem_gb) - def load_model(self, device="cuda"): + def load_model(self, device: str = "cuda"): model = HFDeberta.from_pretrained(DOMAIN_IDENTIFIER) model.set_autocast(self.autocast) model = model.to(device) @@ -70,6 +74,7 @@ class DomainClassifier(DistributedDataClassifier): filter_by (list[str], optional): The classes to filter the dataset by. If None, all classes will be included. Defaults to None. batch_size (int): The number of samples per batch for inference. Defaults to 256. + text_field (str): The field in the dataset that should be classified. pred_column (str): The column name where predictions will be stored. Defaults to "domain_pred". prob_column (str, optional): The column name where prediction probabilities will be stored. Defaults to None. max_chars (int): The maximum number of characters in each document to consider for classification. Defaults to 2000. @@ -82,17 +87,19 @@ class DomainClassifier(DistributedDataClassifier): def __init__( self, - filter_by=None, - batch_size=256, - pred_column="domain_pred", - prob_column=None, - max_chars=2000, - device_type="cuda", - autocast=True, - max_mem_gb=None, + filter_by: Optional[List[str]] = None, + batch_size: int = 256, + text_field: str = "text", + pred_column: str = "domain_pred", + prob_column: Optional[str] = None, + max_chars: int = 2000, + device_type: str = "cuda", + autocast: bool = True, + max_mem_gb: Optional[int] = None, ): config = AutoConfig.from_pretrained(DOMAIN_IDENTIFIER) + self.text_field = text_field self.prob_column = prob_column self.labels = list(config.label2id.keys()) self.labels.sort(key=lambda x: config.label2id[x]) @@ -114,7 +121,7 @@ def __init__( autocast=autocast, ) - def _run_classifier(self, dataset: DocumentDataset): + def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: print("Starting domain classifier inference", flush=True) df = dataset.df df = _run_classifier_helper( @@ -124,6 +131,7 @@ def _run_classifier(self, dataset: DocumentDataset): max_chars=self.max_chars, batch_size=self.batch_size, label_col=self.pred_column, + text_field=self.text_field, prob_col=self.prob_column, ) return DocumentDataset(df) diff --git a/nemo_curator/classifiers/fineweb_edu.py b/nemo_curator/classifiers/fineweb_edu.py index 9547fc8f..75050b22 100644 --- a/nemo_curator/classifiers/fineweb_edu.py +++ b/nemo_curator/classifiers/fineweb_edu.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Optional os.environ["RAPIDS_NO_INITIALIZE"] = "1" import torch @@ -29,21 +30,26 @@ class FinewebEduModel(HFModel): - def __init__(self, path_or_name, max_mem_gb=None, autocast=False): + def __init__( + self, + path_or_name: str, + max_mem_gb: Optional[int] = None, + autocast: bool = False, + ): self.path_or_name = path_or_name self.autocast = autocast if max_mem_gb is None: max_mem_gb = _get_suggest_memory_for_classifier() super().__init__(path_or_name=path_or_name, max_mem_gb=max_mem_gb) - def load_model(self, device="cuda"): + def load_model(self, device: str = "cuda"): model = AutoModelForSequenceClassification.from_pretrained(self.path_or_name) model = model.to(device) model = self.configure_forward(model, self.autocast) return model @staticmethod - def configure_forward(model, autocast=True): + def configure_forward(model, autocast: bool = True): original_forward = model.forward def custom_forward(*args, **kwargs): @@ -83,14 +89,14 @@ class FineWebEduClassifier(DistributedDataClassifier): def __init__( self, - batch_size=256, + batch_size: int = 256, text_field: str = "text", - pred_column="fineweb-edu-score", + pred_column: str = "fineweb-edu-score", int_column="fineweb-edu-score-int", - max_chars=-1, - device_type="cuda", - autocast=True, - max_mem_gb=None, + max_chars: int = -1, + device_type: str = "cuda", + autocast: bool = True, + max_mem_gb: Optional[int] = None, ): model = FinewebEduModel( path_or_name=FINEWEB_EDU_IDENTIFIER, @@ -112,7 +118,7 @@ def __init__( out_dim=1, ) - def _run_classifier(self, dataset: DocumentDataset): + def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: print("Starting Fineweb EDU classifier inference", flush=True) ddf = dataset.df diff --git a/nemo_curator/classifiers/quality.py b/nemo_curator/classifiers/quality.py index c0f2bf77..90db1de5 100644 --- a/nemo_curator/classifiers/quality.py +++ b/nemo_curator/classifiers/quality.py @@ -13,6 +13,7 @@ # limitations under the License. import os from dataclasses import dataclass +from typing import List, Optional os.environ["RAPIDS_NO_INITIALIZE"] = "1" from crossfit.backend.torch.hf.model import HFModel @@ -31,14 +32,17 @@ @dataclass class QualityModelConfig: - model = "microsoft/deberta-v3-base" - fc_dropout = 0.2 - max_len = 1024 + model: str = "microsoft/deberta-v3-base" + fc_dropout: float = 0.2 + max_len: int = 1024 class QualityModel(HFModel): def __init__( - self, config: QualityModelConfig, autocast: bool = False, max_mem_gb: int = None + self, + config: QualityModelConfig, + autocast: bool = False, + max_mem_gb: Optional[int] = None, ): self.config = config self.autocast = autocast @@ -46,7 +50,7 @@ def __init__( max_mem_gb = _get_suggest_memory_for_classifier() super().__init__(self.config.model, max_mem_gb=max_mem_gb) - def load_model(self, device="cuda"): + def load_model(self, device: str = "cuda"): model = HFDeberta.from_pretrained(QUALITY_IDENTIFIER) model.set_autocast(self.autocast) model = model.to(device) @@ -68,6 +72,7 @@ class QualityClassifier(DistributedDataClassifier): Attributes: filter_by (list[str], optional): The classes to filter the dataset by. If None, all classes will be included. Defaults to None. batch_size (int): The number of samples per batch for inference. Defaults to 256. + text_field (str): The field in the dataset that should be classified. pred_column (str): The column name where predictions will be stored. Defaults to "quality_pred". prob_column (str): The column name where prediction probabilities will be stored. Defaults to "quality_prob". max_chars (int): The maximum number of characters in each document to consider for classification. Defaults to 6000. @@ -79,17 +84,19 @@ class QualityClassifier(DistributedDataClassifier): def __init__( self, - filter_by=None, - batch_size=256, - pred_column="quality_pred", - prob_column="quality_prob", - max_chars=6000, - device_type="cuda", - autocast=True, - max_mem_gb=None, + filter_by: Optional[List[str]] = None, + batch_size: int = 256, + text_field: str = "text", + pred_column: str = "quality_pred", + prob_column: str = "quality_prob", + max_chars: int = 6000, + device_type: str = "cuda", + autocast: bool = True, + max_mem_gb: Optional[int] = None, ): config = AutoConfig.from_pretrained(QUALITY_IDENTIFIER) + self.text_field = text_field self.prob_column = prob_column self.labels = list(config.label2id.keys()) self.labels.sort(key=lambda x: config.label2id[x]) @@ -111,7 +118,7 @@ def __init__( autocast=autocast, ) - def _run_classifier(self, dataset: DocumentDataset): + def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: print("Starting Quality classifier inference", flush=True) df = dataset.df df = _run_classifier_helper( @@ -121,6 +128,7 @@ def _run_classifier(self, dataset: DocumentDataset): max_chars=self.max_chars, batch_size=self.batch_size, label_col=self.pred_column, + text_field=self.text_field, prob_col=self.prob_column, ) return DocumentDataset(df) diff --git a/nemo_curator/scripts/classifiers/aegis_classifier_inference.py b/nemo_curator/scripts/classifiers/aegis_classifier_inference.py index 308dd534..ba8b8391 100644 --- a/nemo_curator/scripts/classifiers/aegis_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/aegis_classifier_inference.py @@ -62,6 +62,7 @@ def main(): aegis_classifier = AegisClassifier( aegis_variant=args.aegis_variant, token=args.token, + text_field=args.input_text_field, max_chars=args.max_chars, batch_size=args.batch_size, max_mem_gb=args.max_mem_gb_classifier, diff --git a/nemo_curator/scripts/classifiers/domain_classifier_inference.py b/nemo_curator/scripts/classifiers/domain_classifier_inference.py index 882e2c8b..8557ecf9 100644 --- a/nemo_curator/scripts/classifiers/domain_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/domain_classifier_inference.py @@ -59,6 +59,7 @@ def main(): add_filename = True domain_classifier = DomainClassifier( + text_field=args.input_text_field, max_chars=args.max_chars, batch_size=args.batch_size, autocast=args.autocast, diff --git a/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py b/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py index 3e704848..8552215d 100644 --- a/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/fineweb_edu_classifier_inference.py @@ -60,6 +60,7 @@ def main(): add_filename = True fineweb_edu_classifier = FineWebEduClassifier( + text_field=args.input_text_field, batch_size=args.batch_size, autocast=args.autocast, max_chars=args.max_chars, diff --git a/nemo_curator/scripts/classifiers/quality_classifier_inference.py b/nemo_curator/scripts/classifiers/quality_classifier_inference.py index 8fc1717a..2606898f 100644 --- a/nemo_curator/scripts/classifiers/quality_classifier_inference.py +++ b/nemo_curator/scripts/classifiers/quality_classifier_inference.py @@ -60,6 +60,7 @@ def main(): add_filename = True classifier = QualityClassifier( + text_field=args.input_text_field, max_chars=args.max_chars, batch_size=args.batch_size, autocast=args.autocast,