Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/dj adapter #429

Merged
merged 11 commits into from
Sep 20, 2024
4 changes: 4 additions & 0 deletions data_juicer/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from .adapter import Adapter
from .analyzer import Analyzer
from .data import NestedDataset
from .executor import Executor
from .exporter import Exporter
from .monitor import Monitor
from .tracer import Tracer

__all__ = [
'Adapter',
'Analyzer',
'NestedDataset',
'Executor',
'Exporter',
'Monitor',
'Tracer',
]
137 changes: 137 additions & 0 deletions data_juicer/core/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from data_juicer.core.monitor import Monitor


class Adapter:

MAX_BATCH_SIZE = 10000

def __init__(self, cfg):
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
self.cfg = cfg
self.idle_resources = Monitor.monitor_current_resources()

@staticmethod
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
def execute_and_probe(dataset, operators, sample_interval=0.5):
"""
Process the input dataset and probe related information for each OP in
the specified operator list.

For now, we support the following targets to probe:
"resource": resource utilization for each OP.
"speed": average processing speed for each OP.

The probe result is a list and each item in the list is the probe
result for each OP.
"""
if operators is None or len(operators) == 0:
return []

# resource utilization list
resource_util_list = []
# probe for each OP
for op in operators:
# number of test samples
sample_num = len(dataset)
# run single op and monitor the resource utilization
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=(dataset, ), sample_interval=sample_interval)

# calculate speed
resource_util_per_op[
'speed'] = sample_num / resource_util_per_op['time']
resource_util_list.append(resource_util_per_op)

return resource_util_list

def workloads_adapt(self, dataset, operators):
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
"""
Manage the scheduling and load balancing for the dataset processing.

:param dataset: The dataset that needs to be processed
:param operators: Operators in the data recipe
"""
# TODO: set batch size to 1 for all OPs for probing
load_analysis_res, probed_batch_size = self.probe_small_batch(
dataset, operators)

# calculate batch size for each OP according to the analysis results
bs_per_op = self.batch_size_strategy(load_analysis_res,
base_bs=probed_batch_size)

return bs_per_op

def batch_size_strategy(self, load_analysis_res, base_bs=1, util_th=0.9):
"""
Decide the batch size for each op according to their workload analysis
result and expected utilization threshold. We need to guarantee that
the resource utilization won't exceed the threshold. Now we only
consider the buckets effect, which means the max batch size is decided
by the max utilization of all types of resources except GPU util
(decided by num_proc).
"""
batch_size_per_op = []

# compute left utils according to the util_th
left_utils = {}
for key in self.idle_resources:
if 'util.' not in key or 'GPU' in key:
continue
left_utils[key] = max(0, util_th - self.idle_resources[key])

for item in load_analysis_res:
max_util = 1e-5
max_key = min(left_utils.items(), key=lambda it: it[1])[0]
analysis_res = item['resource_analysis']
for key in analysis_res:
if 'util.' not in key or 'GPU' in key:
continue
used_util = max(
0, analysis_res[key]['max'] - self.idle_resources[key])
yxdyc marked this conversation as resolved.
Show resolved Hide resolved
if used_util > max_util:
max_util = used_util
max_key = key
load_factor = left_utils[max_key] / max_util
bs_this_op = min(max(int(base_bs * load_factor), 1),
self.MAX_BATCH_SIZE)
batch_size_per_op.append(bs_this_op)

return batch_size_per_op

def probe_small_batch(self, dataset, operators):
"""
Perform small batch pre-execution to probe available resources,
current load and estimated OP speed, returning load factors and speed
ranks for each OP.

:param dataset: The dataset to pre-execute small batch on
:param operators: The OP list to be pre-execution and probe
:return: A list of probe results for each OP.
"""
# take a small batch
data_batch = self.take_batch(dataset, self.cfg)
# process and monitor the resource utilization
resource_util_list = self.execute_and_probe(data_batch, operators)
# analyze resource utilization
analysis_res = Monitor.analyze_resource_util_list(resource_util_list)

return analysis_res, len(data_batch)

@staticmethod
def take_batch(dataset, config):
"""
Split the dataset into batches based on configuration and load factor.

:param dataset: The dataset to be split
:param config: Configuration settings, including batch size
:return: An iterator of batches
"""
# get initial batch size
batch_size = config.get('batch_size', 1000)
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
# should be in [1, 10000]
batch_size = min(max(batch_size, 1), Adapter.MAX_BATCH_SIZE)

# check if there are enough samples
num_samples = len(dataset)
if batch_size >= num_samples:
return dataset
else:
return dataset.take(batch_size)
214 changes: 214 additions & 0 deletions data_juicer/core/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import time
from functools import partial
from multiprocessing import get_context

from data_juicer.utils.resource_utils import (get_cpu_count,
get_cpu_utilization,
query_cuda_info, query_mem_info)


def resource_monitor(mdict, interval):
# function to monitor the resource
# interval is the sampling interval
this_states = []
while True:
this_states.append(Monitor.monitor_current_resources())
time.sleep(interval)
if mdict['stop']:
break
mdict['resource'] = this_states


