diff --git a/data_juicer/core/analyser.py b/data_juicer/core/analyser.py index 641471156..1e5891862 100644 --- a/data_juicer/core/analyser.py +++ b/data_juicer/core/analyser.py @@ -2,12 +2,14 @@ from loguru import logger +from data_juicer import use_cuda from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis from data_juicer.config import init_configs from data_juicer.format import load_formatter from data_juicer.ops import Filter, load_ops from data_juicer.utils import cache_utils from data_juicer.utils.constant import Fields +from data_juicer.utils.process_utils import calculate_np from .data import add_same_content_to_new_column from .exporter import Exporter @@ -89,6 +91,13 @@ def run(self, load_data_np=None, skip_export=False): stats_collected = False for op_cfg, op in zip(self.cfg.process, self.ops): op_name = list(op_cfg.keys())[0] + with_rank = use_cuda() and op._accelerator == 'cuda' + if op.num_proc != 0: + op_proc = op.num_proc + logger.info(f'Op [{op_name}] running with sepcified ' + f'number of procs:{op.num_proc}') + else: + op_proc = calculate_np(self.cfg.np, op, op_name) if isinstance(op, Filter): if Fields.stats not in dataset.features: # only add stats when calling filter op @@ -100,7 +109,8 @@ def run(self, load_data_np=None, skip_export=False): num_proc=self.cfg.np, desc='Adding new column for stats') dataset = dataset.map(op.compute_stats, - num_proc=self.cfg.np, + num_proc=op_proc, + with_rank=with_rank, desc=op_name + '_compute_stats') stats_collected = True if not stats_collected: diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index 9ec12a35e..d3dbc3159 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -170,10 +170,10 @@ def run(self, load_data_np=None): op_name, op_args = list(op_cfg.items())[0] prev = dataset # record last dataset with_rank = use_cuda() and op._accelerator == 'cuda' - if op.spec_numprocs != 0: - op_proc = op.spec_numprocs + if op.num_proc != 0: + op_proc = op.num_proc logger.info(f'Op [{op_name}] running with sepcified ' - f'number of procs:{op.spec_numprocs}') + f'number of procs:{op.num_proc}') else: op_proc = calculate_np(self.cfg.np, op, op_name) try: diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index b5b2e79d9..25e03934c 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -34,7 +34,7 @@ def __init__(self, *args, **kwargs): self._accelerator = kwargs.get('accelerator', 'cpu') # parameters to determind the number of procs for this op - self.spec_numprocs = kwargs.get('spec_numprocs', 0) + self.num_proc = kwargs.get('num_proc', 0) self.cpu_required = kwargs.get('cpu_required', 1) self.mem_required = kwargs.get('mem_required', 0) if isinstance(self.mem_required, str): diff --git a/tests/config/test_config_funcs.py b/tests/config/test_config_funcs.py index b8b46867b..de2358677 100644 --- a/tests/config/test_config_funcs.py +++ b/tests/config/test_config_funcs.py @@ -46,7 +46,7 @@ def test_yaml_cfg_file(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'cpu_required': 1, 'mem_required': 0, 'use_actor': False, @@ -63,7 +63,7 @@ def test_yaml_cfg_file(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -130,7 +130,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -147,7 +147,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -164,7 +164,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -181,7 +181,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -198,7 +198,7 @@ def test_mixture_cfg(self): 'audio_key': 'audios', 'video_key': 'videos', 'accelerator': 'cpu', - 'spec_numprocs': 0, + 'num_proc': 0, 'stats_export_path': None, 'cpu_required': 1, 'mem_required': 0, @@ -213,7 +213,7 @@ def test_op_params_parsing(self): base_class_params = { 'text_key', 'image_key', 'audio_key', 'video_key', 'accelerator', - 'spec_numprocs', 'cpu_required', 'mem_required', 'use_actor', + 'num_proc', 'cpu_required', 'mem_required', 'use_actor', } parser = ArgumentParser(default_env=True, default_config_files=None)