Skip to content

Commit

Permalink
joinp: add --use-schemas option; increased default --infer-len
Browse files Browse the repository at this point in the history
…from 1000 to 10,000
  • Loading branch information
jqnatividad committed Oct 27, 2024
1 parent 4999c4c commit 86fe22e
Showing 1 changed file with 87 additions and 20 deletions.
107 changes: 87 additions & 20 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,13 @@ joinp options:
enabled when using asof joins.
--infer-len <arg> The number of rows to scan when inferring the schema of the CSV.
Set to 0 to do a full table scan (warning: very slow).
[default: 1000]
[default: 10000]
--use-schemas Use cached Polars schema JSON files.
If the file/s exists, it will load the schema instead of inferring it
(ignoring --infer-len) and attempt to use it for each corresponding
Polars "table" with the same file stem.
To create Polars schema files, use the `sqlp` command with the `--infer-len`
and `--cache-schema` options.
--low-memory Use low memory mode when parsing CSVs. This will use less memory
but will be slower. It will also process the join in streaming mode.
Only use this when you get out of memory errors.
Expand Down Expand Up @@ -186,8 +192,8 @@ Common options:
use std::{
env,
fs::File,
io::{self, Write},
path::Path,
io::{self, BufReader, Read, Write},
path::{Path, PathBuf},
str,
};

Expand Down Expand Up @@ -218,6 +224,7 @@ struct Args {
flag_try_parsedates: bool,
flag_decimal_comma: bool,
flag_infer_len: usize,
flag_use_schemas: bool,
flag_low_memory: bool,
flag_no_optimizations: bool,
flag_ignore_errors: bool,
Expand Down Expand Up @@ -251,6 +258,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let join = args.new_join(
args.flag_try_parsedates,
args.flag_infer_len,
args.flag_use_schemas,
args.flag_low_memory,
args.flag_ignore_errors,
&tmpdir,
Expand Down Expand Up @@ -525,6 +533,7 @@ impl Args {
&mut self,
try_parsedates: bool,
infer_len: usize,
use_schemas: bool,
low_memory: bool,
ignore_errors: bool,
tmpdir: &tempfile::TempDir,
Expand All @@ -548,24 +557,53 @@ impl Args {
};

// check if the input files exist
let input1_path = Path::new(&self.arg_input1);
let input1_path = PathBuf::from(&self.arg_input1);
if !input1_path.exists() {
return fail_clierror!("Input file {} does not exist.", self.arg_input1);
}
let input2_path = Path::new(&self.arg_input2);
let input2_path = PathBuf::from(&self.arg_input2);
if !input2_path.exists() {
return fail_clierror!("Input file {} does not exist.", self.arg_input2);
}

let mut left_lf = {
// check if the left input file is snappy compressed
// if so, we need to decompress it first
if input1_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input1_path.to_path_buf(), tmpdir)?;
self.arg_input1 = decompressed_path;
}
// check if the left input file is snappy compressed
// if so, we need to decompress it first
if input1_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input1_path.to_path_buf(), tmpdir)?;
self.arg_input1 = decompressed_path;
}

let mut left_lf = if use_schemas {
let mut work_lf = LazyCsvReader::new(&self.arg_input1)
.with_has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_prefix(comment_char.clone())
.with_separator(tsvssv_delim(&self.arg_input1, delim))
.with_try_parse_dates(try_parsedates)
.with_decimal_comma(self.flag_decimal_comma)
.with_low_memory(low_memory)
.with_ignore_errors(ignore_errors);

// use-schemas is enabled, check if a valid pschema.json file exists for this table
let schema_file = input1_path.canonicalize()?.with_extension("pschema.json");
if schema_file.exists()
&& schema_file.metadata()?.modified()? > input1_path.metadata()?.modified()?
{
// We have a valid pschema.json file - it exists and is newer than the table
// load the schema and deserialize it and use it with the lazy frame
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
let mut schema_json = String::with_capacity(100);
buf_reader.read_to_string(&mut schema_json)?;
let schema: Schema = serde_json::from_str(&schema_json)?;
work_lf = work_lf.with_schema(Some(Arc::new(schema)));
} else {
// there is no valid pschema.json file, infer the schema using --infer-len
work_lf = work_lf.with_infer_schema_length(Some(infer_len));
}
work_lf.finish()?
} else {
LazyCsvReader::new(&self.arg_input1)
.with_has_header(true)
.with_missing_is_null(self.flag_nulls)
Expand All @@ -584,14 +622,43 @@ impl Args {
left_lf = left_lf.filter(filter_left_expr);
}

let mut right_lf = {
// check if the right input file is snappy compressed
if input2_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input2_path.to_path_buf(), tmpdir)?;
self.arg_input2 = decompressed_path;
}
// check if the right input file is snappy compressed
if input2_path.extension().and_then(std::ffi::OsStr::to_str) == Some("sz") {
let decompressed_path =
util::decompress_snappy_file(&input2_path.to_path_buf(), tmpdir)?;
self.arg_input2 = decompressed_path;
}

let mut right_lf = if use_schemas {
let mut work_lf = LazyCsvReader::new(&self.arg_input2)
.with_has_header(true)
.with_missing_is_null(self.flag_nulls)
.with_comment_prefix(comment_char)
.with_separator(tsvssv_delim(&self.arg_input2, delim))
.with_try_parse_dates(try_parsedates)
.with_decimal_comma(self.flag_decimal_comma)
.with_low_memory(low_memory)
.with_ignore_errors(ignore_errors);

// use-schemas is enabled, check if a valid pschema.json file exists for this table
let schema_file = input2_path.canonicalize()?.with_extension("pschema.json");
if schema_file.exists()
&& schema_file.metadata()?.modified()? > input2_path.metadata()?.modified()?
{
// We have a valid pschema.json file - it exists and is newer than the table
// load the schema and deserialize it and use it with the lazy frame
let file = File::open(&schema_file)?;
let mut buf_reader = BufReader::new(file);
let mut schema_json = String::with_capacity(100);
buf_reader.read_to_string(&mut schema_json)?;
let schema: Schema = serde_json::from_str(&schema_json)?;
work_lf = work_lf.with_schema(Some(Arc::new(schema)));
} else {
// there is no valid pschema.json file, infer the schema using --infer-len
work_lf = work_lf.with_infer_schema_length(Some(infer_len));
}
work_lf.finish()?
} else {
LazyCsvReader::new(&self.arg_input2)
.with_has_header(true)
.with_missing_is_null(self.flag_nulls)
Expand Down

0 comments on commit 86fe22e

Please sign in to comment.