Skip to content

Commit

Permalink
Support input/output to/from polars via python (not native rust)
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Paine <3105306+timkpaine@users.noreply.github.com>
  • Loading branch information
timkpaine committed Nov 15, 2024
1 parent 3eb8c70 commit bde5e18
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 1 deletion.
250 changes: 250 additions & 0 deletions rust/perspective-python/perspective/tests/table/test_table_polars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
# ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
# ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
# ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
# ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
# ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
# ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
# ┃ Copyright (c) 2017, the Perspective Authors. ┃
# ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
# ┃ This file is part of the Perspective library, distributed under the terms ┃
# ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
# ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

from datetime import date, datetime
import numpy as np
import polars as pl
from pytest import mark
import perspective as psp

client = psp.Server().new_local_client()
Table = client.table


def arrow_bytes_to_polars(view):
import pyarrow

with pyarrow.ipc.open_stream(pyarrow.BufferReader(view.to_arrow())) as reader:
return pl.from_dataframe(reader.read_pandas())


class TestTablePolars(object):
def test_empty_table(self):
tbl = Table([])
assert tbl.size() == 0
assert tbl.schema() == {}

def test_table_dataframe(self):
d = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
data = pl.DataFrame(d)
tbl = Table(data)
assert tbl.size() == 2
assert tbl.schema() == {"a": "integer", "b": "integer"}
assert tbl.view().to_records() == [
{"a": 1, "b": 2},
{"a": 3, "b": 4},
]

def test_table_dataframe_column_order(self):
d = [{"a": 1, "b": 2, "c": 3, "d": 4}, {"a": 3, "b": 4, "c": 5, "d": 6}]
data = pl.DataFrame(d).select(["b", "c", "a", "d"])
tbl = Table(data)
assert tbl.size() == 2
assert tbl.columns() == ["b", "c", "a", "d"]

def test_table_dataframe_selective_column_order(self):
d = [{"a": 1, "b": 2, "c": 3, "d": 4}, {"a": 3, "b": 4, "c": 5, "d": 6}]
data = pl.DataFrame(d).select(["b", "c", "a"])
tbl = Table(data)
assert tbl.size() == 2
assert tbl.columns() == ["b", "c", "a"]

@mark.skip(reason="Not supported, polars converts to fixed_size_binary")
def test_table_dataframe_does_not_mutate(self):
# make sure we don't mutate the dataframe that a user passes in
data = pl.DataFrame(
{
"a": np.array([None, 1, None, 2], dtype=object),
"b": np.array([1.5, None, 2.5, None], dtype=object),
}
)
assert data["a"].to_list() == [None, 1, None, 2]
assert data["b"].to_list() == [1.5, None, 2.5, None]

tbl = Table(data)
assert tbl.size() == 4
assert tbl.schema() == {"a": "integer", "b": "float"}

assert data["a"].to_list() == [None, 1, None, 2]
assert data["b"].to_list() == [1.5, None, 2.5, None]

def test_table_polars_from_schema_int(self):
data = [None, 1, None, 2, None, 3, 4]
df = pl.DataFrame({"a": data})
table = Table({"a": "integer"})
table.update(df)
assert table.view().to_columns()["a"] == data

def test_table_polars_from_schema_bool(self):
data = [True, False, True, False]
df = pl.DataFrame({"a": data})
table = Table({"a": "boolean"})
table.update(df)
assert table.view().to_columns()["a"] == data

def test_table_polars_from_schema_float(self):
data = [None, 1.5, None, 2.5, None, 3.5, 4.5]
df = pl.DataFrame({"a": data})
table = Table({"a": "float"})
table.update(df)
assert table.view().to_columns()["a"] == data

def test_table_polars_from_schema_float_all_nan(self):
data = [np.nan, np.nan, np.nan, np.nan]
df = pl.DataFrame({"a": data})
table = Table({"a": "float"})
table.update(df)
assert table.view().to_columns()["a"] == [None, None, None, None]

def test_table_polars_from_schema_float_to_int(self):
data = [None, 1.5, None, 2.5, None, 3.5, 4.5]
df = pl.DataFrame({"a": data})
table = Table({"a": "integer"})
table.update(df)
# truncates decimal
assert table.view().to_columns()["a"] == [None, 1, None, 2, None, 3, 4]

def test_table_polars_from_schema_int_to_float(self):
data = [None, 1, None, 2, None, 3, 4]
df = pl.DataFrame({"a": data})
table = Table({"a": "float"})
table.update(df)
assert table.view().to_columns()["a"] == [None, 1.0, None, 2.0, None, 3.0, 4.0]

def test_table_polars_from_schema_date(self, util):
data = [date(2019, 8, 15), None, date(2019, 8, 16)]
df = pl.DataFrame({"a": data})
table = Table({"a": "date"})
table.update(df)
assert table.view().to_columns()["a"] == [
util.to_timestamp(datetime(2019, 8, 15)),
None,
util.to_timestamp(datetime(2019, 8, 16)),
]

