Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

writing to partitioned table uses the wrong column as partition key #7892

Closed
alamb opened this issue Oct 20, 2023 · 10 comments
Closed

writing to partitioned table uses the wrong column as partition key #7892

alamb opened this issue Oct 20, 2023 · 10 comments
Labels
bug Something isn't working

Comments

@alamb
Copy link
Contributor

alamb commented Oct 20, 2023

Describe the bug

writing to partitioned table uses the wrong column as partition key

To Reproduce

❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition_id) location '/tmp/test'  options (create_local_path 'true');
Arrow error: Schema error: Unable to get field named "partition_id". Valid fields: ["partition", "trace_id"]
❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition) location '/tmp/test'  options (create_local_path 'true');
0 rows in set. Query took 0.001 seconds.

❯ insert into test select * from 'input.parquet';
Object Store error: Generic LocalFileSystem error: Unable to open file /private/tmp/test/partition=00000000000000003088e9e74cf166bd/QZSOEmePoAsQkvzU.parquet#1: Too many open files (os error 24)

It looks like it used the wrong column as the partition key:

alamb@MacBook-Pro-8:~/Software/arrow-datafusion$ ls /tmp/test | wc -l
   19992

alamb@MacBook-Pro-8:~/Software/arrow-datafusion$ ls /tmp/test | head
partition=0000000000000000000102576ce2faea/
partition=00000000000000000004d8eb49424f0d/
partition=000000000000000000051e89839e2bb0/
partition=00000000000000000005406b87ebcb41/
partition=000000000000000000065226d41eba99/
partition=000000000000000000066e556bea9c68/
partition=0000000000000000000688030531c6ff/
partition=000000000000000000068839bb143b45/
partition=00000000000000000006aaa220390696/
partition=00000000000000000006b0ebabd460a3/

Here is the input: input.parquet

Here is how I made it:

❯ copy (select 'a' as "partition", trace_id from traces UNION ALL select 'b' as "partition", trace_id from traces UNION ALL select 'c' as "partition", trace_id from traces) to 'input.parquet';
+----------+
| count    |
+----------+
| 15557151 |
+----------+
1 row in set. Query took 3.639 seconds.

❯ select * from 'input.parquet' limit 1;
+-----------+----------------------------------+
| partition | trace_id                         |
+-----------+----------------------------------+
| b         | 000000000000000028bf4438cad62275 |
+-----------+----------------------------------+
1 row in set. Query took 0.009 seconds.

❯ describe 'input.parquet';
+-------------+-------------------------+-------------+
| column_name | data_type               | is_nullable |
+-------------+-------------------------+-------------+
| partition   | Utf8                    | NO          |
| trace_id    | Dictionary(Int32, Utf8) | YES         |
+-------------+-------------------------+-------------+
2 rows in set. Query took 0.001 seconds.

Expected behavior

I expect a three output directories to be created, for each of the distinct values of partition_id, a, b and c

/tmp/test/partition_id=a/<uuid>.parquet
/tmp/test/partition_id=b/<uuid>.parquet
/tmp/test/partition_id=c/<uuid>.parquet

Additional context

Found as part of #7801 (review)

@devinjdangelo did some initial investigation: #7801 (comment)

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Oct 21, 2023

@alamb I opened a PR (#7896) which in my testing works correctly for your example case, but only in pure rust. In datafusion-cli it still fails. I believe this is because of an additional bug in datafusion-cli that is incorrectly casting the column from dictionary encoded to non-dictionary encoded.

Using #7896, the following example runs as expected:

use datafusion::{dataframe::DataFrameWriteOptions, prelude::*};
use datafusion_common::DataFusionError;
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use url::Url;

const FILENAME: &str = "/home/dev/arrow-datafusion/input.parquet";

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();
    let local = Arc::new(LocalFileSystem::new());
    let local_url = Url::parse("file://local").unwrap();
    _ctx.runtime_env().register_object_store(&local_url, local);

    let _read_options = ParquetReadOptions::default();

    let _df = _ctx.read_parquet(FILENAME, _read_options.clone()).await?;

    _df.clone().show_limit(10).await?;

    println!("{}", _df.clone().schema());

    _ctx.sql(
        "create external table 
    test(partition string, trace_id varchar)
    stored as parquet 
    partitioned by (partition) 
    location './temptest2/'  
    options (create_local_path 'true');",
    )
    .await?
    .collect()
    .await?;

    _df.clone()
        // This select is needed since the schema equivalence check is sensitive to column ordering
        .select(vec![col("trace_id"), col("partition")])?
        .write_table("test", DataFrameWriteOptions::new())
        .await?;

    _ctx.sql("select count(1) from test").await?.show().await?;

    Ok(())
}

@marvinlanhenke
Copy link
Contributor

...while I was looking into: #8493 trying to understand how the hive-partitioned writes are implemented I came across the same issue, having it select the wrong column as the partition-column. So I did a basic test.

@alamb @devinjdangelo

this is what I suspect so far:

The compute_partition_keys_by_row gets the ArrayRef with column_by_name which uses the schema and the field_idx to return the ArrayRef. demux.rs

