Skip to content

Commit

Permalink
Don't write metadata file (#875)
Browse files Browse the repository at this point in the history
We reintroduced writing the metadata file in #864 to preserve the
divisions of the data when writing and reading again. We turned this
behavior off in the past, but without proper documentation of the
reason.

I'm now running into issues with Dask workers dying when writing large
datasets though, presumably because of the metadata file, as documented
in these Dask issues:
- dask/dask#6600
- dask/dask#3873
- dask/dask#8901

Also, while I ran into issues with the preservation of divisions before,
I can't reproduce this locally with a small example. Let's turn writing
metadata off again and validate if we are still having issues with this.
  • Loading branch information
RobbeSneyders authored Feb 24, 2024
1 parent bd2a218 commit c2b717d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 25 deletions.
5 changes: 1 addition & 4 deletions src/fondant/component/data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask.distributed import Client

from fondant.core.component_spec import OperationSpec
from fondant.core.manifest import Manifest
Expand Down Expand Up @@ -157,7 +156,6 @@ def __init__(
def write_dataframe(
self,
dataframe: dd.DataFrame,
dask_client: t.Optional[Client] = None,
) -> None:
dataframe.index = dataframe.index.rename(DEFAULT_INDEX_NAME)

Expand All @@ -176,7 +174,7 @@ def write_dataframe(

with ProgressBar():
logging.info("Writing data...")
dd.compute(write_task, scheduler=dask_client)
dd.compute(write_task)

@staticmethod
def validate_dataframe_columns(dataframe: dd.DataFrame, columns: t.List[str]):
Expand Down Expand Up @@ -234,7 +232,6 @@ def _create_write_task(
schema=schema,
overwrite=False,
compute=False,
write_metadata_file=True,
)
logging.info(f"Creating write task for: {location}")
return write_task
28 changes: 7 additions & 21 deletions tests/component/test_data_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import dask.dataframe as dd
import pyarrow as pa
import pytest
from dask.distributed import Client
from fondant.component.data_io import DaskDataLoader, DaskDataWriter
from fondant.core.component_spec import ComponentSpec, OperationSpec
from fondant.core.manifest import Manifest
Expand All @@ -21,13 +20,6 @@
NUMBER_OF_TEST_ROWS = 151


@pytest.fixture()
def dask_client(): # noqa: PT004
client = Client()
yield
client.close()


@pytest.fixture()
def manifest():
return Manifest.from_file(manifest_path)
Expand Down Expand Up @@ -121,7 +113,6 @@ def test_write_dataset(
dataframe,
manifest,
component_spec,
dask_client,
):
"""Test writing out subsets."""
# Dictionary specifying the expected subsets to write and their column names
Expand All @@ -134,7 +125,7 @@ def test_write_dataset(
operation_spec=OperationSpec(component_spec),
)
# write dataframe to temp dir
data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)
# read written data and assert
dataframe = dd.read_parquet(
temp_dir
Expand All @@ -152,7 +143,6 @@ def test_write_dataset_custom_produces(
dataframe,
manifest,
component_spec_produces,
dask_client,
):
"""Test writing out subsets."""
produces = {
Expand All @@ -175,7 +165,7 @@ def test_write_dataset_custom_produces(
)

# write dataframe to temp dir
data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)
# # read written data and assert
dataframe = dd.read_parquet(
temp_dir
Expand All @@ -194,7 +184,6 @@ def test_write_reset_index(
dataframe,
manifest,
component_spec,
dask_client,
):
"""Test writing out the index and fields that have no dask index and checking
if the id index was created.
Expand All @@ -207,19 +196,18 @@ def test_write_reset_index(
manifest=manifest,
operation_spec=OperationSpec(component_spec),
)
data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)
dataframe = dd.read_parquet(fn)
assert dataframe.index.name == "id"


@pytest.mark.parametrize("partitions", list(range(1, 5)))
def test_write_divisions( # noqa: PLR0913
def test_write_divisions(
tmp_path_factory,
dataframe,
manifest,
component_spec,
partitions,
dask_client,
):
"""Test writing out index and subsets and asserting they have the divisions of the dataframe."""
# repartition the dataframe (default is 3 partitions)
Expand All @@ -233,7 +221,7 @@ def test_write_divisions( # noqa: PLR0913
operation_spec=OperationSpec(component_spec),
)

data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)

dataframe = dd.read_parquet(fn)
assert dataframe.index.name == "id"
Expand All @@ -245,7 +233,6 @@ def test_write_fields_invalid(
dataframe,
manifest,
component_spec,
dask_client,
):
"""Test writing out fields but the dataframe columns are incomplete."""
with tmp_path_factory.mktemp("temp") as fn:
Expand All @@ -262,15 +249,14 @@ def test_write_fields_invalid(
r"but not found in dataframe"
)
with pytest.raises(ValueError, match=expected_error_msg):
data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)


def test_write_fields_invalid_several_fields_missing(
tmp_path_factory,
dataframe,
manifest,
component_spec,
dask_client,
):
"""Test writing out fields but the dataframe columns are incomplete."""
with tmp_path_factory.mktemp("temp") as fn:
Expand All @@ -288,4 +274,4 @@ def test_write_fields_invalid_several_fields_missing(
r"but not found in dataframe"
)
with pytest.raises(ValueError, match=expected_error_msg):
data_writer.write_dataframe(dataframe, dask_client)
data_writer.write_dataframe(dataframe)

0 comments on commit c2b717d

Please sign in to comment.