def test_table_polars_from_schema_str(self):
data = ["a", None, "b", None, "c"]
df = pl.DataFrame({"a": data})
table = Table({"a": "string"})
table.update(df)
assert table.view().to_columns()["a"] == data

def test_table_polars_none(self):
data = [None, None, None]
df = pl.DataFrame({"a": data})
table = Table(df)
assert table.view().to_columns()["a"] == data

def test_table_polars_symmetric_table(self):
# make sure that updates are symmetric to table creation
df = pl.DataFrame({"a": [1, 2, 3, 4], "b": [1.5, 2.5, 3.5, 4.5]})
t1 = Table(df)
t2 = Table({"a": "integer", "b": "float"})
t2.update(df)
assert t1.view().to_columns() == {
"a": [1, 2, 3, 4],
"b": [1.5, 2.5, 3.5, 4.5],
}

def test_table_polars_symmetric_stacked_updates(self):
# make sure that updates are symmetric to table creation
df = pl.DataFrame({"a": [1, 2, 3, 4], "b": [1.5, 2.5, 3.5, 4.5]})

t1 = Table(df)
t1.update(df)

t2 = Table({"a": "integer", "b": "float"})
t2.update(df)
t2.update(df)

assert t1.view().to_columns() == {
"a": [1, 2, 3, 4, 1, 2, 3, 4],
"b": [1.5, 2.5, 3.5, 4.5, 1.5, 2.5, 3.5, 4.5],
}

@mark.skip(reason="Not supported, polars doesnt like input")
def test_table_polars_transitive(self):
# serialized output -> table -> serialized output
records = {
"a": [1, 2, 3, 4],
"b": [1.5, 2.5, 3.5, 4.5],
"c": [np.nan, np.nan, "abc", np.nan],
"d": [None, True, None, False],
"e": [
float("nan"),
datetime(2019, 7, 11, 12, 30),
float("nan"),
datetime(2019, 7, 11, 12, 30),
],
}

df = pl.DataFrame(records, strict=False)
t1 = Table(df)
out1 = arrow_bytes_to_polars(t1.view(columns=["a", "b", "c", "d", "e"]))
t2 = Table(out1)
assert t1.schema() == t2.schema()
out2 = t2.view().to_columns()
assert t1.view().to_columns() == out2

# dtype=object should have correct inferred types

@mark.skip(reason="Not supported, polars converts to fixed_size_binary")
def test_table_polars_object_to_int(self):
df = pl.DataFrame({"a": np.array([1, 2, None, 2, None, 3, 4], dtype=object)})
table = Table(df)
assert table.schema() == {"a": "integer"}
assert table.view().to_columns()["a"] == [1, 2, None, 2, None, 3, 4]

@mark.skip(reason="Not supported, polars converts to fixed_size_binary")
def test_table_polars_object_to_float(self):
df = pl.DataFrame({"a": np.array([None, 1, None, 2, None, 3, 4], dtype=object)})
table = Table(df)
assert table.schema() == {"a": "integer"}
assert table.view().to_columns()["a"] == [None, 1.0, None, 2.0, None, 3.0, 4.0]

@mark.skip(reason="Not supported, polars converts to fixed_size_binary")
def test_table_polars_object_to_bool(self):
df = pl.DataFrame(
{"a": np.array([True, False, True, False, True, False], dtype=object)}
)
table = Table(df)
assert table.schema() == {"a": "boolean"}
assert table.view().to_columns()["a"] == [True, False, True, False, True, False]


@mark.skip(reason="Not supported, polars converts to fixed_size_binary")
def test_table_polars_object_to_datetime(self):
df = pl.DataFrame(
{
"a": np.array(
[
datetime(2019, 7, 11, 1, 2, 3),
datetime(2019, 7, 12, 1, 2, 3),
None,
],
dtype=object,
)
}
)
table = Table(df)
assert table.schema() == {"a": "datetime"}
assert table.view().to_columns()["a"] == [
datetime(2019, 7, 11, 1, 2, 3),
datetime(2019, 7, 12, 1, 2, 3),
None,
]

def test_table_polars_object_to_str(self):
df = pl.DataFrame({"a": np.array(["abc", "def", None, "ghi"], dtype=object)})
table = Table(df)
assert table.schema() == {"a": "string"}
assert table.view().to_columns()["a"] == ["abc", "def", None, "ghi"]
5 changes: 5 additions & 0 deletions rust/perspective-python/src/client/client_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,11 @@ impl View {
self.0.to_dataframe(window).py_block_on(py)
}

