From 2145ca8b20b4bcefad66bb779f8f316da4245273 Mon Sep 17 00:00:00 2001 From: mkucijan Date: Sun, 21 Jan 2024 19:03:09 +0100 Subject: [PATCH] perf(python): allow python threads in read_ functions (#13886) --- py-polars/src/dataframe.rs | 267 ++++++++++++++++++++----------------- 1 file changed, 147 insertions(+), 120 deletions(-) diff --git a/py-polars/src/dataframe.rs b/py-polars/src/dataframe.rs index 39f0aef1f9d3..6f55c4233e75 100644 --- a/py-polars/src/dataframe.rs +++ b/py-polars/src/dataframe.rs @@ -180,6 +180,7 @@ impl PyDataFrame { row_index, sample_size, eol_char, raise_if_empty, truncate_ragged_lines, schema) )] pub fn read_csv( + py: Python, py_f: &PyAny, infer_schema_length: Option, chunk_size: usize, @@ -233,37 +234,39 @@ impl PyDataFrame { }); let mmap_bytes_r = get_mmap_bytes_reader(py_f)?; - let df = CsvReader::new(mmap_bytes_r) - .infer_schema(infer_schema_length) - .has_header(has_header) - .with_n_rows(n_rows) - .with_separator(separator.as_bytes()[0]) - .with_skip_rows(skip_rows) - .with_ignore_errors(ignore_errors) - .with_projection(projection) - .with_rechunk(rechunk) - .with_chunk_size(chunk_size) - .with_encoding(encoding.0) - .with_columns(columns) - .with_n_threads(n_threads) - .with_path(path) - .with_dtypes(overwrite_dtype.map(Arc::new)) - .with_dtypes_slice(overwrite_dtype_slice.as_deref()) - .with_schema(schema.map(|schema| Arc::new(schema.0))) - .low_memory(low_memory) - .with_null_values(null_values) - .with_missing_is_null(!missing_utf8_is_empty_string) - .with_comment_prefix(comment_prefix) - .with_try_parse_dates(try_parse_dates) - .with_quote_char(quote_char) - .with_end_of_line_char(eol_char) - .with_skip_rows_after_header(skip_rows_after_header) - .with_row_index(row_index) - .sample_size(sample_size) - .raise_if_empty(raise_if_empty) - .truncate_ragged_lines(truncate_ragged_lines) - .finish() - .map_err(PyPolarsErr::from)?; + let df = py.allow_threads(move || { + CsvReader::new(mmap_bytes_r) + .infer_schema(infer_schema_length) + .has_header(has_header) + .with_n_rows(n_rows) + .with_separator(separator.as_bytes()[0]) + .with_skip_rows(skip_rows) + .with_ignore_errors(ignore_errors) + .with_projection(projection) + .with_rechunk(rechunk) + .with_chunk_size(chunk_size) + .with_encoding(encoding.0) + .with_columns(columns) + .with_n_threads(n_threads) + .with_path(path) + .with_dtypes(overwrite_dtype.map(Arc::new)) + .with_dtypes_slice(overwrite_dtype_slice.as_deref()) + .with_schema(schema.map(|schema| Arc::new(schema.0))) + .low_memory(low_memory) + .with_null_values(null_values) + .with_missing_is_null(!missing_utf8_is_empty_string) + .with_comment_prefix(comment_prefix) + .with_try_parse_dates(try_parse_dates) + .with_quote_char(quote_char) + .with_end_of_line_char(eol_char) + .with_skip_rows_after_header(skip_rows_after_header) + .with_row_index(row_index) + .sample_size(sample_size) + .raise_if_empty(raise_if_empty) + .truncate_ragged_lines(truncate_ragged_lines) + .finish() + .map_err(PyPolarsErr::from) + })?; Ok(df.into()) } @@ -271,6 +274,7 @@ impl PyDataFrame { #[cfg(feature = "parquet")] #[pyo3(signature = (py_f, columns, projection, n_rows, parallel, row_index, low_memory, use_statistics, rechunk))] pub fn read_parquet( + py: Python, py_f: PyObject, columns: Option>, projection: Option>, @@ -287,26 +291,30 @@ impl PyDataFrame { let result = match get_either_file(py_f, false)? { Py(f) => { let buf = f.as_buffer(); - ParquetReader::new(buf) + py.allow_threads(move || { + ParquetReader::new(buf) + .with_projection(projection) + .with_columns(columns) + .read_parallel(parallel.0) + .with_n_rows(n_rows) + .with_row_index(row_index) + .set_low_memory(low_memory) + .use_statistics(use_statistics) + .set_rechunk(rechunk) + .finish() + }) + }, + Rust(f) => py.allow_threads(move || { + ParquetReader::new(f.into_inner()) .with_projection(projection) .with_columns(columns) .read_parallel(parallel.0) .with_n_rows(n_rows) .with_row_index(row_index) - .set_low_memory(low_memory) .use_statistics(use_statistics) .set_rechunk(rechunk) .finish() - }, - Rust(f) => ParquetReader::new(f.into_inner()) - .with_projection(projection) - .with_columns(columns) - .read_parallel(parallel.0) - .with_n_rows(n_rows) - .with_row_index(row_index) - .use_statistics(use_statistics) - .set_rechunk(rechunk) - .finish(), + }), }; let df = result.map_err(PyPolarsErr::from)?; Ok(PyDataFrame::new(df)) @@ -316,6 +324,7 @@ impl PyDataFrame { #[cfg(feature = "ipc")] #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, memory_map))] pub fn read_ipc( + py: Python, py_f: &PyAny, columns: Option>, projection: Option>, @@ -325,14 +334,16 @@ impl PyDataFrame { ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); let mmap_bytes_r = get_mmap_bytes_reader(py_f)?; - let df = IpcReader::new(mmap_bytes_r) - .with_projection(projection) - .with_columns(columns) - .with_n_rows(n_rows) - .with_row_index(row_index) - .memory_mapped(memory_map) - .finish() - .map_err(PyPolarsErr::from)?; + let df = py.allow_threads(move || { + IpcReader::new(mmap_bytes_r) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_index) + .memory_mapped(memory_map) + .finish() + .map_err(PyPolarsErr::from) + })?; Ok(PyDataFrame::new(df)) } @@ -340,6 +351,7 @@ impl PyDataFrame { #[cfg(feature = "ipc_streaming")] #[pyo3(signature = (py_f, columns, projection, n_rows, row_index, rechunk))] pub fn read_ipc_stream( + py: Python, py_f: &PyAny, columns: Option>, projection: Option>, @@ -349,14 +361,16 @@ impl PyDataFrame { ) -> PyResult { let row_index = row_index.map(|(name, offset)| RowIndex { name, offset }); let mmap_bytes_r = get_mmap_bytes_reader(py_f)?; - let df = IpcStreamReader::new(mmap_bytes_r) - .with_projection(projection) - .with_columns(columns) - .with_n_rows(n_rows) - .with_row_index(row_index) - .set_rechunk(rechunk) - .finish() - .map_err(PyPolarsErr::from)?; + let df = py.allow_threads(move || { + IpcStreamReader::new(mmap_bytes_r) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .with_row_index(row_index) + .set_rechunk(rechunk) + .finish() + .map_err(PyPolarsErr::from) + })?; Ok(PyDataFrame::new(df)) } @@ -364,6 +378,7 @@ impl PyDataFrame { #[cfg(feature = "avro")] #[pyo3(signature = (py_f, columns, projection, n_rows))] pub fn read_avro( + py: Python, py_f: PyObject, columns: Option>, projection: Option>, @@ -372,12 +387,14 @@ impl PyDataFrame { use polars::io::avro::AvroReader; let file = get_file_like(py_f, false)?; - let df = AvroReader::new(file) - .with_projection(projection) - .with_columns(columns) - .with_n_rows(n_rows) - .finish() - .map_err(PyPolarsErr::from)?; + let df = py.allow_threads(move || { + AvroReader::new(file) + .with_projection(projection) + .with_columns(columns) + .with_n_rows(n_rows) + .finish() + .map_err(PyPolarsErr::from) + })?; Ok(PyDataFrame::new(df)) } @@ -414,6 +431,7 @@ impl PyDataFrame { #[staticmethod] #[cfg(feature = "json")] pub fn read_json( + py: Python, py_f: &PyAny, infer_schema_length: Option, schema: Option>, @@ -421,43 +439,46 @@ impl PyDataFrame { ) -> PyResult { // memmap the file first. let mmap_bytes_r = get_mmap_bytes_reader(py_f)?; - let mmap_read: ReaderBytes = (&mmap_bytes_r).into(); - let bytes = mmap_read.deref(); - - // Happy path is our column oriented json as that is most performant, - // on failure we try the arrow json reader instead, which is row-oriented. - match serde_json::from_slice::(bytes) { - Ok(df) => Ok(df.into()), - Err(e) => { - let msg = format!("{e}"); - if msg.contains("successful parse invalid data") { - let e = PyPolarsErr::from(PolarsError::ComputeError(msg.into())); - Err(PyErr::from(e)) - } else { - let mut builder = JsonReader::new(mmap_bytes_r) - .with_json_format(JsonFormat::Json) - .infer_schema_len(infer_schema_length); - if let Some(schema) = schema { - builder = builder.with_schema(Arc::new(schema.0)); + py.allow_threads(move || { + let mmap_read: ReaderBytes = (&mmap_bytes_r).into(); + let bytes = mmap_read.deref(); + // Happy path is our column oriented json as that is most performant, + // on failure we try the arrow json reader instead, which is row-oriented. + match serde_json::from_slice::(bytes) { + Ok(df) => Ok(df.into()), + Err(e) => { + let msg = format!("{e}"); + if msg.contains("successful parse invalid data") { + let e = PyPolarsErr::from(PolarsError::ComputeError(msg.into())); + Err(PyErr::from(e)) + } else { + let mut builder = JsonReader::new(mmap_bytes_r) + .with_json_format(JsonFormat::Json) + .infer_schema_len(infer_schema_length); + + if let Some(schema) = schema { + builder = builder.with_schema(Arc::new(schema.0)); + } + + if let Some(schema) = schema_overrides.as_ref() { + builder = builder.with_schema_overwrite(&schema.0); + } + + let out = builder + .finish() + .map_err(|e| PyPolarsErr::Other(format!("{e}")))?; + Ok(out.into()) } - - if let Some(schema) = schema_overrides.as_ref() { - builder = builder.with_schema_overwrite(&schema.0); - } - - let out = builder - .finish() - .map_err(|e| PyPolarsErr::Other(format!("{e}")))?; - Ok(out.into()) - } - }, - } + }, + } + }) } #[staticmethod] #[cfg(feature = "json")] pub fn read_ndjson( + py: Python, py_f: &PyAny, ignore_errors: bool, schema: Option>, @@ -477,8 +498,8 @@ impl PyDataFrame { builder = builder.with_schema_overwrite(&schema.0); } - let out = builder - .finish() + let out = py + .allow_threads(move || builder.finish()) .map_err(|e| PyPolarsErr::Other(format!("{e}")))?; Ok(out.into()) } @@ -522,17 +543,21 @@ impl PyDataFrame { // somehow from_rows did not work #[staticmethod] pub fn read_rows( + py: Python, rows: Vec>, infer_schema_length: Option, schema: Option>, ) -> PyResult { // SAFETY: Wrap is transparent. let rows = unsafe { std::mem::transmute::>, Vec>(rows) }; - Self::finish_from_rows(rows, infer_schema_length, schema.map(|wrap| wrap.0), None) + py.allow_threads(move || { + Self::finish_from_rows(rows, infer_schema_length, schema.map(|wrap| wrap.0), None) + }) } #[staticmethod] pub fn read_dicts( + py: Python, dicts: &PyAny, infer_schema_length: Option, schema: Option>, @@ -544,32 +569,34 @@ impl PyDataFrame { schema_columns.extend(s.0.iter_names().map(|n| n.to_string())) } let (rows, names) = dicts_to_rows(dicts, infer_schema_length, schema_columns)?; - let mut schema_overrides_by_idx: Vec<(usize, DataType)> = Vec::new(); - if let Some(overrides) = schema_overrides { - for (idx, name) in names.iter().enumerate() { - if let Some(dtype) = overrides.0.get(name) { - schema_overrides_by_idx.push((idx, dtype.clone())); + py.allow_threads(move || { + let mut schema_overrides_by_idx: Vec<(usize, DataType)> = Vec::new(); + if let Some(overrides) = schema_overrides { + for (idx, name) in names.iter().enumerate() { + if let Some(dtype) = overrides.0.get(name) { + schema_overrides_by_idx.push((idx, dtype.clone())); + } } } - } - let mut pydf = Self::finish_from_rows( - rows, - infer_schema_length, - schema.map(|wrap| wrap.0), - Some(schema_overrides_by_idx), - )?; - unsafe { - for (s, name) in pydf.df.get_columns_mut().iter_mut().zip(&names) { - s.rename(name); + let mut pydf = Self::finish_from_rows( + rows, + infer_schema_length, + schema.map(|wrap| wrap.0), + Some(schema_overrides_by_idx), + )?; + unsafe { + for (s, name) in pydf.df.get_columns_mut().iter_mut().zip(&names) { + s.rename(name); + } + } + let length = names.len(); + if names.into_iter().collect::>().len() != length { + let err = PolarsError::Duplicate("duplicate column names found".into()); + Err(PyPolarsErr::Polars(err))?; } - } - let length = names.len(); - if names.into_iter().collect::>().len() != length { - let err = PolarsError::Duplicate("duplicate column names found".into()); - Err(PyPolarsErr::Polars(err))?; - } - Ok(pydf) + Ok(pydf) + }) } #[staticmethod]