Skip to content

Commit

Permalink
fix(rust, python): use explicit drop function node (#6769)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Feb 10, 2023
1 parent 1a45830 commit f61fa38
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 116 deletions.
33 changes: 33 additions & 0 deletions polars/polars-lazy/polars-plan/src/logical_plan/functions/drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use super::*;

pub(super) fn drop_impl(mut df: DataFrame, names: &[String]) -> PolarsResult<DataFrame> {
for name in names {
// ignore names that are not in there
// they might already be removed by projection pushdown
if let Some(idx) = df.find_idx_by_name(name) {
let _ = df.get_columns_mut().remove(idx);
}
}

Ok(df)
}

pub(super) fn drop_schema<'a>(
input_schema: &'a SchemaRef,
names: &[String],
) -> PolarsResult<Cow<'a, SchemaRef>> {
let to_drop = PlHashSet::from_iter(names);

let new_schema = input_schema
.iter()
.flat_map(|(name, dtype)| {
if to_drop.contains(name) {
None
} else {
Some(Field::new(name, dtype.clone()))
}
})
.collect::<Schema>();

Ok(Cow::Owned(Arc::new(new_schema)))
}
32 changes: 25 additions & 7 deletions polars/polars-lazy/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod drop;
#[cfg(feature = "merge_sorted")]
mod merge_sorted;
mod rename;
Expand Down Expand Up @@ -63,6 +64,9 @@ pub enum FunctionNode {
// A column name gets swapped with an existing column
swapping: bool,
},
Drop {
names: Arc<Vec<String>>,
},
}

impl PartialEq for FunctionNode {
Expand All @@ -84,6 +88,7 @@ impl PartialEq for FunctionNode {
..
},
) => existing_l == existing_r && new_l == new_r,
(Drop { names: l }, Drop { names: r }) => l == r,
_ => false,
}
}
Expand All @@ -97,7 +102,11 @@ impl FunctionNode {
Rechunk | Pipeline { .. } => false,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => false,
DropNulls { .. } | FastProjection { .. } | Unnest { .. } | Rename { .. } => true,
DropNulls { .. }
| FastProjection { .. }
| Unnest { .. }
| Rename { .. }
| Drop { .. } => true,
Opaque { streamable, .. } => *streamable,
}
}
Expand Down Expand Up @@ -161,16 +170,20 @@ impl FunctionNode {
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => Ok(Cow::Borrowed(input_schema)),
Rename { existing, new, .. } => rename::rename_schema(input_schema, existing, new),
Drop { names } => drop::drop_schema(input_schema, names),
}
}

pub(crate) fn allow_predicate_pd(&self) -> bool {
use FunctionNode::*;
match self {
Opaque { predicate_pd, .. } => *predicate_pd,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => {
true
}
FastProjection { .. }
| DropNulls { .. }
| Rechunk
| Unnest { .. }
| Rename { .. }
| Drop { .. } => true,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => true,
Pipeline { .. } => unimplemented!(),
Expand All @@ -181,9 +194,12 @@ impl FunctionNode {
use FunctionNode::*;
match self {
Opaque { projection_pd, .. } => *projection_pd,
FastProjection { .. } | DropNulls { .. } | Rechunk | Unnest { .. } | Rename { .. } => {
true
}
FastProjection { .. }
| DropNulls { .. }
| Rechunk
| Unnest { .. }
| Rename { .. }
| Drop { .. } => true,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => true,
Pipeline { .. } => unimplemented!(),
Expand Down Expand Up @@ -234,6 +250,7 @@ impl FunctionNode {
}
}
Rename { existing, new, .. } => rename::rename_impl(df, existing, new),
Drop { names } => drop::drop_impl(df, names),
}
}
}
Expand Down Expand Up @@ -278,6 +295,7 @@ impl Display for FunctionNode {
}
}
Rename { .. } => write!(f, "RENAME"),
Drop { .. } => write!(f, "DROP"),
}
}
}
60 changes: 40 additions & 20 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,34 @@ impl LazyFrame {
self.select_local(vec![col("*").reverse()])
}