class Monitor:
"""
Monitor resource utilization and other information during the data
processing.

Resource utilization dict: (for each func)
'''python
{
'time': 10,
'resource': [
{
'timestamp': xxx,
'CPU count': xxx,
'GPU free mem.': xxx.
...
},
{
'timestamp': xxx,
'CPU count': xxx,
'GPU free mem.': xxx,
...
},
]
}
'''

Based on the structure above, the resource utilization analysis result will
add several extra fields on the first level:
'''python
{
'time': 10,
'resource': [...],
'resource_analysis': {
'GPU free mem.': {
'max': xxx,
'min': xxx,
'avg': xxx,
},
...
}
}
'''
Only those fields in DYNAMIC_FIELDS will be analyzed.
"""

DYNAMIC_FIELDS = {
'CPU util.',
'Used mem.',
'Free mem.',
'Available mem.',
'Mem. util.',
'GPU free mem.',
'GPU used mem.',
'GPU util.',
}

def __init__(self):
pass

def monitor_all_resources(self):
"""
Detect the resource utilization of all distributed nodes.
"""
# TODO
raise NotImplementedError

@staticmethod
def monitor_current_resources():
"""
Detect the resource utilization of the current environment/machine.
All data of "util." is in percent. All data of "mem." is in MB.
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
"""
resource_dict = dict()
# current time
resource_dict['timestamp'] = time.time()

# CPU
resource_dict['CPU count'] = get_cpu_count()
resource_dict['CPU util.'] = get_cpu_utilization() / 100.0
resource_dict['Total mem.'] = query_mem_info('total')
resource_dict['Used mem.'] = query_mem_info('used')
resource_dict['Free mem.'] = query_mem_info('free')
resource_dict['Available mem.'] = query_mem_info('available')
resource_dict['Mem. util.'] = resource_dict[
'Used mem.'] / resource_dict['Total mem.']

# GPU
resource_dict['GPU total mem.'] = query_cuda_info('memory.total')
resource_dict['GPU free mem.'] = query_cuda_info('memory.free')
resource_dict['GPU used mem.'] = query_cuda_info('memory.used')
resource_dict['GPU util.'] = query_cuda_info('utilization.gpu')
if resource_dict['GPU util.']:
resource_dict['GPU util.'] = [
x / 100.0 for x in resource_dict['GPU util.']
]

return resource_dict

@staticmethod
def analyze_resource_util_list(resource_util_list):
"""
Analyze the resource utilization for a given resource util list.
Compute {'max', 'min', 'avg'} of resource metrics for each dict item.
"""
res_list = []
for item in resource_util_list:
res_list.append(Monitor.analyze_single_resource_util(item))
return res_list

@staticmethod
def analyze_single_resource_util(resource_util_dict):
"""
Analyze the resource utilization for a single resource util dict.
Compute {'max', 'min', 'avg'} of each resource metrics.
"""
analysis_res = {}
record_list = {}
for record in resource_util_dict['resource']:
for key in Monitor.DYNAMIC_FIELDS:
if key in record:
if record[key] is None:
continue
elif isinstance(record[key], list):
record_list.setdefault(key, []).extend(record[key])
else:
record_list.setdefault(key, []).append(record[key])

# analyze the max, min, and avg
for key in record_list:
analysis_res[key] = {
'max': max(record_list[key]),
'min': min(record_list[key]),
'avg': sum(record_list[key]) / len(record_list[key]),
}
resource_util_dict['resource_analysis'] = analysis_res

return resource_util_dict

@staticmethod
def monitor_func(func, args=None, sample_interval=0.5):
"""
Process the input dataset and probe related information for each OP in
the specified operator list.

For now, we support the following targets to probe:
"resource": resource utilization for each OP.
"speed": average processing speed for each OP.

The probe result is a list and each item in the list is the probe
result for each OP.
"""
if args is None:
args = {}
if isinstance(args, dict):
func = partial(func, **args)
elif isinstance(args, list) or isinstance(args, tuple):
func = partial(func, *args)
else:
func = partial(func, args)

# resource utilization dict
resource_util_dict = {}

# start monitor
ctx = get_context('fork')
with ctx.Manager() as manager:
mdict = manager.dict()
mdict['stop'] = False
monitor_proc = ctx.Process(target=resource_monitor,
args=(
mdict,
sample_interval,
))
monitor_proc.start()
# start timer
start = time.time()

# run single op
ret = func()

# end timer
end = time.time()

# stop monitor
mdict['stop'] = True
monitor_proc.join()

resource_util_dict['resource'] = mdict['resource']

# calculate speed
resource_util_dict['time'] = end - start

return ret, resource_util_dict
2 changes: 1 addition & 1 deletion data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def run(self, dataset, *, exporter=None, tracer=None):
num_proc=self.runtime_np(),
with_rank=self.use_cuda(),
desc=self._name + '_compute_stats')
if self.stats_export_path is not None:
if exporter and self.stats_export_path is not None:
exporter.export_compute_stats(dataset, self.stats_export_path)
new_dataset = dataset.filter(self.process,
num_proc=self.runtime_np(),
Expand Down
Loading
Loading