Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer file creation to write #8539

Merged
merged 5 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ListingTableUrl {
/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
#[deprecated(note = "Use parse")]
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
Expand Down
17 changes: 5 additions & 12 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,18 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};

let create_local_path = statement_options
.take_bool_option("create_local_path")?
.unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);

// Backwards compatibility
// Backwards compatibility (#8547)
if let Some(s) = statement_options.take_str_option("insert_mode") {
if !s.eq_ignore_ascii_case("append_new_files") {
return plan_err!("Unknown or unsupported insert mode {s}. Only append_to_file supported");
return plan_err!("Unknown or unsupported insert mode {s}. Only append_new_files supported");
}
}
statement_options.take_bool_option("create_local_path")?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please leave an comment here explaining why this is being ignored? Maybe even with a ticket reference to a ticket tracking removing it?

I can file such a ticket if it would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #8547


let file_type = file_format.file_type();

// Use remaining options and session state to build FileTypeWriterOptions
Expand Down Expand Up @@ -199,13 +198,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};

let table_path = match create_local_path {
true => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
false => ListingTableUrl::parse(&cmd.location),
}?;
let table_path = ListingTableUrl::parse(&cmd.location)?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/datasource/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,21 @@ impl StreamConfig {
match &self.encoding {
StreamEncoding::Csv => {
let header = self.header && !self.location.exists();
let file = OpenOptions::new().append(true).open(&self.location)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.location)?;
let writer = arrow::csv::WriterBuilder::new()
.with_header(header)
.build(file);

Ok(Box::new(writer))
}
StreamEncoding::Json => {
let file = OpenOptions::new().append(true).open(&self.location)?;
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&self.location)?;
Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
}
}
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, !*single_file_output)?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
18 changes: 9 additions & 9 deletions datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
b varchar,
)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because now that the directories no longer exist it falls back to using the trailing / to determining if a directory or file is desired.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that inserting (appending) to an individual file is no longer a thing ListingTable needs to concern itself with, I wonder if the concept of a "single file" table could be completely removed. I.e. whether there is a trailing slash or not, the LOCATION is interpreted as the directory where the data files will be written to / read from.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is definitely the eventual goal, is this something you would be interested in working on?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usecase of writing to /foo/bar/1.parquet (a single file) is important, but perhaps that is triggered by the name ending in .parquet 🤔

I think that would still be achievable with the proposal above, I just wanted to point it out

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, COPY has a SINGLE_FILE_OUTPUT option that gates this, but for CREATE EXTERNAL TABLE single files don't support insert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket for this #8548

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to take a stab at removing the single file table option next week, when I also plan to look at adding support for writing to arrow files.

PARTITIONED BY (b)
OPTIONS(
create_local_path 'true',
Expand Down Expand Up @@ -292,7 +292,7 @@ statement ok
CREATE EXTERNAL TABLE
directory_test(a bigint, b bigint)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0/'
OPTIONS(
create_local_path 'true',
);
Expand All @@ -312,7 +312,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1/'
OPTIONS (create_local_path 'true');

query TT
Expand Down Expand Up @@ -378,7 +378,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2/'
OPTIONS (create_local_path 'true');

query TT
Expand Down Expand Up @@ -423,7 +423,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(c1 varchar NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3/'
OPTIONS (create_local_path 'true');

# verify that the sort order of the insert query is maintained into the
Expand Down Expand Up @@ -462,7 +462,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(id BIGINT, name varchar)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4/'
OPTIONS (create_local_path 'true');

query IT
Expand Down Expand Up @@ -505,7 +505,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5/'
OPTIONS (create_local_path 'true');

query II
Expand Down Expand Up @@ -555,7 +555,7 @@ CREATE EXTERNAL TABLE test_column_defaults(
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6/'
OPTIONS (create_local_path 'true');

# fill in all column values
Expand Down Expand Up @@ -608,5 +608,5 @@ CREATE EXTERNAL TABLE test_column_defaults(
a int,
b int default a+1
) STORED AS parquet
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7'
LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7/'
OPTIONS (create_local_path 'true');
8 changes: 3 additions & 5 deletions docs/source/user-guide/sql/write_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,9 @@ The following special options are specific to the `COPY` command.

The following special options are specific to creating an external table.

| Option | Description | Default Value |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------------------------------- |
| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false |
| CREATE_LOCAL_PATH | If true, the folder or file backing this table will be created on the local file system if it does not already exist when running INSERT INTO queries. | false |
| INSERT_MODE | Determines if INSERT INTO queries should append to existing files or append new files to an existing directory. Valid values are append_to_file, append_new_files, and error. Note that "error" will block inserting data into this table. | CSV and JSON default to append_to_file. Parquet defaults to append_new_files |
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only valid INSERT_MODE is append_new_files so we might as well remove this from the documentation

| Option | Description | Default Value |
| ----------- | --------------------------------------------------------------------------------------------------------------------- | ------------- |
| SINGLE_FILE | If true, indicates that this external table is backed by a single file. INSERT INTO queries will append to this file. | false |

### JSON Format Specific Options

Expand Down