Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance: Add "read strings as binary" option for parquet #12788

Closed
alamb opened this issue Oct 7, 2024 · 15 comments · Fixed by #12816
Closed

Performance: Add "read strings as binary" option for parquet #12788

alamb opened this issue Oct 7, 2024 · 15 comments · Fixed by #12816
Assignees
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@alamb
Copy link
Contributor

alamb commented Oct 7, 2024

TLDR I would like to add a new binary_as_string option for paruet

CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'hits_partitioned'
OPTIONS ('binary_as_string' 'true');

Is your feature request related to a problem or challenge?

The Real Problem

The primary problem is that the ClickBench queries slow down when we enable StringView by default only for the hits_partitioned version of the dataset

One of the reasons is that reading a column as a BinaryViewArray and then casting to Utf8ViewArray is significantly slower than reading the data from parquet as a Utf8ViewArray (due to optimizations in the parquet decoder).

This is not a problem with StringArray --> BinaryArray because reading a column as a BinaryArray and then casting to Utf8Array is about the same speed as reading as Utf8Array

The core issue is that for hits_partitioned the "String" columns in the schema are marked as binary (not Utf8) and thus the slower conversion path is used.

The background: hits_partitioned has "string" columns marked as "Binary"

Clickbench has 2 versions of the parquet dataset (see docs here)

  1. hits.parquet (a single 14G parquet file)
  2. athena_partitioned/hits_{0..99}.parquet

However, the SCHEMA is different between these two files

hits.parquet has Strings:

