Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/text aug mapper #17

Merged
merged 8 commits into from
Sep 13, 2023
Merged
29 changes: 25 additions & 4 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
# global parameters
project_name: 'all' # project name for distinguish your configs
dataset_path: '/path/to/your/dataset' # path to your dataset directory or file with weights(0.0-1.0), 1.0 as default.
# Accepted format: 'weight1(optional) dataset1-path weight2(optional) dataset2-path '
# Accepted format: 'weight1(optional) dataset1-path weight2(optional) dataset2-path
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
export_path: '/path/to/result/dataset.jsonl' # path to processed result dataset. Supported suffixes include ['jsonl', 'json', 'parquet']
export_shard_size: 0 # Shard size of exported dataset in Byte. In default, it's 0, which means export the whole dataset into only one file. If it's set a positive number, the exported dataset will be split into several dataset shards, and the max size of each shard won't larger than the export_shard_size
export_in_parallel: false # Whether to export the result dataset in parallel to a single file. It only works when export_shard_size is 0, and its default number of processes is the same as the argument np.
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
np: 4 # number of subprocess to process your dataset
text_keys: 'content' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...'
text_keys: 'content' # the key name of field where the sample texts to be processed, e.g., `text`, `instruction`, `output`, ...
# Note: currently, we support specify only ONE key for each op, for cases requiring multiple keys, users can specify the op multiple times. We will only use the first key of `text_keys` when you set multiple keys.
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of hugging face datasets. It might take up lots of disk space when using cache
Expand Down Expand Up @@ -54,8 +55,28 @@ process:
lang: en # sample in which language
tokenization: false # whether to use model to tokenize documents
substrings: ['http', 'www', '.com', 'href', '//'] # incorrect substrings to remove
- sentence_split_mapper: # split text to sentences and join them with '\n'
lang: 'en' # split text in what language
- sentence_split_mapper: # split text to multiple sentences and join them with '\n'
lang: 'en' # split text in what language
- simple_aug_en_mapper: # simply augment texts in English based on the nlpaug library
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
delete_random_word: false # whether to open the augmentation method of deleting random words from the original texts. e.g. "I love LLM" --> "I LLM"
swap_random_word: false # whether to open the augmentation method of swapping random contiguous words in the original texts. e.g. "I love LLM" --> "Love I LLM"
spelling_error_word: false # whether to open the augmentation method of simulating the spelling error for words in the original texts. e.g. "I love LLM" --> "Ai love LLM"
split_random_word: false # whether to open the augmentation method of splitting words randomly with whitespaces in the original texts. e.g. "I love LLM" --> "I love LL M"
keyboard_error_char: false # whether to open the augmentation method of simulating the keyboard error for characters in the original texts. e.g. "I love LLM" --> "I ;ov4 LLM"
ocr_error_char: false # whether to open the augmentation method of simulating the OCR error for characters in the original texts. e.g. "I love LLM" --> "I 10ve LLM"
delete_random_char: false # whether to open the augmentation method of deleting random characters from the original texts. e.g. "I love LLM" --> "I oe LLM"
swap_random_char: false # whether to open the augmentation method of swapping random contiguous characters in the original texts. e.g. "I love LLM" --> "I ovle LLM"
insert_random_char: false # whether to open the augmentation method of inserting random characters into the original texts. e.g. "I love LLM" --> "I ^lKove LLM"
- simple_aug_zh_mapper: # simply augment texts in Chinese based on the nlpaug library
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
sequential: false # whether combine all augmentation methods to a sequence. If it's True, a sample will be augmented by all opened augmentation methods sequentially. If it's False, each opened augmentation method would generate its augmented samples independently.
aug_num: 1 # number of augmented samples to be generated. If `sequential` is True, there will be total aug_num augmented samples generated. If it's False, there will be (aug_num * #opened_aug_method) augmented samples generated.
replace_similar_word: false # whether to open the augmentation method of replacing random words with their similar words in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这边一共有5种不同的数据增强方法"
replace_homophone_char: false # whether to open the augmentation method of replacing random characters with their homophones in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的濖据增强方法"
delete_random_char: false # whether to open the augmentation method of deleting random characters from the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的数据增强"
swap_random_char: false # whether to open the augmentation method of swapping random contiguous characters in the original texts. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有5种不同的数据强增方法"
replace_equivalent_num: false # whether to open the augmentation method of replacing random numbers with their equivalent representations in the original texts. **Notice**: Only for numbers for now. e.g. "这里一共有5种不同的数据增强方法" --> "这里一共有伍种不同的数据增强方法"
- whitespace_normalization_mapper: # normalize different kinds of whitespaces to English whitespace.

# Filter ops
Expand Down
48 changes: 45 additions & 3 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import shutil
import time
from argparse import ArgumentError
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Tuple, Union