/// Check the if the `names` are available in the `schema`, if not
/// return a `LogicalPlan` that raises an `Error`.
fn check_names(&self, names: &[String], schema: Option<&SchemaRef>) -> Option<Self> {
let schema = schema
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(self.schema().unwrap()));

let mut opt_not_found = None;
names.iter().for_each(|name| {
let invalid = schema.get(name).is_none();

if invalid && opt_not_found.is_none() {
opt_not_found = Some(name)
}
});

if let Some(name) = opt_not_found {
let lp = self
.clone()
.get_plan_builder()
.add_err(PolarsError::SchemaFieldNotFound(name.to_string().into()))
.build();
Some(Self::from_logical_plan(lp, self.opt_state))
} else {
None
}
}

/// Rename columns in the DataFrame.
pub fn rename<I, J, T, S>(self, existing: I, new: J) -> Self
where
Expand All @@ -290,25 +318,11 @@ impl LazyFrame {
}

// a column gets swapped
let schema = &*self.schema().unwrap();
let schema = &self.schema().unwrap();
let swapping = new_vec.iter().any(|name| schema.get(name).is_some());

let mut opt_not_found = None;
existing_vec.iter().for_each(|name| {
let invalid = schema.get(name).is_none();

if invalid && opt_not_found.is_none() {
opt_not_found = Some(name)
}
});

