diff --git a/data_juicer/core/__init__.py b/data_juicer/core/__init__.py index 79ead0f8a..6fbb9d5d6 100644 --- a/data_juicer/core/__init__.py +++ b/data_juicer/core/__init__.py @@ -1,13 +1,17 @@ +from .adapter import Adapter from .analyzer import Analyzer from .data import NestedDataset from .executor import Executor from .exporter import Exporter +from .monitor import Monitor from .tracer import Tracer __all__ = [ + 'Adapter', 'Analyzer', 'NestedDataset', 'Executor', 'Exporter', + 'Monitor', 'Tracer', ] diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py new file mode 100644 index 000000000..aa746a058 --- /dev/null +++ b/data_juicer/core/adapter.py @@ -0,0 +1,148 @@ +from datasets.config import DEFAULT_MAX_BATCH_SIZE + +from data_juicer.core.monitor import Monitor + + +class Adapter: + + MAX_BATCH_SIZE = 10000 + + def __init__(self, cfg: dict): + self.cfg = cfg + self.idle_resources = Monitor.monitor_current_resources() + + @staticmethod + def execute_and_probe(dataset, operators, sample_interval=0.5): + """ + Process the input dataset and probe related information for each OP in + the specified operator list. + + For now, we support the following targets to probe: + "resource": resource utilization for each OP. + "speed": average processing speed for each OP. + + The probe result is a list and each item in the list is the probe + result for each OP. + """ + if operators is None or len(operators) == 0: + return [] + + # resource utilization list + resource_util_list = [] + # probe for each OP + for op in operators: + # set num_proc to 1 for each OP to focus on the influence of batch + # size only. + old_num_proc = op.num_proc + op.num_proc = 1 + + # number of test samples + sample_num = len(dataset) + # run single op and monitor the resource utilization + dataset, resource_util_per_op = Monitor.monitor_func( + op.run, args=(dataset, ), sample_interval=sample_interval) + + # calculate speed + resource_util_per_op[ + 'speed'] = sample_num / resource_util_per_op['time'] + resource_util_list.append(resource_util_per_op) + + # restore to the original num_proc + op.num_proc = old_num_proc + + return resource_util_list + + @staticmethod + def take_batch(dataset, config): + """ + Split the dataset into batches based on configuration and load factor. + + :param dataset: The dataset to be split + :param config: Configuration settings, including batch size + :return: An iterator of batches + """ + # get initial batch size + batch_size = config.get('batch_size', DEFAULT_MAX_BATCH_SIZE) + # should be in [1, 10000] + batch_size = min(max(batch_size, 1), Adapter.MAX_BATCH_SIZE) + + # check if there are enough samples + num_samples = len(dataset) + if batch_size >= num_samples: + return dataset + else: + return dataset.take(batch_size) + + def adapt_workloads(self, dataset, operators): + """ + Manage the scheduling and load balancing for the dataset processing. + + :param dataset: The dataset that needs to be processed + :param operators: Operators in the data recipe + """ + # TODO: set batch size to 1 for all OPs for probing + load_analysis_res, probed_batch_size = self.probe_small_batch( + dataset, operators) + + # calculate batch size for each OP according to the analysis results + bs_per_op = self.batch_size_strategy(load_analysis_res, + base_bs=probed_batch_size) + + return bs_per_op + + def probe_small_batch(self, dataset, operators): + """ + Perform small batch pre-execution to probe available resources, + current load and estimated OP speed, returning load factors and speed + ranks for each OP. + + :param dataset: The dataset to pre-execute small batch on + :param operators: The OP list to be pre-execution and probe + :return: A list of probe results for each OP and the length of data + batch to probe. + """ + # take a small batch + data_batch = self.take_batch(dataset, self.cfg) + # process and monitor the resource utilization + resource_util_list = self.execute_and_probe(data_batch, operators) + # analyze resource utilization + analysis_res = Monitor.analyze_resource_util_list(resource_util_list) + + return analysis_res, len(data_batch) + + def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): + """ + Decide the batch size for each op according to their workload analysis + result and expected utilization threshold. We need to guarantee that + the resource utilization won't exceed the threshold. Now we only + consider the buckets effect, which means the max batch size is decided + by the max utilization of all types of resources except GPU util + (decided by num_proc). + """ + batch_size_per_op = [] + + # compute left utils according to the util_th + left_utils = {} + for key in self.idle_resources: + if 'util.' not in key or 'GPU' in key: + continue + left_utils[key] = max(0, util_th - self.idle_resources[key]) + + for item in load_analysis_res: + max_util = 1e-5 + max_key = min(left_utils.items(), key=lambda it: it[1])[0] + analysis_res = item['resource_analysis'] + for key in analysis_res: + if 'util.' not in key or 'GPU' in key: + continue + used_util = max( + 0, analysis_res[key]['max'] - self.idle_resources[key]) + if used_util > max_util: + max_util = used_util + max_key = key + load_factor = left_utils[max_key] / max_util + bs_this_op = min(max(int(base_bs * load_factor), 1), + self.MAX_BATCH_SIZE) + batch_size_per_op.append(bs_this_op) + + return batch_size_per_op diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index ce1af2d84..93ffea451 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -87,7 +87,7 @@ def run(self, load_data_np=None, skip_export=False): if isinstance(op, Filter): original_process = op.process op.process = None - dataset = dataset.process(op) + dataset = dataset.process(op, work_dir=self.work_dir) op.process = original_process stats_collected = True if not stats_collected: diff --git a/data_juicer/core/data.py b/data_juicer/core/data.py index c8f05a767..bf2ad5fa1 100644 --- a/data_juicer/core/data.py +++ b/data_juicer/core/data.py @@ -2,6 +2,8 @@ import copy import inspect +import json +import os import traceback from abc import ABC, abstractmethod from functools import wraps @@ -12,6 +14,7 @@ from datasets.formatting.formatting import LazyBatch from loguru import logger +from data_juicer.core.monitor import Monitor from data_juicer.ops import UNFORKABLE from data_juicer.utils import cache_utils from data_juicer.utils.compress import (CompressionOff, @@ -164,6 +167,7 @@ def __getitem__(self, key): def process(self, operators, *, + work_dir=None, exporter=None, checkpointer=None, tracer=None): @@ -174,6 +178,9 @@ def process(self, operators = [operators] unforkable_operators = set(UNFORKABLE.modules.keys()) + # resource utilization monitor + resource_util_list = [] + dataset = self try: for op in operators: @@ -184,10 +191,17 @@ def process(self, start = time() # run single op - dataset = op.run(dataset, exporter=exporter, tracer=tracer) + run_args = { + 'dataset': dataset, + 'exporter': exporter, + 'tracer': tracer, + } + dataset, resource_util_per_op = Monitor.monitor_func( + op.run, args=run_args) # record processed ops if checkpointer is not None: checkpointer.record(op._op_cfg) + resource_util_list.append(resource_util_per_op) end = time() logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. ' f'Left {len(dataset)} samples.') @@ -201,6 +215,9 @@ def process(self, 'last op...') dataset.cleanup_cache_files() checkpointer.save_ckpt(dataset) + if work_dir: + with open(os.path.join(work_dir, 'monitor.json'), 'w') as out: + json.dump(resource_util_list, out) return dataset def map(self, *args, **kargs): diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index 87e38dbce..1cf309e42 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -160,6 +160,7 @@ def run(self, load_data_np=None): logger.info('Processing data...') tstart = time() dataset = dataset.process(ops, + work_dir=self.work_dir, exporter=self.exporter, checkpointer=self.ckpt_manager, tracer=self.tracer) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py new file mode 100644 index 000000000..7d2f7984c --- /dev/null +++ b/data_juicer/core/monitor.py @@ -0,0 +1,215 @@ +import time +from functools import partial +from multiprocessing import get_context + +from data_juicer.utils.resource_utils import (get_cpu_count, + get_cpu_utilization, + query_cuda_info, query_mem_info) + + +def resource_monitor(mdict, interval): + # function to monitor the resource + # interval is the sampling interval + this_states = [] + while True: + this_states.append(Monitor.monitor_current_resources()) + time.sleep(interval) + if mdict['stop']: + break + mdict['resource'] = this_states + + +class Monitor: + """ + Monitor resource utilization and other information during the data + processing. + + Resource utilization dict: (for each func) + '''python + { + 'time': 10, + 'resource': [ + { + 'timestamp': xxx, + 'CPU count': xxx, + 'GPU free mem.': xxx. + ... + }, + { + 'timestamp': xxx, + 'CPU count': xxx, + 'GPU free mem.': xxx, + ... + }, + ] + } + ''' + + Based on the structure above, the resource utilization analysis result will + add several extra fields on the first level: + '''python + { + 'time': 10, + 'resource': [...], + 'resource_analysis': { + 'GPU free mem.': { + 'max': xxx, + 'min': xxx, + 'avg': xxx, + }, + ... + } + } + ''' + Only those fields in DYNAMIC_FIELDS will be analyzed. + """ + + DYNAMIC_FIELDS = { + 'CPU util.', + 'Used mem.', + 'Free mem.', + 'Available mem.', + 'Mem. util.', + 'GPU free mem.', + 'GPU used mem.', + 'GPU util.', + } + + def __init__(self): + pass + + def monitor_all_resources(self): + """ + Detect the resource utilization of all distributed nodes. + """ + # TODO + raise NotImplementedError + + @staticmethod + def monitor_current_resources(): + """ + Detect the resource utilization of the current environment/machine. + All data of "util." is ratios in the range of [0.0, 1.0]. All data of + "mem." is in MB. + """ + resource_dict = dict() + # current time + resource_dict['timestamp'] = time.time() + + # CPU + resource_dict['CPU count'] = get_cpu_count() + resource_dict['CPU util.'] = get_cpu_utilization() / 100.0 + resource_dict['Total mem.'] = query_mem_info('total') + resource_dict['Used mem.'] = query_mem_info('used') + resource_dict['Free mem.'] = query_mem_info('free') + resource_dict['Available mem.'] = query_mem_info('available') + resource_dict['Mem. util.'] = resource_dict[ + 'Used mem.'] / resource_dict['Total mem.'] + + # GPU + resource_dict['GPU total mem.'] = query_cuda_info('memory.total') + resource_dict['GPU free mem.'] = query_cuda_info('memory.free') + resource_dict['GPU used mem.'] = query_cuda_info('memory.used') + resource_dict['GPU util.'] = query_cuda_info('utilization.gpu') + if resource_dict['GPU util.']: + resource_dict['GPU util.'] = [ + x / 100.0 for x in resource_dict['GPU util.'] + ] + + return resource_dict + + @staticmethod + def analyze_resource_util_list(resource_util_list): + """ + Analyze the resource utilization for a given resource util list. + Compute {'max', 'min', 'avg'} of resource metrics for each dict item. + """ + res_list = [] + for item in resource_util_list: + res_list.append(Monitor.analyze_single_resource_util(item)) + return res_list + + @staticmethod + def analyze_single_resource_util(resource_util_dict): + """ + Analyze the resource utilization for a single resource util dict. + Compute {'max', 'min', 'avg'} of each resource metrics. + """ + analysis_res = {} + record_list = {} + for record in resource_util_dict['resource']: + for key in Monitor.DYNAMIC_FIELDS: + if key in record: + if record[key] is None: + continue + elif isinstance(record[key], list): + record_list.setdefault(key, []).extend(record[key]) + else: + record_list.setdefault(key, []).append(record[key]) + + # analyze the max, min, and avg + for key in record_list: + analysis_res[key] = { + 'max': max(record_list[key]), + 'min': min(record_list[key]), + 'avg': sum(record_list[key]) / len(record_list[key]), + } + resource_util_dict['resource_analysis'] = analysis_res + + return resource_util_dict + + @staticmethod + def monitor_func(func, args=None, sample_interval=0.5): + """ + Process the input dataset and probe related information for each OP in + the specified operator list. + + For now, we support the following targets to probe: + "resource": resource utilization for each OP. + "speed": average processing speed for each OP. + + The probe result is a list and each item in the list is the probe + result for each OP. + """ + if args is None: + args = {} + if isinstance(args, dict): + func = partial(func, **args) + elif isinstance(args, list) or isinstance(args, tuple): + func = partial(func, *args) + else: + func = partial(func, args) + + # resource utilization dict + resource_util_dict = {} + + # start monitor + ctx = get_context('fork') + with ctx.Manager() as manager: + mdict = manager.dict() + mdict['stop'] = False + monitor_proc = ctx.Process(target=resource_monitor, + args=( + mdict, + sample_interval, + )) + monitor_proc.start() + # start timer + start = time.time() + + # run single op + ret = func() + + # end timer + end = time.time() + + # stop monitor + mdict['stop'] = True + monitor_proc.join() + + resource_util_dict['resource'] = mdict['resource'] + + # calculate speed + resource_util_dict['time'] = end - start + + return ret, resource_util_dict diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index d822a4fc7..aec6a8cc7 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -307,7 +307,7 @@ def run(self, dataset, *, exporter=None, tracer=None): num_proc=self.runtime_np(), with_rank=self.use_cuda(), desc=self._name + '_compute_stats') - if self.stats_export_path is not None: + if exporter and self.stats_export_path is not None: exporter.export_compute_stats(dataset, self.stats_export_path) new_dataset = dataset.filter(self.process, num_proc=self.runtime_np(), diff --git a/data_juicer/utils/resource_utils.py b/data_juicer/utils/resource_utils.py new file mode 100644 index 000000000..cb1efd14e --- /dev/null +++ b/data_juicer/utils/resource_utils.py @@ -0,0 +1,52 @@ +import subprocess + +import psutil +from loguru import logger + +NVSMI_REPORT = True + + +def query_cuda_info(query_key): + global NVSMI_REPORT + # get cuda info using "nvidia-smi" command in MB + try: + nvidia_smi_output = subprocess.check_output([ + 'nvidia-smi', f'--query-gpu={query_key}', + '--format=csv,noheader,nounits' + ]).decode('utf-8') + except Exception as e: + if 'non-zero exit status 2' in str(e): + err_msg = f'The specified query_key [{query_key}] might not be ' \ + f'supported by command nvidia-smi. Please check and ' \ + f'retry!' + elif 'No such file or directory' in str(e): + err_msg = 'Command nvidia-smi is not found. There might be no ' \ + 'GPUs on this machine.' + else: + err_msg = str(e) + if NVSMI_REPORT: + logger.warning(err_msg) + NVSMI_REPORT = False + return None + cuda_info_list = [] + for line in nvidia_smi_output.strip().split('\n'): + cuda_info_list.append(int(line)) + return cuda_info_list + + +def get_cpu_count(): + return psutil.cpu_count() + + +def get_cpu_utilization(): + return psutil.cpu_percent() + + +def query_mem_info(query_key): + mem = psutil.virtual_memory() + if query_key not in mem._fields: + logger.warning(f'No such query key [{query_key}] for memory info. ' + f'Should be one of {mem._fields}') + return None + val = round(mem.__getattribute__(query_key) / (2**20), 2) # in MB + return val diff --git a/environments/minimal_requires.txt b/environments/minimal_requires.txt index bd55d2008..79d924989 100644 --- a/environments/minimal_requires.txt +++ b/environments/minimal_requires.txt @@ -1,7 +1,7 @@ fsspec==2023.5.0 pyarrow<=12.0.0 pandas==2.0.3 -datasets==2.18.0 +datasets>=2.19.0 av soundfile librosa>=0.10 diff --git a/tests/core/test_adapter.py b/tests/core/test_adapter.py new file mode 100644 index 000000000..965355b96 --- /dev/null +++ b/tests/core/test_adapter.py @@ -0,0 +1,182 @@ +import os +import unittest +import datasets +from datasets import load_dataset +from loguru import logger +from data_juicer.core import Adapter +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.ops.mapper import FixUnicodeMapper +from data_juicer.ops.filter import PerplexityFilter +from data_juicer.ops.deduplicator import DocumentDeduplicator + +class AdapterTest(DataJuicerTestCaseBase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.test_file = 'text_only_2.3k.jsonl' + download_link = f'http://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/unittest_data/{cls.test_file}' + os.system(f'wget {download_link}') + + @classmethod + def tearDownClass(cls, hf_model_name=None) -> None: + # remove test dataset + os.system(f'rm -f {cls.test_file}') + + super().tearDownClass(hf_model_name) + + def test_take_batch(self): + ds = load_dataset('json', data_files=self.test_file, split='train') + logger.info(f'Length of test dataset: [{len(ds)}]') + batch_sizes = [100, 1000, 3000] + test_config = {} + + for bs in batch_sizes: + tgt_len = min(bs, len(ds)) + test_config['batch_size'] = bs + res_ds = Adapter.take_batch(ds, test_config) + logger.info(f'Require [{bs}] and got [{len(res_ds)}].') + self.assertEqual(len(res_ds), tgt_len) + + def test_batch_size_strategy(self): + test_analysis_res = [ + { + 'resource_analysis': { + 'CPU util.': { + 'max': 0.5, + }, + 'GPU util.': { + 'max': 0.8, + }, + 'Mem. util.': { + 'max': 0.3, + } + } + }, + { + 'resource_analysis': { + 'CPU util.': { + 'max': 0.2, + }, + 'GPU util.': { + 'max': 1.0, + }, + 'Mem. util.': { + 'max': 0.1, + } + } + }, + ] + + adapter = Adapter({'batch_size': 1}) + adapter.idle_resources = { + 'CPU util.': 0, + 'GPU util.': 0, + 'Mem. util.': 0, + } + + # basic test + tgt_bs_1 = [2, 5] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=1.0) + self.assertEqual(bs_res, tgt_bs_1) + + # lower util threshold + tgt_bs_2 = [1, 3] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=0.7) + self.assertEqual(bs_res, tgt_bs_2) + + # larger base batch size + adapter.cfg['batch_size'] = 10 + tgt_bs_3 = [18, 45] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=10, + util_th=0.9) + self.assertEqual(bs_res, tgt_bs_3) + + # out of resource + tgt_bs_4 = [2, 5] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=10, + util_th=0.1) + self.assertEqual(bs_res, tgt_bs_4) + + # out of resource 2 + adapter.cfg['batch_size'] = 1 + tgt_bs_4 = [1, 1] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=0.1) + self.assertEqual(bs_res, tgt_bs_4) + + def test_execute_and_probe(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train').take(100) + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + resource_util_list = Adapter.execute_and_probe(ds, ops) + self.assertEqual(len(resource_util_list), len(ops)) + + # finer-grained test + # reinitialize the OPs to avoid warm start. + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] + resource_util_list2 = Adapter.execute_and_probe(ds, ops, sample_interval=0.2) + logger.info(f'interval=\t0.5\t0.2') + for item1, item2 in zip(resource_util_list, resource_util_list2): + logger.info(f' \t{len(item1["resource"])}\t{len(item2["resource"])}') + self.assertLessEqual(len(item1['resource']), len(item2['resource'])) + + datasets.enable_caching() + + def test_probe_small_batch(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train') + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + adapter = Adapter({'batch_size': 100}) + resource_util_analysis_res, probe_bs = adapter.probe_small_batch(ds, ops) + logger.info(f'Probe on batch size is [{probe_bs}].') + self.assertEqual(len(resource_util_analysis_res), len(ops)) + for item in resource_util_analysis_res: + self.assertIn('resource_analysis', item) + + datasets.enable_caching() + + def test_adapt_workloads(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train') + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + adapter = Adapter({'batch_size': 100}) + adapted_batch_sizes = adapter.adapt_workloads(ds, ops) + self.assertEqual(len(adapted_batch_sizes), len(ops)) + logger.info(adapted_batch_sizes) + + datasets.enable_caching() + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/core/test_monitor.py b/tests/core/test_monitor.py new file mode 100644 index 000000000..01840348d --- /dev/null +++ b/tests/core/test_monitor.py @@ -0,0 +1,61 @@ +import unittest +import time +from loguru import logger +from data_juicer.core import Monitor +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, SKIPPED_TESTS + +@SKIPPED_TESTS.register_module() +class MonitorTest(DataJuicerTestCaseBase): + + def test_monitor_current_resources(self): + resource_dict = Monitor.monitor_current_resources() + logger.info(resource_dict) + self.assertIn('timestamp', resource_dict) + self.assertIn('CPU count', resource_dict) + self.assertIn('Mem. util.', resource_dict) + + def test_analyze_resource_util_list(self): + resource_samples = [] + for i in range(5): + resource_samples.append(Monitor.monitor_current_resources()) + time.sleep(0.2) + resource_util_list = [{ + 'time': 1, + 'resource': resource_samples, + }] + analysis_res = Monitor.analyze_resource_util_list(resource_util_list) + logger.info(analysis_res) + item = analysis_res[0] + self.assertIn('resource_analysis', item) + resource_analysis = item['resource_analysis'] + cpu_util = resource_analysis['CPU util.'] + self.assertIn('max', cpu_util) + self.assertIn('min', cpu_util) + self.assertIn('avg', cpu_util) + + def _increase_mem_func(self, init_len=100000, multiplier=2, times=10, interval=0.5): + lst = list(range(init_len)) + for i in range(times): + lst = lst * multiplier + time.sleep(interval) + + def test_monitor_func(self): + _, dict1 = Monitor.monitor_func(self._increase_mem_func, + args=(10 ** 3, 2, 4,), + sample_interval=0.3) + analysis1 = Monitor.analyze_single_resource_util(dict1) + logger.info(analysis1['resource_analysis']) + + _, dict2 = Monitor.monitor_func(self._increase_mem_func, + args=(10 ** 7, 2, 4,), + sample_interval=0.3) + analysis2 = Monitor.analyze_single_resource_util(dict2) + logger.info(analysis2['resource_analysis']) + + self.assertLessEqual( + analysis1['resource_analysis']['Mem. util.']['avg'], + analysis2['resource_analysis']['Mem. util.']['avg']) + + +if __name__ == '__main__': + unittest.main()