from jsonargparse import (ActionConfigFile, ArgumentParser, dict_to_namespace,
namespace_to_dict)
Expand Down Expand Up @@ -65,6 +66,13 @@ def init_configs(args=None):
'it\'s set a positive number, the exported dataset will be split '
'into several sub-dataset shards, and the max size of each shard '
'won\'t larger than the export_shard_size')
parser.add_argument(
'--export_in_parallel',
type=bool,
default=False,
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
help='Whether to export the result dataset in parallel to a single '
'file. It only works when export_shard_size is 0, and its default'
'number of processes is the same as the argument np.')
parser.add_argument(
'--np',
type=PositiveInt,
Expand Down Expand Up @@ -219,6 +227,13 @@ def init_configs(args=None):
}

cfg = init_setup_from_cfg(cfg)

# copy the config file into the work directory
config_backup(cfg)

# show the final config tables before the process started
display_config(cfg)

return cfg
except ArgumentError:
logger.error('Config initialization failed')
Expand All @@ -241,8 +256,9 @@ def init_setup_from_cfg(cfg):
log_dir = os.path.join(cfg.work_dir, 'log')
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
logfile_name = time.strftime('%Y%m%d%H%M%S', time.localtime(
time.time())) + '.txt'
timestamp = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
cfg.timestamp = timestamp
logfile_name = timestamp + '.txt'
setup_logger(save_dir=log_dir, filename=logfile_name)

# whether or not to use cache management
Expand Down Expand Up @@ -335,3 +351,29 @@ def sort_op_by_types_and_names(op_name_classes):
ops_sorted_by_types = sorted(mapper_ops) + sorted(filter_ops) + sorted(
deduplicator_ops) + sorted(selector_ops)
return ops_sorted_by_types

def config_backup(cfg):
cfg_path = cfg.config[0].absolute
work_dir = cfg.work_dir
target_path = os.path.join(work_dir, os.path.basename(cfg_path))
logger.info(f'Back up the input config file [{cfg_path}] into the '
f'work_dir [{work_dir}]')
shutil.copyfile(cfg_path, target_path)

def display_config(cfg):
from tabulate import tabulate
import pprint
table_header = ['key', 'values']

# remove ops outside the process list for better displaying
shown_cfg = cfg.clone()
for op in OPERATORS.modules.keys():
_ = shown_cfg.pop(op)

# construct the table as 2 columns
config_table = [(k, pprint.pformat(v, compact=True))
for k, v in shown_cfg.items()]
table = tabulate(config_table, headers=table_header, tablefmt='fancy_grid')

logger.info('Configuration table: ')
print(table)
1 change: 1 addition & 0 deletions data_juicer/core/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, cfg=None):
logger.info('Preparing exporter...')
self.exporter = Exporter(self.cfg.export_path,
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np,
export_ds=False,
export_stats=True)
Expand Down
30 changes: 23 additions & 7 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from data_juicer.config import init_configs
from data_juicer.format.load import load_formatter
from data_juicer.ops import (OPERATORS, Deduplicator, Filter, Mapper, Selector,
load_ops)
load_ops, BatchMapper)
from data_juicer.utils.ckpt_utils import CheckpointManager
from data_juicer.utils.constant import Fields

Expand Down Expand Up @@ -60,7 +60,9 @@ def __init__(self, cfg=None):
# prepare exporter and check export path suffix
logger.info('Preparing exporter...')
self.exporter = Exporter(self.cfg.export_path,
self.cfg.export_shard_size, self.cfg.np)
self.cfg.export_shard_size,
self.cfg.export_in_parallel,
self.cfg.np)

# setup tracer
self.open_tracer = self.cfg.open_tracer
Expand Down Expand Up @@ -105,11 +107,25 @@ def run(self, load_data_np=None):
prev = dataset # record last dataset
try:
if isinstance(op, Mapper):
tmp = dataset.map(op.process,
num_proc=self.cfg.np,
desc=op_name + '_process')
if self.open_tracer and op_name in self.op_list_to_trace:
self.tracer.trace_mapper(op_name, dataset, tmp)
if isinstance(op, BatchMapper):
tmp = dataset.map(op.process,
num_proc=self.cfg.np,
batched=True,
batch_size=1,
desc=op_name + '_process')
if self.open_tracer and \
op_name in self.op_list_to_trace:
self.tracer.trace_batch_mapper(
op_name,
dataset,
tmp)
else:
tmp = dataset.map(op.process,
num_proc=self.cfg.np,
desc=op_name + '_process')
if self.open_tracer and \
op_name in self.op_list_to_trace:
self.tracer.trace_mapper(op_name, dataset, tmp)
elif isinstance(op, Filter):
if Fields.stats not in dataset.features:
# TODO:
Expand Down
18 changes: 14 additions & 4 deletions data_juicer/core/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class Exporter:
def __init__(self,
export_path,
export_shard_size=0,
export_in_parallel=True,
num_proc=1,
export_ds=True,
export_stats=True):
Expand All @@ -34,6 +35,7 @@ def __init__(self,
"""
self.export_path = export_path
self.export_shard_size = export_shard_size
self.export_in_parallel = export_in_parallel
self.export_ds = export_ds
self.export_stats = export_stats
self.suffix = self._get_suffix(export_path)
Expand Down Expand Up @@ -102,7 +104,10 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
if self.export_shard_size <= 0:
# export the whole dataset into one single file.
logger.info('Export dataset into 1 file...')
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
export_method(dataset, export_path)
export_method(
dataset,
export_path,
num_proc=self.num_proc if self.export_in_parallel else 1)
else:
# compute the dataset size and number of shards to split
if dataset._indices is not None:
Expand Down Expand Up @@ -153,7 +158,11 @@ def _export_impl(self, dataset, export_path, suffix, export_stats=True):
# export stats of datasets into a single file.
ds_stats = dataset.select_columns(Fields.stats)
stats_file = export_path.replace('.' + suffix, '_stats.jsonl')
Exporter.to_jsonl(ds_stats, stats_file)
Exporter.to_jsonl(
ds_stats,
stats_file,
num_proc=self.num_proc if self.export_in_parallel else 1
)

def export(self, dataset):
"""
Expand All @@ -166,16 +175,17 @@ def export(self, dataset):
self.export_stats)

