Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support sft mapdataset #8840

Merged
merged 5 commits into from
Aug 5, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions paddlenlp/data/indexed_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import shutil
import struct
import time
from dataclasses import fields
from functools import lru_cache
from itertools import accumulate

Expand Down Expand Up @@ -68,6 +69,19 @@
return None


def make_sft_dataset(path, impl, dataclass, skip_warmup=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要么就只支持mmap的吧,不用判断了

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已经修改,不是mmap直接报错

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

建议:make_sft_dataset(path, dataclass, skip_warmup=False, impl=“mmap”)

if impl != "mmap":
raise ValueError("SFT Indexed Dataset only support mmap memory-mapped method temporarily")

Check warning on line 74 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L73-L74

Added lines #L73 - L74 were not covered by tests

print_rank_0(" > building dataset index ...")
start_time = time.time()
sft_indexed_dataset = SFT_MMapIndexedDataset(path, dataclass, skip_warmup)
print_rank_0(" > finished creating SFT indexed dataset in {:4f} " "seconds".format(time.time() - start_time))
print_rank_0(" number of samples: {}".format(len(sft_indexed_dataset.doc_idx) - 1))

Check warning on line 80 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L76-L80

Added lines #L76 - L80 were not covered by tests

return sft_indexed_dataset

Check warning on line 82 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L82

Added line #L82 was not covered by tests


def dataset_exists(path, impl):
if impl == "mmap":
return MMapIndexedDataset.exists(path)
Expand Down Expand Up @@ -120,6 +134,18 @@
return prefix_path + ".idx"


def sft_index_file_path(prefix_path):
return os.path.join(prefix_path, "index.idx")

Check warning on line 138 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L138

Added line #L138 was not covered by tests


def sft_data_file_path(prefix_path, dataclass):
file_path_list = []
for field in fields(dataclass):
file_path = os.path.join(prefix_path, f"{field.name}.bin")
file_path_list.append(file_path)
return file_path_list

Check warning on line 146 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L142-L146

Added lines #L142 - L146 were not covered by tests


def data_file_path(prefix_path):
return prefix_path + ".bin"

Expand Down Expand Up @@ -548,13 +574,259 @@
return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path))


class SFT_MMapIndexedDataset(paddle.io.Dataset):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里class采用驼峰命名,不要下划线。

class Index(object):
_HDR_MAGIC = b"MMIDIDX\x00\x00"

@classmethod
def writer(cls, path, dtype):
class _Writer(object):
def __enter__(self):
self._file = open(path, "wb")
self._file.write(cls._HDR_MAGIC)
self._file.write(struct.pack("<Q", 1))
self._file.write(struct.pack("<B", code(dtype)))

Check warning on line 588 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L583-L588

Added lines #L583 - L588 were not covered by tests

return self

Check warning on line 590 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L590

Added line #L590 was not covered by tests

@staticmethod
def _get_pointers(sizes):
dtype_size = dtype().itemsize
address = 0
pointers = []
for size in sizes:
pointers.append(address)
address += size * dtype_size
return pointers

Check warning on line 600 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L592-L600

Added lines #L592 - L600 were not covered by tests

def write(self, sizes, doc_idx):

Check warning on line 602 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L602

Added line #L602 was not covered by tests

pointers = self._get_pointers(sizes)
self._file.write(struct.pack("<Q", len(sizes)))
self._file.write(struct.pack("<Q", len(doc_idx)))

Check warning on line 606 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L604-L606

Added lines #L604 - L606 were not covered by tests

sizes = np.array(sizes, dtype=np.int32)
self._file.write(sizes.tobytes(order="C"))
del sizes

Check warning on line 610 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L608-L610

Added lines #L608 - L610 were not covered by tests

pointers = np.array(pointers, dtype=np.int64)
self._file.write(pointers.tobytes(order="C"))
del pointers

Check warning on line 614 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L612-L614

Added lines #L612 - L614 were not covered by tests

doc_idx = np.array(doc_idx, dtype=np.int64)
self._file.write(doc_idx.tobytes(order="C"))

Check warning on line 617 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L616-L617

Added lines #L616 - L617 were not covered by tests

def __exit__(self, exc_type, exc_val, exc_tb):
self._file.close()

Check warning on line 620 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L619-L620

Added lines #L619 - L620 were not covered by tests

return _Writer()

Check warning on line 622 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L622

Added line #L622 was not covered by tests

def __init__(self, path, skip_warmup=False):
with open(path, "rb") as stream:
magic_test = stream.read(9)
assert self._HDR_MAGIC == magic_test, (

Check warning on line 627 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L625-L627

Added lines #L625 - L627 were not covered by tests
"Index file doesn't match expected format. "
"Make sure that --dataset-impl is configured properly."
)
version = struct.unpack("<Q", stream.read(8))
assert (1,) == version

Check warning on line 632 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L631-L632