However, when creating the table in ListingTableFactory a new schema is returned by excluding the declared partition_cols. listing_table_factory.rs

A few lines later when the actual ListingTable is created the schema is put together, by appending the partition_cols at the end - this changes the original schema field order. table.rs

The values by the insert into statement, however, are declared in the original order.
Now, we have a mismatch: (see print output below). Schema Field[0] = trace_id instead of partition.

I am not sure if I am correct, and why the partition_cols need to be excluded and then added to the end later on.

If this could be handled differently the partition_bug migth be resolved.

use datafusion::prelude::*;
use datafusion_common::DataFusionError;

#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
    let _ctx = SessionContext::new();

    _ctx.sql(
        "create external table 
    test(partition string, trace_id varchar)
    stored as parquet 
    partitioned by (partition) 
    location './temptest2/'  
    options (create_local_path 'true', insert_mode 'append_new_files',);",
    )
    .await?
    .collect()
    .await?;

    _ctx.sql(
        "
        insert into test values
        ('b', '0btid1'), ('b', '0btid2'), ('c', '0ctid1'), ('c', '0ctid2')
    ",
    )
    .await?
    .collect()
    .await?;

    Ok(())
}
rb: RecordBatch { schema: Schema { fields: [Field { name: "trace_id", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, columns: [StringArray
[
  "b",
  "b",
  "c",
  "c",
], StringArray
[
  "0btid1",
  "0btid2",
  "0ctid1",
  "0ctid2",
]], row_count: 4 }
col: partition, col_array: StringArray
[
  "0btid1",
  "0btid2",
  "0ctid1",
  "0ctid2",
]

@devinjdangelo
Copy link
Contributor

Hey @marvinlanhenke , thanks for digging into this.

I am not sure if I am correct, and why the partition_cols need to be excluded and then added to the end later on.

The reason there are two schemas for hive-style partition tables is that by convention the partition columns are excluded from the underlying files. So, in your test table the underlying parquet files would only have 1 column "trace_id". DataFusion has to know the parquet schema and also the table schema and at execution time "fill in" the partition column based on the file path of the parquet files.

In the Demux code, this comes into play in the remove_partition_by_columns. The incoming RecordBatches contain the partition values encoded as Array arrays, but we don't actually want to write those to disk (just encode them in the file path).

As for the error in your example, DataFusion is moving the partition_by columns to the end of the table schema regardless of how you declare the table. We may want to update the Create External Table syntax to throw an error if you do not place the partition_by columns at the end, to avoid this confusion. See this note in our insert_to_external sqllogic tests: https://github.com/apache/arrow-datafusion/blob/main/datafusion/sqllogictest/test_files/insert_to_external.slt#L178...

In fact in HiveQL syntax, it is an error to declare the partition() columns in the body of the table as well. https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-PartitionedTables

I think we could avoid confusion/bugs around schema ordering and partitioning by following this same convention.

@marvinlanhenke
Copy link
Contributor

@devinjdangelo
thank you so much for the explanation. I wasn't aware of the convention [1] and thus not sure if the schema handling was intended this way. Now, it makes sense, thank you.

In fact in HiveQL syntax, it is an error to declare the partition() columns in the body of the table as well. X
I think we could avoid confusion/bugs around schema ordering and partitioning by following this same convention.

I like this idea. Would have saved me some confusion ;)

[1] https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-PartitionedTables

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Mar 4, 2024

I revisited this on the theory that #9276 fixed it as a side effect. I was wrong and it is still an issue.

❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition) location '/tmp/test/';
0 rows in set. Query took 0.001 seconds.

❯ insert into test select * from 'input.parquet';
#(runs for a very long time and uses wrong column for partitioning)insert into test select trace_id, partition from 'input.parquet';
+----------+
| count    |
+----------+
| 15557151 |
+----------+
1 row in set. Query took 1.501 seconds.

As shown above, it seems that the order of the columns in the schema affects whether the result is correct. I think we will need to look into the logic which aligns the schema of the table vs. the stream of data which should be written to the table.

Note as well that listing table moves the "partition" column to the end of the schema. So, the second query projects the 'input.parquet' file in the same way which fixes the issue. It would be nice if the Insert planning was smart enough to align the schemas correctly based on the field names and data types automatically...

@devinjdangelo
Copy link
Contributor

So, we actually do have some logic to realign the schemas in insert_to_plan logical planning, but only when the user manually specifies the column list that they are inserting. When no column list is passed to insert, it is assumed that the schemas are already aligned and no effort is made to align them by name. E.g. this works

DataFusion CLI v36.0.0
❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition) location '/tmp/test/';
0 rows in set. Query took 0.001 seconds.

❯ insert into test (partition, trace_id) select * from 'input.parquet';
+----------+
| count    |
+----------+
| 15557151 |
+----------+
1 row in set. Query took 1.487 seconds.

Relevant bit of code is here: https://github.com/apache/arrow-datafusion/blob/581fd98270e69221689ddd4c37566ef620a67167/datafusion/sql/src/statement.rs#L1154-L1160