@staticmethod
def to_jsonl(dataset, export_path, **kwargs):
def to_jsonl(dataset, export_path, num_proc=1, **kwargs):
"""
Export method for json/jsonl target files.

:param dataset: the dataset to export.
:param export_path: the path to store the exported dataset.
:param num_proc: the number of processes used to export the dataset.
:param kwargs: extra arguments.
:return:
"""
dataset.to_json(export_path, force_ascii=False)
dataset.to_json(export_path, force_ascii=False, num_proc=num_proc)

@staticmethod
def to_parquet(dataset, export_path, **kwargs):
Expand Down
39 changes: 39 additions & 0 deletions data_juicer/core/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,45 @@ def trace_mapper(self, op_name: str, previous_ds: Dataset,
lines=True,
force_ascii=False)

def trace_batch_mapper(self, op_name: str, previous_ds: Dataset,
processed_ds: Dataset):
"""
Compare datasets before and after a BatchMapper.

This will mainly show the new samples augmented by the BatchMapper

:param op_name: the op name of mapper
:param previous_ds: dataset before the mapper process
:param processed_ds: dataset processed by the mapper
:return:
"""
assert previous_ds[0]['text'] == processed_ds[0]['text']
HYLcool marked this conversation as resolved.
Show resolved Hide resolved
aug_dict = []

# Get the first samples
for i in range(len(processed_ds)):
processed_sample = processed_ds[i]
aug_dict.append(processed_sample)
if i + 1 >= self.show_num:
break

if len(aug_dict) == 0:
logger.warning(f'Datasets before and after op [{op_name}] are '
f'empty. Thus no comparison results would be '
f'generated.')
return
elif len(aug_dict) < self.show_num:
logger.warning(f'There are only {len(aug_dict)} samples -- less '
f'than expected {self.show_num} samples.')

# export the tracer results.
res_name = f'mapper-{op_name}.jsonl'
dif_df = pd.DataFrame(aug_dict)
dif_df.to_json(os.path.join(self.work_dir, res_name),
orient='records',
lines=True,
force_ascii=False)

def trace_filter(self, op_name: str, previous_ds: Dataset,
processed_ds: Dataset):
"""
Expand Down
2 changes: 1 addition & 1 deletion data_juicer/ops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import deduplicator, filter, mapper, selector
from .base_op import OPERATORS, Deduplicator, Filter, Mapper, Selector
from .base_op import OPERATORS, Deduplicator, Filter, Mapper, Selector, BatchMapper
from .load import load_ops
26 changes: 25 additions & 1 deletion data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,37 @@ def __init__(self, text_key: str = None):

def process(self, sample):
"""
For sample level, sample --> sample or sample --> [samples, ...]
For sample level, sample --> sample

:param sample: sample to process
:return: processed sample
"""
raise NotImplementedError

class BatchMapper(Mapper):
HYLcool marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, text_key: str = None):
"""
Base class that conducts text editing.

:param text_key: the key name of field that stores sample texts
to be processed.
"""
if text_key is None:
text_key = 'text'
self.text_key = text_key
from data_juicer.core.data import wrap_func_with_nested_access
self.process = wrap_func_with_nested_access(self.process)

def process(self, samples):
"""
For sample level, samples --> samples

:param samples: samples to process
:return: processed samples
"""
raise NotImplementedError


class Filter:

Expand Down
3 changes: 2 additions & 1 deletion data_juicer/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
remove_header_mapper, remove_long_words_mapper,
remove_specific_chars_mapper, remove_table_text_mapper,
remove_words_with_incorrect_substrings_mapper,
sentence_split_mapper, whitespace_normalization_mapper)
sentence_split_mapper, simple_aug_en_mapper,
simple_aug_zh_mapper, whitespace_normalization_mapper)
Loading