Skip to content

Commit

Permalink
Write to file without including "filename" column (NVIDIA#317)
Browse files Browse the repository at this point in the history
* keep_filename_column param

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update pytests

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* run black

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

---------

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Ayush Dattagupta <ayushdg95@gmail.com>
  • Loading branch information
sarahyurick authored and ayushdg committed Oct 30, 2024
1 parent 55807d5 commit 575a54c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
7 changes: 7 additions & 0 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __len__(self):
def persist(self):
return DocumentDataset(self.df.persist())

def head(self, n=5):
return self.df.head(n)

@classmethod
def read_json(
cls,
Expand Down Expand Up @@ -107,6 +110,7 @@ def to_json(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -116,13 +120,15 @@ def to_json(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="jsonl",
)

def to_parquet(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -132,6 +138,7 @@ def to_parquet(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="parquet",
)

Expand Down
27 changes: 25 additions & 2 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,13 +494,19 @@ def process_all_batches(
)


def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl"):
def single_partition_write_with_filename(
df,
output_file_dir,
keep_filename_column=False,
output_type="jsonl",
):
"""
This function processes a DataFrame and writes it to disk
Args:
df: A DataFrame.
output_file_dir: The output file path.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
Returns:
If the DataFrame is non-empty, return a Series containing a single element, True.
Expand All @@ -526,12 +532,18 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
filenames = df.filename.unique()
filenames = list(filenames.values_host) if is_cudf_type(df) else list(filenames)
num_files = len(filenames)

for filename in filenames:
out_df = df[df.filename == filename] if num_files > 1 else df
if not keep_filename_column:
out_df = out_df.drop("filename", axis=1)

filename = Path(filename).stem
output_file_path = os.path.join(output_file_dir, filename)

if output_type == "jsonl":
output_file_path = output_file_path + ".jsonl"

if isinstance(df, pd.DataFrame):
out_df.to_json(
output_file_path,
Expand All @@ -550,16 +562,24 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
lines=True,
force_ascii=False,
)

elif output_type == "parquet":
output_file_path = output_file_path + ".parquet"
out_df.to_parquet(output_file_path)

else:
raise ValueError(f"Unknown output type: {output_type}")

return success_ser


def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jsonl"):
def write_to_disk(
df,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
output_type="jsonl",
):
"""
This function writes a Dask DataFrame to the specified file path.
If write_to_filename is True, then it expects the
Expand All @@ -569,6 +589,7 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
df: A Dask DataFrame.
output_file_dir: The output file path.
write_to_filename: Whether to write the filename using the "filename" column.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
"""
Expand All @@ -589,11 +610,13 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
output = df.map_partitions(
single_partition_write_with_filename,
output_file_dir,
keep_filename_column=keep_filename_column,
output_type=output_type,
meta=output_meta,
enforce_metadata=False,
)
output = output.compute()

else:
if output_type == "jsonl":
if is_cudf_type(df):
Expand Down
34 changes: 30 additions & 4 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,59 @@ def test_meta_str(self, jsonl_dataset):


class TestWriteWithFilename:
@pytest.mark.parametrize("keep_filename_column", [True, False])
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
def test_multifile_single_partition(self, tmp_path, file_ext):
def test_multifile_single_partition(
self,
tmp_path,
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file0", "file1", "file1"]})

single_partition_write_with_filename(
df=df, output_file_dir=tmp_path, output_type=file_ext
df=df,
output_file_dir=tmp_path,
keep_filename_column=keep_filename_column,
output_type=file_ext,
)
assert os.path.exists(tmp_path / f"file0.{file_ext}")
assert os.path.exists(tmp_path / f"file1.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)

df1 = read_single_partition(
files=[tmp_path / f"file0.{file_ext}"], backend="pandas", filetype=file_ext
)
assert_eq(df1, df.iloc[0:1], check_index=False)

df2 = read_single_partition(
files=[tmp_path / f"file1.{file_ext}"], backend="pandas", filetype=file_ext
)
assert_eq(df2, df.iloc[1:3], check_index=False)

@pytest.mark.parametrize("keep_filename_column", [True, False])
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
def test_singlefile_single_partition(self, tmp_path, file_ext):
def test_singlefile_single_partition(
self,
tmp_path,
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file2", "file2", "file2"]})

single_partition_write_with_filename(
df=df, output_file_dir=tmp_path, output_type=file_ext
df=df,
output_file_dir=tmp_path,
keep_filename_column=keep_filename_column,
output_type=file_ext,
)
assert len(os.listdir(tmp_path)) == 1
assert os.path.exists(tmp_path / f"file2.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)
got = read_single_partition(
files=[tmp_path / f"file2.{file_ext}"], backend="pandas", filetype=file_ext
)
Expand Down

0 comments on commit 575a54c

Please sign in to comment.