-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support cloud storage in scan_csv
#16674
Conversation
CodSpeed Performance ReportMerging #16674 will not alter performanceComparing Summary
|
7491915
to
aa6f347
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #16674 +/- ##
==========================================
- Coverage 81.45% 81.36% -0.10%
==========================================
Files 1413 1421 +8
Lines 186096 187205 +1109
Branches 2776 2777 +1
==========================================
+ Hits 151588 152321 +733
- Misses 33988 34363 +375
- Partials 520 521 +1 ☔ View full report in Codecov by Sentry. |
e033eda
to
b8b2b11
Compare
scan_csv
crates/polars-io/Cargo.toml
Outdated
@@ -20,6 +20,7 @@ ahash = { workspace = true } | |||
arrow = { workspace = true } | |||
async-trait = { version = "0.1.59", optional = true } | |||
atoi_simd = { workspace = true, optional = true } | |||
blake3 = "1.5.1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this an optional dependency? Only activated when we activate the caching?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put them behind a new file_cache
feature flag (although maybe we could also just use the cloud
feature flag if you want, let me know)
*edit: nvm, I think I prefer the new feature flag because it also specifies "dep:blake3", "dep:fs4"
pub(super) type GlobalFileCacheGuardExclusive<'a> = RwLockWriteGuard<'a, GlobalLockData>; | ||
|
||
#[derive(Clone)] | ||
struct AccessTracker(Arc<AtomicBool>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment explaining what this boolean means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a very good point - I've left a comment summarizing its purpose, but how it achieves its purpose would probably take a few paragraphs 😂
&& verbose | ||
{ | ||
eprintln!("unlocked global file cache lockfile"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before sleeping, I think we can also set a notify: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.htm;
As I understand it this will not schedule this task until we notify that we have set the locked file first. If that works I think we can set the sleep
shorter to 200 ms or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip - I've added the notify layer for the background task to await on, which should make it more efficient.
For the delay I'd prefer to avoid making it too little - the background task gets notified the moment we acquire a lock, so it runs during the entire duration for which we hold the lock (i.e. while we are downloading files).
self.uri_hash.as_bytes(), | ||
remote_metadata.last_modified, | ||
); | ||
let _ = std::fs::remove_file(data_file_path); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block until a new file is downloaded right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, not exactly..- this was for in case there was a partial download from an aborted process. I've also left a comment above it.
impl EvictionManager { | ||
pub(super) fn run_in_background(mut self) { | ||
let verbose = config::verbose(); | ||
let sleep_interval = std::cmp::max(self.limit_since_last_access.as_secs() / 4, 7); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set this higher to 60 seconds or so? Doesn't have to run often I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to 60 secs
Cool stuff @nameexhaustion. I have left some comments/ questions. But really fun to review this. |
Closes #13115. |
Can you explain |
Thanks for the triage!
A query will always check to ensure it has the latest file at the start of execution (i.e. |
This adds support for cloud storage in
scan_csv
, using a local cache to avoid duplicates downloads. Cached files are removed if they are not accessed within an hour (configurable withPOLARS_FILE_CACHE_TTL
).