Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Schema error while writing Parquet files #48102

Closed
bveeramani opened this issue Oct 18, 2024 · 11 comments · Fixed by #48478
Closed

[Data] Schema error while writing Parquet files #48102

bveeramani opened this issue Oct 18, 2024 · 11 comments · Fixed by #48478
Assignees
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks

Comments

@bveeramani
Copy link
Member

bveeramani commented Oct 18, 2024

What happened + What you expected to happen

I'm writing Parquet files, with the number of rows per file configured. I'm getting the error below:

Traceback (most recent call last):
  File "/home/ray/default/1.py", line 56, in <module>
    dataset.repartition(64).select_columns(['RESOURCE_ID', 'LINK', 'STATUS', 'OPTED_OUT', 'TITLE', 'DESCRIPTION']).materialize().map(parse_record).write_parquet(
  File "ray/data/dataset.py", line 2780, in write_parquet
    self.write_datasink(
  File "ray/data/dataset.py", line 3614, in write_datasink
    self._write_ds = Dataset(plan, logical_plan).materialize()
  File "ray/data/dataset.py", line 4642, in materialize
    copy._plan.execute()
  File "ray/data/exceptions.py", line 89, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ValueError): ray::Write() (pid=9458, ip=10.0.68.5)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "ray/data/_internal/execution/operators/map_transformer.py", line 253, in __call__
    yield from self._block_fn(input, ctx)
  File "ray/data/_internal/planner/plan_write_op.py", line 26, in fn
    write_result = datasink_or_legacy_datasource.write(blocks, ctx)
  File "ray/data/_internal/datasource/parquet_datasink.py", line 85, in write
    call_with_retry(
  File "ray/data/_internal/util.py", line 986, in call_with_retry
    raise e from None
  File "ray/data/_internal/util.py", line 973, in call_with_retry
    return f()
  File "ray/data/_internal/datasource/parquet_datasink.py", line 82, in write_blocks_to_path
    writer.write_table(table)
  File "pyarrow/parquet/core.py", line 1094, in write_table
    raise ValueError(msg)
ValueError: Table schema does not match schema used to create file: 
table:
RESOURCE_ID: string
LINK: string
STATUS: string
OPTED_OUT: string
TITLE: null
DESCRIPTION: null
image_bytes: binary vs. 
file:
RESOURCE_ID: string
LINK: string
STATUS: string
OPTED_OUT: string
TITLE: string
DESCRIPTION: null
image_bytes: binary

Versions / Dependencies

2.37

Reproduction script

import ray
from ray.data._internal.logical.optimizers import _PHYSICAL_RULES
from ray.data._internal.logical.rules.operator_fusion import OperatorFusionRule

_PHYSICAL_RULES.remove(OperatorFusionRule)


def add_row(row):
    if row["id"] == 0:
        return {"data": "eggs"}
    else:
        return {"data": None}


ray.data.range(2, override_num_blocks=2).map(add_row).write_parquet(
    "data", num_rows_per_file=2
)

Issue Severity

None

@bveeramani bveeramani added bug Something that is supposed to be working; but isn't P1 Issue that should be fixed within a few weeks data Ray Data-related issues labels Oct 18, 2024
@bveeramani bveeramani self-assigned this Oct 18, 2024
@alexeykudinkin
Copy link
Contributor

@bveeramani inferred schema has to be enforced on all blocks, if any block doesn't adhere we have to fail and defer to user either provide it explicitly.

There are obvious caveat for nullability -- if type only differ in nulls, we'd assume that the column is nullable relaxing the schema.

@bveeramani
Copy link
Member Author

if type only differ in nulls, we'd assume that the column is nullable relaxing the schema.

Yup, this issue is referring the null situation

@rickyyx
Copy link
Contributor

rickyyx commented Oct 31, 2024

if any block doesn't adhere we have to fail and defer to user either provide it explicitly.

Actually - how could a user do this?

@bveeramani
Copy link
Member Author

Actually - how could a user do this?

@rickyyx haven't test this out, but you can specify a schema when you call read_parquet. I'd think you'd be able to specify a schema with nullable types, and the read data should use that schema

@rickyyx
Copy link
Contributor

rickyyx commented Oct 31, 2024

Actually - how could a user do this?

@rickyyx haven't test this out, but you can specify a schema when you call read_parquet. I'd think you'd be able to specify a schema with nullable types, and the read data should use that schema

I see, but in the context of this issue, there's no way for user to provide a schema when writing right? E.g. if I have a column that's actually float, but some of the data is in int, the write_parquet() API would still fail while it could technically be okay if users could hint it's actually float.

@bveeramani
Copy link
Member Author

Yeah, you're right. If you do something like map after reading the data, the types could change, and there wouldn't be a way to explicitly specify the schema.

@rickyyx
Copy link
Contributor

rickyyx commented Oct 31, 2024

Yeah, you're right. If you do something like map after reading the data, the types could change, and there wouldn't be a way to explicitly specify the schema.

I see- thoughts on extending the write_parquet API to allow users pass in explicit schema? This might be better than ray.data trying to inferring the schema on some limited cases (e.g. nullable fields in this issue).

@bveeramani
Copy link
Member Author

bveeramani commented Nov 1, 2024

I think that sounds reasonable, too.

This might be better than ray.data trying to inferring the schema on some limited cases (e.g. nullable fields in this issue)

What's the primary concern with inferring the schema? I think the UX might be better if you didn't have to manually specify a schema in this scenario

@rickyyx
Copy link
Contributor

rickyyx commented Nov 1, 2024

What's the primary concern with inferring the schema? I think the UX might be better if you didn't have to manually specify a schema in this scenario

Yeah, I think for the nullable columns, auto-inferring is reasonable. But the fundamental issue here is that users might have some implicit assumptions on the actual types of the schema, which currently they have no ways to specify.

I am not sure how much this is an actual requirement though - so the current auto-inferring-nullable-columns approach seems reasonable to me.

@rickyyx
Copy link
Contributor

rickyyx commented Nov 1, 2024

Another question on the requirement here:

Should we allow auto-appending of null columns? E.g.

block_1: {"a": 1, "b": 2}
block_2: {"a": 2} 

Should the unified schema be allowed as {"a": int, "b": int}?

I think this is where auto-inferring might not be the best since we don't really know how permissive users want the schema unification to be.

@alexeykudinkin
Copy link
Contributor

alexeykudinkin commented Nov 2, 2024

@rickyyx @bveeramani let's start with a simpler fix here:

  1. Currently, we're inferring schema only from a first block
  2. Instead, we'd unify schemas from all blocks (which would also normalize for nullability)

rickyyx added a commit that referenced this issue Nov 14, 2024
…es (#48478)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

When writing blocks to parquet, there might be blocks with fields that
differ ONLY in nullability - by default, this would be rejected since
some blocks might have a different schema than the ParquetWriter.
However, we could potentially allow it to happen by tweaking the schema.

This PR goes through all blocks before writing them to parquet, and
merge schemas that differ only in nullability of the fields.
It also casts the table to the newly merged schema so that the write
could happen.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

Closes #48102

---------

Signed-off-by: rickyx <rickyx@anyscale.com>
JP-sDEV pushed a commit to JP-sDEV/ray that referenced this issue Nov 14, 2024
…es (ray-project#48478)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

When writing blocks to parquet, there might be blocks with fields that
differ ONLY in nullability - by default, this would be rejected since
some blocks might have a different schema than the ParquetWriter.
However, we could potentially allow it to happen by tweaking the schema.

This PR goes through all blocks before writing them to parquet, and
merge schemas that differ only in nullability of the fields.
It also casts the table to the newly merged schema so that the write
could happen.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

Closes ray-project#48102

---------

Signed-off-by: rickyx <rickyx@anyscale.com>
mohitjain2504 pushed a commit to mohitjain2504/ray that referenced this issue Nov 15, 2024
…es (ray-project#48478)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

When writing blocks to parquet, there might be blocks with fields that
differ ONLY in nullability - by default, this would be rejected since
some blocks might have a different schema than the ParquetWriter.
However, we could potentially allow it to happen by tweaking the schema.

This PR goes through all blocks before writing them to parquet, and
merge schemas that differ only in nullability of the fields.
It also casts the table to the newly merged schema so that the write
could happen.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

Closes ray-project#48102

---------

Signed-off-by: rickyx <rickyx@anyscale.com>
Signed-off-by: mohitjain2504 <mohit.jain@dream11.com>
dentiny pushed a commit to dentiny/ray that referenced this issue Dec 7, 2024
…es (ray-project#48478)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

When writing blocks to parquet, there might be blocks with fields that
differ ONLY in nullability - by default, this would be rejected since
some blocks might have a different schema than the ParquetWriter.
However, we could potentially allow it to happen by tweaking the schema.

This PR goes through all blocks before writing them to parquet, and
merge schemas that differ only in nullability of the fields.
It also casts the table to the newly merged schema so that the write
could happen.

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

Closes ray-project#48102

---------

Signed-off-by: rickyx <rickyx@anyscale.com>
Signed-off-by: hjiang <dentinyhao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't data Ray Data-related issues P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants