From 716e1f9dc6d3dcdfb617ce7fdb6d26b569fe0708 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Mon, 9 Sep 2024 16:28:29 +0800 Subject: [PATCH 01/11] * init adapter --- data_juicer/core/__init__.py | 4 ++ data_juicer/core/adapter.py | 81 +++++++++++++++++++++ data_juicer/core/monitor.py | 105 ++++++++++++++++++++++++++++ data_juicer/ops/base_op.py | 2 +- data_juicer/utils/resource_utils.py | 47 +++++++++++++ 5 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 data_juicer/core/adapter.py create mode 100644 data_juicer/core/monitor.py create mode 100644 data_juicer/utils/resource_utils.py diff --git a/data_juicer/core/__init__.py b/data_juicer/core/__init__.py index 79ead0f8a..6fbb9d5d6 100644 --- a/data_juicer/core/__init__.py +++ b/data_juicer/core/__init__.py @@ -1,13 +1,17 @@ +from .adapter import Adapter from .analyzer import Analyzer from .data import NestedDataset from .executor import Executor from .exporter import Exporter +from .monitor import Monitor from .tracer import Tracer __all__ = [ + 'Adapter', 'Analyzer', 'NestedDataset', 'Executor', 'Exporter', + 'Monitor', 'Tracer', ] diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py new file mode 100644 index 000000000..ecf49bddc --- /dev/null +++ b/data_juicer/core/adapter.py @@ -0,0 +1,81 @@ +from data_juicer.core.monitor import Monitor + + +class Adapter: + + def __init__(self): + self.current_resources = self.detect_current_resources() + + @staticmethod + def execute_and_probe(dataset, operators, sample_interval=0.5): + """ + Process the input dataset and probe related information for each OP in + the specified operator list. + + For now, we support the following targets to probe: + "resource": resource utilization for each OP. + "speed": average processing speed for each OP. + + The probe result is a list and each item in the list is the probe + result for each OP. + """ + if operators is None or len(operators) == 0: + return [] + + # number of test samples + sample_num = len(dataset) + # resource utilization list + resource_util = [] + # probe for each OP + for op in operators: + + # run single op and monitor the resource utilization + dataset, resource_util_per_op = Monitor.monitor_func( + op.run, args=(dataset, ), sample_interval=sample_interval) + + # calculate speed + resource_util_per_op[ + 'speed'] = sample_num / resource_util_per_op['time'] + resource_util.append(resource_util_per_op) + + return resource_util + + def probe_small_batches(self, dataset, operators): + """ + Perform small batch pre-execution to probe available resources, + current load and estimated OP speed, returning load factors and speed + ranks for each OP. + + :param dataset: The dataset to pre-execute small batches on + :param operators: The OP list to be pre-execution and probe + :return: A list of probe results for each OP. + """ + # 假设这个函数执行一小部分数据以检测负载 + print('Pre-executing small batches to detect system load...') + # 这里可以添加具体的逻辑来预执行小批量 + # 模拟的负载因子(可以根据实际情况计算) + + # 例如,在负载过高的情况下返回较小的值 + load_factor = self.available_resources['load'] # 返回当前负载 + print(f'Detected load factor: {load_factor}') + return load_factor + + @staticmethod + def batch_split(dataset, config, load_factor=None): + """ + Split the dataset into batches based on configuration and load factor. + + :param dataset: The dataset to be split + :param config: Configuration settings, including batch size + :param load_factor: The detected load factor from pre-execution + :return: An iterator of batches + """ + # get initial batch size + adjusted_batch_size = config.get('batch_size', 100) + # adapt according to the load factor + if load_factor: + adjusted_batch_size = int(adjusted_batch_size / load_factor) + # should be in [1, 1000] + adjusted_batch_size = min(max(adjusted_batch_size, 1), 1000) + + return dataset.iter(batch_size=adjusted_batch_size) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py new file mode 100644 index 000000000..0c12aeb86 --- /dev/null +++ b/data_juicer/core/monitor.py @@ -0,0 +1,105 @@ +import threading +import time +from functools import partial + +from data_juicer.utils.resource_utils import (get_cpu_count, + get_cpu_utilization, + query_cuda_info, query_mem_info) + + +class Monitor: + + def __init__(self): + pass + + def monitor_all_resources(self): + """ + Detect the resource utilization of all distributed nodes. + """ + # TODO + pass + + @staticmethod + def monitor_current_resources(): + """ + Detect the resource utilization of the current environment/machine. + All data of "util." is in percent. All data of "mem." is in MB. + """ + resource_dict = dict() + # current time + resource_dict['timestamp'] = time.time() + + # CPU + resource_dict['CPU count'] = get_cpu_count() + resource_dict['CPU util.'] = get_cpu_utilization() + resource_dict['Total mem.'] = query_mem_info('total') + resource_dict['Used mem.'] = query_mem_info('used') + resource_dict['Free mem.'] = query_mem_info('free') + resource_dict['Available mem.'] = query_mem_info('available') + + # GPU + resource_dict['GPU total mem.'] = query_cuda_info('memory.total') + resource_dict['GPU free mem.'] = query_cuda_info('memory.free') + resource_dict['GPU used mem.'] = query_cuda_info('memory.used') + resource_dict['GPU util.'] = query_cuda_info('utilization.gpu') + + return resource_dict + + @staticmethod + def monitor_func(func, args=None, sample_interval=0.5): + """ + Process the input dataset and probe related information for each OP in + the specified operator list. + + For now, we support the following targets to probe: + "resource": resource utilization for each OP. + "speed": average processing speed for each OP. + + The probe result is a list and each item in the list is the probe + result for each OP. + """ + if args is None: + args = {} + if isinstance(args, dict): + func = partial(func, **args) + elif isinstance(args, list) or isinstance(args, tuple): + func = partial(func, *args) + else: + func = partial(func, args) + + # resource utilization list + resource_util = {} + + # whether in the monitoring mode + running_flag = False + + def resource_monitor(interval): + # function to monitor the resource + # interval is the sampling interval + this_states = [] + while running_flag: + this_states.append(Monitor.monitor_current_resources()) + time.sleep(interval) + resource_util['resource'] = this_states + + # start monitor + running_flag = True + monitor_thread = threading.Thread(target=resource_monitor, + args=(sample_interval, )) + monitor_thread.start() + # start timer + start = time.time() + + # run single op + ret = func() + + # end timer + end = time.time() + # stop monitor + running_flag = False + monitor_thread.join() + + # calculate speed + resource_util['time'] = end - start + + return ret, resource_util diff --git a/data_juicer/ops/base_op.py b/data_juicer/ops/base_op.py index d822a4fc7..aec6a8cc7 100644 --- a/data_juicer/ops/base_op.py +++ b/data_juicer/ops/base_op.py @@ -307,7 +307,7 @@ def run(self, dataset, *, exporter=None, tracer=None): num_proc=self.runtime_np(), with_rank=self.use_cuda(), desc=self._name + '_compute_stats') - if self.stats_export_path is not None: + if exporter and self.stats_export_path is not None: exporter.export_compute_stats(dataset, self.stats_export_path) new_dataset = dataset.filter(self.process, num_proc=self.runtime_np(), diff --git a/data_juicer/utils/resource_utils.py b/data_juicer/utils/resource_utils.py new file mode 100644 index 000000000..2105c52ae --- /dev/null +++ b/data_juicer/utils/resource_utils.py @@ -0,0 +1,47 @@ +import subprocess + +import psutil +from loguru import logger + + +def query_cuda_info(query_key): + # get cuda info using "nvidia-smi" command in MB + try: + nvidia_smi_output = subprocess.check_output([ + 'nvidia-smi', f'--query-gpu={query_key}', + '--format=csv,noheader,nounits' + ]).decode('utf-8') + except Exception as e: + if 'non-zero exit status 2' in str(e): + err_msg = f'The specified query_key [{query_key}] might not be ' \ + f'supported by command nvidia-smi. Please check and ' \ + f'retry!' + elif 'No such file or directory' in str(e): + err_msg = 'Command nvidia-smi is not found. There might be no ' \ + 'GPUs on this machine.' + else: + err_msg = str(e) + logger.warning(err_msg) + return None + cuda_info_list = [] + for line in nvidia_smi_output.strip().split('\n'): + cuda_info_list.append(int(line)) + return cuda_info_list + + +def get_cpu_count(): + return psutil.cpu_count() + + +def get_cpu_utilization(): + return psutil.cpu_percent() + + +def query_mem_info(query_key): + mem = psutil.virtual_memory() + if query_key not in mem._fields: + logger.warning(f'No such query key [{query_key}] for memory info. ' + f'Should be one of {mem._fields}') + return None + val = round(mem.__getattribute__(query_key) / (2**20), 2) # in MB + return val From 584241b5dc6403b9b3ead68fef28a0eaf82348cb Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 12 Sep 2024 10:23:10 +0800 Subject: [PATCH 02/11] + add basic logic of workloads adaptation --- data_juicer/core/adapter.py | 42 +++++++++++----- data_juicer/core/monitor.py | 96 +++++++++++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 18 deletions(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index ecf49bddc..bf91f9a6f 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -1,10 +1,13 @@ +from loguru import logger + from data_juicer.core.monitor import Monitor class Adapter: - def __init__(self): - self.current_resources = self.detect_current_resources() + def __init__(self, cfg): + self.cfg = cfg + self.current_resources = Monitor.monitor_current_resources() @staticmethod def execute_and_probe(dataset, operators, sample_interval=0.5): @@ -25,7 +28,7 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): # number of test samples sample_num = len(dataset) # resource utilization list - resource_util = [] + resource_util_list = [] # probe for each OP for op in operators: @@ -36,9 +39,20 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): # calculate speed resource_util_per_op[ 'speed'] = sample_num / resource_util_per_op['time'] - resource_util.append(resource_util_per_op) + resource_util_list.append(resource_util_per_op) + + return resource_util_list - return resource_util + def workloads_adapt(self, dataset, operators): + """ + Manage the scheduling and load balancing for the dataset processing. + + :param dataset: The dataset that needs to be processed + :param operators: Operators in the data recipe + """ + load_factor = self.probe_small_batches(dataset, operators) + dataset_batches = self.batch_split(dataset, self.cfg, load_factor) + return dataset_batches def probe_small_batches(self, dataset, operators): """ @@ -50,14 +64,16 @@ def probe_small_batches(self, dataset, operators): :param operators: The OP list to be pre-execution and probe :return: A list of probe results for each OP. """ - # 假设这个函数执行一小部分数据以检测负载 - print('Pre-executing small batches to detect system load...') - # 这里可以添加具体的逻辑来预执行小批量 - # 模拟的负载因子(可以根据实际情况计算) - - # 例如,在负载过高的情况下返回较小的值 - load_factor = self.available_resources['load'] # 返回当前负载 - print(f'Detected load factor: {load_factor}') + # get a small batch of dataset in default batch size + small_batch = self.batch_split(dataset, self.cfg) + resource_util_list = self.execute_and_probe(small_batch, operators) + # analyze resource utilization + analysis_res = Monitor.analyze_resource_util_list(resource_util_list) + + # get a new load_factor according to the analysis result + load_factor = 1.0 * analysis_res + pass + logger.info(f'Adjust load factor to: {load_factor}') return load_factor @staticmethod diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index 0c12aeb86..1d3ce481d 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -8,6 +8,59 @@ class Monitor: + """ + Monitor resource utilization and other information during the data + processing. + + Resource utilization dict: (for each func) + '''python + { + 'time': 10, + 'resource': [ + { + 'timestamp': xxx, + 'CPU count': xxx, + 'GPU free mem.': xxx. + ... + }, + { + 'timestamp': xxx, + 'CPU count': xxx, + 'GPU free mem.': xxx, + ... + }, + ] + } + ''' + + Based on the structure above, the resource utilization analysis result will + add several extra fields on the first level: + '''python + { + 'time': 10, + 'resource': [...], + 'resource_analysis': { + 'GPU free mem.': { + 'max': xxx, + 'min': xxx, + 'avg': xxx, + }, + ... + } + } + ''' + Only those fields in DYNAMIC_FIELDS will be analyzed. + """ + + DYNAMIC_FIELDS = { + 'CPU util.', + 'Used mem.', + 'Free mem.', + 'Available mem.', + 'GPU free mem.', + 'GPU used mem.', + 'GPU util.', + } def __init__(self): pass @@ -45,6 +98,39 @@ def monitor_current_resources(): return resource_dict + def analyze_resource_util_list(self, resource_util_list): + """ + Analyze the resource utilization for a given resource util list. + Compute {'max', 'min', 'avg'} of resource metrics for each dict item. + """ + res_list = [] + for item in resource_util_list: + res_list.append(self.analyze_single_resource_util(item)) + return res_list + + def analyze_single_resource_util(self, resource_util_dict): + """ + Analyze the resource utilization for a single resource util dict. + Compute {'max', 'min', 'avg'} of each resource metrics. + """ + analysis_res = {} + record_list = {} + for record in resource_util_dict['resource']: + for key in self.DYNAMIC_FIELDS: + if key in record: + record_list.setdefault(key, []).append(record[key]) + + # analyze the max, min, and avg + for key in record_list: + analysis_res[key] = { + 'max': max(record_list[key]), + 'min': min(record_list[key]), + 'avg': sum(record_list[key]) / len(record_list), + } + resource_util_dict['resource_analysis'] = analysis_res + + return resource_util_dict + @staticmethod def monitor_func(func, args=None, sample_interval=0.5): """ @@ -67,8 +153,8 @@ def monitor_func(func, args=None, sample_interval=0.5): else: func = partial(func, args) - # resource utilization list - resource_util = {} + # resource utilization dict + resource_util_dict = {} # whether in the monitoring mode running_flag = False @@ -80,7 +166,7 @@ def resource_monitor(interval): while running_flag: this_states.append(Monitor.monitor_current_resources()) time.sleep(interval) - resource_util['resource'] = this_states + resource_util_dict['resource'] = this_states # start monitor running_flag = True @@ -100,6 +186,6 @@ def resource_monitor(interval): monitor_thread.join() # calculate speed - resource_util['time'] = end - start + resource_util_dict['time'] = end - start - return ret, resource_util + return ret, resource_util_dict From de67cccdbbed98351e424f36def81e9fbda70d89 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 12 Sep 2024 14:50:28 +0800 Subject: [PATCH 03/11] * update the adaptation logic --- data_juicer/core/adapter.py | 49 ++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index bf91f9a6f..7a08578e0 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -1,3 +1,4 @@ +from datasets import concatenate_datasets from loguru import logger from data_juicer.core.monitor import Monitor @@ -41,7 +42,7 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): 'speed'] = sample_num / resource_util_per_op['time'] resource_util_list.append(resource_util_per_op) - return resource_util_list + return dataset, resource_util_list def workloads_adapt(self, dataset, operators): """ @@ -50,23 +51,31 @@ def workloads_adapt(self, dataset, operators): :param dataset: The dataset that needs to be processed :param operators: Operators in the data recipe """ - load_factor = self.probe_small_batches(dataset, operators) - dataset_batches = self.batch_split(dataset, self.cfg, load_factor) - return dataset_batches - - def probe_small_batches(self, dataset, operators): + res_data_batches = [] + load_factor = 1.0 + left_dataset = dataset + while left_dataset: + data_batch, left_dataset = self.take_batch(left_dataset, self.cfg, + load_factor) + res_batch, load_factor = self.probe_data_batches( + data_batch, operators, load_factor) + res_data_batches.append(res_batch) + return concatenate_datasets(res_data_batches) + + def probe_data_batches(self, data_batch, operators, load_factor=None): """ Perform small batch pre-execution to probe available resources, current load and estimated OP speed, returning load factors and speed ranks for each OP. - :param dataset: The dataset to pre-execute small batches on + :param data_batch: The dataset batch to pre-execute on :param operators: The OP list to be pre-execution and probe + :param load_factor: Load factor for this batch :return: A list of probe results for each OP. """ - # get a small batch of dataset in default batch size - small_batch = self.batch_split(dataset, self.cfg) - resource_util_list = self.execute_and_probe(small_batch, operators) + # process and monitor the resource utilization + res_batch, resource_util_list = self.execute_and_probe( + data_batch, operators) # analyze resource utilization analysis_res = Monitor.analyze_resource_util_list(resource_util_list) @@ -74,10 +83,10 @@ def probe_small_batches(self, dataset, operators): load_factor = 1.0 * analysis_res pass logger.info(f'Adjust load factor to: {load_factor}') - return load_factor + return res_batch, load_factor @staticmethod - def batch_split(dataset, config, load_factor=None): + def take_batch(dataset, config, load_factor=None): """ Split the dataset into batches based on configuration and load factor. @@ -87,11 +96,17 @@ def batch_split(dataset, config, load_factor=None): :return: An iterator of batches """ # get initial batch size - adjusted_batch_size = config.get('batch_size', 100) + batch_size = config.get('batch_size', 100) # adapt according to the load factor if load_factor: - adjusted_batch_size = int(adjusted_batch_size / load_factor) + batch_size = int(batch_size / load_factor) + pass # should be in [1, 1000] - adjusted_batch_size = min(max(adjusted_batch_size, 1), 1000) - - return dataset.iter(batch_size=adjusted_batch_size) + batch_size = min(max(batch_size, 1), 1000) + + # check if there are enough samples + num_samples = len(dataset) + if batch_size >= num_samples: + return dataset, None + else: + return dataset.take(batch_size), dataset.skip(batch_size) From 4a368ed4ff046e27815f1672d56efc3530c30eed Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 12 Sep 2024 21:03:19 +0800 Subject: [PATCH 04/11] + add unittests for Monitor --- data_juicer/core/monitor.py | 29 ++++++++++++----- tests/core/test_monitor.py | 64 +++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 tests/core/test_monitor.py diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index 1d3ce481d..bfea5b91f 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -70,7 +70,7 @@ def monitor_all_resources(self): Detect the resource utilization of all distributed nodes. """ # TODO - pass + raise NotImplementedError @staticmethod def monitor_current_resources(): @@ -84,31 +84,39 @@ def monitor_current_resources(): # CPU resource_dict['CPU count'] = get_cpu_count() - resource_dict['CPU util.'] = get_cpu_utilization() + resource_dict['CPU util.'] = get_cpu_utilization() / 100.0 resource_dict['Total mem.'] = query_mem_info('total') resource_dict['Used mem.'] = query_mem_info('used') resource_dict['Free mem.'] = query_mem_info('free') resource_dict['Available mem.'] = query_mem_info('available') + resource_dict['Mem. util.'] = resource_dict[ + 'Used mem.'] / resource_dict['Total mem.'] # GPU resource_dict['GPU total mem.'] = query_cuda_info('memory.total') resource_dict['GPU free mem.'] = query_cuda_info('memory.free') resource_dict['GPU used mem.'] = query_cuda_info('memory.used') resource_dict['GPU util.'] = query_cuda_info('utilization.gpu') + if resource_dict['GPU util.']: + resource_dict['GPU util.'] = [ + x / 100.0 for x in resource_dict['GPU util.'] + ] return resource_dict - def analyze_resource_util_list(self, resource_util_list): + @staticmethod + def analyze_resource_util_list(resource_util_list): """ Analyze the resource utilization for a given resource util list. Compute {'max', 'min', 'avg'} of resource metrics for each dict item. """ res_list = [] for item in resource_util_list: - res_list.append(self.analyze_single_resource_util(item)) + res_list.append(Monitor.analyze_single_resource_util(item)) return res_list - def analyze_single_resource_util(self, resource_util_dict): + @staticmethod + def analyze_single_resource_util(resource_util_dict): """ Analyze the resource utilization for a single resource util dict. Compute {'max', 'min', 'avg'} of each resource metrics. @@ -116,16 +124,21 @@ def analyze_single_resource_util(self, resource_util_dict): analysis_res = {} record_list = {} for record in resource_util_dict['resource']: - for key in self.DYNAMIC_FIELDS: + for key in Monitor.DYNAMIC_FIELDS: if key in record: - record_list.setdefault(key, []).append(record[key]) + if record[key] is None: + continue + elif isinstance(record[key], list): + record_list.setdefault(key, []).extend(record[key]) + else: + record_list.setdefault(key, []).append(record[key]) # analyze the max, min, and avg for key in record_list: analysis_res[key] = { 'max': max(record_list[key]), 'min': min(record_list[key]), - 'avg': sum(record_list[key]) / len(record_list), + 'avg': sum(record_list[key]) / len(record_list[key]), } resource_util_dict['resource_analysis'] = analysis_res diff --git a/tests/core/test_monitor.py b/tests/core/test_monitor.py new file mode 100644 index 000000000..e4354c205 --- /dev/null +++ b/tests/core/test_monitor.py @@ -0,0 +1,64 @@ +import unittest +import time +from loguru import logger +from data_juicer.core import Monitor +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +class MonitorTest(DataJuicerTestCaseBase): + + def test_monitor_current_resources(self): + resource_dict = Monitor.monitor_current_resources() + logger.info(resource_dict) + self.assertIn('timestamp', resource_dict) + self.assertIn('CPU count', resource_dict) + self.assertIn('Mem. util.', resource_dict) + + def test_analyze_resource_util_list(self): + resource_samples = [] + for i in range(5): + resource_samples.append(Monitor.monitor_current_resources()) + time.sleep(0.2) + resource_util_list = [{ + 'time': 1, + 'resource': resource_samples, + }] + analysis_res = Monitor.analyze_resource_util_list(resource_util_list) + logger.info(analysis_res) + item = analysis_res[0] + self.assertIn('resource_analysis', item) + resource_analysis = item['resource_analysis'] + cpu_util = resource_analysis['CPU util.'] + self.assertIn('max', cpu_util) + self.assertIn('min', cpu_util) + self.assertIn('avg', cpu_util) + + def _increase_mem_func(self, init_len=100000, multiplier=2, times=10, interval=0.5): + lst = list(range(init_len)) + for i in range(times): + lst = lst * multiplier + time.sleep(interval) + + def test_monitor_func(self): + _, dict1 = Monitor.monitor_func(self._increase_mem_func, + args=(10 ** 5, 2, 8,), + sample_interval=0.3) + resource1 = dict1['resource'] + self.assertLessEqual(resource1[1]['Mem. util.'], resource1[-2]['Mem. util.']) + + _, dict2 = Monitor.monitor_func(self._increase_mem_func, + args=(10 ** 6, 2, 5,), + sample_interval=0.2) + resource2 = dict2['resource'][:] + self.assertLessEqual(resource2[1]['Mem. util.'], resource2[-2]['Mem. util.']) + + _, dict3 = Monitor.monitor_func(self._increase_mem_func, + args=(25600000, 2, 4,), + sample_interval=0.3) + resource3 = dict3['resource'][:] + self.assertLessEqual(resource3[1]['Mem. util.'], resource3[-2]['Mem. util.']) + self.assertGreaterEqual(resource3[1]['Mem. util.'], + resource1[-2]['Mem. util.']) + + +if __name__ == '__main__': + unittest.main() From a9e1ec4b297ce89bf413192187c2312212aab24f Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 12 Sep 2024 21:26:08 +0800 Subject: [PATCH 05/11] + add unittests for Monitor --- data_juicer/core/monitor.py | 1 + tests/core/test_monitor.py | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index bfea5b91f..9bbbf9cf3 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -57,6 +57,7 @@ class Monitor: 'Used mem.', 'Free mem.', 'Available mem.', + 'Mem. util.', 'GPU free mem.', 'GPU used mem.', 'GPU util.', diff --git a/tests/core/test_monitor.py b/tests/core/test_monitor.py index e4354c205..ca2cd1469 100644 --- a/tests/core/test_monitor.py +++ b/tests/core/test_monitor.py @@ -40,24 +40,30 @@ def _increase_mem_func(self, init_len=100000, multiplier=2, times=10, interval=0 def test_monitor_func(self): _, dict1 = Monitor.monitor_func(self._increase_mem_func, - args=(10 ** 5, 2, 8,), + args=(10 ** 3, 2, 4,), sample_interval=0.3) - resource1 = dict1['resource'] - self.assertLessEqual(resource1[1]['Mem. util.'], resource1[-2]['Mem. util.']) + analysis1 = Monitor.analyze_single_resource_util(dict1) + logger.info(analysis1['resource_analysis']) _, dict2 = Monitor.monitor_func(self._increase_mem_func, - args=(10 ** 6, 2, 5,), - sample_interval=0.2) - resource2 = dict2['resource'][:] - self.assertLessEqual(resource2[1]['Mem. util.'], resource2[-2]['Mem. util.']) + args=(10 ** 5, 2, 4,), + sample_interval=0.3) + analysis2 = Monitor.analyze_single_resource_util(dict2) + logger.info(analysis2['resource_analysis']) _, dict3 = Monitor.monitor_func(self._increase_mem_func, - args=(25600000, 2, 4,), + args=(10 ** 7, 2, 4,), sample_interval=0.3) - resource3 = dict3['resource'][:] - self.assertLessEqual(resource3[1]['Mem. util.'], resource3[-2]['Mem. util.']) - self.assertGreaterEqual(resource3[1]['Mem. util.'], - resource1[-2]['Mem. util.']) + analysis3 = Monitor.analyze_single_resource_util(dict3) + logger.info(analysis3['resource_analysis']) + + self.assertLessEqual( + analysis1['resource_analysis']['Mem. util.']['avg'], + analysis2['resource_analysis']['Mem. util.']['avg']) + + self.assertLessEqual( + analysis2['resource_analysis']['Mem. util.']['avg'], + analysis3['resource_analysis']['Mem. util.']['avg']) if __name__ == '__main__': From 8ade118fbac764bb0bd3deda705120fbdc2c5d6c Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Sep 2024 12:05:13 +0800 Subject: [PATCH 06/11] * use multiprocessing to monitor resource utilization --- data_juicer/core/monitor.py | 71 +++++++++++++++++++++---------------- tests/core/test_monitor.py | 15 ++------ 2 files changed, 43 insertions(+), 43 deletions(-) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index 9bbbf9cf3..932ea04f0 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -1,12 +1,24 @@ -import threading import time from functools import partial +from multiprocessing import get_context from data_juicer.utils.resource_utils import (get_cpu_count, get_cpu_utilization, query_cuda_info, query_mem_info) +def resource_monitor(mdict, interval): + # function to monitor the resource + # interval is the sampling interval + this_states = [] + while True: + this_states.append(Monitor.monitor_current_resources()) + time.sleep(interval) + if mdict['stop']: + break + mdict['resource'] = this_states + + class Monitor: """ Monitor resource utilization and other information during the data @@ -170,36 +182,33 @@ def monitor_func(func, args=None, sample_interval=0.5): # resource utilization dict resource_util_dict = {} - # whether in the monitoring mode - running_flag = False - - def resource_monitor(interval): - # function to monitor the resource - # interval is the sampling interval - this_states = [] - while running_flag: - this_states.append(Monitor.monitor_current_resources()) - time.sleep(interval) - resource_util_dict['resource'] = this_states - # start monitor - running_flag = True - monitor_thread = threading.Thread(target=resource_monitor, - args=(sample_interval, )) - monitor_thread.start() - # start timer - start = time.time() - - # run single op - ret = func() - - # end timer - end = time.time() - # stop monitor - running_flag = False - monitor_thread.join() - - # calculate speed - resource_util_dict['time'] = end - start + ctx = get_context('fork') + with ctx.Manager() as manager: + mdict = manager.dict() + mdict['stop'] = False + monitor_proc = ctx.Process(target=resource_monitor, + args=( + mdict, + sample_interval, + )) + monitor_proc.start() + # start timer + start = time.time() + + # run single op + ret = func() + + # end timer + end = time.time() + + # stop monitor + mdict['stop'] = True + monitor_proc.join() + + resource_util_dict['resource'] = mdict['resource'] + + # calculate speed + resource_util_dict['time'] = end - start return ret, resource_util_dict diff --git a/tests/core/test_monitor.py b/tests/core/test_monitor.py index ca2cd1469..01840348d 100644 --- a/tests/core/test_monitor.py +++ b/tests/core/test_monitor.py @@ -2,8 +2,9 @@ import time from loguru import logger from data_juicer.core import Monitor -from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase, SKIPPED_TESTS +@SKIPPED_TESTS.register_module() class MonitorTest(DataJuicerTestCaseBase): def test_monitor_current_resources(self): @@ -46,25 +47,15 @@ def test_monitor_func(self): logger.info(analysis1['resource_analysis']) _, dict2 = Monitor.monitor_func(self._increase_mem_func, - args=(10 ** 5, 2, 4,), + args=(10 ** 7, 2, 4,), sample_interval=0.3) analysis2 = Monitor.analyze_single_resource_util(dict2) logger.info(analysis2['resource_analysis']) - _, dict3 = Monitor.monitor_func(self._increase_mem_func, - args=(10 ** 7, 2, 4,), - sample_interval=0.3) - analysis3 = Monitor.analyze_single_resource_util(dict3) - logger.info(analysis3['resource_analysis']) - self.assertLessEqual( analysis1['resource_analysis']['Mem. util.']['avg'], analysis2['resource_analysis']['Mem. util.']['avg']) - self.assertLessEqual( - analysis2['resource_analysis']['Mem. util.']['avg'], - analysis3['resource_analysis']['Mem. util.']['avg']) - if __name__ == '__main__': unittest.main() From 3c4320c76e564d9c922731e3277bd5359c3e6080 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 13 Sep 2024 19:31:22 +0800 Subject: [PATCH 07/11] + add unittests for adapter --- data_juicer/core/adapter.py | 105 ++++++++++------ data_juicer/utils/resource_utils.py | 7 +- environments/minimal_requires.txt | 2 +- tests/core/test_adapter.py | 182 ++++++++++++++++++++++++++++ 4 files changed, 254 insertions(+), 42 deletions(-) create mode 100644 tests/core/test_adapter.py diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 7a08578e0..77bbf12d9 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -1,14 +1,13 @@ -from datasets import concatenate_datasets -from loguru import logger - from data_juicer.core.monitor import Monitor class Adapter: + MAX_BATCH_SIZE = 10000 + def __init__(self, cfg): self.cfg = cfg - self.current_resources = Monitor.monitor_current_resources() + self.idle_resources = Monitor.monitor_current_resources() @staticmethod def execute_and_probe(dataset, operators, sample_interval=0.5): @@ -26,13 +25,12 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): if operators is None or len(operators) == 0: return [] - # number of test samples - sample_num = len(dataset) # resource utilization list resource_util_list = [] # probe for each OP for op in operators: - + # number of test samples + sample_num = len(dataset) # run single op and monitor the resource utilization dataset, resource_util_per_op = Monitor.monitor_func( op.run, args=(dataset, ), sample_interval=sample_interval) @@ -42,7 +40,7 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): 'speed'] = sample_num / resource_util_per_op['time'] resource_util_list.append(resource_util_per_op) - return dataset, resource_util_list + return resource_util_list def workloads_adapt(self, dataset, operators): """ @@ -51,62 +49,89 @@ def workloads_adapt(self, dataset, operators): :param dataset: The dataset that needs to be processed :param operators: Operators in the data recipe """ - res_data_batches = [] - load_factor = 1.0 - left_dataset = dataset - while left_dataset: - data_batch, left_dataset = self.take_batch(left_dataset, self.cfg, - load_factor) - res_batch, load_factor = self.probe_data_batches( - data_batch, operators, load_factor) - res_data_batches.append(res_batch) - return concatenate_datasets(res_data_batches) - - def probe_data_batches(self, data_batch, operators, load_factor=None): + # TODO: set batch size to 1 for all OPs for probing + load_analysis_res, probed_batch_size = self.probe_small_batch( + dataset, operators) + + # calculate batch size for each OP according to the analysis results + bs_per_op = self.batch_size_strategy(load_analysis_res, + base_bs=probed_batch_size) + + return bs_per_op + + def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): + """ + Decide the batch size for each op according to their workload analysis + result and expected utilization threshold. We need to guarantee that + the resource utilization won't exceed the threshold. Now we only + consider the buckets effect, which means the max batch size is decided + by the max utilization of all types of resources except GPU util + (decided by num_proc). + """ + batch_size_per_op = [] + + # compute left utils according to the util_th + left_utils = {} + for key in self.idle_resources: + if 'util.' not in key or 'GPU' in key: + continue + left_utils[key] = max(0, util_th - self.idle_resources[key]) + + for item in load_analysis_res: + max_util = 1e-5 + max_key = min(left_utils.items(), key=lambda it: it[1])[0] + analysis_res = item['resource_analysis'] + for key in analysis_res: + if 'util.' not in key or 'GPU' in key: + continue + used_util = max( + 0, analysis_res[key]['max'] - self.idle_resources[key]) + if used_util > max_util: + max_util = used_util + max_key = key + load_factor = left_utils[max_key] / max_util + bs_this_op = min(max(int(base_bs * load_factor), 1), + self.MAX_BATCH_SIZE) + batch_size_per_op.append(bs_this_op) + + return batch_size_per_op + + def probe_small_batch(self, dataset, operators): """ Perform small batch pre-execution to probe available resources, current load and estimated OP speed, returning load factors and speed ranks for each OP. - :param data_batch: The dataset batch to pre-execute on + :param dataset: The dataset to pre-execute small batch on :param operators: The OP list to be pre-execution and probe - :param load_factor: Load factor for this batch :return: A list of probe results for each OP. """ + # take a small batch + data_batch = self.take_batch(dataset, self.cfg) # process and monitor the resource utilization - res_batch, resource_util_list = self.execute_and_probe( - data_batch, operators) + resource_util_list = self.execute_and_probe(data_batch, operators) # analyze resource utilization analysis_res = Monitor.analyze_resource_util_list(resource_util_list) - # get a new load_factor according to the analysis result - load_factor = 1.0 * analysis_res - pass - logger.info(f'Adjust load factor to: {load_factor}') - return res_batch, load_factor + return analysis_res, len(data_batch) @staticmethod - def take_batch(dataset, config, load_factor=None): + def take_batch(dataset, config): """ Split the dataset into batches based on configuration and load factor. :param dataset: The dataset to be split :param config: Configuration settings, including batch size - :param load_factor: The detected load factor from pre-execution :return: An iterator of batches """ # get initial batch size - batch_size = config.get('batch_size', 100) - # adapt according to the load factor - if load_factor: - batch_size = int(batch_size / load_factor) - pass - # should be in [1, 1000] - batch_size = min(max(batch_size, 1), 1000) + batch_size = config.get('batch_size', 1000) + # should be in [1, 10000] + batch_size = min(max(batch_size, 1), Adapter.MAX_BATCH_SIZE) # check if there are enough samples num_samples = len(dataset) if batch_size >= num_samples: - return dataset, None + return dataset else: - return dataset.take(batch_size), dataset.skip(batch_size) + return dataset.take(batch_size) diff --git a/data_juicer/utils/resource_utils.py b/data_juicer/utils/resource_utils.py index 2105c52ae..cb1efd14e 100644 --- a/data_juicer/utils/resource_utils.py +++ b/data_juicer/utils/resource_utils.py @@ -3,8 +3,11 @@ import psutil from loguru import logger +NVSMI_REPORT = True + def query_cuda_info(query_key): + global NVSMI_REPORT # get cuda info using "nvidia-smi" command in MB try: nvidia_smi_output = subprocess.check_output([ @@ -21,7 +24,9 @@ def query_cuda_info(query_key): 'GPUs on this machine.' else: err_msg = str(e) - logger.warning(err_msg) + if NVSMI_REPORT: + logger.warning(err_msg) + NVSMI_REPORT = False return None cuda_info_list = [] for line in nvidia_smi_output.strip().split('\n'): diff --git a/environments/minimal_requires.txt b/environments/minimal_requires.txt index bd55d2008..79d924989 100644 --- a/environments/minimal_requires.txt +++ b/environments/minimal_requires.txt @@ -1,7 +1,7 @@ fsspec==2023.5.0 pyarrow<=12.0.0 pandas==2.0.3 -datasets==2.18.0 +datasets>=2.19.0 av soundfile librosa>=0.10 diff --git a/tests/core/test_adapter.py b/tests/core/test_adapter.py new file mode 100644 index 000000000..5bd4cb89b --- /dev/null +++ b/tests/core/test_adapter.py @@ -0,0 +1,182 @@ +import os +import unittest +import datasets +from datasets import load_dataset +from loguru import logger +from data_juicer.core import Adapter +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase +from data_juicer.ops.mapper import FixUnicodeMapper +from data_juicer.ops.filter import PerplexityFilter +from data_juicer.ops.deduplicator import DocumentDeduplicator + +class AdapterTest(DataJuicerTestCaseBase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.test_file = 'text_only_2.3k.jsonl' + download_link = f'http://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/unittest_data/{cls.test_file}' + os.system(f'wget {download_link}') + + @classmethod + def tearDownClass(cls, hf_model_name=None) -> None: + # remove test dataset + os.system(f'rm -f {cls.test_file}') + + super().tearDownClass(hf_model_name) + + def test_take_batch(self): + ds = load_dataset('json', data_files=self.test_file, split='train') + logger.info(f'Length of test dataset: [{len(ds)}]') + batch_sizes = [100, 1000, 3000] + test_config = {} + + for bs in batch_sizes: + tgt_len = min(bs, len(ds)) + test_config['batch_size'] = bs + res_ds = Adapter.take_batch(ds, test_config) + logger.info(f'Require [{bs}] and got [{len(res_ds)}].') + self.assertEqual(len(res_ds), tgt_len) + + def test_batch_size_strategy(self): + test_analysis_res = [ + { + 'resource_analysis': { + 'CPU util.': { + 'max': 0.5, + }, + 'GPU util.': { + 'max': 0.8, + }, + 'Mem. util.': { + 'max': 0.3, + } + } + }, + { + 'resource_analysis': { + 'CPU util.': { + 'max': 0.2, + }, + 'GPU util.': { + 'max': 1.0, + }, + 'Mem. util.': { + 'max': 0.1, + } + } + }, + ] + + adapter = Adapter({'batch_size': 1}) + adapter.idle_resources = { + 'CPU util.': 0, + 'GPU util.': 0, + 'Mem. util.': 0, + } + + # basic test + tgt_bs_1 = [2, 5] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=1.0) + self.assertEqual(bs_res, tgt_bs_1) + + # lower util threshold + tgt_bs_2 = [1, 3] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=0.7) + self.assertEqual(bs_res, tgt_bs_2) + + # larger base batch size + adapter.cfg['batch_size'] = 10 + tgt_bs_3 = [18, 45] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=10, + util_th=0.9) + self.assertEqual(bs_res, tgt_bs_3) + + # out of resource + tgt_bs_4 = [2, 5] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=10, + util_th=0.1) + self.assertEqual(bs_res, tgt_bs_4) + + # out of resource 2 + adapter.cfg['batch_size'] = 1 + tgt_bs_4 = [1, 1] + bs_res = adapter.batch_size_strategy(test_analysis_res, + base_bs=1, + util_th=0.1) + self.assertEqual(bs_res, tgt_bs_4) + + def test_execute_and_probe(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train').take(100) + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + resource_util_list = Adapter.execute_and_probe(ds, ops) + self.assertEqual(len(resource_util_list), len(ops)) + + # finer-grained test + # reinitialize the OPs to avoid warm start. + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] + resource_util_list2 = Adapter.execute_and_probe(ds, ops, sample_interval=0.2) + logger.info(f'interval=\t0.5\t0.2') + for item1, item2 in zip(resource_util_list, resource_util_list2): + logger.info(f' \t{len(item1["resource"])}\t{len(item2["resource"])}') + self.assertLessEqual(len(item1['resource']), len(item2['resource'])) + + datasets.enable_caching() + + def test_probe_small_batch(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train') + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + adapter = Adapter({'batch_size': 100}) + resource_util_analysis_res, probe_bs = adapter.probe_small_batch(ds, ops) + logger.info(f'Probe on batch size is [{probe_bs}].') + self.assertEqual(len(resource_util_analysis_res), len(ops)) + for item in resource_util_analysis_res: + self.assertIn('resource_analysis', item) + + datasets.enable_caching() + + def test_workloads_adapt(self): + datasets.disable_caching() + # basic test + ds = load_dataset('json', data_files=self.test_file, split='train') + ops = [ + FixUnicodeMapper(num_proc=1), + PerplexityFilter(num_proc=1), + DocumentDeduplicator(num_proc=1), + ] # use some batched OPs later + + adapter = Adapter({'batch_size': 100}) + adapted_batch_sizes = adapter.workloads_adapt(ds, ops) + self.assertEqual(len(adapted_batch_sizes), len(ops)) + logger.info(adapted_batch_sizes) + + datasets.enable_caching() + + +if __name__ == '__main__': + unittest.main() From 5a6c0714cde85f9d8b77f632b39206580beec2c5 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Sat, 14 Sep 2024 15:42:42 +0800 Subject: [PATCH 08/11] * modification and fix for gece's comments --- data_juicer/core/adapter.py | 86 +++++++++++++++++++------------------ data_juicer/core/monitor.py | 3 +- tests/core/test_adapter.py | 4 +- 3 files changed, 48 insertions(+), 45 deletions(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 77bbf12d9..730969e16 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -1,3 +1,5 @@ +from datasets.config import DEFAULT_MAX_BATCH_SIZE + from data_juicer.core.monitor import Monitor @@ -5,7 +7,7 @@ class Adapter: MAX_BATCH_SIZE = 10000 - def __init__(self, cfg): + def __init__(self, cfg: dict): self.cfg = cfg self.idle_resources = Monitor.monitor_current_resources() @@ -42,7 +44,28 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): return resource_util_list - def workloads_adapt(self, dataset, operators): + @staticmethod + def take_batch(dataset, config): + """ + Split the dataset into batches based on configuration and load factor. + + :param dataset: The dataset to be split + :param config: Configuration settings, including batch size + :return: An iterator of batches + """ + # get initial batch size + batch_size = config.get('batch_size', DEFAULT_MAX_BATCH_SIZE) + # should be in [1, 10000] + batch_size = min(max(batch_size, 1), Adapter.MAX_BATCH_SIZE) + + # check if there are enough samples + num_samples = len(dataset) + if batch_size >= num_samples: + return dataset + else: + return dataset.take(batch_size) + + def adapt_workloads(self, dataset, operators): """ Manage the scheduling and load balancing for the dataset processing. @@ -59,6 +82,25 @@ def workloads_adapt(self, dataset, operators): return bs_per_op + def probe_small_batch(self, dataset, operators): + """ + Perform small batch pre-execution to probe available resources, + current load and estimated OP speed, returning load factors and speed + ranks for each OP. + + :param dataset: The dataset to pre-execute small batch on + :param operators: The OP list to be pre-execution and probe + :return: A list of probe results for each OP. + """ + # take a small batch + data_batch = self.take_batch(dataset, self.cfg) + # process and monitor the resource utilization + resource_util_list = self.execute_and_probe(data_batch, operators) + # analyze resource utilization + analysis_res = Monitor.analyze_resource_util_list(resource_util_list) + + return analysis_res, len(data_batch) + def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): """ Decide the batch size for each op according to their workload analysis @@ -95,43 +137,3 @@ def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9): batch_size_per_op.append(bs_this_op) return batch_size_per_op - - def probe_small_batch(self, dataset, operators): - """ - Perform small batch pre-execution to probe available resources, - current load and estimated OP speed, returning load factors and speed - ranks for each OP. - - :param dataset: The dataset to pre-execute small batch on - :param operators: The OP list to be pre-execution and probe - :return: A list of probe results for each OP. - """ - # take a small batch - data_batch = self.take_batch(dataset, self.cfg) - # process and monitor the resource utilization - resource_util_list = self.execute_and_probe(data_batch, operators) - # analyze resource utilization - analysis_res = Monitor.analyze_resource_util_list(resource_util_list) - - return analysis_res, len(data_batch) - - @staticmethod - def take_batch(dataset, config): - """ - Split the dataset into batches based on configuration and load factor. - - :param dataset: The dataset to be split - :param config: Configuration settings, including batch size - :return: An iterator of batches - """ - # get initial batch size - batch_size = config.get('batch_size', 1000) - # should be in [1, 10000] - batch_size = min(max(batch_size, 1), Adapter.MAX_BATCH_SIZE) - - # check if there are enough samples - num_samples = len(dataset) - if batch_size >= num_samples: - return dataset - else: - return dataset.take(batch_size) diff --git a/data_juicer/core/monitor.py b/data_juicer/core/monitor.py index 932ea04f0..7d2f7984c 100644 --- a/data_juicer/core/monitor.py +++ b/data_juicer/core/monitor.py @@ -89,7 +89,8 @@ def monitor_all_resources(self): def monitor_current_resources(): """ Detect the resource utilization of the current environment/machine. - All data of "util." is in percent. All data of "mem." is in MB. + All data of "util." is ratios in the range of [0.0, 1.0]. All data of + "mem." is in MB. """ resource_dict = dict() # current time diff --git a/tests/core/test_adapter.py b/tests/core/test_adapter.py index 5bd4cb89b..965355b96 100644 --- a/tests/core/test_adapter.py +++ b/tests/core/test_adapter.py @@ -160,7 +160,7 @@ def test_probe_small_batch(self): datasets.enable_caching() - def test_workloads_adapt(self): + def test_adapt_workloads(self): datasets.disable_caching() # basic test ds = load_dataset('json', data_files=self.test_file, split='train') @@ -171,7 +171,7 @@ def test_workloads_adapt(self): ] # use some batched OPs later adapter = Adapter({'batch_size': 100}) - adapted_batch_sizes = adapter.workloads_adapt(ds, ops) + adapted_batch_sizes = adapter.adapt_workloads(ds, ops) self.assertEqual(len(adapted_batch_sizes), len(ops)) logger.info(adapted_batch_sizes) From f6e017f4fd3f99afb239c72224b2770f5213193d Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Wed, 18 Sep 2024 18:16:31 +0800 Subject: [PATCH 09/11] * support OP fusion based on probed speed of each OP --- data_juicer/core/adapter.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 730969e16..34250cab5 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -31,6 +31,11 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): resource_util_list = [] # probe for each OP for op in operators: + # set num_proc to 1 for each OP to focus on the influence of batch + # size only. + old_num_proc = op.num_proc + op.num_proc = 1 + # number of test samples sample_num = len(dataset) # run single op and monitor the resource utilization @@ -42,6 +47,9 @@ def execute_and_probe(dataset, operators, sample_interval=0.5): 'speed'] = sample_num / resource_util_per_op['time'] resource_util_list.append(resource_util_per_op) + # restore to the original num_proc + op.num_proc = old_num_proc + return resource_util_list @staticmethod From 902c0598ea4436ed18c02cd3ea2fcd792c1a9604 Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Thu, 19 Sep 2024 20:32:51 +0800 Subject: [PATCH 10/11] + monitor the resource utilization for the normal processing pipeline and store the monitor results into a specific file in work dir --- data_juicer/core/analyzer.py | 2 +- data_juicer/core/data.py | 19 ++++++++++++++++++- data_juicer/core/executor.py | 1 + 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/data_juicer/core/analyzer.py b/data_juicer/core/analyzer.py index ce1af2d84..93ffea451 100644 --- a/data_juicer/core/analyzer.py +++ b/data_juicer/core/analyzer.py @@ -87,7 +87,7 @@ def run(self, load_data_np=None, skip_export=False): if isinstance(op, Filter): original_process = op.process op.process = None - dataset = dataset.process(op) + dataset = dataset.process(op, work_dir=self.work_dir) op.process = original_process stats_collected = True if not stats_collected: diff --git a/data_juicer/core/data.py b/data_juicer/core/data.py index c8f05a767..bf2ad5fa1 100644 --- a/data_juicer/core/data.py +++ b/data_juicer/core/data.py @@ -2,6 +2,8 @@ import copy import inspect +import json +import os import traceback from abc import ABC, abstractmethod from functools import wraps @@ -12,6 +14,7 @@ from datasets.formatting.formatting import LazyBatch from loguru import logger +from data_juicer.core.monitor import Monitor from data_juicer.ops import UNFORKABLE from data_juicer.utils import cache_utils from data_juicer.utils.compress import (CompressionOff, @@ -164,6 +167,7 @@ def __getitem__(self, key): def process(self, operators, *, + work_dir=None, exporter=None, checkpointer=None, tracer=None): @@ -174,6 +178,9 @@ def process(self, operators = [operators] unforkable_operators = set(UNFORKABLE.modules.keys()) + # resource utilization monitor + resource_util_list = [] + dataset = self try: for op in operators: @@ -184,10 +191,17 @@ def process(self, start = time() # run single op - dataset = op.run(dataset, exporter=exporter, tracer=tracer) + run_args = { + 'dataset': dataset, + 'exporter': exporter, + 'tracer': tracer, + } + dataset, resource_util_per_op = Monitor.monitor_func( + op.run, args=run_args) # record processed ops if checkpointer is not None: checkpointer.record(op._op_cfg) + resource_util_list.append(resource_util_per_op) end = time() logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. ' f'Left {len(dataset)} samples.') @@ -201,6 +215,9 @@ def process(self, 'last op...') dataset.cleanup_cache_files() checkpointer.save_ckpt(dataset) + if work_dir: + with open(os.path.join(work_dir, 'monitor.json'), 'w') as out: + json.dump(resource_util_list, out) return dataset def map(self, *args, **kargs): diff --git a/data_juicer/core/executor.py b/data_juicer/core/executor.py index 87e38dbce..1cf309e42 100644 --- a/data_juicer/core/executor.py +++ b/data_juicer/core/executor.py @@ -160,6 +160,7 @@ def run(self, load_data_np=None): logger.info('Processing data...') tstart = time() dataset = dataset.process(ops, + work_dir=self.work_dir, exporter=self.exporter, checkpointer=self.ckpt_manager, tracer=self.tracer) From a052535017f1afea650e4064a15fa6691c4c3bcd Mon Sep 17 00:00:00 2001 From: "lielin.hyl" Date: Fri, 20 Sep 2024 15:26:34 +0800 Subject: [PATCH 11/11] * doc fixed --- data_juicer/core/adapter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data_juicer/core/adapter.py b/data_juicer/core/adapter.py index 34250cab5..aa746a058 100644 --- a/data_juicer/core/adapter.py +++ b/data_juicer/core/adapter.py @@ -98,7 +98,8 @@ def probe_small_batch(self, dataset, operators): :param dataset: The dataset to pre-execute small batch on :param operators: The OP list to be pre-execution and probe - :return: A list of probe results for each OP. + :return: A list of probe results for each OP and the length of data + batch to probe. """ # take a small batch data_batch = self.take_batch(dataset, self.cfg)