-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(rust, python): read ipc file from network #14861
feat(rust, python): read ipc file from network #14861
Conversation
66301f9
to
47bc620
Compare
ed741ae
to
63813bc
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #14861 +/- ##
==========================================
+ Coverage 80.99% 81.00% +0.01%
==========================================
Files 1333 1334 +1
Lines 173175 173267 +92
Branches 2458 2458
==========================================
+ Hits 140260 140354 +94
+ Misses 32448 32446 -2
Partials 467 467 ☔ View full report in Codecov by Sentry. |
d4e4b18
to
65a446c
Compare
@ritchie46 is it correct that we read the schema during the construction of the logical plan, and then only later during execution of the physical plan read the entire file? If that is the case, I think I should provide a function that reads just the schema and one that reads the necessary data. These functions will take different options. For example, reading the schema wouldn't require projection names. Reading the data can take the projection indices computed from the schema + indices. There are probably other differences. Some options are probably be shared (some of the cloud-related options?). |
65a446c
to
36f18a3
Compare
36f18a3
to
4174ca1
Compare
|
||
#[cfg(feature = "cloud")] | ||
{ | ||
// TODO: This will block the current thread until the data has been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fine as the default engine cannot work on batches anyway. What our goal here will be is downloading parts of the file (e.g. Recordbatches/ Chunks
) concurrently.
fn read_ipc<R: MmapBytesReader>(reader: R, options: IpcReadOptions) -> PolarsResult<DataFrame> { | ||
let mut reader = IpcReader::new(reader); | ||
|
||
if let Some(columns) = options.columns { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should only accept column indices. This is not user-facing code, so the extra option only complicates it here.
} | ||
|
||
async fn read_bytes<O: ObjectStore>(store: &O, path: &ObjectPath) -> PolarsResult<Bytes> { | ||
// TODO: Is `to_compute_err` appropriate? It is used in the Parquet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compute error is fine for now.
async fn read_bytes<O: ObjectStore>(store: &O, path: &ObjectPath) -> PolarsResult<Bytes> { | ||
// TODO: Is `to_compute_err` appropriate? It is used in the Parquet | ||
// reader as well but I am not sure it is what we want. | ||
let get_result = store.get(path).await.map_err(to_compute_err)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we should split up the work. We should be able download concurrently and the first chunks should be downloaded first. We should have a batched
interface that can already get DataFrame
s of the chunks that are already downloaded.
4174ca1
to
c1a1a37
Compare
Closing this in favor of #14984 |
Draft to discuss ongoing work from #14839