#[pyo3(signature = (**window))]
pub fn to_polars(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyAny>> {
self.0.to_polars(window).py_block_on(py)
}

#[doc = crate::inherit_docs!("view/to_arrow.md")]
#[pyo3(signature = (**window))]
pub fn to_arrow(&self, py: Python<'_>, window: Option<Py<PyDict>>) -> PyResult<Py<PyBytes>> {
Expand Down
1 change: 1 addition & 0 deletions rust/perspective-python/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@

pub mod client_sync;
mod pandas;
mod polars;
mod pyarrow;
pub mod python;
125 changes: 125 additions & 0 deletions rust/perspective-python/src/client/polars.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
// ┃ ██████ ██████ ██████ █ █ █ █ █ █▄ ▀███ █ ┃
// ┃ ▄▄▄▄▄█ █▄▄▄▄▄ ▄▄▄▄▄█ ▀▀▀▀▀█▀▀▀▀▀ █ ▀▀▀▀▀█ ████████▌▐███ ███▄ ▀█ █ ▀▀▀▀▀ ┃
// ┃ █▀▀▀▀▀ █▀▀▀▀▀ █▀██▀▀ ▄▄▄▄▄ █ ▄▄▄▄▄█ ▄▄▄▄▄█ ████████▌▐███ █████▄ █ ▄▄▄▄▄ ┃
// ┃ █ ██████ █ ▀█▄ █ ██████ █ ███▌▐███ ███████▄ █ ┃
// ┣━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
// ┃ Copyright (c) 2017, the Perspective Authors. ┃
// ┃ ╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌ ┃
// ┃ This file is part of the Perspective library, distributed under the terms ┃
// ┃ of the [Apache License 2.0](https://www.apache.org/licenses/LICENSE-2.0). ┃
// ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyAny, PyBytes, PyList};

use super::pyarrow;

fn get_polars_df_cls(py: Python<'_>) -> PyResult<Option<Bound<'_, PyAny>>> {
let sys = PyModule::import_bound(py, "sys")?;
if sys.getattr("modules")?.contains("polars")? {
let polars = PyModule::import_bound(py, "polars")?;
Ok(Some(
polars.getattr("DataFrame")?.to_object(py).into_bound(py),
))
} else {
Ok(None)
}
}

fn get_polars_lf_cls(py: Python<'_>) -> PyResult<Option<Bound<'_, PyAny>>> {
let sys = PyModule::import_bound(py, "sys")?;
if sys.getattr("modules")?.contains("polars")? {
let polars = PyModule::import_bound(py, "polars")?;
Ok(Some(
polars.getattr("LazyFrame")?.to_object(py).into_bound(py),
))
} else {
Ok(None)
}
}

pub fn is_polars_df(py: Python, df: &Bound<'_, PyAny>) -> PyResult<bool> {
if let Some(df_class) = get_polars_df_cls(py)? {
df.is_instance(&df_class)
} else {
Ok(false)
}
}

pub fn is_polars_lf(py: Python, df: &Bound<'_, PyAny>) -> PyResult<bool> {
if let Some(df_class) = get_polars_lf_cls(py)? {
df.is_instance(&df_class)
} else {
Ok(false)
}
}

// ipc_bytes = self.to_arrow()
// table = pa.ipc.open_stream(ipc_bytes).read_all()
// x = pd.DataFrame(table.to_pandas())
// print("AAA", x)
// return x

pub fn arrow_to_polars(py: Python<'_>, arrow: &[u8]) -> PyResult<Py<PyAny>> {
let polars = PyModule::import_bound(py, "polars")?;
let bytes = PyBytes::new_bound(py, arrow);
Ok(polars
.getattr("read_ipc_stream")?
.call1((bytes,))?
.call0()?
.as_unbound()
.clone())
}

pub fn polars_to_arrow_bytes<'py>(
py: Python<'py>,
df: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, PyBytes>> {
let df_class = get_polars_df_cls(py)?
.ok_or_else(|| PyValueError::new_err("Failed to import polars.DataFrame"))?;
let lf_class = get_polars_lf_cls(py)?
.ok_or_else(|| PyValueError::new_err("Failed to import polars.LazyFrame"))?;

if !df.is_instance(&df_class)? && !df.is_instance(&lf_class)? {
return Err(PyValueError::new_err("Input is not a polars.DataFrame or polars.LazyFrame"));
}

let is_lazyframe = df.is_instance(&lf_class)?;

// let kwargs = PyDict::new_bound(py);
// kwargs.set_item("preserve_index", true)?;

let table = if is_lazyframe {
df.call_method0("collect")?.call_method0("to_arrow")?
} else {
df.call_method0("to_arrow")?
};

// rename from __index_level_0__ to index
let old_names: Vec<String> = table.getattr("column_names")?.extract()?;
let mut new_names: Vec<String> = old_names
.into_iter()
.map(|e| {
if e == "__index_level_0__" {
"index".to_string()
} else {
e
}
})
.collect();

let names = PyList::new_bound(py, new_names.clone());
let table = table.call_method1("rename_columns", (names,))?;

// move the index column to be the first column.
if new_names[new_names.len() - 1] == "index" {
new_names.rotate_right(1);
let order = PyList::new_bound(py, new_names);
let table = table.call_method1("select", (order,))?;
pyarrow::to_arrow_bytes(py, &table)
} else {
pyarrow::to_arrow_bytes(py, &table)
}
}
Loading

0 comments on commit bde5e18

Please sign in to comment.