Skip to content

Commit

Permalink
maybe windows fix
Browse files Browse the repository at this point in the history
  • Loading branch information
devinjdangelo committed Aug 15, 2023
1 parent 8863c45 commit 5938dc0
Showing 1 changed file with 9 additions and 17 deletions.
26 changes: 9 additions & 17 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::logical_expr::{
};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_expr::dml::{CopyTo, OutputFileFormat};
use object_store::local::LocalFileSystem;
use url::Url;

use crate::logical_expr::{Limit, Values};
Expand Down Expand Up @@ -565,16 +564,19 @@ impl DefaultPhysicalPlanner {
options: _,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// Get object store for specified output_url
let maybe_parsed_url = ListingTableUrl::parse(output_url);
// if user did not pass in a url, we assume it is a local file path
// this requires some special handling as copy can create non
// existing file paths
let is_valid_url = Url::parse(output_url).is_ok();

// if we failed due to non existing local path passed, try again as local url with ObjectStore
// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = match maybe_parsed_url {
Ok(url) => Ok(url),
Err(DataFusionError::IoError(_e)) => {
let parsed_url = match is_valid_url {
true => ListingTableUrl::parse(output_url),
false => {
let path = std::path::PathBuf::from(output_url);
if !path.exists(){
if *per_thread_output{
Expand All @@ -583,18 +585,8 @@ impl DefaultPhysicalPlanner {
fs::File::create(path)?;
}
}
let clean_path = std::path::PathBuf::from(output_url)
.canonicalize()?
.to_str()
.ok_or(DataFusionError::Plan(format!("Unable to clean path {output_url}")))?
.to_owned();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://").unwrap();
session_state.runtime_env().register_object_store(&local_url, local);
let output_local_url = format!("file://{clean_path}");
ListingTableUrl::parse(output_local_url)
ListingTableUrl::parse(output_url)
}
Err(e) => Err(e)
}?;

let object_store_url = parsed_url.object_store();
Expand Down

0 comments on commit 5938dc0

Please sign in to comment.