Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

joinp: add --maintain-order option #2338

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,9 @@ strum_macros = { git = "https://github.com/dathere/strum", branch = "bump-phf-to
# BUILD NOTE: Be sure to set QSV_POLARS_REV below to the latest commit short hash or tag
# of polars/py-polars before building qsv. This allows us to show the polars rev/tag in --version.
# if we are using a release version of Rust Polars, leave QSV_POLARS_REV empty
# QSV_POLARS_REV=39550c0
# QSV_POLARS_REV=a6ca94d
# polars = { git = "https://github.com/pola-rs/polars", tag = "py-1.16.0" }
polars = { git = "https://github.com/pola-rs/polars", rev = "39550c0" }
polars = { git = "https://github.com/pola-rs/polars", rev = "a6ca94d" }

[features]
default = ["mimalloc"]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
| [index](/src/cmd/index.rs#L2) | Create an index (📇) for a CSV. This is very quick (even the 15gb, 28m row NYC 311 dataset takes all of 14 seconds to index) & provides constant time indexing/random access into the CSV. With an index, `count`, `sample` & `slice` work instantaneously; random access mode is enabled in `luau`; and multithreading (🏎️) is enabled for the `frequency`, `split`, `stats`, `schema` & `tojsonl` commands. |
| [input](/src/cmd/input.rs#L2) | Read CSV data with special commenting, quoting, trimming, line-skipping & non-UTF8 encoding handling rules. Typically used to "normalize" a CSV for further processing with other qsv commands. |
| [join](/src/cmd/join.rs#L2)<br>👆 | Inner, outer, right, cross, anti & semi joins. Automatically creates a simple, in-memory hash index to make it fast. |
| [joinp](/src/cmd/joinp.rs#L2)✨<br>🚀🐻‍❄️🪄 | Inner, outer, right, cross, anti, semi & asof joins using the [Pola.rs](https://www.pola.rs) engine. Unlike the `join` command, `joinp` can process files larger than RAM, is multithreaded, has join key validation, pre-join filtering, supports [asof joins](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.join_asof.html) (which is [particularly useful for time series data](https://github.com/dathere/qsv/blob/30cc920d0812a854fcbfedc5db81788a0600c92b/tests/test_joinp.rs#L509-L983)) & its output columns can be coalesced. |
| [joinp](/src/cmd/joinp.rs#L2)✨<br>🚀🐻‍❄️🪄 | Inner, outer, right, cross, anti, semi & asof joins using the [Pola.rs](https://www.pola.rs) engine. Unlike the `join` command, `joinp` can process files larger than RAM, is multithreaded, has join key validation, a maintain row order option, pre-join filtering, supports [asof joins](https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/api/polars.DataFrame.join_asof.html) (which is [particularly useful for time series data](https://github.com/dathere/qsv/blob/30cc920d0812a854fcbfedc5db81788a0600c92b/tests/test_joinp.rs#L509-L983)) & its output columns can be coalesced. |
| [json](/src/cmd/json.rs#L2)<br>👆 | Convert JSON to CSV.
| [jsonl](/src/cmd/jsonl.rs#L2)<br>🚀🔣 | Convert newline-delimited JSON ([JSONL](https://jsonlines.org/)/[NDJSON](http://ndjson.org/)) to CSV. See `tojsonl` command to convert CSV to JSONL.
| [lens](/src/cmd/lens.rs#L2)✨ | Interactively view, search & filter a CSV using the [csvlens](https://github.com/YS-L/csvlens#csvlens) engine.
Expand Down
61 changes: 47 additions & 14 deletions src/cmd/joinp.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
static USAGE: &str = r#"
Joins two sets of CSV data on the specified columns using the Pola.rs engine.
Joins two sets of CSV data on the specified columns using the Polars engine.

The default join operation is an 'inner' join. This corresponds to the
intersection of rows on the keys specified.

Unlike the join command, joinp can process files larger than RAM, is multithreaded,
has join key validation, pre-join filtering, supports asof joins & its output columns
can be coalesced (no duplicate columns).

However, joinp doesn't have an --ignore-case option.
has join key validation, a maintain row order option, pre-join filtering, supports
asof joins and its output columns can be coalesced (no duplicate columns).

Returns the shape of the join result (number of rows, number of columns) to stderr.

Expand Down Expand Up @@ -76,7 +74,13 @@ joinp options:
onetoone - join keys are unique in both left & right data sets.
[default: none]

JOIN OPTIONS:
JOIN OPTIONS:
--maintain-order <arg> Which row order to preserve, if any. Valid values are:
none, left, right, left_right, right_left
Do not rely on any observed ordering without explicitly
setting this parameter. Not specifying any order can improve
performance. Supported for inner, left, right and full joins.
[default: none]
--nulls When set, joins will work on empty fields.
Otherwise, empty fields are completely ignored.
--streaming When set, the join will be done in a streaming fashion.
Expand Down Expand Up @@ -228,6 +232,7 @@ struct Args {
flag_filter_left: Option<String>,
flag_filter_right: Option<String>,
flag_validate: Option<String>,
flag_maintain_order: Option<String>,
flag_nulls: bool,
flag_streaming: bool,
flag_try_parsedates: bool,
Expand Down Expand Up @@ -283,6 +288,19 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
s => return fail_incorrectusage_clierror!("Invalid join validation: {s}"),
};

let flag_maintain_order = args
.flag_maintain_order
.unwrap_or_else(|| "none".to_string())
.to_lowercase();
let maintain_order = match flag_maintain_order.as_str() {
"none" => MaintainOrderJoin::None,
"left" => MaintainOrderJoin::Left,
"right" => MaintainOrderJoin::Right,
"left_right" => MaintainOrderJoin::LeftRight,
"right_left" => MaintainOrderJoin::RightLeft,
s => return fail_incorrectusage_clierror!("Invalid maintain order option: {s}"),
};

let join_shape: (usize, usize) = match (
args.flag_left,
args.flag_left_anti,
Expand All @@ -292,27 +310,35 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
args.flag_cross,
args.flag_asof,
) {
// default inner join
(false, false, false, false, false, false, false) => {
join.run(JoinType::Inner, validation, false)
join.run(JoinType::Inner, validation, maintain_order, false)
},
// left join
(true, false, false, false, false, false, false) => {
join.run(JoinType::Left, validation, false)
join.run(JoinType::Left, validation, maintain_order, false)
},
// left anti join
(false, true, false, false, false, false, false) => {
join.run(JoinType::Anti, validation, false)
join.run(JoinType::Anti, validation, maintain_order, false)
},
// left semi join
(false, false, true, false, false, false, false) => {
join.run(JoinType::Semi, validation, false)
join.run(JoinType::Semi, validation, maintain_order, false)
},
// right join
(false, false, false, true, false, false, false) => {
join.run(JoinType::Right, validation, false)
join.run(JoinType::Right, validation, maintain_order, false)
},
// full join
(false, false, false, false, true, false, false) => {
join.run(JoinType::Full, validation, false)
join.run(JoinType::Full, validation, maintain_order, false)
},
// cross join
(false, false, false, false, false, true, false) => {
join.run(JoinType::Cross, validation, false)
join.run(JoinType::Cross, validation, MaintainOrderJoin::None, false)
},
// as of join
(false, false, false, false, false, false, true) => {
// safety: flag_strategy is always is_some() as it has a default value
args.flag_strategy = Some(args.flag_strategy.unwrap().to_lowercase());
Expand Down Expand Up @@ -358,7 +384,12 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
.collect(),
);
}
join.run(JoinType::AsOf(asof_options), validation, true)
join.run(
JoinType::AsOf(asof_options),
validation,
MaintainOrderJoin::None,
true,
)
},
_ => fail_incorrectusage_clierror!("Please pick exactly one join operation."),
}?;
Expand Down Expand Up @@ -394,6 +425,7 @@ impl JoinStruct {
mut self,
jointype: JoinType,
validation: JoinValidation,
maintain_order: MaintainOrderJoin,
asof_join: bool,
) -> CliResult<(usize, usize)> {
let mut left_selcols: Vec<_> = self
Expand Down Expand Up @@ -537,6 +569,7 @@ impl JoinStruct {
.left_on(left_selcols)
.right_on(right_selcols)
.how(jointype)
.maintain_order(maintain_order)
.coalesce(coalesce_flag)
.allow_parallel(true)
.validate(validation)
Expand Down
Loading
Loading