Added lines #L631 - L632 were not covered by tests

(dtype_code,) = struct.unpack("<B", stream.read(1))
self._dtype = dtypes[dtype_code]
self._dtype_size = self._dtype().itemsize

Check warning on line 636 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L634-L636

Added lines #L634 - L636 were not covered by tests

self._len = struct.unpack("<Q", stream.read(8))[0]
self._doc_count = struct.unpack("<Q", stream.read(8))[0]
offset = stream.tell()

Check warning on line 640 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L638-L640

Added lines #L638 - L640 were not covered by tests

if not skip_warmup:
print_rank_0(" warming up index mmap file...")
_warmup_mmap_file(path)

Check warning on line 644 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L642-L644

Added lines #L642 - L644 were not covered by tests
gongel marked this conversation as resolved.
Show resolved Hide resolved

self._buffer_mmap = np.memmap(path, mode="r", order="C")
self._buffer = memoryview(self._buffer_mmap)
print_rank_0(" reading sizes...")
self._sizes = np.frombuffer(self._buffer, dtype=np.int32, count=self._len, offset=offset)
print_rank_0(" reading pointers...")
self._pointers = np.frombuffer(

Check warning on line 651 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L646-L651

Added lines #L646 - L651 were not covered by tests
self._buffer, dtype=np.int64, count=self._len, offset=offset + self._sizes.nbytes
)
print_rank_0(" reading document index...")
self._doc_idx = np.frombuffer(

Check warning on line 655 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L654-L655

Added lines #L654 - L655 were not covered by tests
self._buffer,
dtype=np.int64,
count=self._doc_count,
offset=offset + self._sizes.nbytes + self._pointers.nbytes,
)

def __del__(self):
self._buffer_mmap._mmap.close()
del self._buffer_mmap

Check warning on line 664 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L663-L664

Added lines #L663 - L664 were not covered by tests

@property
def dtype(self):
return self._dtype

Check warning on line 668 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L668

Added line #L668 was not covered by tests

@property
def sizes(self):
return self._sizes

Check warning on line 672 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L672

Added line #L672 was not covered by tests

@property
def doc_idx(self):
return self._doc_idx

Check warning on line 676 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L676

Added line #L676 was not covered by tests

@lru_cache(maxsize=8)
def __getitem__(self, i):
return self._pointers[i], self._sizes[i]

Check warning on line 680 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L680

Added line #L680 was not covered by tests

def __len__(self):
return self._doc_count - 1

Check warning on line 683 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L683

Added line #L683 was not covered by tests

def __init__(self, path, dataclass, skip_warmup=False):
super().__init__()
self._dataclass = dataclass
self._path = None
self._index = None
self._bin_buffer = None

Check warning on line 690 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L686-L690

Added lines #L686 - L690 were not covered by tests

self._do_init(path, skip_warmup)

Check warning on line 692 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L692

Added line #L692 was not covered by tests

def __getstate__(self):
return self._path

Check warning on line 695 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L695

Added line #L695 was not covered by tests

def __setstate__(self, state):
self._do_init(state, skip_warmup=True)

Check warning on line 698 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L698

Added line #L698 was not covered by tests

def _do_init(self, path, skip_warmup):
self._path = path
if not self.exists(path, self._dataclass):
raise ValueError("Missing file, %s" % (path))

Check warning on line 703 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L701-L703

Added lines #L701 - L703 were not covered by tests

self._index = self.Index(sft_index_file_path(self._path), skip_warmup)
if not skip_warmup:
print_rank_0(" warming up data mmap file...")
for data_file in sft_data_file_path(self._path, self._dataclass):
_warmup_mmap_file(data_file)
print_rank_0(" creating numpy buffer of mmap...")

Check warning on line 710 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L705-L710

Added lines #L705 - L710 were not covered by tests

self._bin_buffer_mmap_dict = {}
self._bin_buffer_dict = {}
for data_file in sft_data_file_path(self._path, self._dataclass):
self._bin_buffer_mmap_dict[data_file] = np.memmap(data_file, mode="r", order="C")
self._bin_buffer_dict[data_file] = memoryview(self._bin_buffer_mmap_dict[data_file])
print_rank_0(" creating memory view of numpy buffer...")

Check warning on line 717 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L712-L717

Added lines #L712 - L717 were not covered by tests

def __del__(self):
for key, value in self._bin_buffer_mmap_dict.items():
value._mmap.close()
for key, value in self._bin_buffer_dict.items():
del value
del self._index

Check warning on line 724 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L720-L724

Added lines #L720 - L724 were not covered by tests

def __len__(self):
return len(self._index)

Check warning on line 727 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L727

Added line #L727 was not covered by tests

def __getitem__(self, idx):
def get_index(idx):
doc_idx = self._index.doc_idx
start_sentence, end_sentence = doc_idx[idx], doc_idx[idx + 1]
start_pointers, _ = self._index[start_sentence]
length_list = self._index._sizes[start_sentence:end_sentence]

