Skip to content

Commit

Permalink
Add dask profile options and add overwrite
Browse files Browse the repository at this point in the history
Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
  • Loading branch information
VibhuJawa committed Oct 29, 2024
1 parent 3774351 commit c122248
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
10 changes: 6 additions & 4 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,7 @@ def _run_connected_components(
assert num_nodes == len(labels_df)
# Ensure all docs in the same group are in the same partition
labels_df = labels_df.shuffle(on=["group"], ignore_index=True)
labels_df.to_parquet(output_path, write_index=False)
labels_df.to_parquet(output_path, write_index=False, overwrite=True)
Comms.destroy()
self._logger.info(
f"Time taken for Connected Components Run = {time.time() - t0}s and output written at {output_path}"
Expand Down Expand Up @@ -1561,7 +1561,7 @@ def _write_dedup_encoded_jaccard_pair(self, encoded_jaccard_pair_path):
transform_divisions=False,
align_dataframes=False,
)
ddf.to_parquet(output_path, write_index=False)
ddf.to_parquet(output_path, write_index=False, overwrite=True)
self._logger.info(
f"Time taken for Dedup Encoding Jaccard Pairs = {time.time() - t0}s and output written at {output_path}"
)
Expand Down Expand Up @@ -1590,7 +1590,9 @@ def _write_dedup_parsed_id(self):
unique_docs["uid"] = np.uint64(1)
unique_docs["uid"] = unique_docs["uid"].cumsum()
unique_docs["uid"] = unique_docs["uid"] - 1
unique_docs.to_parquet(dedup_parsed_id_path, write_index=False)
unique_docs.to_parquet(
dedup_parsed_id_path, write_index=False, overwrite=True
)
self._logger.info(
f"Time taken for Dedup Parsed Id = {time.time() - t0}s and output written at {dedup_parsed_id_path}"
)
Expand Down Expand Up @@ -1644,7 +1646,7 @@ def _merge_and_write(
ddf = ddf.drop(columns=pair_id)
ddf = ddf.rename(columns={"uid": f"{self.id_column}_{tag}"})
ddf = ddf[[self.left_id, self.right_id, "jaccard"]]
ddf.to_parquet(output_path, write_index=False)
ddf.to_parquet(output_path, write_index=False, overwrite=True)

et = time.time()
self._logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def main(args):
id_column=args.input_json_id_field,
jaccard_threshold=args.jaccard_threshold,
logger=args.log_dir,
profile_dir=args.profile_path,
)
components_stage.cc_workflow(output_path=output_path)
print(f"All done in {time.time()-st:.1f} seconds")
Expand Down

0 comments on commit c122248

Please sign in to comment.