(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ parquet-schema hits.parquet
Metadata for file: hits.parquet

version: 1
num of rows: 99997497
created by: parquet-cpp version 1.5.1-SNAPSHOT
message schema {
  REQUIRED INT64 WatchID;
  REQUIRED INT32 JavaEnable (INTEGER(16,true));
  REQUIRED BYTE_ARRAY Title (STRING);    <---  Annotated with "(String)" logical type

DataFusion recognizes this as Utf8

> describe 'hits.parquet';
+-----------------------+-----------+-------------+
| column_name           | data_type | is_nullable |
+-----------------------+-----------+-------------+
| WatchID               | Int64     | NO          |
| JavaEnable            | Int16     | NO          |
| Title                 | Utf8      | NO          | <-- StringArray!
...

hits_partitioned has the string columns as Binary:

$ parquet-schema hits_partitioned/hits_22.parquet
Metadata for file: hits_partitioned/hits_22.parquet

version: 1
num of rows: 1000000
created by: parquet-cpp version 1.5.1-SNAPSHOT
message schema {
  OPTIONAL INT64 WatchID;
  OPTIONAL INT32 JavaEnable (INTEGER(16,true));
  OPTIONAL BYTE_ARRAY Title;   <---- this is NOT annotated with String logical type

Which datafusion correctly interprets as Binary:

DataFusion CLI v42.0.0
> describe 'hits_partitioned';
+-----------------------+-----------+-------------+
| column_name           | data_type | is_nullable |
+-----------------------+-----------+-------------+
| WatchID               | Int64     | YES         |
| JavaEnable            | Int16     | YES         |
| Title                 | Binary    | YES         |  <-- BinaryArray
...

Describe the solution you'd like

I would like a way to treat binary columns in the hits_partitioned dataset as Strings.

This is the right thing to do for the hits_partitioned dataset, but I am not sure it is the right thing to do for all files, so I think we need some flag

Describe alternatives you've considered

I propose adding a binary_as_string option to the parquet reader like

CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION 'hits_partitioned'
OPTIONS ('binary_as_string' 'true');

In fact, it seems as if duckDB does exactly this note the (binary_as_string=True) item here

https://duckdb.org/docs/data/parquet/overview.html#parameters

Name Description Type Default
binary_as_string Parquet files generated by legacy writers do not correctly set the UTF8 flag for strings, causing string columns to be loaded as BLOB instead. Set this to true to load binary columns as strings. BOOL false

And it is set in clickbench scripts:

https://github.com/ClickHouse/ClickBench/blob/a6615de8e6eae45f8c1b047df0073fe32f43499f/duckdb-parquet/create.sql#L6

Additional context

Trino also explictly specifies the types of files in the hits_partitioned dataset
https://github.com/ClickHouse/ClickBench/blob/a6615de8e6eae45f8c1b047df0073fe32f43499f/trino/create_partitioned.sql#L1-L107

We could argue that adding something just for benchmarking goes against the spirit of the test, however, I think we can make a reasonable argument on correctness grounds too

For example, if you run this query today

SELECT "SearchEngineID", "SearchPhrase", COUNT(*) AS c FROM 'hits_partitioned' WHERE "SearchPhrase" <> '' GROUP BY "SearchEngineID", "SearchPhrase" ORDER BY c DESC LIMIT 10;

You get this arguably nonsensical result (the search phrase is shown as binary):

(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli -f q.sql
DataFusion CLI v42.0.0
+----------------+--------------------------------------------------------------------------------------------------+-------+
| SearchEngineID | SearchPhrase                                                                                     | c     |
+----------------+--------------------------------------------------------------------------------------------------+-------+
| 2              | d0bad0b0d180d0b5d0bbd0bad0b8                                                                     | 46258 |
| 2              | d0bcd0b0d0bdd0b3d18320d0b220d0b7d0b0d180d0b0d0b1d0b5d0b920d0b3d180d0b0d0bcd0b0                   | 18871 |
| 2              | d181d0bcd0bed182d180d0b5d182d18c20d0bed0bdd0bbd0b0d0b9d0bd                                       | 16905 |
| 3              | d0b0d0bbd0b1d0b0d182d180d183d182d0b4d0b8d0bd                                                     | 16748 |
| 2              | d181d0bcd0bed182d180d0b5d182d18c20d0bed0bdd0bbd0b0d0b9d0bd20d0b1d0b5d181d0bfd0bbd0b0d182d0bdd0be | 14909 |
| 2              | d0b0d0bbd0b1d0b0d182d180d183d182d0b4d0b8d0bd                                                     | 13716 |
| 2              | d18dd0bad0b7d0bed0b8d0b4d0bdd18bd0b5                                                             | 13414 |
| 2              | d181d0bcd0bed182d180d0b5d182d18c                                                                 | 13108 |
| 3              | d0bad0b0d180d0b5d0bbd0bad0b8                                                                     | 12815 |
| 2              | d0b4d180d183d0b6d0bad0b520d0bfd0bed0bcd0b5d189d0b5d0bdd0b8d0b5                                   | 11946 |
+----------------+--------------------------------------------------------------------------------------------------+-------+
10 row(s) fetched.
Elapsed 0.561 seconds.

However if you run the same query with the proper schema (strings) you see a real search phrase

(venv) andrewlamb@Andrews-MacBook-Pro-2:~/Downloads$ datafusion-cli -f q.sql
DataFusion CLI v42.0.0
+----------------+---------------------------+-------+
| SearchEngineID | SearchPhrase              | c     |
+----------------+---------------------------+-------+
| 2              | карелки                   | 46258 |
| 2              | мангу в зарабей грама     | 18871 |
| 2              | смотреть онлайн           | 16905 |
| 3              | албатрутдин               | 16748 |
| 2              | смотреть онлайн бесплатно | 14909 |
| 2              | албатрутдин               | 13716 |
| 2              | экзоидные                 | 13414 |
| 2              | смотреть                  | 13108 |
| 3              | карелки                   | 12815 |
| 2              | дружке помещение          | 11946 |
+----------------+---------------------------+-------+
10 row(s) fetched.
Elapsed 0.569 seconds.
@alamb alamb added the enhancement New feature or request label Oct 7, 2024
@alamb alamb added the help wanted Extra attention is needed label Oct 7, 2024
@alamb alamb changed the title Add "read strings as binary" option for parquet Performance: Add "read strings as binary" option for parquet Oct 7, 2024
@goldmedal
Copy link
Contributor

I want to try this issue, but I'm unsure if it's part of #12777. Is @jayzhan211 currently working on it?

@alamb
Copy link
Contributor Author

alamb commented Oct 7, 2024

I want to try this issue, but I'm unsure if it's part of #12777. Is @jayzhan211 currently working on it?

Thank you @goldmedal

I think #12777 is version of this issue, but it changes the schema for all files, not just based on a config option

@goldmedal
Copy link
Contributor

Thanks, @alamb
I see. I think I can help by doing some research on this version (maybe do a POC? 🤔 ).

@alamb
Copy link
Contributor Author

alamb commented Oct 7, 2024

I think a POC would be wonderful

@jayzhan211
Copy link
Contributor

I want to try this issue, but I'm unsure if it's part of #12777. Is @jayzhan211 currently working on it?

You can take it if you would like to

@goldmedal
Copy link
Contributor

take

@goldmedal
Copy link
Contributor

@alamb @jayzhan211
I drafted a PR #12816 for a simple POC. In this PR, we can use it like

    let ctx = SessionContext::new();
    ctx.sql(
        r#"
    CREATE EXTERNAL TABLE hits
    STORED AS PARQUET
    LOCATION 'benchmarks/data/hits_partitioned'
    OPTIONS ('binary_as_string' 'true')
    "#,
    ).await?.show().await?;
    ctx.sql("describe hits").await?.show().await?;
    ctx.sql(r#"select "Title" from hits limit 1"#).await?.show().await?;

The result is

+-----------------------+-----------+-------------+
| column_name           | data_type | is_nullable |
+-----------------------+-----------+-------------+
| WatchID               | Int64     | YES         |
| JavaEnable            | Int16     | YES         |
| Title                 | Utf8      | YES         |
| GoodEvent             | Int16     | YES         |
...

+--------------------------+
| arrow_typeof(hits.Title) |
+--------------------------+
| Utf8                     |
+--------------------------+

If you want to do some experiments, this PR is easy to use.
It seems that I need to fix some CI fails, but the basic function works fine, I guess.
I'll add some tests and documents soon.

Related Issue

By the way, I found an issue about casting Binary to StringView when I tired to use this binary_as_string with schema_force_view_types.

    let ctx = SessionContext::new();
    ctx.sql(
        r#"
    CREATE EXTERNAL TABLE hits
    STORED AS PARQUET
    LOCATION '/Users/jax/git/datafusion/benchmarks/data/hits_partitioned'
    OPTIONS ('binary_as_string' 'true', 'schema_force_view_types' 'true')
    "#,
    ).await?.show().await?;
    ctx.sql("describe hits").await?.show().await?;
    ctx.sql(r#"select "Title" from hits limit 1"#).await?.show().await?;
----
Error: Error during planning: Cannot cast file schema field Title of type Binary to table schema field of type Utf8View

It can be reproduced by

> select arrow_cast(arrow_cast('abc', 'Binary'), 'Utf8View');
This feature is not implemented: Unsupported CAST from Binary to Utf8View
> select arrow_cast(arrow_cast('abc', 'Binary'), 'Utf8');
+-----------------------------------------------------------------+
| arrow_cast(arrow_cast(Utf8("abc"),Utf8("Binary")),Utf8("Utf8")) |
+-----------------------------------------------------------------+
| abc                                                             |
+-----------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.071 seconds.

I guess it is an issue of arrow-cast at
https://github.com/apache/arrow-rs/blob/1be268db2237b8850161f96849353eac00cb2615/arrow-cast/src/cast/mod.rs#L209-L210

Maybe we should file an issue on the arrow-rs repo. 🤔

BinaryView works well.

> select arrow_cast(arrow_cast('abc', 'BinaryView'), 'Utf8');
+---------------------------------------------------------------------+
| arrow_cast(arrow_cast(Utf8("abc"),Utf8("BinaryView")),Utf8("Utf8")) |
+---------------------------------------------------------------------+
| abc                                                                 |
+---------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.034 seconds.

> select arrow_cast(arrow_cast('abc', 'BinaryView'), 'Utf8View');
+-------------------------------------------------------------------------+
| arrow_cast(arrow_cast(Utf8("abc"),Utf8("BinaryView")),Utf8("Utf8View")) |
+-------------------------------------------------------------------------+
| abc                                                                     |
+-------------------------------------------------------------------------+
1 row(s) fetched. 
Elapsed 0.007 seconds.

@alamb
Copy link
Contributor Author

alamb commented Oct 8, 2024

Thank you @goldmedal -- I am checking it out now.

@jayzhan211
Copy link
Contributor

Yes, we need to support binary -> utf8view in arrow cast

@alamb
Copy link
Contributor Author

alamb commented Oct 9, 2024

Yes, we need to support binary -> utf8view in arrow cast

Filed apache/arrow-rs#6531

@alamb
Copy link
Contributor Author

alamb commented Oct 9, 2024

What I hope / plan to do is to combine this PR with the code in #12792 and #12809 from @Rachelint and finally get a benchmark run that shows stringview speeding up ClickBench for hits_partitioned.

Stay tuned.

@alamb
Copy link
Contributor Author

alamb commented Oct 9, 2024

Yes, we need to support binary -> utf8view in arrow cast

Casting from binary --> utf8view via cast will work, but won't be much/any faster than it is done today

BTW thinking more about this, I do think we need to support the cast, but in this PR we should effectively change the file schema (not just the table schema) when we setup the parquet reader (specifically with ArrowReaderOptions::with_schema)

Here is the code that does so for Utf8 --> Utf8View

if let Some(merged) = coerce_file_schema_to_view_type(&table_schema, &schema)
{
schema = Arc::new(merged);
}

I think we need to also allow the switch when the table schema is Utf8View and the file schema is Binary/BinaryView

@goldmedal
Copy link
Contributor

goldmedal commented Oct 9, 2024

BTW thinking more about this, I do think we need to support the cast, but in this PR we should effectively change the file schema (not just the table schema) when we setup the parquet reader (specifically with ArrowReaderOptions::with_schema)

This idea sounds great. It seems if we can apply the new schema when reading file, we can save one time casting. Just read as string.

I tried to follow the implementation of StringView to apply the new schema using with_schema but I got casting error.

Parquet error: Arrow: incompatible arrow schema, the following fields could not be cast:

I can reprodcue this error on the arrow-rs side by added a test case in parquet/src/arrow/arrow_reader/mod.rs

    #[test]
    fn test_cast_binary_utf8() {
        let original_fields = Fields::from(vec![
            Field::new("binary_to_utf8", ArrowDataType::Binary, false),
        ]);
        let file = write_parquet_from_iter(vec![
            (
                "binary_to_utf8",
                Arc::new(BinaryArray::from(vec![b"one".as_ref(), b"two".as_ref()])) as ArrayRef,
            ),
            ]);
        let supplied_fields = Fields::from(vec![
            Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
        ]);

        let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
        let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
            file.try_clone().unwrap(),
            options,
        )
        .expect("reader builder with schema")
        .build()
        .expect("reader with schema");

        let batch = arrow_reader.next().unwrap().unwrap();
        assert_eq!(batch.num_columns(), 1);
        assert_eq!(batch.num_rows(), 2);
        assert_eq!(
            batch
                .column(0)
                .as_any()
                .downcast_ref::<StringArray>()
                .expect("downcast to string")
                .iter()
                .collect::<Vec<_>>(),
            vec![Some("one"), Some("two")]
        );
    }

The output is

reader builder with schema: ArrowError("incompatible arrow schema, the following fields could not be cast: [binary_to_utf8]")

I tired to fix it through adding more pattern match at
https://github.com/apache/arrow-rs/blob/5508978a3c5c4eb65ef6410e097887a8adaba38a/parquet/src/arrow/schema/primitive.rs#L40

        (DataType::Binary, DataType::Utf8) => hint,

It can work well but I'm not pretty sure if this way makes sense 🤔

BTW, this way might be not safe. If the data isn't a valid Utf8 binary like

Arc::new(BinaryArray::from(vec![b"\xDE\x00\xFF".as_ref()])) as ArrayRef,

This casting will fail by

InvalidArgumentError("Invalid UTF8 sequence at string index 0 (0..3): invalid utf-8 sequence of 1 bytes from index 0")

Maybe we also need to make this behaivor be optional on the arrow-rs side? 🤔

@goldmedal
Copy link
Contributor

I tried to follow the implementation of StringView to apply the new schema using with_schema but I got casting error.

Parquet error: Arrow: incompatible arrow schema, the following fields could not be cast:

Submitted apache/arrow-rs#6539 for it.

@alamb
Copy link
Contributor Author

alamb commented Oct 10, 2024

Submitted apache/arrow-rs#6539 for it.

Thank you @goldmedal -- that looks great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants