Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only keep columns in produces #490

Merged
merged 7 commits into from
Oct 11, 2023
Merged

Conversation

PhilippeMoussalli
Copy link
Contributor

PR that modifies the load from hub/parquet components to only return the columns specified in the produces section

@RobbeSneyders
Copy link
Member

Thanks @PhilippeMoussalli!

I would propose to change this in fondant itself though. We already implemented this for the PandasTransformComponent, but I think it makes sense to automatically trim extra columns in all components?

@PhilippeMoussalli
Copy link
Contributor Author

Thanks @PhilippeMoussalli!

I would propose to change this in fondant itself though. We already implemented this for the PandasTransformComponent, but I think it makes sense to automatically trim extra columns in all components?

After deep diving into this I think we already do this in the data_io

The issue seems to come from something different, here is the stacktrace of the error without the fix above

stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:45,442 | fondant.cli | INFO] Component `LoadFromHubComponent` found in module main
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:45,444 | fondant.executor | INFO] Dask default local mode will be used for further executions.Our current supported options are limited to 'local' and 'default'.
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:45,444 | fondant.executor | INFO] No matching execution for component detected
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:45,444 | root | INFO] Executing component
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:45,444 | main | INFO] Loading dataset from the hub...
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:34:47,631 | main | INFO] Renaming columnss...
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:35:29,200 | main | INFO] Index column not specified, setting a globally unique index
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:35:29,213 | root | INFO] Creating write task for: /new/stable_diffusion_pipeline/stable_diffusion_pipeline-20231005173443/load_from_hub/index
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:35:29,219 | root | INFO] Creating write task for: /new/stable_diffusion_pipeline/stable_diffusion_pipeline-20231005173443/load_from_hub/images
stable_diffusion_pipeline-load_from_hub-1  | [2023-10-05 15:35:29,219 | root | INFO] Writing data...
stable_diffusion_pipeline-load_from_hub-1  | subset_columns
stable_diffusion_pipeline-load_from_hub-1  | ['images_data']
stable_diffusion_pipeline-load_from_hub-1  | cols
stable_diffusion_pipeline-load_from_hub-1  | Index(['images_data'], dtype='object')
stable_diffusion_pipeline-load_from_hub-1  | {'data': DataType(binary)}
stable_diffusion_pipeline-load_from_hub-1  | dtype
stable_diffusion_pipeline-load_from_hub-1  | data    binary[pyarrow]
stable_diffusion_pipeline-load_from_hub-1  | dtype: object
[                                        ] | 0% Completed | 100.79 ms
stable_diffusion_pipeline-load_from_hub-1  | Traceback (most recent call last):
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/bin/fondant", line 8, in <module>
stable_diffusion_pipeline-load_from_hub-1  |     sys.exit(entrypoint())
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/cli.py", line 80, in entrypoint
stable_diffusion_pipeline-load_from_hub-1  |     args.func(args)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/cli.py", line 382, in execute
stable_diffusion_pipeline-load_from_hub-1  |     executor.execute(component)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/executor.py", line 359, in execute
stable_diffusion_pipeline-load_from_hub-1  |     output_manifest = self._run_execution(component_cls, input_manifest)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/executor.py", line 341, in _run_execution
stable_diffusion_pipeline-load_from_hub-1  |     self._write_data(dataframe=output_df, manifest=output_manifest)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/executor.py", line 258, in _write_data
stable_diffusion_pipeline-load_from_hub-1  |     data_writer.write_dataframe(dataframe, self.client)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/fondant/data_io.py", line 201, in write_dataframe
stable_diffusion_pipeline-load_from_hub-1  |     dd.compute(*write_tasks, scheduler=dask_client)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/base.py", line 599, in compute
stable_diffusion_pipeline-load_from_hub-1  |     results = schedule(dsk, keys, **kwargs)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/threaded.py", line 89, in get
stable_diffusion_pipeline-load_from_hub-1  |     results = get_async(
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 511, in get_async
stable_diffusion_pipeline-load_from_hub-1  |     raise_exception(exc, tb)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 319, in reraise
stable_diffusion_pipeline-load_from_hub-1  |     raise exc
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/local.py", line 224, in execute_task
stable_diffusion_pipeline-load_from_hub-1  |     result = _execute_task(task, data)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
stable_diffusion_pipeline-load_from_hub-1  |     return func(*(_execute_task(a, cache) for a in args))
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/optimization.py", line 990, in __call__
stable_diffusion_pipeline-load_from_hub-1  |     return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/core.py", line 149, in get
stable_diffusion_pipeline-load_from_hub-1  |     result = _execute_task(task, cache)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
stable_diffusion_pipeline-load_from_hub-1  |     return func(*(_execute_task(a, cache) for a in args))
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/utils.py", line 73, in apply
stable_diffusion_pipeline-load_from_hub-1  |     return func(*args, **kwargs)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/dataframe/core.py", line 7028, in apply_and_enforce
stable_diffusion_pipeline-load_from_hub-1  |     check_matching_columns(meta, df)
stable_diffusion_pipeline-load_from_hub-1  |   File "/usr/local/lib/python3.8/site-packages/dask/dataframe/utils.py", line 430, in check_matching_columns
stable_diffusion_pipeline-load_from_hub-1  |     raise ValueError(
stable_diffusion_pipeline-load_from_hub-1  | ValueError: The columns in the computed data do not match the columns in the provided metadata
stable_diffusion_pipeline-load_from_hub-1  |   Extra:   ['text']
stable_diffusion_pipeline-load_from_hub-1  |   Missing: []

Here, I am running the SD example by only removing the text data from the component spec produces section. The error does not seem to come from additional existing columns (I have some print statements in the stack trace showing the schema and columns) but rather some metadata associated with the loaded dataset that still has text in it even after dropping it. For some reason, removing that column early on in the load_from_hub seems to solve this issue. Might be related to this.

@RobbeSneyders
Copy link
Member

The error is raised here I think.

Maybe we can just create the meta dataframe from the original dataframe instead of from the component spec. Then I don't think we'll have an issue.

@PhilippeMoussalli
Copy link
Contributor Author

The error is raised here I think.

Good catch, I tested it out and indeed seems to come from there

Maybe we can just create the meta dataframe from the original dataframe instead of from the component spec. Then I don't think we'll have an issue.

I think then we'd have to read the dataset twice, once with dask and another time with HF to get the metadata of the original dataset since they can't be inferred in Dask without triggering a compute. Does this offer any advantage over just dropping the columns at the beginning? Might even have to compute less

@RobbeSneyders
Copy link
Member

I think we can just provide the columns we want to download to read_parquet from the start, instead of trimming them after loading.

@PhilippeMoussalli
Copy link
Contributor Author

I think we can just provide the columns we want to download to read_parquet from the start, instead of trimming them after loading.

It might not work since the names of the columns might be different from the spec during loading. We rename based on the mapping dict (optional argument since columns might already be in the _ format) and then we can trim based on the component spec.

@RobbeSneyders
Copy link
Member

The component spec in combination with the column name mapping should give us all the information we need to specify which columns to load, right?

@PhilippeMoussalli
Copy link
Contributor Author

The component spec in combination with the column name mapping should give us all the information we need to specify which columns to load, right?

Ok I see what you mean, I implemented the change

for field_name, field in subset.fields.items():
columns.append(f"{subset_name}_{field_name}")
else:
columns = list(self.column_name_mapping.keys())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the column_name_mapping need to define every column or can it be partial?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed might be partial, updated the PR to handle this case

Copy link
Member

@RobbeSneyders RobbeSneyders left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @PhilippeMoussalli, I merged the KfP v2 changes into this.

@RobbeSneyders RobbeSneyders merged commit 1c770a7 into main Oct 11, 2023
6 checks passed
@RobbeSneyders RobbeSneyders deleted the select-columns-load-component branch October 11, 2023 12:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants