diff --git a/lib/explorer/backend/data_frame.ex b/lib/explorer/backend/data_frame.ex index c06e291e5..856faff71 100644 --- a/lib/explorer/backend/data_frame.ex +++ b/lib/explorer/backend/data_frame.ex @@ -89,7 +89,8 @@ defmodule Explorer.Backend.DataFrame do @callback from_parquet( entry :: fs_entry(), max_rows :: option(integer()), - columns :: columns_for_io() + columns :: columns_for_io(), + rechunk :: boolean() ) :: io_result(df) @callback to_parquet( df, diff --git a/lib/explorer/data_frame.ex b/lib/explorer/data_frame.ex index 6f5f0c995..2c112ee3e 100644 --- a/lib/explorer/data_frame.ex +++ b/lib/explorer/data_frame.ex @@ -701,6 +701,9 @@ defmodule Explorer.DataFrame do * `:columns` - A list of column names or indexes to keep. If present, only these columns are read into the dataframe. (default: `nil`) + * `:rechunk` - Make sure that all columns are contiguous in memory + by aggregating the chunks into a single array. (default: `false`) + * `:config` - An optional struct, keyword list or map, normally associated with remote file systems. See [IO section](#module-io-operations) for more details. (default: `nil`) @@ -718,7 +721,8 @@ defmodule Explorer.DataFrame do Keyword.validate!(opts, max_rows: nil, columns: nil, - config: nil + config: nil, + rechunk: false ) backend = backend_from_options!(backend_opts) @@ -727,7 +731,8 @@ defmodule Explorer.DataFrame do backend.from_parquet( entry, opts[:max_rows], - to_columns_for_io(opts[:columns]) + to_columns_for_io(opts[:columns]), + opts[:rechunk] ) end end diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index c9091dcf3..d9278f0e0 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -280,7 +280,7 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def from_parquet(%S3.Entry{} = entry, max_rows, columns) do + def from_parquet(%S3.Entry{} = entry, max_rows, columns, _rechunk) do # We first read using a lazy dataframe, then we collect. with {:ok, ldf} <- Native.lf_from_parquet_cloud(entry, max_rows, columns), {:ok, df} <- Native.lf_collect(ldf) do @@ -289,13 +289,13 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def from_parquet(%HTTP.Entry{} = entry, max_rows, columns) do + def from_parquet(%HTTP.Entry{} = entry, max_rows, columns, rechunk) do path = Shared.build_path_for_entry(entry) with :ok <- Explorer.FSS.download(entry, path) do entry = Local.from_path(path) - result = from_parquet(entry, max_rows, columns) + result = from_parquet(entry, max_rows, columns, rechunk) File.rm(path) result @@ -303,7 +303,7 @@ defmodule Explorer.PolarsBackend.DataFrame do end @impl true - def from_parquet(%Local.Entry{} = entry, max_rows, columns) do + def from_parquet(%Local.Entry{} = entry, max_rows, columns, rechunk) do {columns, with_projection} = column_names_or_projection(columns) df = @@ -311,7 +311,8 @@ defmodule Explorer.PolarsBackend.DataFrame do entry.path, max_rows, columns, - with_projection + with_projection, + rechunk ) case df do diff --git a/lib/explorer/polars_backend/lazy_frame.ex b/lib/explorer/polars_backend/lazy_frame.ex index a14d69391..186931272 100644 --- a/lib/explorer/polars_backend/lazy_frame.ex +++ b/lib/explorer/polars_backend/lazy_frame.ex @@ -184,7 +184,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do defp char_byte(<>), do: char @impl true - def from_parquet(%S3.Entry{} = entry, max_rows, columns) do + def from_parquet(%S3.Entry{} = entry, max_rows, columns, _rechunk) do case Native.lf_from_parquet_cloud(entry, max_rows, columns) do {:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)} {:error, error} -> {:error, RuntimeError.exception(error)} @@ -192,7 +192,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do end @impl true - def from_parquet(%Local.Entry{} = entry, max_rows, columns) do + def from_parquet(%Local.Entry{} = entry, max_rows, columns, _rechunk) do case Native.lf_from_parquet(entry.path, max_rows, columns) do {:ok, polars_ldf} -> {:ok, Shared.create_dataframe(polars_ldf)} {:error, error} -> {:error, RuntimeError.exception(error)} diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index ba9238e1d..a09920a70 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -108,7 +108,8 @@ defmodule Explorer.PolarsBackend.Native do _filename, _stop_after_n_rows, _columns, - _projection + _projection, + _rechunk ), do: err() diff --git a/native/explorer/src/dataframe/io.rs b/native/explorer/src/dataframe/io.rs index 83fe7f55f..eda3921d1 100644 --- a/native/explorer/src/dataframe/io.rs +++ b/native/explorer/src/dataframe/io.rs @@ -181,6 +181,7 @@ pub fn df_from_parquet( stop_after_n_rows: Option, column_names: Option>, projection: Option>, + rechunk: bool, ) -> Result { let file = File::open(filename)?; let buf_reader = BufReader::new(file); @@ -188,7 +189,8 @@ pub fn df_from_parquet( let reader = ParquetReader::new(buf_reader) .with_n_rows(stop_after_n_rows) .with_columns(column_names) - .with_projection(projection); + .with_projection(projection) + .set_rechunk(rechunk); Ok(ExDataFrame::new(reader.finish()?)) }