Check warning on line 734 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L730-L734

Added lines #L730 - L734 were not covered by tests

dataclass_fields = fields(self._dataclass)
dataclass_list = []
sequence_offset = start_pointers
scalar_offset = doc_idx[idx] * np.dtype(self._index.dtype).itemsize

Check warning on line 739 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L736-L739

Added lines #L736 - L739 were not covered by tests

for length in length_list:
field_data = {field.name: [] for field in dataclass_fields}
for field in dataclass_fields:
bin_buffer = self._bin_buffer_dict[os.path.join(self._path, f"{field.name}.bin")]
if field.type != int:
data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=length, offset=sequence_offset)
field_data[field.name] = data.tolist()

Check warning on line 747 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L741-L747

Added lines #L741 - L747 were not covered by tests
else:
data = np.frombuffer(bin_buffer, dtype=self._index.dtype, count=1, offset=scalar_offset)
field_data[field.name] = int(data[0])

Check warning on line 750 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L749-L750

Added lines #L749 - L750 were not covered by tests

dataclass_list.append(self._dataclass(**field_data))

Check warning on line 752 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L752

Added line #L752 was not covered by tests

sequence_offset += length * np.dtype(self._index.dtype).itemsize
scalar_offset += np.dtype(self._index.dtype).itemsize
return dataclass_list

Check warning on line 756 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L754-L756

Added lines #L754 - L756 were not covered by tests

if isinstance(idx, (int, np.integer)):
return get_index(idx)
elif isinstance(idx, slice):
start, stop, step = idx.indices(len(self))
if step != 1:
raise ValueError("Slices into indexed_dataset must be contiguous")
return [get_index(idx) for idx in range(start, stop)]

Check warning on line 764 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L758-L764

Added lines #L758 - L764 were not covered by tests

@property
def sizes(self):
return self._index.sizes

Check warning on line 768 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L768

Added line #L768 was not covered by tests

@property
def doc_idx(self):
return self._index.doc_idx

Check warning on line 772 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L772

Added line #L772 was not covered by tests

def get_doc_idx(self):
return self._index._doc_idx

Check warning on line 775 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L775

Added line #L775 was not covered by tests

def set_doc_idx(self, doc_idx_):
self._index._doc_idx = doc_idx_

Check warning on line 778 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L778

Added line #L778 was not covered by tests

@property
def supports_prefetch(self):
return False

Check warning on line 782 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L782

Added line #L782 was not covered by tests

@staticmethod
def exists(path, dataclass):
file_path_list = sft_data_file_path(path, dataclass)
file_path_list.append(sft_index_file_path(path))
for file_path in file_path_list:
if not os.path.exists(file_path):
return False
return True

Check warning on line 791 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L786-L791

Added lines #L786 - L791 were not covered by tests


def make_builder(out_file, impl, save_dtype, loss_mask_file=None):
if impl == "mmap":
return MMapIndexedDatasetBuilder(out_file, dtype=save_dtype, loss_mask_file=loss_mask_file)
else:
return IndexedDatasetBuilder(out_file, dtype=save_dtype)


class SFT_MMapIndexedDatasetBuilder(object):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个命名同样

def __init__(self, output_file_dict, dtype):
self._data_file_dict = {}
for key, filename in output_file_dict.items():
self._data_file_dict[key] = open(filename, "wb")
self.output_file_dict = output_file_dict
self._dtype = dtype
self._sizes = []
self._doc_idx = [0]

Check warning on line 809 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L803-L809

Added lines #L803 - L809 were not covered by tests

def add_item(self, sequence):
add_sequence_len = False
for key in self._data_file_dict.keys():
tensor = np.array(getattr(sequence, key), dtype=self._dtype)
if tensor.size > 1 and not add_sequence_len:
self._sizes.append(tensor.size)
add_sequence_len = True
self._data_file_dict[key].write(tensor.tobytes(order="C"))

Check warning on line 818 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L812-L818

Added lines #L812 - L818 were not covered by tests

def end_document(self):
self._doc_idx.append(len(self._sizes))

Check warning on line 821 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L821

Added line #L821 was not covered by tests

def finalize(self, index_file):
for key, filename in self._data_file_dict.items():
filename.close()
with SFT_MMapIndexedDataset.Index.writer(index_file, self._dtype) as index:
index.write(self._sizes, self._doc_idx)

Check warning on line 827 in paddlenlp/data/indexed_dataset.py

View check run for this annotation

Codecov / codecov/patch

paddlenlp/data/indexed_dataset.py#L824-L827

Added lines #L824 - L827 were not covered by tests


class MMapIndexedDatasetBuilder(object):
def __init__(self, out_file, dtype, loss_mask_file=None):
self._data_file = open(out_file, "wb")
Expand Down
Loading