Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write to file without including "filename" column #317

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ def __len__(self):
def persist(self):
return DocumentDataset(self.df.persist())

def head(self, n=5):
return self.df.head(n)

@classmethod
def read_json(
cls,
Expand Down Expand Up @@ -95,6 +98,7 @@ def to_json(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -104,13 +108,15 @@ def to_json(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="jsonl",
)

def to_parquet(
self,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for other parameters.
Expand All @@ -120,6 +126,7 @@ def to_parquet(
df=self.df,
output_file_dir=output_file_dir,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="parquet",
)

Expand Down
27 changes: 25 additions & 2 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,13 +468,19 @@ def process_all_batches(
)


def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl"):
def single_partition_write_with_filename(
df,
output_file_dir,
keep_filename_column=False,
output_type="jsonl",
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
):
"""
This function processes a DataFrame and writes it to disk

Args:
df: A DataFrame.
output_file_dir: The output file path.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.
Returns:
If the DataFrame is non-empty, return a Series containing a single element, True.
VibhuJawa marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -500,12 +506,18 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
filenames = df.filename.unique()
filenames = list(filenames.values_host) if is_cudf_type(df) else list(filenames)
num_files = len(filenames)

for filename in filenames:
out_df = df[df.filename == filename] if num_files > 1 else df
if not keep_filename_column:
out_df = out_df.drop("filename", axis=1)

filename = Path(filename).stem
output_file_path = os.path.join(output_file_dir, filename)

if output_type == "jsonl":
output_file_path = output_file_path + ".jsonl"

if isinstance(df, pd.DataFrame):
out_df.to_json(
output_file_path,
Expand All @@ -524,16 +536,24 @@ def single_partition_write_with_filename(df, output_file_dir, output_type="jsonl
lines=True,
force_ascii=False,
)

elif output_type == "parquet":
output_file_path = output_file_path + ".parquet"
out_df.to_parquet(output_file_path)

else:
raise ValueError(f"Unknown output type: {output_type}")

return success_ser


def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jsonl"):
def write_to_disk(
df,
output_file_dir,
write_to_filename=False,
keep_filename_column=False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interested in others' opinions here. I think it makes the most sense to default to false here, and automatically drop the "filename" column unless specified otherwise. It saves storage space and the column itself means nothing to the user who can already see the filename after it is written.

I know a lot of our classifier scripts, etc. use the write_to_disk, so those will all now automatically default to keep_filename_column=False. Don't know if we should consider adding it to argparse for more user control on the CLI side, or if folks think we should be good without.

Copy link
Collaborator

@ryantwolf ryantwolf Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, defaulting to False is good. I think it was an oversight when initially making this. I wouldn't add as an option to argparse yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this behavior makes a lot more sense

output_type="jsonl",
):
"""
This function writes a Dask DataFrame to the specified file path.
If write_to_filename is True, then it expects the
Expand All @@ -543,6 +563,7 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
df: A Dask DataFrame.
output_file_dir: The output file path.
write_to_filename: Whether to write the filename using the "filename" column.
keep_filename_column: Whether to keep or drop the "filename" column, if it exists.
output_type="jsonl": The type of output file to write.

"""
Expand All @@ -563,11 +584,13 @@ def write_to_disk(df, output_file_dir, write_to_filename=False, output_type="jso
output = df.map_partitions(
single_partition_write_with_filename,
output_file_dir,
keep_filename_column=keep_filename_column,
output_type=output_type,
meta=output_meta,
enforce_metadata=False,
)
output = output.compute()

else:
if output_type == "jsonl":
if is_cudf_type(df):
Expand Down
34 changes: 30 additions & 4 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,33 +149,59 @@ def test_meta_str(self, jsonl_dataset):


class TestWriteWithFilename:
@pytest.mark.parametrize("keep_filename_column", [True, False])
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
def test_multifile_single_partition(self, tmp_path, file_ext):
def test_multifile_single_partition(
self,
tmp_path,
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file0", "file1", "file1"]})

single_partition_write_with_filename(
df=df, output_file_dir=tmp_path, output_type=file_ext
df=df,
output_file_dir=tmp_path,
keep_filename_column=keep_filename_column,
output_type=file_ext,
)
assert os.path.exists(tmp_path / f"file0.{file_ext}")
assert os.path.exists(tmp_path / f"file1.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)

df1 = read_single_partition(
files=[tmp_path / f"file0.{file_ext}"], backend="pandas", filetype=file_ext
)
assert_eq(df1, df.iloc[0:1], check_index=False)

df2 = read_single_partition(
files=[tmp_path / f"file1.{file_ext}"], backend="pandas", filetype=file_ext
)
assert_eq(df2, df.iloc[1:3], check_index=False)

@pytest.mark.parametrize("keep_filename_column", [True, False])
@pytest.mark.parametrize("file_ext", ["jsonl", "parquet"])
def test_singlefile_single_partition(self, tmp_path, file_ext):
def test_singlefile_single_partition(
self,
tmp_path,
keep_filename_column,
file_ext,
):
df = pd.DataFrame({"a": [1, 2, 3], "filename": ["file2", "file2", "file2"]})

single_partition_write_with_filename(
df=df, output_file_dir=tmp_path, output_type=file_ext
df=df,
output_file_dir=tmp_path,
keep_filename_column=keep_filename_column,
output_type=file_ext,
)
assert len(os.listdir(tmp_path)) == 1
assert os.path.exists(tmp_path / f"file2.{file_ext}")

if not keep_filename_column:
df = df.drop("filename", axis=1)
got = read_single_partition(
files=[tmp_path / f"file2.{file_ext}"], backend="pandas", filetype=file_ext
)
Expand Down