if let Some(name) = opt_not_found {
let lp = self
.clone()
.get_plan_builder()
.add_err(PolarsError::SchemaFieldNotFound(name.to_string().into()))
.build();
Self::from_logical_plan(lp, self.opt_state)
if let Some(lp) = self.check_names(&existing_vec, Some(schema)) {
lp
} else {
self.map_private(FunctionNode::Rename {
existing: existing_vec.into(),
Expand All @@ -330,12 +344,18 @@ impl LazyFrame {
.into_iter()
.map(|name| name.as_ref().to_string())
.collect();
self.drop_columns_impl(&columns)
self.drop_columns_impl(columns)
}

#[allow(clippy::ptr_arg)]
fn drop_columns_impl(self, columns: &Vec<String>) -> Self {
self.select_local(vec![col("*").exclude(columns)])
fn drop_columns_impl(self, columns: Vec<String>) -> Self {
if let Some(lp) = self.check_names(&columns, None) {
lp
} else {
self.map_private(FunctionNode::Drop {
names: columns.into(),
})
}
}

/// Shift the values by a given period and fill the parts that will be empty due to this operation
Expand Down
26 changes: 0 additions & 26 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1620,32 +1620,6 @@ fn test_binary_expr() -> PolarsResult<()> {
Ok(())
}

#[test]
fn test_drop_and_select() -> PolarsResult<()> {
let df = fruits_cars();

// we test that the schema is still correct for drop to work.
// typically the projection is pushed to before the drop and then the drop may think that some
// columns are still there to be projected

// we test this on both dataframe scan and csv scan.
let out = df
.lazy()
.drop_columns(["A", "B"])
.select([col("fruits")])
.collect()?;

assert_eq!(out.get_column_names(), &["fruits"]);

let out = scan_foods_csv()
.drop_columns(["calories", "sugar_g"])
.select([col("category")])
.collect()?;

assert_eq!(out.get_column_names(), &["category"]);
Ok(())
}

#[test]
fn test_single_group_result() -> PolarsResult<()> {
// the argsort should not auto explode
Expand Down
32 changes: 0 additions & 32 deletions py-polars/tests/unit/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,29 +499,6 @@ def test_head_tail_limit() -> None:
assert len(df.tail(-6)) == 4


def test_drop_nulls() -> None:
df = pl.DataFrame(
{
"foo": [1, 2, 3],
"bar": [6, None, 8],
"ham": ["a", "b", "c"],
}
)
result = df.drop_nulls()
expected = pl.DataFrame(
{
"foo": [1, 3],
"bar": [6, 8],
"ham": ["a", "c"],
}
)
assert_frame_equal(result, expected)

# below we only drop entries if they are null in the column 'foo'
result = df.drop_nulls("foo")
assert_frame_equal(result, df)


def test_pipe() -> None:
df = pl.DataFrame({"foo": [1, 2, 3], "bar": [6, None, 8]})

Expand Down Expand Up @@ -649,15 +626,6 @@ def test_extend() -> None:
assert_frame_equal(df1, expected)


def test_drop() -> None:
df = pl.DataFrame({"a": [2, 1, 3], "b": ["a", "b", "c"], "c": [1, 2, 3]})
df = df.drop(columns="a")
assert df.shape == (3, 2)
df = pl.DataFrame({"a": [2, 1, 3], "b": ["a", "b", "c"], "c": [1, 2, 3]})
s = df.drop_in_place("a")
assert s.name == "a"


def test_file_buffer() -> None:
f = BytesIO()
f.write(b"1,2,3,4,5,6\n7,8,9,10,11,12")
Expand Down
95 changes: 95 additions & 0 deletions py-polars/tests/unit/test_drop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import polars as pl
from polars.testing import assert_frame_equal


def test_drop_explode_6641() -> None:
df = pl.DataFrame(
{
"chromosome": ["chr1"] * 2,
"identifier": [["chr1:10426:10429:ACC>A"], ["chr1:10426:10429:ACC>*"]],
"alternate": [["A"], ["T"]],
"quality": pl.Series([None, None], dtype=pl.Float32()),
}
).lazy()

assert (
df.explode(["identifier", "alternate"])
.with_columns(pl.struct(["identifier", "alternate"]).alias("test"))
.drop(["identifier", "alternate"])
.select(pl.concat_list([pl.col("test"), pl.col("test")]))
.collect()
).to_dict(False) == {
"test": [
[
{"identifier": "chr1:10426:10429:ACC>A", "alternate": "A"},
{"identifier": "chr1:10426:10429:ACC>A", "alternate": "A"},
],
[
{"identifier": "chr1:10426:10429:ACC>*", "alternate": "T"},
{"identifier": "chr1:10426:10429:ACC>*", "alternate": "T"},
],
]
}


def test_drop_nulls() -> None:
df = pl.DataFrame(
{
"foo": [1, 2, 3],
"bar": [6, None, 8],
"ham": ["a", "b", "c"],
}
)
result = df.drop_nulls()
expected = pl.DataFrame(
{
"foo": [1, 3],
"bar": [6, 8],
"ham": ["a", "c"],
}
)
assert_frame_equal(result, expected)

# below we only drop entries if they are null in the column 'foo'
result = df.drop_nulls("foo")
assert_frame_equal(result, df)


def test_drop() -> None:
df = pl.DataFrame({"a": [2, 1, 3], "b": ["a", "b", "c"], "c": [1, 2, 3]})
df = df.drop(columns="a")
assert df.shape == (3, 2)
df = pl.DataFrame({"a": [2, 1, 3], "b": ["a", "b", "c"], "c": [1, 2, 3]})
s = df.drop_in_place("a")
assert s.name == "a"


def test_drop_nulls_lazy() -> None:
df = pl.DataFrame({"nrs": [None, 1, 2, 3, None, 4, 5, None]})
assert df.select(pl.col("nrs").drop_nulls()).to_dict(as_series=False) == {
"nrs": [1, 2, 3, 4, 5]
}

df = pl.DataFrame({"foo": [1, 2, 3], "bar": [6, None, 8], "ham": ["a", "b", "c"]})
expected = pl.DataFrame({"foo": [1, 3], "bar": [6, 8], "ham": ["a", "c"]})
result = df.lazy().drop_nulls().collect()
assert_frame_equal(result, expected)


def test_drop_columns() -> None:
out = pl.DataFrame({"a": [1], "b": [2], "c": [3]}).lazy().drop(["a", "b"])
assert out.columns == ["c"]

out = pl.DataFrame({"a": [1], "b": [2], "c": [3]}).lazy().drop("a")
assert out.columns == ["b", "c"]


def test_drop_nan_ignore_null_3525() -> None:
df = pl.DataFrame({"a": [1.0, float("NaN"), 2.0, None, 3.0, 4.0]})
assert df.select(pl.col("a").drop_nans()).to_series().to_list() == [
1.0,
2.0,
None,
3.0,
4.0,
]
Loading

0 comments on commit f61fa38

Please sign in to comment.