From 1b44df61b23980d17a2f278309f8ce9e0197b6ff Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 20 Mar 2024 17:52:11 +0800 Subject: [PATCH 1/3] fix: Arrange get range with batch unaligned --- src/flow/src/utils.rs | 273 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 255 insertions(+), 18 deletions(-) diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 41114e1f0cb2..8192553e1737 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::{BTreeMap, BTreeSet}; +use std::ops::Bound; use std::sync::Arc; use itertools::Itertools; @@ -24,11 +25,12 @@ use crate::expr::error::InternalSnafu; use crate::expr::{EvalError, ScalarExpr}; use crate::repr::{value_to_internal_ts, Diff, DiffRow, Duration, KeyValDiffRow, Row, Timestamp}; +pub type Batch = BTreeMap>; +pub type Spine = BTreeMap; + /// Determine when should a key expire according to it's event timestamp in key, /// if a key is expired, any future updates to it should be ignored /// Note that key is expired by it's event timestamp(contained in the key), not by the time it's inserted(system timestamp) -/// -/// TODO(discord9): find a better way to handle key expiration, like write to disk or something instead of throw away #[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Deserialize, Serialize)] pub struct KeyExpiryManager { /// a map from event timestamp to key, used for expire keys @@ -121,13 +123,16 @@ pub struct Arrangement { /// And for consolidated batch(i.e. btach representing now), there should be only one update for each key with `diff==1` /// /// And since most time a key gots updated by first delete then insert, small vec with size of 2 make sense - spine: BTreeMap>>, + /// TODO: batch size balancing? + spine: Spine, /// if set to false, will not update current value of the arrangement, useful for case like `map -> arrange -> reduce` full_arrangement: bool, /// flag to mark that this arrangement haven't been written to, so that it can be cloned and shared is_written: bool, /// manage the expire state of the arrangement expire_state: Option, + /// the time that the last compaction happened, also know as current time + last_compaction_time: Option, } impl Arrangement { @@ -137,6 +142,7 @@ impl Arrangement { full_arrangement: false, is_written: false, expire_state: None, + last_compaction_time: None, } } @@ -160,6 +166,7 @@ impl Arrangement { continue; } } + // the first batch with key that's greater or equal to ts let batch = if let Some((_, batch)) = self.spine.range_mut(ts..).next() { batch @@ -176,15 +183,85 @@ impl Arrangement { Ok(max_late_by) } + /// find out the time of next update in the future + pub fn get_next_update_time(&self) -> Option { + let next_batch = { + let mut iter = self.spine.iter(); + if self.full_arrangement { + iter.next(); + iter + } else { + iter + } + }; + for (_ts, batch) in next_batch { + let min_ts = batch + .iter() + .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts)) + .min(); + if let Some(min_ts) = min_ts { + return Some(min_ts); + } else { + continue; + } + } + // all batches are empty, return now + None + } + + /// get the last compaction time + pub fn get_compaction(&self) -> Option { + self.last_compaction_time + } + + /// split spine off at `now`, and return the spine that's before `now`(including `now`) + fn split_lte(&mut self, now: &Timestamp) -> Spine { + let mut before = self.spine.split_off(&(now + 1)); + std::mem::swap(&mut before, &mut self.spine); + + // if before's last key == now, then all the keys we needed are found + if before + .last_key_value() + .map(|(k, _v)| *k == *now) + .unwrap_or(false) + { + return before; + } + + // also need to move all keys from the first batch in spine with timestamp<=now to before + // we know that all remaining keys to be split off are last key < key <= now, we will make them into a new batch + if let Some(mut first_batch) = self.spine.first_entry() { + let mut new_batch: Batch = Default::default(); + // remove all keys with val of empty vec + first_batch.get_mut().retain(|key, updates| { + // remove keys <= now from updates + updates.retain(|(val, ts, diff)| { + if *ts <= *now { + new_batch.entry(key.clone()).or_insert(smallvec![]).push(( + val.clone(), + *ts, + *diff, + )); + } + *ts > *now + }); + !updates.is_empty() + }); + + before.insert(*now, new_batch); + } + before + } + /// advance time to `now` and consolidate all older(`now` included) updates to the first key /// /// return the maximum expire time(already expire by how much time) of all updates if any keys is already expired pub fn set_compaction(&mut self, now: Timestamp) -> Result, EvalError> { let mut max_late_by: Option = None; - let mut should_compact = self.spine.split_off(&(now + 1)); - std::mem::swap(&mut should_compact, &mut self.spine); + let should_compact = self.split_lte(&now); + self.last_compaction_time = Some(now); // if a full arrangement is not needed, we can just discard everything before and including now if !self.full_arrangement { return Ok(None); @@ -221,18 +298,62 @@ impl Arrangement { } /// get the updates of the arrangement from the given range of time - pub fn get_updates_in_range>( + pub fn get_updates_in_range + Clone>( &self, range: R, ) -> Vec { let mut result = vec![]; - for (_ts, batch) in self.spine.range(range) { - for (key, updates) in batch.clone() { - for (val, ts, diff) in updates { - result.push(((key.clone(), val), ts, diff)); + // three part: + // 1.the starting batch with first key >= range.start, which may contain updates that not in range + // 2. the batches with key in range + // 3. the last batch with first key > range.end, which may contain updates that are in range + let mut is_first = true; + for (_ts, batch) in self.spine.range(range.clone()) { + if is_first { + for (key, updates) in batch { + let iter = updates + .iter() + .filter(|(_val, ts, _diff)| range.contains(ts)) + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); + result.extend(iter); + } + is_first = false; + } else { + for (key, updates) in batch.clone() { + result.extend( + updates + .iter() + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)), + ); } } } + + // deal with boundary include start and end + // and for the next batch with upper_bound >= range.end + // we need to search for updates within range + let neg_bound = match range.end_bound() { + Bound::Included(b) => { + // if boundary is aligned, the last batch in range actually cover the full range + // then there will be no further keys we need in the next batch + if self.spine.contains_key(b) { + return result; + } + Bound::Excluded(*b) + } + Bound::Excluded(b) => Bound::Included(*b), + Bound::Unbounded => return result, + }; + let search_range = (neg_bound, Bound::Unbounded); + if let Some(last_batch) = self.spine.range(search_range).next() { + for (key, updates) in last_batch.1 { + let iter = updates + .iter() + .filter(|(_val, ts, _diff)| range.contains(ts)) + .map(|(val, ts, diff)| ((key.clone(), val.clone()), *ts, *diff)); + result.extend(iter); + } + }; result } @@ -260,11 +381,12 @@ impl Arrangement { /// get current state of things /// useful for query existing keys(i.e. reduce and join operator need to query existing state) pub fn get(&self, now: Timestamp, key: &Row) -> Option<(Row, Timestamp, Diff)> { - if self - .spine - .first_key_value() - .map(|(ts, _)| *ts >= now) - .unwrap_or(false) + if self.full_arrangement + && self + .spine + .first_key_value() + .map(|(ts, _)| *ts >= now) + .unwrap_or(false) { self.spine .first_key_value() @@ -272,10 +394,41 @@ impl Arrangement { } else { // check keys <= now to know current value let mut final_val = None; - for (_ts, batch) in self.spine.range(..=now) { + + let with_extra_batch = { + let unaligned = self.spine.range(..=now); + if unaligned + .clone() + .last() + .map(|(ts, _)| *ts == now) + .unwrap_or(false) + { + // this extra chain is there just to make type the same + unaligned.chain(None) + } else { + // if the last key is not equal to now, then we need to include the next batch + // because we know last batch key < now < next batch key + // therefore next batch may contain updates that we want + unaligned.chain( + self.spine + .range((Bound::Excluded(now), Bound::Unbounded)) + .next(), + ) + } + }; + for (ts, batch) in with_extra_batch { if let Some(new_rows) = batch.get(key).map(|v| v.iter()) { - for new_row in new_rows { - final_val = compact_diff_row(final_val, new_row); + if *ts <= now { + for new_row in new_rows.sorted_by_key(|r| r.1) { + final_val = compact_diff_row(final_val, new_row); + } + } else { + for new_row in new_rows + .filter(|new_row| new_row.1 <= now) + .sorted_by_key(|r| r.1) + { + final_val = compact_diff_row(final_val, new_row); + } } } } @@ -530,4 +683,88 @@ mod test { assert_eq!(arr.get(12, &Row::new(vec![1i64.into()])), None); } } + + /// test if split_lte get ranges that are not aligned with batch boundaries + /// this split_lte can correctly retrieve all updates in the range, including updates that are in the batches + /// near the boundary of input range + #[test] + fn test_split_off() { + let mut arr = Arrangement::new(); + // manually create batch ..=1 and 2..=3 + arr.spine.insert(1, Default::default()); + arr.spine.insert(3, Default::default()); + arr.apply_updates( + 2, + vec![((Row::new(vec![1.into()]), Row::new(vec![2.into()])), 2, 1)], + ) + .unwrap(); + // updates falls into the range of 2..=3 + let mut arr1 = arr.clone(); + { + assert_eq!(arr.get_next_update_time(), Some(2)); + // split expect to take batch ..=1 and create a new batch 2..=2(which contain update) + let split = &arr.split_lte(&2); + assert_eq!(split.len(), 2); + assert_eq!(split[&2].len(), 1); + let _ = &arr.split_lte(&3); + assert_eq!(arr.get_next_update_time(), None); + } + { + // take all updates with timestamp <=1, will get no updates + let split = &arr1.split_lte(&1); + assert_eq!(split.len(), 1); + } + } + + /// test if get ranges is not aligned with boundary of batch, + /// whether can get correct result + #[test] + fn test_get_by_range() { + let mut arr = Arrangement::new(); + + // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch + // TODO(discord9): manually set batch + let updates: Vec = vec![ + ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 1, 1), + ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 6, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + ]; + arr.apply_updates(0, updates).unwrap(); + assert_eq!( + arr.get_updates_in_range(2..=5), + vec![ + ((Row::new(vec![1i64.into()]), Row::empty()), 2, 1), + ((Row::new(vec![2i64.into()]), Row::empty()), 4, 1), + ((Row::new(vec![3i64.into()]), Row::empty()), 3, 1), + ((Row::new(vec![1i64.into()]), Row::empty()), 5, 1), + ] + ); + } + + /// test if get with range unaligned with batch boundary + /// can get correct result + #[test] + fn test_get_unaligned() { + let mut arr = Arrangement::new(); + + // will form {2: [2, 1], 4: [4,3], 6: [6,5]} three batch + // TODO(discord9): manually set batch + let key = Row::new(vec![1i64.into()]); + let updates: Vec = vec![ + ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), + ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), + ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), + ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), + ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + ]; + arr.apply_updates(0, updates).unwrap(); + // aligned with batch boundary + assert_eq!(arr.get(2, &key), Some((Row::new(vec![1i64.into()]), 2, 1))); + // unaligned with batch boundary + assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1))); + } } From 42828f55da6d6f7d87ba3e87eccec09827e28821 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 20 Mar 2024 20:07:21 +0800 Subject: [PATCH 2/3] chore: per review --- src/flow/src/utils.rs | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 8192553e1737..9a88667af760 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -184,17 +184,11 @@ impl Arrangement { } /// find out the time of next update in the future - pub fn get_next_update_time(&self) -> Option { - let next_batch = { - let mut iter = self.spine.iter(); - if self.full_arrangement { - iter.next(); - iter - } else { - iter - } - }; - for (_ts, batch) in next_batch { + /// that is the next update with `timestamp > now` + pub fn get_next_update_time(&self, now: &Timestamp) -> Option { + // iter over batches that only have updates of `timestamp>now` and find the first non empty batch, then get the minimum timestamp in that batch + let next_batches = self.spine.range((Bound::Excluded(now), Bound::Unbounded)); + for (_ts, batch) in next_batches { let min_ts = batch .iter() .flat_map(|(_k, v)| v.iter().map(|(_, ts, _)| *ts)) @@ -248,7 +242,7 @@ impl Arrangement { !updates.is_empty() }); - before.insert(*now, new_batch); + before.entry(*now).or_default().extend(new_batch); } before } @@ -701,13 +695,13 @@ mod test { // updates falls into the range of 2..=3 let mut arr1 = arr.clone(); { - assert_eq!(arr.get_next_update_time(), Some(2)); + assert_eq!(arr.get_next_update_time(&1), Some(2)); // split expect to take batch ..=1 and create a new batch 2..=2(which contain update) let split = &arr.split_lte(&2); assert_eq!(split.len(), 2); assert_eq!(split[&2].len(), 1); let _ = &arr.split_lte(&3); - assert_eq!(arr.get_next_update_time(), None); + assert_eq!(arr.get_next_update_time(&1), None); } { // take all updates with timestamp <=1, will get no updates From d47f4e49915e7863855f8ad0a36a24e9c6bb3050 Mon Sep 17 00:00:00 2001 From: Discord9 Date: Wed, 20 Mar 2024 20:37:20 +0800 Subject: [PATCH 3/3] refactor: sort at apply_updates --- src/flow/src/utils.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/src/flow/src/utils.rs b/src/flow/src/utils.rs index 9a88667af760..01c6539d2dd2 100644 --- a/src/flow/src/utils.rs +++ b/src/flow/src/utils.rs @@ -178,6 +178,9 @@ impl Arrangement { { let key_updates = batch.entry(key).or_insert(smallvec![]); key_updates.push((val, ts, diff)); + // a stable sort make updates sort in order of insertion + // without changing the order of updates within same tick + key_updates.sort_by_key(|r| r.1); } } Ok(max_late_by) @@ -413,14 +416,11 @@ impl Arrangement { for (ts, batch) in with_extra_batch { if let Some(new_rows) = batch.get(key).map(|v| v.iter()) { if *ts <= now { - for new_row in new_rows.sorted_by_key(|r| r.1) { + for new_row in new_rows { final_val = compact_diff_row(final_val, new_row); } } else { - for new_row in new_rows - .filter(|new_row| new_row.1 <= now) - .sorted_by_key(|r| r.1) - { + for new_row in new_rows.filter(|new_row| new_row.1 <= now) { final_val = compact_diff_row(final_val, new_row); } } @@ -761,4 +761,24 @@ mod test { // unaligned with batch boundary assert_eq!(arr.get(3, &key), Some((Row::new(vec![4i64.into()]), 3, 1))); } + + /// test if out of order updates can be sorted correctly + #[test] + fn test_out_of_order_apply_updates() { + let mut arr = Arrangement::new(); + + let key = Row::new(vec![1i64.into()]); + let updates: Vec = vec![ + ((key.clone(), Row::new(vec![5i64.into()])), 6, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 2, -1), + ((key.clone(), Row::new(vec![1i64.into()])), 2, 1), + ((key.clone(), Row::new(vec![2i64.into()])), 1, 1), + ((key.clone(), Row::new(vec![3i64.into()])), 4, 1), + ((key.clone(), Row::new(vec![4i64.into()])), 3, 1), + ((key.clone(), Row::new(vec![6i64.into()])), 5, 1), + ]; + arr.apply_updates(0, updates.clone()).unwrap(); + let sorted = updates.iter().sorted_by_key(|r| r.1).cloned().collect_vec(); + assert_eq!(arr.get_updates_in_range(1..7), sorted); + } }