diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index b5b2e79d9..f84295c68 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -2,6 +2,7 @@ import pandas as pd import pyarrow as pa +from loguru import logger from data_juicer.utils.mm_utils import size_to_bytes from data_juicer.utils.registry import Registry @@ -9,6 +10,68 @@ OPERATORS = Registry('Operators') +def convert_list_dict_to_dict_list(samples): + # reconstruct samples from "list of dicts" to "dict of lists" + keys = samples[0].keys() + res_samples = {} + for key in keys: + res_samples[key] = [s[key] for s in samples] + return res_samples + + +def convert_dict_list_to_list_dict(samples): + # reconstruct samples from "dict of lists" to "list of dicts" + reconstructed_samples = [] + keys = list(samples.keys()) + # take any key, since they should be of same length + for i in range(len(samples[keys[0]])): + reconstructed_samples.append({key: samples[key][i] for key in samples}) + return reconstructed_samples + + +def catch_exception_mapper_process(method): + """ + For mapper sample level fault torelerance. + """ + + def wrapper(self, *args, **kwargs): + try: + return method(self, *args, **kwargs) + except Exception as e: + samples = args[0] + logger.error( + f'An error occurred in mapper operation when processing' + f'sample {samples}, {type(e)}: {e}') + return {} + + return wrapper + + +def catch_exception_mapper_process_single(method): + """ + For mapper process_single, + turn it into batch_size = 1, and enable fault torelerance. + """ + + def wrapper(self, *args, **kwargs): + try: + args = list(args) + samples = args[0] + sample = convert_dict_list_to_list_dict(samples)[0] + args[0] = sample + args = tuple(args) + res_sample = method(self, *args, **kwargs) + return convert_list_dict_to_dict_list([res_sample]) + except Exception as e: + samples = args[0] + logger.error( + f'An error occurred in mapper operation when processing' + f'sample {samples}, {type(e)}: {e}') + return {} + + return wrapper + + class OP: def __init__(self, *args, **kwargs): @@ -87,6 +150,7 @@ def ray_batch_mapper_wrapper(samples, fn): return pa.Table.from_pandas(res) +# @mapper_fault_tolerance class Mapper(OP): def __init__(self, *args, **kwargs): @@ -105,7 +169,9 @@ def __init__(self, *args, **kwargs): super(Mapper, self).__init__(*args, **kwargs) # In default, it's a normal OP instead of batched OP - self._batched_op = kwargs.get('batched_op', False) + # self._batched_op = kwargs.get('batched_op', False) + # Aftet the refactor, we want all ops to be batched OP by default + self._batched_op = kwargs.get('batched_op', True) def process(self, sample): """ @@ -134,6 +200,7 @@ def __call__(self, sample): return self.process(sample) +# @filter_fault_tolerance class Filter(OP): def __init__(self, *args, **kwargs): diff --git a/data_juicer/ops/mapper/audio_ffmpeg_wrapped_mapper.py b/data_juicer/ops/mapper/audio_ffmpeg_wrapped_mapper.py index 730098e39..98d79c49f 100644 --- a/data_juicer/ops/mapper/audio_ffmpeg_wrapped_mapper.py +++ b/data_juicer/ops/mapper/audio_ffmpeg_wrapped_mapper.py @@ -4,7 +4,7 @@ from data_juicer.utils.file_utils import transfer_filename from data_juicer.utils.logger_utils import HiddenPrints -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'audio_ffmpeg_wrapped_mapper' @@ -47,8 +47,8 @@ def __init__( self.capture_stderr = capture_stderr self.overwrite_output = overwrite_output - def process(self, sample): - # there is no audio in this sample + @catch_exception_mapper_process_single + def process(self, sample): # there is no audio in this sample if self.audio_key not in sample or not sample[self.audio_key]: return sample diff --git a/data_juicer/ops/mapper/clean_copyright_mapper.py b/data_juicer/ops/mapper/clean_copyright_mapper.py index dabb0cd40..1bb9fa943 100644 --- a/data_juicer/ops/mapper/clean_copyright_mapper.py +++ b/data_juicer/ops/mapper/clean_copyright_mapper.py @@ -4,7 +4,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('clean_copyright_mapper') @@ -23,8 +23,8 @@ def __init__(self, *args, **kwargs): self.pat = re.compile('/\\*[^*]*\\*+(?:[^/*][^*]*\\*+)*/') self.cpat = re.compile('copyright', re.IGNORECASE) + @catch_exception_mapper_process_single def process(self, sample): - r = self.pat.search(sample[self.text_key]) if r: # found one, now see if it contains "copyright", if so strip it diff --git a/data_juicer/ops/mapper/clean_email_mapper.py b/data_juicer/ops/mapper/clean_email_mapper.py index 9708363e5..461c417c9 100644 --- a/data_juicer/ops/mapper/clean_email_mapper.py +++ b/data_juicer/ops/mapper/clean_email_mapper.py @@ -1,6 +1,6 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('clean_email_mapper') @@ -28,8 +28,8 @@ def __init__(self, pattern: str = None, repl: str = '', *args, **kwargs): self.repl = repl + @catch_exception_mapper_process_single def process(self, sample): - if not re.search(self.pattern, sample[self.text_key], flags=re.DOTALL): return sample diff --git a/data_juicer/ops/mapper/clean_html_mapper.py b/data_juicer/ops/mapper/clean_html_mapper.py index 5c2c30c57..dd437e27d 100644 --- a/data_juicer/ops/mapper/clean_html_mapper.py +++ b/data_juicer/ops/mapper/clean_html_mapper.py @@ -4,7 +4,7 @@ from data_juicer.utils.availability_utils import AvailabilityChecking -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'clean_html_mapper' @@ -25,6 +25,7 @@ def __init__(self, *args, **kwargs): """ super().__init__(*args, **kwargs) + @catch_exception_mapper_process_single def process(self, sample): def _clean_html(raw_html): diff --git a/data_juicer/ops/mapper/clean_ip_mapper.py b/data_juicer/ops/mapper/clean_ip_mapper.py index 607aeb585..6f732ad86 100644 --- a/data_juicer/ops/mapper/clean_ip_mapper.py +++ b/data_juicer/ops/mapper/clean_ip_mapper.py @@ -1,6 +1,6 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('clean_ip_mapper') @@ -32,8 +32,8 @@ def __init__(self, pattern: str = None, repl: str = '', *args, **kwargs): self.pattern = pattern[2:-1] self.repl = repl + @catch_exception_mapper_process_single def process(self, sample): - if not re.search(self.pattern, sample[self.text_key], flags=re.DOTALL): return sample diff --git a/data_juicer/ops/mapper/clean_links_mapper.py b/data_juicer/ops/mapper/clean_links_mapper.py index bcd90d524..83767f3bb 100644 --- a/data_juicer/ops/mapper/clean_links_mapper.py +++ b/data_juicer/ops/mapper/clean_links_mapper.py @@ -3,7 +3,7 @@ # -------------------------------------------------------- import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('clean_links_mapper') @@ -38,8 +38,8 @@ def __init__(self, pattern: str = None, repl: str = '', *args, **kwargs): self.pattern = pattern[2:-1] self.repl = repl + @catch_exception_mapper_process_single def process(self, sample): - if not re.search(self.pattern, sample[self.text_key], flags=re.DOTALL): return sample diff --git a/data_juicer/ops/mapper/expand_macro_mapper.py b/data_juicer/ops/mapper/expand_macro_mapper.py index 2f5d7fe83..afe2de5d2 100644 --- a/data_juicer/ops/mapper/expand_macro_mapper.py +++ b/data_juicer/ops/mapper/expand_macro_mapper.py @@ -4,7 +4,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('expand_macro_mapper') @@ -55,6 +55,7 @@ def _build_non_arg_macros_dict(self, file_content): macros[macro_name] = macro_val return macros + @catch_exception_mapper_process_single def process(self, sample): non_arg_macros = self._build_non_arg_macros_dict(sample[self.text_key]) diff --git a/data_juicer/ops/mapper/fix_unicode_mapper.py b/data_juicer/ops/mapper/fix_unicode_mapper.py index a7d06da3a..2c257869d 100644 --- a/data_juicer/ops/mapper/fix_unicode_mapper.py +++ b/data_juicer/ops/mapper/fix_unicode_mapper.py @@ -1,6 +1,6 @@ from data_juicer.utils.availability_utils import AvailabilityChecking -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'fix_unicode_mapper' @@ -33,6 +33,7 @@ def __init__(self, normalization: str = None, *args, **kwargs): 'supported. Can only be one of ' '["NFC", "NFKC", "NFD", "NFKD"]') + @catch_exception_mapper_process_single def process(self, sample): sample[self.text_key] = ftfy.fix_text(sample[self.text_key], normalization=self.normalization) diff --git a/data_juicer/ops/mapper/image_blur_mapper.py b/data_juicer/ops/mapper/image_blur_mapper.py index 71bb03f16..9386869e7 100644 --- a/data_juicer/ops/mapper/image_blur_mapper.py +++ b/data_juicer/ops/mapper/image_blur_mapper.py @@ -6,7 +6,7 @@ from data_juicer.utils.file_utils import transfer_filename from data_juicer.utils.mm_utils import load_data_with_context, load_image -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_IMAGES OP_NAME = 'image_blur_mapper' @@ -53,6 +53,7 @@ def __init__(self, else: self.blur = ImageFilter.GaussianBlur(radius) + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no image in this sample if self.image_key not in sample or not sample[self.image_key]: diff --git a/data_juicer/ops/mapper/image_captioning_from_gpt4v_mapper.py b/data_juicer/ops/mapper/image_captioning_from_gpt4v_mapper.py index 8b58f1e3a..979f8d909 100644 --- a/data_juicer/ops/mapper/image_captioning_from_gpt4v_mapper.py +++ b/data_juicer/ops/mapper/image_captioning_from_gpt4v_mapper.py @@ -10,7 +10,7 @@ remove_non_special_tokens, remove_special_tokens) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_IMAGES SYSTEM_PROMPTS = { @@ -244,6 +244,7 @@ def _process_single_sample(self, sample): return [generated_sample] + @catch_exception_mapper_process def process(self, samples): # reconstruct samples from "dict of lists" to "list of dicts" reconstructed_samples = [] diff --git a/data_juicer/ops/mapper/image_captioning_mapper.py b/data_juicer/ops/mapper/image_captioning_mapper.py index 5a678ad07..071119760 100644 --- a/data_juicer/ops/mapper/image_captioning_mapper.py +++ b/data_juicer/ops/mapper/image_captioning_mapper.py @@ -13,7 +13,7 @@ remove_special_tokens) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_IMAGES OP_NAME = 'image_captioning_mapper' @@ -272,6 +272,7 @@ def _reduce_captions_per_image(self, chunk, generated_text_candidates_single_chunk[max_index]) return new_generated_text_per_chunk + @catch_exception_mapper_process def process(self, samples, rank=None): """ Note: diff --git a/data_juicer/ops/mapper/image_diffusion_mapper.py b/data_juicer/ops/mapper/image_diffusion_mapper.py index f7fdab0c3..e446d68df 100644 --- a/data_juicer/ops/mapper/image_diffusion_mapper.py +++ b/data_juicer/ops/mapper/image_diffusion_mapper.py @@ -10,7 +10,7 @@ load_image, remove_special_tokens) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_IMAGES OP_NAME = 'image_diffusion_mapper' @@ -209,6 +209,7 @@ def _process_single_sample(self, ori_sample, rank=None, context=False): return generated_samples + @catch_exception_mapper_process def process(self, samples, rank=None, context=False): """ Note: diff --git a/data_juicer/ops/mapper/image_face_blur_mapper.py b/data_juicer/ops/mapper/image_face_blur_mapper.py index 8ba01b61b..acf7f81d4 100644 --- a/data_juicer/ops/mapper/image_face_blur_mapper.py +++ b/data_juicer/ops/mapper/image_face_blur_mapper.py @@ -6,7 +6,7 @@ from data_juicer.utils.mm_utils import (load_data_with_context, load_image, pil_to_opencv) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_IMAGES OP_NAME = 'image_face_blur_mapper' @@ -66,6 +66,7 @@ def __init__(self, # Initialize face detector self.detector = dlib.get_frontal_face_detector() + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no image in this sample if self.image_key not in sample or not sample[self.image_key]: diff --git a/data_juicer/ops/mapper/nlpaug_en_mapper.py b/data_juicer/ops/mapper/nlpaug_en_mapper.py index a721cf2b3..c32c1e742 100644 --- a/data_juicer/ops/mapper/nlpaug_en_mapper.py +++ b/data_juicer/ops/mapper/nlpaug_en_mapper.py @@ -4,7 +4,7 @@ from data_juicer.utils.availability_utils import AvailabilityChecking -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process OP_NAME = 'nlpaug_en_mapper' @@ -122,6 +122,7 @@ def __init__(self, else: self.aug = aug_pipeline + @catch_exception_mapper_process def process(self, samples): # no augmentation methods are opened if len(self.aug) == 0: diff --git a/data_juicer/ops/mapper/nlpcda_zh_mapper.py b/data_juicer/ops/mapper/nlpcda_zh_mapper.py index 262d90782..a64860286 100644 --- a/data_juicer/ops/mapper/nlpcda_zh_mapper.py +++ b/data_juicer/ops/mapper/nlpcda_zh_mapper.py @@ -5,7 +5,7 @@ from data_juicer.utils.availability_utils import AvailabilityChecking from data_juicer.utils.logger_utils import HiddenPrints -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process OP_NAME = 'nlpcda_zh_mapper' @@ -128,6 +128,7 @@ def __init__(self, self.aug_pipeline.append( nlpcda.EquivalentChar(create_num=create_num)) + @catch_exception_mapper_process def process(self, samples): # no augmentation methods are opened if len(self.aug_pipeline) == 0: diff --git a/data_juicer/ops/mapper/punctuation_normalization_mapper.py b/data_juicer/ops/mapper/punctuation_normalization_mapper.py index b6640e9eb..475219d6b 100644 --- a/data_juicer/ops/mapper/punctuation_normalization_mapper.py +++ b/data_juicer/ops/mapper/punctuation_normalization_mapper.py @@ -2,7 +2,7 @@ # https://github.com/bigscience-workshop/data-preparation # -------------------------------------------------------- -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('punctuation_normalization_mapper') @@ -55,6 +55,7 @@ def __init__(self, *args, **kwargs): '►': '-', } + @catch_exception_mapper_process_single def process(self, sample): sample[self.text_key] = ''.join([ self.punctuation_unicode.get(c, c) for c in sample[self.text_key] diff --git a/data_juicer/ops/mapper/remove_bibliography_mapper.py b/data_juicer/ops/mapper/remove_bibliography_mapper.py index 2ce852d66..4a32c4ea6 100644 --- a/data_juicer/ops/mapper/remove_bibliography_mapper.py +++ b/data_juicer/ops/mapper/remove_bibliography_mapper.py @@ -4,7 +4,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('remove_bibliography_mapper') @@ -27,6 +27,7 @@ def __init__(self, *args, **kwargs): self.pattern += r'\\bibliography\{.*\}' self.pattern += r').*$' + @catch_exception_mapper_process_single def process(self, sample): sample[self.text_key] = re.sub(pattern=self.pattern, repl=r'', diff --git a/data_juicer/ops/mapper/remove_comments_mapper.py b/data_juicer/ops/mapper/remove_comments_mapper.py index c5f083c14..8986efa53 100644 --- a/data_juicer/ops/mapper/remove_comments_mapper.py +++ b/data_juicer/ops/mapper/remove_comments_mapper.py @@ -6,7 +6,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('remove_comments_mapper') @@ -37,6 +37,7 @@ def __init__(self, self.inline = inline self.multiline = multiline + @catch_exception_mapper_process_single def process(self, sample): # TODO: remove different comments by sample type diff --git a/data_juicer/ops/mapper/remove_header_mapper.py b/data_juicer/ops/mapper/remove_header_mapper.py index 8371d2f99..fda446bdd 100644 --- a/data_juicer/ops/mapper/remove_header_mapper.py +++ b/data_juicer/ops/mapper/remove_header_mapper.py @@ -4,7 +4,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('remove_header_mapper') @@ -34,8 +34,8 @@ def __init__(self, drop_no_head: bool = True, *args, **kwargs): self.drop_no_head = drop_no_head + @catch_exception_mapper_process_single def process(self, sample): - if not re.search(self.pattern, sample[self.text_key], flags=re.DOTALL): if self.drop_no_head: sample[self.text_key] = '' diff --git a/data_juicer/ops/mapper/remove_long_words_mapper.py b/data_juicer/ops/mapper/remove_long_words_mapper.py index 92ac8fe2d..51a00111d 100644 --- a/data_juicer/ops/mapper/remove_long_words_mapper.py +++ b/data_juicer/ops/mapper/remove_long_words_mapper.py @@ -6,7 +6,7 @@ from jsonargparse.typing import PositiveInt -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..common import (SPECIAL_CHARACTERS, merge_on_whitespace_tab_newline, split_on_newline_tab_whitespace, strip) @@ -43,8 +43,8 @@ def should_keep_long_word(self, word): else: return False + @catch_exception_mapper_process_single def process(self, sample): - sentences = split_on_newline_tab_whitespace(sample[self.text_key]) sentences = [[[ word for word in subsentence if self.should_keep_long_word(word) diff --git a/data_juicer/ops/mapper/remove_non_chinese_character_mapper.py b/data_juicer/ops/mapper/remove_non_chinese_character_mapper.py index 3e6cd494d..74111ce86 100644 --- a/data_juicer/ops/mapper/remove_non_chinese_character_mapper.py +++ b/data_juicer/ops/mapper/remove_non_chinese_character_mapper.py @@ -1,6 +1,6 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('remove_non_chinese_character_mapper') @@ -33,8 +33,8 @@ def __init__(self, else: self.pattern += u']' + @catch_exception_mapper_process_single def process(self, sample): - if not re.search(self.pattern, sample[self.text_key], flags=re.DOTALL): return sample diff --git a/data_juicer/ops/mapper/remove_repeat_sentences_mapper.py b/data_juicer/ops/mapper/remove_repeat_sentences_mapper.py index a1069d24d..388e99d4d 100644 --- a/data_juicer/ops/mapper/remove_repeat_sentences_mapper.py +++ b/data_juicer/ops/mapper/remove_repeat_sentences_mapper.py @@ -1,6 +1,6 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single def split_sentence(text): @@ -43,8 +43,8 @@ def __init__(self, self.remove_regex = re.compile(r'[^a-zA-Z0-9\u4e00-\u9fa5\n\t ]' ) if ignore_special_character else None + @catch_exception_mapper_process_single def process(self, sample): - lines = [e for e in sample[self.text_key].split('\n')] new_lines = [] hash_set = set([]) diff --git a/data_juicer/ops/mapper/remove_specific_chars_mapper.py b/data_juicer/ops/mapper/remove_specific_chars_mapper.py index 99e15afef..ac2899a25 100644 --- a/data_juicer/ops/mapper/remove_specific_chars_mapper.py +++ b/data_juicer/ops/mapper/remove_specific_chars_mapper.py @@ -2,7 +2,7 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('remove_specific_chars_mapper') @@ -28,8 +28,8 @@ def __init__(self, else: self.pattern = None + @catch_exception_mapper_process_single def process(self, sample): - if self.pattern is None: return sample diff --git a/data_juicer/ops/mapper/remove_table_text_mapper.py b/data_juicer/ops/mapper/remove_table_text_mapper.py index 4f6dfb233..4e2dffb51 100644 --- a/data_juicer/ops/mapper/remove_table_text_mapper.py +++ b/data_juicer/ops/mapper/remove_table_text_mapper.py @@ -1,7 +1,7 @@ import regex as re from jsonargparse.typing import restricted_number_type -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from_2_to_20 = restricted_number_type('from_2_to_20', int, [('>=', 2), ('<=', 20)]) @@ -34,8 +34,8 @@ def __init__(self, self.max_col = max_col self.pattern = r'(?<=\n)((\S+?)([ |\t](\S+?)){%d}\n+){2,}' + @catch_exception_mapper_process_single def process(self, sample): - text = sample[self.text_key] for i in range(self.min_col - 1, self.max_col): pattern = re.compile(self.pattern % i) diff --git a/data_juicer/ops/mapper/remove_words_with_incorrect_substrings_mapper.py b/data_juicer/ops/mapper/remove_words_with_incorrect_substrings_mapper.py index 605a75e3b..5c3d7a2d9 100644 --- a/data_juicer/ops/mapper/remove_words_with_incorrect_substrings_mapper.py +++ b/data_juicer/ops/mapper/remove_words_with_incorrect_substrings_mapper.py @@ -3,7 +3,7 @@ from data_juicer.utils.availability_utils import AvailabilityChecking from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..common import (SPECIAL_CHARACTERS, get_words_from_document, merge_on_whitespace_tab_newline, split_on_newline_tab_whitespace, strip) @@ -48,6 +48,7 @@ def should_keep_word_with_incorrect_substrings(self, word, substrings): should_keep = all([(i_substr not in word) for i_substr in substrings]) return should_keep + @catch_exception_mapper_process_single def process(self, sample): if self.tokenization: tokenizer = get_model(self.model_key) diff --git a/data_juicer/ops/mapper/replace_content_mapper.py b/data_juicer/ops/mapper/replace_content_mapper.py index 703405001..ed47acad8 100644 --- a/data_juicer/ops/mapper/replace_content_mapper.py +++ b/data_juicer/ops/mapper/replace_content_mapper.py @@ -1,6 +1,6 @@ import regex as re -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single @OPERATORS.register_module('replace_content_mapper') @@ -26,8 +26,8 @@ def __init__(self, pattern: str = None, repl: str = '', *args, **kwargs): self.pattern = pattern[2:-1] self.repl = repl + @catch_exception_mapper_process_single def process(self, sample): - if self.pattern is None: return sample diff --git a/data_juicer/ops/mapper/sentence_split_mapper.py b/data_juicer/ops/mapper/sentence_split_mapper.py index 522c01300..0e928c71f 100644 --- a/data_juicer/ops/mapper/sentence_split_mapper.py +++ b/data_juicer/ops/mapper/sentence_split_mapper.py @@ -1,7 +1,7 @@ from data_juicer.utils.availability_utils import AvailabilityChecking from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..common import get_sentences_from_document OP_NAME = 'sentence_split_mapper' @@ -26,8 +26,8 @@ def __init__(self, lang: str = 'en', *args, **kwargs): self.lang = lang self.model_key = prepare_model(model_type='nltk', lang=lang) + @catch_exception_mapper_process_single def process(self, sample): - nltk_model = get_model(self.model_key) sample[self.text_key] = get_sentences_from_document( sample[self.text_key], diff --git a/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py b/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py index 38523b4b5..d6d079d23 100644 --- a/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_audio_mapper.py @@ -7,7 +7,7 @@ from data_juicer.utils.mm_utils import SpecialTokens, extract_audio_from_video from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process NAME = 'video_captioning_from_audio_mapper' CHECK_PKGS = [ @@ -122,6 +122,7 @@ def _process_single_sample(self, sample, rank=None): captioned_sample[self.video_key] = left_video_keys return [captioned_sample] + @catch_exception_mapper_process def process(self, samples, rank=None): # reconstruct samples from "dict of lists" to "list of dicts" reconstructed_samples = [] diff --git a/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py b/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py index 7ef01a098..3f08e0aeb 100644 --- a/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_frames_mapper.py @@ -16,7 +16,7 @@ remove_special_tokens) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_captioning_from_frames_mapper' @@ -329,6 +329,7 @@ def _reduce_captions(self, chunk, generated_text_candidates_single_chunk): generated_text_candidates_single_chunk[max_index]) return generated_text_per_chunk + @catch_exception_mapper_process def process(self, samples, rank=None, context=False): """ :param samples: diff --git a/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py b/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py index d03dc6482..dcd1bc468 100644 --- a/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_summarizer_mapper.py @@ -8,7 +8,7 @@ from data_juicer.utils.mm_utils import SpecialTokens, remove_special_tokens from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process NAME = 'video_captioning_from_summarizer_mapper' CHECK_PKGS = [ @@ -249,6 +249,7 @@ def _process_single_sample(self, sample, rank=None): captioned_sample[self.text_key] = captioned_texts return [captioned_sample] + @catch_exception_mapper_process def process(self, samples, rank=None): # reconstruct samples from "dict of lists" to "list of dicts" reconstructed_samples = [] diff --git a/data_juicer/ops/mapper/video_captioning_from_video_mapper.py b/data_juicer/ops/mapper/video_captioning_from_video_mapper.py index 9dc5e34e6..17fa57818 100644 --- a/data_juicer/ops/mapper/video_captioning_from_video_mapper.py +++ b/data_juicer/ops/mapper/video_captioning_from_video_mapper.py @@ -16,7 +16,7 @@ remove_special_tokens) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_captioning_from_video_mapper' @@ -336,6 +336,7 @@ def _reduce_captions(self, chunk, generated_text_candidates_single_chunk): generated_text_candidates_single_chunk[max_index]) return generated_text_per_chunk + @catch_exception_mapper_process def process(self, samples, rank=None, context=False): """ :param samples: diff --git a/data_juicer/ops/mapper/video_face_blur_mapper.py b/data_juicer/ops/mapper/video_face_blur_mapper.py index a3abb233e..4a467cc3f 100644 --- a/data_juicer/ops/mapper/video_face_blur_mapper.py +++ b/data_juicer/ops/mapper/video_face_blur_mapper.py @@ -5,7 +5,7 @@ from data_juicer.utils.mm_utils import (load_data_with_context, load_video, pil_to_opencv, process_each_frame) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_face_blur_mapper' @@ -65,6 +65,7 @@ def __init__(self, # Initialize face detector self.detector = dlib.get_frontal_face_detector() + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: diff --git a/data_juicer/ops/mapper/video_ffmpeg_wrapped_mapper.py b/data_juicer/ops/mapper/video_ffmpeg_wrapped_mapper.py index 39a04da0a..0ec320c1c 100644 --- a/data_juicer/ops/mapper/video_ffmpeg_wrapped_mapper.py +++ b/data_juicer/ops/mapper/video_ffmpeg_wrapped_mapper.py @@ -4,7 +4,7 @@ from data_juicer.utils.file_utils import transfer_filename from data_juicer.utils.logger_utils import HiddenPrints -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'video_ffmpeg_wrapped_mapper' @@ -47,8 +47,8 @@ def __init__( self.capture_stderr = capture_stderr self.overwrite_output = overwrite_output - def process(self, sample): - # there is no video in this sample + @catch_exception_mapper_process_single + def process(self, sample): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: return sample diff --git a/data_juicer/ops/mapper/video_remove_watermark_mapper.py b/data_juicer/ops/mapper/video_remove_watermark_mapper.py index 53e755936..33431e7af 100644 --- a/data_juicer/ops/mapper/video_remove_watermark_mapper.py +++ b/data_juicer/ops/mapper/video_remove_watermark_mapper.py @@ -12,7 +12,7 @@ parse_string_to_roi, process_each_frame) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_remove_watermark_mapper' @@ -199,6 +199,7 @@ def _clean_watermark(self, frame, watermark_mask): new_np_frame = cv.inpaint(np_frame, watermark_mask, 3, cv.INPAINT_NS) return av.VideoFrame.from_ndarray(new_np_frame, format='bgr24') + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: diff --git a/data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py b/data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py index fa1de22d6..948db82bd 100644 --- a/data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py +++ b/data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py @@ -7,7 +7,7 @@ from data_juicer.utils.logger_utils import HiddenPrints from data_juicer.utils.mm_utils import load_video -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'video_resize_aspect_ratio_mapper' @@ -99,8 +99,9 @@ def __init__( self.max_ratio = Fraction(str(max_ratio).replace(':', '/')) self.strategy = strategy - def process(self, sample): - # there is no video in this sample + # turned into batched processing and with catch + @catch_exception_mapper_process_single + def process(self, sample): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: return sample diff --git a/data_juicer/ops/mapper/video_resize_resolution_mapper.py b/data_juicer/ops/mapper/video_resize_resolution_mapper.py index 5d026f8ae..c82cfd7e8 100644 --- a/data_juicer/ops/mapper/video_resize_resolution_mapper.py +++ b/data_juicer/ops/mapper/video_resize_resolution_mapper.py @@ -9,7 +9,7 @@ from data_juicer.utils.logger_utils import HiddenPrints from data_juicer.utils.mm_utils import load_video -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_resize_resolution_mapper' @@ -83,6 +83,7 @@ def __init__(self, self.force_original_aspect_ratio = force_original_aspect_ratio self.force_divisible_by = force_divisible_by + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: diff --git a/data_juicer/ops/mapper/video_split_by_duration_mapper.py b/data_juicer/ops/mapper/video_split_by_duration_mapper.py index dbd30b5a4..a57f9d55e 100644 --- a/data_juicer/ops/mapper/video_split_by_duration_mapper.py +++ b/data_juicer/ops/mapper/video_split_by_duration_mapper.py @@ -8,7 +8,7 @@ from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds, get_video_duration, load_video) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_VIDEOS @@ -132,6 +132,7 @@ def _process_single_sample(self, sample): split_sample[self.video_key] = split_video_keys return [split_sample] + @catch_exception_mapper_process def process(self, samples): # reconstruct samples from "dict of lists" to "list of dicts" reconstructed_samples = [] diff --git a/data_juicer/ops/mapper/video_split_by_key_frame_mapper.py b/data_juicer/ops/mapper/video_split_by_key_frame_mapper.py index 7f6eb3dca..916e35a21 100644 --- a/data_juicer/ops/mapper/video_split_by_key_frame_mapper.py +++ b/data_juicer/ops/mapper/video_split_by_key_frame_mapper.py @@ -6,7 +6,7 @@ from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds, get_key_frame_seconds, load_video) -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process from ..op_fusion import LOADED_VIDEOS @@ -114,6 +114,7 @@ def _process_single_sample(self, sample): split_sample[self.video_key] = split_video_keys return [split_sample] + @catch_exception_mapper_process def process(self, samples): # reconstruct samples from "dict of lists" to "list of dicts" reconstructed_samples = [] diff --git a/data_juicer/ops/mapper/video_split_by_scene_mapper.py b/data_juicer/ops/mapper/video_split_by_scene_mapper.py index 14ce456b6..f89dd02fa 100644 --- a/data_juicer/ops/mapper/video_split_by_scene_mapper.py +++ b/data_juicer/ops/mapper/video_split_by_scene_mapper.py @@ -9,7 +9,7 @@ transfer_filename) from data_juicer.utils.mm_utils import SpecialTokens -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'video_split_by_scene_mapper' @@ -81,6 +81,7 @@ def __init__(self, for key in avaliable_kwargs if key in kwargs } + @catch_exception_mapper_process_single def process(self, sample, context=False): # there is no video in this sample if self.video_key not in sample or not sample[self.video_key]: diff --git a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py index 6a9636160..8f6469a8e 100644 --- a/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_audio_mapper.py @@ -5,7 +5,7 @@ from data_juicer.utils.mm_utils import extract_audio_from_video from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single OP_NAME = 'video_tagging_from_audio_mapper' @@ -40,6 +40,7 @@ def __init__(self, self._model_sampling_rate = 16000 self._no_audio_label = 'EMPTY' + @catch_exception_mapper_process_single def process(self, sample, rank=None): # check if it's generated already if Fields.video_audio_tags in sample: diff --git a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py index 0c69461a3..f188ca70c 100644 --- a/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py +++ b/data_juicer/ops/mapper/video_tagging_from_frames_mapper.py @@ -9,7 +9,7 @@ load_data_with_context, load_video) from data_juicer.utils.model_utils import get_model, prepare_model -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..op_fusion import LOADED_VIDEOS OP_NAME = 'video_tagging_from_frames_mapper' @@ -68,6 +68,7 @@ def __init__(self, from ram import get_transform self.transform = get_transform(image_size=384) + @catch_exception_mapper_process_single def process(self, sample, rank=None, context=False): # check if it's generated already if Fields.video_frame_tags in sample: diff --git a/data_juicer/ops/mapper/whitespace_normalization_mapper.py b/data_juicer/ops/mapper/whitespace_normalization_mapper.py index 6fa44b559..88333bded 100644 --- a/data_juicer/ops/mapper/whitespace_normalization_mapper.py +++ b/data_juicer/ops/mapper/whitespace_normalization_mapper.py @@ -2,7 +2,7 @@ # https://github.com/bigscience-workshop/data-preparation # -------------------------------------------------------- -from ..base_op import OPERATORS, Mapper +from ..base_op import OPERATORS, Mapper, catch_exception_mapper_process_single from ..common.special_characters import VARIOUS_WHITESPACES @@ -25,6 +25,7 @@ def __init__(self, *args, **kwargs): """ super().__init__(*args, **kwargs) + @catch_exception_mapper_process_single def process(self, sample): # remove whitespaces before and after the main content text = sample[self.text_key].strip()