@devinjdangelo
Copy link
Contributor

@alamb I've narrowed this down to logical planning for insert_to_plan not considering the field names of the input schema vs. the table schema. Rather, it assumes the fields are in the same order. ListingTable moves the partition column to the end of the schema and we aren't realigning based on the name of the column being the same, which leads to the confusing behavior you flagged in this issue.

I believe we should add logic to insert_to_plan which will align fields from the input schema and table schema if their names are the same and data types logically equivalent. E.g. there are two cases to handle:

  1. Input schema field names are a subset of table schema field names. If so, project input schema so that the schemas are aligned by name.
  2. Fall back to the existing logic which assumes the schemas are already aligned if the types are logically equivalent by position.

@alamb
Copy link
Contributor Author

alamb commented Mar 4, 2024

I don't think SQL aligns on field name, but instead only on position

I think the two cases are

  1. INSERT INTO bar SELECT .... in which case the columns of the select list should be inserted into the table columns by position. If there are more columns in the table than in the select list, the remaining ones should be null padded
  2. INSERT INTO bar(cols) SELECT ... in which case the columns of the select list should be inserted into the list of columns by position. If there are more columns in the table than in the select list, the remaining ones should be null padded

Does that makes sense? Maybe we can simply add ProjectionExec that does the alignment correctly?

Here is an example showing that position is used

postgres=# create table foo(x int, y int);
CREATE TABLE
postgres=# insert into foo values (1, 100);
INSERT 0 1

Now, insert into bar via select * from foo, and position is used:

postgres=# create table bar (y int, x int);
CREATE TABLE
postgres=# insert into bar select * from foo;
INSERT 0 1
postgres=# select * from bar;
 y |  x
---+-----
 1 | 100
(1 row)

Insert too few columns into foo, then pad rest with null but don't align by column name:

postgres=# insert into bar select x from foo;
INSERT 0 1
postgres=# select * from bar;
 y |  x
---+-----
 1 | 100
 1 |
(2 rows)

@devinjdangelo
Copy link
Contributor

devinjdangelo commented Mar 5, 2024

I don't think SQL aligns on field name, but instead only on position

I think the two cases are

INSERT INTO bar SELECT .... in which case the columns of the select list should be inserted into the table columns by position. If there are more columns in the table than in the select list, the remaining ones should be null padded
INSERT INTO bar(cols) SELECT ... in which case the columns of the select list should be inserted into the list of columns by position. If there are more columns in the table than in the select list, the remaining ones should be null padded

In that case, the existing insert logical planning and partitioned writes are behaving correctly. The reproducer in this issue is correct behavior which is confusing due to CREATE EXTERNAL TABLE reordering the schema by moving the partition column to the end. I believe this sequence of commands shows the problem clearly.

DataFusion CLI v36.0.0
❯ create external table test(partition varchar, trace_id varchar) stored as parquet partitioned by (partition) location '/tmp/test/';
0 rows in set. Query took 0.001 seconds.

❯ insert into test values ('a','x'),('b','y'),('c','z');
+-------+
| count |
+-------+
| 3     |
+-------+
1 row in set. Query took 0.016 seconds.

❯ select * from test;
+----------+-----------+
| trace_id | partition |
+----------+-----------+
| a        | x         |
| c        | z         |
| b        | y         |
+----------+-----------+
3 rows in set. Query took 0.002 seconds.

❯ select * from 'input.parquet' limit 5;
+-----------+----------------------------------+
| partition | trace_id                         |
+-----------+----------------------------------+
| b         | 0000000000000000353b8c80d0183941 |
| b         | 0000000000000000353b8c80d0183941 |
| b         | 0000000000000000353b8c80d0183941 |
| b         | 0000000000000000353b8c80d0183941 |
| b         | 0000000000000000353b8c80d0183941 |
+-----------+----------------------------------+
5 rows in set. Query took 0.013 seconds.

Note that the table schema and the parquet schema are not aligned by position due to ListingTable moving partition columns to the end. HiveQL DDL statements avoid this confusing behavior by having the column declaration moved entirely to the partition by clause:

CREATE TABLE page_view(
     viewTime INT, 
     userid BIGINT,
     page_url STRING, 
     ip STRING)
 PARTITIONED BY(dt STRING, country STRING)
 STORED AS SEQUENCEFILE;

Note that dt and country are ONLY declared in PARTITIONED BY clause. This way there can be no ambiguity that the partition columns will always be at the end of the schema order wise. https://cwiki.apache.org/confluence/display/hive/languagemanual+ddl#LanguageManualDDL-PartitionedTables

@alamb
Copy link
Contributor Author

alamb commented Mar 9, 2024

In that case, the existing insert logical planning and partitioned writes are behaving correctly. The reproducer in this issue is correct behavior which is confusing due to CREATE EXTERNAL TABLE reordering the schema by moving the partition column to the end. I believe this sequence of commands shows the problem clearly.

I agree with your analysis -- thank you @devinjdangelo -- it is clear. I will close this ticket and let's continue the discussion on #9465

@alamb alamb closed this as completed Mar 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants