Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove cached nodes when finished #15310

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions crates/polars-lazy/src/physical_plan/executors/cache.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
use std::sync::atomic::Ordering;

use super::*;

pub struct CacheExec {
pub input: Box<dyn Executor>,
pub id: usize,
pub count: usize,
pub count: u32,
}

impl Executor for CacheExec {
fn execute(&mut self, state: &mut ExecutionState) -> PolarsResult<DataFrame> {
// Skip cache and always re-execute
if self.count == 0 {
if state.verbose() {
println!("CACHE IGNORE: cache id: {:x}", self.id);
}
return self.input.execute(state);
}

let cache = state.get_df_cache(self.id);
let cache = state.get_df_cache(self.id, self.count);
let mut cache_hit = true;
let previous = cache.0.fetch_sub(1, Ordering::Relaxed);
debug_assert!(previous >= 0);

let df = cache.get_or_try_init(|| {
let df = cache.1.get_or_try_init(|| {
cache_hit = false;
self.input.execute(state)
})?;

// Decrement count on cache hits
if cache_hit {
self.count -= 1;
// Decrement count on cache hits.
if cache_hit && previous == 0 {
state.remove_df_cache(self.id);
}

if state.verbose() {
Expand Down
15 changes: 11 additions & 4 deletions crates/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::borrow::Cow;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU8, Ordering};
use std::sync::{Mutex, RwLock};

use bitflags::bitflags;
Expand Down Expand Up @@ -69,10 +69,12 @@ impl From<u8> for StateFlags {
}
}

type CachedValue = Arc<(AtomicI64, OnceCell<DataFrame>)>;

/// State/ cache that is maintained during the Execution of the physical plan.
pub struct ExecutionState {
// cached by a `.cache` call and kept in memory for the duration of the plan.
df_cache: Arc<Mutex<PlHashMap<usize, Arc<OnceCell<DataFrame>>>>>,
df_cache: Arc<Mutex<PlHashMap<usize, CachedValue>>>,
// cache file reads until all branches got there file, then we delete it
#[cfg(any(
feature = "ipc",
Expand Down Expand Up @@ -239,14 +241,19 @@ impl ExecutionState {
lock.clone()
}

pub(crate) fn get_df_cache(&self, key: usize) -> Arc<OnceCell<DataFrame>> {
pub(crate) fn get_df_cache(&self, key: usize, cache_hits: u32) -> CachedValue {
let mut guard = self.df_cache.lock().unwrap();
guard
.entry(key)
.or_insert_with(|| Arc::new(OnceCell::new()))
.or_insert_with(|| Arc::new((AtomicI64::new(cache_hits as i64), OnceCell::new())))
.clone()
}

pub(crate) fn remove_df_cache(&self, key: usize) {
let mut guard = self.df_cache.lock().unwrap();
let _ = guard.remove(&key).unwrap();
}

/// Clear the cache used by the Window expressions
pub(crate) fn clear_window_expr_cache(&self) {
{
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub static MAP_LIST_NAME: &str = "map_list";
pub static CSE_REPLACED: &str = "__POLARS_CSER_";
pub const LEN: &str = "len";
pub const LITERAL_NAME: &str = "literal";
pub const UNLIMITED_CACHE: u32 = u32::MAX;

// Cache the often used LITERAL and LEN constants
static LITERAL_NAME_INIT: OnceLock<Arc<str>> = OnceLock::new();
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::PathBuf;

use polars_core::prelude::*;

use crate::constants::UNLIMITED_CACHE;
use crate::prelude::*;

impl Expr {
Expand Down Expand Up @@ -162,7 +163,7 @@ impl LogicalPlan {
id: cache_id,
cache_hits,
} => {
let fmt = if *cache_hits == usize::MAX {
let fmt = if *cache_hits == UNLIMITED_CACHE {
Cow::Borrowed("CACHE")
} else {
Cow::Owned(format!("CACHE: {} times", *cache_hits))
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/alp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub enum ALogicalPlan {
// Unique ID.
id: usize,
/// How many hits the cache must be saved in memory.
cache_hits: usize,
cache_hits: u32,
},
Aggregate {
input: Node,
Expand Down
3 changes: 2 additions & 1 deletion crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use polars_io::{
};

use super::builder_functions::*;
use crate::constants::UNLIMITED_CACHE;
use crate::dsl::functions::horizontal::all_horizontal;
use crate::logical_plan::projection::{is_regex_projection, rewrite_projections};
#[cfg(feature = "python")]
Expand Down Expand Up @@ -428,7 +429,7 @@ impl LogicalPlanBuilder {
LogicalPlan::Cache {
input,
id,
cache_hits: usize::MAX,
cache_hits: UNLIMITED_CACHE,
}
.into()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ pub enum LogicalPlan {
Cache {
input: Box<LogicalPlan>,
id: usize,
cache_hits: usize,
cache_hits: u32,
},
Scan {
paths: Arc<[PathBuf]>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::constants::UNLIMITED_CACHE;

// ensure the file count counters are decremented with the cache counts
pub(crate) fn decrement_file_counters_by_cache_hits(
Expand All @@ -24,7 +25,7 @@ pub(crate) fn decrement_file_counters_by_cache_hits(
cache_hits, input, ..
} => {
// we use usize::MAX for an infinite cache.
let new_count = if *cache_hits != usize::MAX {
let new_count = if *cache_hits != UNLIMITED_CACHE {
acc_count + *cache_hits as FileCount
} else {
acc_count
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/logical_plan/optimizer/cse/cse_lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ mod identifier_impl {
use identifier_impl::*;

/// Identifier maps to Expr Node and count.
type SubPlanCount = PlHashMap<Identifier, (Node, usize)>;
type SubPlanCount = PlHashMap<Identifier, (Node, u32)>;
/// (post_visit_idx, identifier);
type IdentifierArray = Vec<(usize, Identifier)>;

Expand Down Expand Up @@ -193,7 +193,7 @@ impl Visitor for LpIdentifierVisitor<'_> {
}
}

pub(super) type CacheId2Caches = PlHashMap<usize, (usize, Vec<Node>)>;
pub(super) type CacheId2Caches = PlHashMap<usize, (u32, Vec<Node>)>;

struct CommonSubPlanRewriter<'a> {
sp_count: &'a SubPlanCount,
Expand Down Expand Up @@ -323,7 +323,7 @@ pub(crate) fn elim_cmn_subplans(
///
pub(crate) fn prune_unused_caches(lp_arena: &mut Arena<ALogicalPlan>, cid2c: CacheId2Caches) {
for (count, nodes) in cid2c.values() {
if *count == nodes.len() {
if *count == nodes.len() as u32 {
continue;
}

Expand Down
Loading