Skip to content

Commit

Permalink
Fix ipc stream reader column order
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvangelderen committed Mar 6, 2024
1 parent 698a276 commit f6d7bac
Showing 1 changed file with 2 additions and 37 deletions.
39 changes: 2 additions & 37 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,14 @@ where
self.projection = Some(prj);
}

let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let include_row_index = self.row_index.is_some();
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection);
read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
finish_reader(
ipc_reader,
rechunk,
Expand All @@ -188,35 +182,6 @@ where
&schema,
self.row_index,
)
.map(|df| fix_column_order(df, self.projection, include_row_index))
}
}

fn fix_column_order(
df: DataFrame,
projection: Option<Vec<usize>>,
include_row_index: bool,
) -> DataFrame {
if let Some(proj) = projection {
let offset = usize::from(include_row_index);
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if include_row_index {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

unsafe { DataFrame::new_no_checks(cols) }
} else {
df
}
}

Expand Down

0 comments on commit f6d7bac

Please sign in to comment.