diff --git a/eyeball-im-util/Cargo.toml b/eyeball-im-util/Cargo.toml index 60c32ab..5ef6f2f 100644 --- a/eyeball-im-util/Cargo.toml +++ b/eyeball-im-util/Cargo.toml @@ -12,11 +12,11 @@ keywords.workspace = true all-features = true [dependencies] -arrayvec = "0.7.4" eyeball-im = { version = "0.4.2", path = "../eyeball-im" } futures-core.workspace = true imbl = "2.0.0" pin-project-lite = "0.2.9" +smallvec = { version = "1.11.2", features = ["const_generics", "const_new"] } [dev-dependencies] eyeball = { version = "0.8.6", path = "../eyeball" } diff --git a/eyeball-im-util/src/vector.rs b/eyeball-im-util/src/vector.rs index 04abdee..8940e4a 100644 --- a/eyeball-im-util/src/vector.rs +++ b/eyeball-im-util/src/vector.rs @@ -3,6 +3,7 @@ mod filter; mod limit; mod ops; +mod sort; mod traits; use eyeball_im::VectorDiff; @@ -12,6 +13,7 @@ use self::ops::{VectorDiffContainerFamilyMember, VectorDiffContainerOps}; pub use self::{ filter::{Filter, FilterMap}, limit::{EmptyLimitStream, Limit}, + sort::SortBy, traits::{ BatchedVectorSubscriber, VectorDiffContainer, VectorObserver, VectorObserverExt, VectorSubscriberExt, @@ -40,5 +42,5 @@ type VectorDiffContainerDiff = VectorDiff /// Type alias for extracting the buffer type from a stream of /// [`VectorDiffContainer`]s. -type VectorDiffContainerStreamLimitBuf = +type VectorDiffContainerStreamBuffer = <::Item as VectorDiffContainerOps>>::Buffer; diff --git a/eyeball-im-util/src/vector/limit.rs b/eyeball-im-util/src/vector/limit.rs index c918eb7..87e663c 100644 --- a/eyeball-im-util/src/vector/limit.rs +++ b/eyeball-im-util/src/vector/limit.rs @@ -1,3 +1,4 @@ +use smallvec::SmallVec; use std::{ cmp::{min, Ordering}, mem, @@ -6,10 +7,9 @@ use std::{ }; use super::{ - VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamElement, - VectorDiffContainerStreamLimitBuf, VectorObserver, + VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamBuffer, + VectorDiffContainerStreamElement, VectorObserver, }; -use arrayvec::ArrayVec; use eyeball_im::VectorDiff; use futures_core::Stream; use imbl::Vector; @@ -63,7 +63,7 @@ pin_project! { // with a limit of 2 on top: if an item is popped at the front then 10 // is removed, but 12 has to be pushed back as it "enters" the "view". // That second `PushBack` diff is buffered here. - ready_values: VectorDiffContainerStreamLimitBuf, + ready_values: VectorDiffContainerStreamBuffer, } } @@ -207,6 +207,7 @@ where update_buffered_vector(&diff, self.buffered_vector); handle_diff(diff, limit, prev_len, self.buffered_vector) }); + if let Some(diff) = ready { return Poll::Ready(Some(diff)); } @@ -318,15 +319,15 @@ fn handle_diff( limit: usize, prev_len: usize, buffered_vector: &Vector, -) -> ArrayVec, 2> { +) -> SmallVec<[VectorDiff; 2]> { // If the limit is zero, we have nothing to do. if limit == 0 { - return ArrayVec::new(); + return SmallVec::new(); } let is_full = prev_len >= limit; + let mut res = SmallVec::new(); - let mut res = ArrayVec::new(); match diff { VectorDiff::Append { mut values } => { if is_full { diff --git a/eyeball-im-util/src/vector/ops.rs b/eyeball-im-util/src/vector/ops.rs index cfd14d2..f9c51fb 100644 --- a/eyeball-im-util/src/vector/ops.rs +++ b/eyeball-im-util/src/vector/ops.rs @@ -1,5 +1,5 @@ -use arrayvec::ArrayVec; use eyeball_im::VectorDiff; +use smallvec::SmallVec; pub trait VectorDiffContainerOps: Sized { type Family: VectorDiffContainerFamily; @@ -15,7 +15,7 @@ pub trait VectorDiffContainerOps: Sized { fn push_into_buffer( self, buffer: &mut Self::Buffer, - make_diffs: impl FnMut(VectorDiff) -> ArrayVec, 2>, + make_diffs: impl FnMut(VectorDiff) -> SmallVec<[VectorDiff; 2]>, ) -> Option; fn pop_from_buffer(buffer: &mut Self::Buffer) -> Option; @@ -26,7 +26,7 @@ pub type VectorDiffContainerFamilyMember = VectorDiffContainerOps for VectorDiff { type Family = VectorDiffFamily; - type Buffer = Option>; + type Buffer = SmallVec<[VectorDiff; 2]>; fn from_item(vector_diff: VectorDiff) -> Self { vector_diff @@ -42,23 +42,28 @@ impl VectorDiffContainerOps for VectorDiff { fn push_into_buffer( self, buffer: &mut Self::Buffer, - mut make_diffs: impl FnMut(VectorDiff) -> ArrayVec, 2>, + mut make_diffs: impl FnMut(VectorDiff) -> SmallVec<[VectorDiff; 2]>, ) -> Option { - assert!(buffer.is_none(), "buffer must be None when calling push_into_buffer"); + assert!(buffer.is_empty(), "buffer must be empty when calling `push_into_buffer`"); let mut diffs = make_diffs(self); - let last = diffs.pop(); - if let Some(first) = diffs.pop() { - *buffer = last; - Some(first) - } else { - last + match diffs.len() { + 0 => None, + 1 => diffs.pop(), + _ => { + // We want the first element. We can't “pop front” on a `SmallVec`. + // The idea is to reverse the `diffs` and to pop from it. + diffs.reverse(); + *buffer = diffs; + + buffer.pop() + } } } fn pop_from_buffer(buffer: &mut Self::Buffer) -> Option { - buffer.take() + buffer.pop() } } @@ -75,6 +80,7 @@ impl VectorDiffContainerOps for Vec> { f: impl FnMut(VectorDiff) -> Option>, ) -> Option> { let res: Vec<_> = self.into_iter().filter_map(f).collect(); + if res.is_empty() { None } else { @@ -85,9 +91,10 @@ impl VectorDiffContainerOps for Vec> { fn push_into_buffer( self, _buffer: &mut (), - make_diffs: impl FnMut(VectorDiff) -> ArrayVec, 2>, + make_diffs: impl FnMut(VectorDiff) -> SmallVec<[VectorDiff; 2]>, ) -> Option { let res: Vec<_> = self.into_iter().flat_map(make_diffs).collect(); + if res.is_empty() { None } else { diff --git a/eyeball-im-util/src/vector/sort.rs b/eyeball-im-util/src/vector/sort.rs new file mode 100644 index 0000000..ea27275 --- /dev/null +++ b/eyeball-im-util/src/vector/sort.rs @@ -0,0 +1,554 @@ +use std::{ + cmp::Ordering, + ops::Not, + pin::Pin, + task::{self, ready, Poll}, +}; + +use eyeball_im::{Vector, VectorDiff}; +use futures_core::Stream; +use pin_project_lite::pin_project; +use smallvec::SmallVec; + +use super::{ + VectorDiffContainer, VectorDiffContainerOps, VectorDiffContainerStreamBuffer, + VectorDiffContainerStreamElement, +}; + +type UnsortedIndex = usize; + +pin_project! { + /// A [`VectorDiff`] stream adapter that presents a sorted view of the + /// underlying [`ObservableVector`] items. + /// + /// ```rust + /// use eyeball_im::{ObservableVector, VectorDiff}; + /// use eyeball_im_util::vector::VectorObserverExt; + /// use imbl::vector; + /// use std::cmp::Ordering; + /// use stream_assert::{assert_closed, assert_next_eq, assert_pending}; + /// + /// // A comparison function that is used to sort our + /// // `ObservableVector` values. + /// fn cmp(left: &T, right: &T) -> Ordering + /// where + /// T: Ord, + /// { + /// left.cmp(right) + /// } + /// + /// # fn main() { + /// // Our vector. + /// let mut ob = ObservableVector::::new(); + /// let (values, mut sub) = ob.subscribe().sort_by(&cmp); + /// // ^^^^ + /// // | our comparison function + /// + /// assert!(values.is_empty()); + /// assert_pending!(sub); + /// + /// // Append multiple unsorted values. + /// ob.append(vector!['d', 'b', 'e']); + /// // We get a `VectorDiff::Append` with sorted values! + /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['b', 'd', 'e'] }); + /// + /// // Let's recap what we have. `ob` is our `ObservableVector`, + /// // `sub` is the “sorted view”/“sorted stream” of `ob`: + /// // | `ob` | d b e | + /// // | `sub` | b d e | + /// + /// // Append other multiple values. + /// ob.append(vector!['f', 'g', 'a', 'c']); + /// // We get three `VectorDiff`s! + /// assert_next_eq!(sub, VectorDiff::PushFront { value: 'a' }); + /// assert_next_eq!(sub, VectorDiff::Insert { index: 2, value: 'c' }); + /// assert_next_eq!(sub, VectorDiff::Append { values: vector!['f', 'g'] }); + /// + /// // Let's recap what we have: + /// // | `ob` | d b e f g a c | + /// // | `sub` | a b c d e f g | + /// // ^ ^ ^^^ + /// // | | | + /// // | | with `VectorDiff::Append { .. }` + /// // | with `VectorDiff::Insert { index: 2, .. }` + /// // with `VectorDiff::PushFront { .. }` + /// + /// // Technically, `SortBy` emits `VectorDiff`s that mimic a sorted `Vector`. + /// + /// drop(ob); + /// assert_closed!(sub); + /// # } + /// ``` + /// + /// [`ObservableVector`]: eyeball_im::ObservableVector + pub struct SortBy<'a, S, F> + where + S: Stream, + S::Item: VectorDiffContainer, + { + // The main stream to poll items from. + #[pin] + inner_stream: S, + + // The comparison function to sort items. + compare: &'a F, + + // This is the **sorted** buffered vector. + buffered_vector: Vector<(UnsortedIndex, VectorDiffContainerStreamElement)>, + + // This adapter can produce many items per item of the underlying stream. + // + // Thus, if the item type is just `VectorDiff<_>` (non-bached, can't + // just add diffs to a `poll_next` result), we need a buffer to store the + // possible extra items in. + ready_values: VectorDiffContainerStreamBuffer, + } +} + +impl<'a, S, F> SortBy<'a, S, F> +where + S: Stream, + S::Item: VectorDiffContainer, + F: Fn(&VectorDiffContainerStreamElement, &VectorDiffContainerStreamElement) -> Ordering, +{ + /// Create a new `SortBy` with the given (unsorted) initial values, stream + /// of `VectorDiff` updates for those values, and the comparison function. + pub fn new( + initial_values: Vector>, + inner_stream: S, + compare: &'a F, + ) -> (Vector>, Self) { + let mut initial_values = initial_values.into_iter().enumerate().collect::>(); + initial_values.sort_by(|(_, left), (_, right)| compare(left, right)); + + ( + initial_values.iter().map(|(_, value)| value.clone()).collect(), + Self { + inner_stream, + compare, + buffered_vector: initial_values, + ready_values: Default::default(), + }, + ) + } +} + +impl<'a, S, F> Stream for SortBy<'a, S, F> +where + S: Stream, + S::Item: VectorDiffContainer, + F: Fn(&VectorDiffContainerStreamElement, &VectorDiffContainerStreamElement) -> Ordering, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let mut this = self.project(); + + loop { + // First off, if any values are ready, return them. + if let Some(value) = S::Item::pop_from_buffer(this.ready_values) { + return Poll::Ready(Some(value)); + } + + // Poll `VectorDiff`s from the `inner_stream`. + let Some(diffs) = ready!(this.inner_stream.as_mut().poll_next(cx)) else { + return Poll::Ready(None); + }; + + // Consume and apply the diffs if possible. + let ready = diffs.push_into_buffer(this.ready_values, |diff| { + handle_diff_and_update_buffered_vector(diff, this.compare, this.buffered_vector) + }); + + if let Some(diff) = ready { + return Poll::Ready(Some(diff)); + } + + // Else loop and poll the streams again. + } + } +} + +/// Map a `VectorDiff` to potentially `VectorDiff`s. Keep in mind that +/// `buffered_vector` contains the sorted values. +/// +/// When looking for the _position_ of a value (e.g. where to insert a new +/// value?), `Vector::binary_search_by` is used — it is possible because the +/// `Vector` is sorted. When looking for the _unsorted index_ of a value, +/// `Iterator::position` is used. +fn handle_diff_and_update_buffered_vector( + diff: VectorDiff, + compare: &F, + buffered_vector: &mut Vector<(usize, T)>, +) -> SmallVec<[VectorDiff; 2]> +where + T: Clone, + F: Fn(&T, &T) -> Ordering, +{ + let mut result = SmallVec::new(); + + match diff { + VectorDiff::Append { values: new_values } => { + // Sort `new_values`. + let mut new_values = { + // Calculate the `new_values` with their `unsorted_index`. + // The `unsorted_index` is the index of the new value in `new_values` + an + // offset, where the offset is given by `offset`, i.e the actual size of the + // `buffered_vector`. + let offset = buffered_vector.len(); + let mut new_values = new_values + .into_iter() + .enumerate() + .map(|(unsorted_index, value)| (unsorted_index + offset, value)) + .collect::>(); + + // Now, we can sort `new_values`. + new_values.sort_by(|(_, left), (_, right)| compare(left, right)); + + new_values + }; + + // If `buffered_vector` is empty, all `new_values` are appended. + if buffered_vector.is_empty() { + buffered_vector.append(new_values.clone()); + result.push(VectorDiff::Append { + values: new_values.into_iter().map(|(_, value)| value).collect(), + }); + } else { + // Read the first item of `new_values`. We get a reference to it. + // + // Why using `Vector::get`? We _could_ use `new_values.pop_front()` to get + // ownership of `new_value`. But in the slow path, in the `_` branch, we + // would need to generate a `VectorDiff::PushBack`, followed by the + // `VectorDiff::Append` outside this loop, which is 2 diffs. Or, alternatively, + // we would need to `push_front` the `new_value` again, which has a cost too. + // By using a reference, and `pop_front`ing when necessary, we reduce the number + // of diffs. + while let Some((_, new_value)) = new_values.get(0) { + // Fast path. + // + // If `new_value`, i.e. the first item from `new_values`, is greater than or + // equal to the last item from `buffered_vector`, it means + // that all items in `new_values` can be appended. That's because `new_values` + // is already sorted. + if compare( + new_value, + buffered_vector + .last() + .map(|(_, value)| value) + .expect("`buffered_vector` cannot be empty"), + ) + .is_ge() + { + // `new_value` isn't consumed. Let's break the loop and emit a + // `VectorDiff::Append` just hereinafter. + break; + } + // Slow path. + // + // Look for the position where to insert the `new_value`. + else { + // Find the position where to insert `new_value`. + match buffered_vector + .binary_search_by(|(_, value)| compare(value, new_value)) + { + // Somewhere? + Ok(index) | Err(index) if index != buffered_vector.len() => { + // Insert the new value. We get it by using `pop_front` on + // `new_values`. This time the new value is consumed. + let (unsorted_index, new_value) = + new_values.pop_front().expect("`new_values` cannot be empty"); + + buffered_vector.insert(index, (unsorted_index, new_value.clone())); + result.push( + // At the beginning? Let's emit a `VectorDiff::PushFront`. + if index == 0 { + VectorDiff::PushFront { value: new_value } + } + // Somewhere in the middle? Let's emit a `VectorDiff::Insert`. + else { + VectorDiff::Insert { index, value: new_value } + }, + ); + } + // At the end? + _ => { + // `new_value` isn't consumed. Let's break the loop and emit a + // `VectorDiff::Append` just after. + break; + } + } + } + } + + // Some values have not been inserted. Based on our algorithm, it means they + // must be appended. + if new_values.is_empty().not() { + buffered_vector.append(new_values.clone()); + result.push(VectorDiff::Append { + values: new_values.into_iter().map(|(_, value)| value).collect(), + }); + } + } + } + VectorDiff::Clear => { + // Nothing to do but clear. + buffered_vector.clear(); + result.push(VectorDiff::Clear); + } + VectorDiff::PushFront { value: new_value } => { + // The unsorted index is inevitably 0, because we push a new item at the front + // of the vector. + let unsorted_index = 0; + + // Shift all unsorted indices to the right. + buffered_vector.iter_mut().for_each(|(unsorted_index, _)| *unsorted_index += 1); + + // Find where to insert the `new_value`. + match buffered_vector.binary_search_by(|(_, value)| compare(value, &new_value)) { + // At the beginning? Let's emit a `VectorDiff::PushFront`. + Ok(0) | Err(0) => { + buffered_vector.push_front((unsorted_index, new_value.clone())); + result.push(VectorDiff::PushFront { value: new_value }); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Insert`. + Ok(index) | Err(index) if index != buffered_vector.len() => { + buffered_vector.insert(index, (unsorted_index, new_value.clone())); + result.push(VectorDiff::Insert { index, value: new_value }); + } + // At the end? Let's emit a `VectorDiff::PushBack`. + _ => { + buffered_vector.push_back((unsorted_index, new_value.clone())); + result.push(VectorDiff::PushBack { value: new_value }); + } + } + } + VectorDiff::PushBack { value: new_value } => { + let buffered_vector_length = buffered_vector.len(); + + // The unsorted index is inevitably the size of `buffered_vector`, because + // we push a new item at the back of the vector. + let unsorted_index = buffered_vector_length; + + // Find where to insert the `new_value`. + match buffered_vector.binary_search_by(|(_, value)| compare(value, &new_value)) { + // At the beginning? Let's emit a `VectorDiff::PushFront`. + Ok(0) | Err(0) => { + buffered_vector.push_front((unsorted_index, new_value.clone())); + result.push(VectorDiff::PushFront { value: new_value }); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Insert`. + Ok(index) | Err(index) if index != buffered_vector_length => { + buffered_vector.insert(index, (unsorted_index, new_value.clone())); + result.push(VectorDiff::Insert { index, value: new_value }); + } + // At the end? Let's emit a `VectorDiff::PushBack`. + _ => { + buffered_vector.push_back((unsorted_index, new_value.clone())); + result.push(VectorDiff::PushBack { value: new_value }); + } + } + } + VectorDiff::Insert { index: new_unsorted_index, value: new_value } => { + // Shift all unsorted indices after `new_unsorted_index` to the right. + buffered_vector.iter_mut().for_each(|(unsorted_index, _)| { + if *unsorted_index >= new_unsorted_index { + *unsorted_index += 1; + } + }); + + // Find where to insert the `new_value`. + match buffered_vector.binary_search_by(|(_, value)| compare(value, &new_value)) { + // At the beginning? Let's emit a `VectorDiff::PushFront`. + Ok(0) | Err(0) => { + buffered_vector.push_front((new_unsorted_index, new_value.clone())); + result.push(VectorDiff::PushFront { value: new_value }); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Insert`. + Ok(index) | Err(index) if index != buffered_vector.len() => { + buffered_vector.insert(index, (new_unsorted_index, new_value.clone())); + result.push(VectorDiff::Insert { index, value: new_value }); + } + // At the end? Let's emit a `VectorDiff::PushBack`. + _ => { + buffered_vector.push_back((new_unsorted_index, new_value.clone())); + result.push(VectorDiff::PushBack { value: new_value }); + } + } + } + VectorDiff::PopFront => { + let last_index = buffered_vector.len() - 1; + + // Find the position and shift all unsorted indices to the left safely. + // Also, find the value to remove. + let position = buffered_vector + .iter_mut() + .enumerate() + .fold(None, |mut position, (index, (unsorted_index, _))| { + // Position has been found. + if position.is_none() && *unsorted_index == 0 { + position = Some(index); + } + // Otherwise, let's shift all other unsorted indices to the left. + // Value with an `unsorted_index` of 0 will be removed hereinafter. + else { + *unsorted_index -= 1; + } + + position + }) + .expect("`buffered_vector` must have an item with an unsorted index of 0"); + + match position { + // At the beginning? Let's emit a `VectorDiff::PopFront`. + 0 => { + buffered_vector.pop_front(); + result.push(VectorDiff::PopFront); + } + // At the end? Let's emit a `VectorDiff::PopBack`. + index if index == last_index => { + buffered_vector.pop_back(); + result.push(VectorDiff::PopBack); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Remove`. + index => { + buffered_vector.remove(index); + result.push(VectorDiff::Remove { index }); + } + } + } + VectorDiff::PopBack => { + let last_index = buffered_vector.len() - 1; + + // Find the value to remove. + match buffered_vector + .iter() + .position(|(unsorted_index, _)| *unsorted_index == last_index) + .expect( + "`buffered_vector` must have an item with an unsorted index of `last_index`", + ) { + // At the beginning? Let's emit a `VectorDiff::PopFront`. + 0 => { + buffered_vector.pop_front(); + result.push(VectorDiff::PopFront); + } + // At the end? Let's emit a `VectorDiff::PopBack`. + index if index == last_index => { + buffered_vector.pop_back(); + result.push(VectorDiff::PopBack); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Remove`. + index => { + buffered_vector.remove(index); + result.push(VectorDiff::Remove { index }); + } + } + } + VectorDiff::Remove { index: new_unsorted_index } => { + let last_index = buffered_vector.len() - 1; + + // Shift all items with an `unsorted_index` greater than `new_unsorted_index` to + // the left. + // Also, find the value to remove. + let position = buffered_vector + .iter_mut() + .enumerate() + .fold(None, |mut position, (index, (unsorted_index, _))| { + if position.is_none() && *unsorted_index == new_unsorted_index { + position = Some(index); + } + + if *unsorted_index > new_unsorted_index { + *unsorted_index -= 1; + } + + position + }) + .expect("`buffered_vector` must contain an item with an unsorted index of `new_unsorted_index`"); + + match position { + // At the beginning? Let's emit a `VectorDiff::PopFront`. + 0 => { + buffered_vector.pop_front(); + result.push(VectorDiff::PopFront); + } + // At the end? Let's emit a `VectorDiff::PopBack`. + index if index == last_index => { + buffered_vector.pop_back(); + result.push(VectorDiff::PopBack); + } + // Somewhere in the middle? Let's emit a `VectorDiff::Remove`. + index => { + buffered_vector.remove(index); + result.push(VectorDiff::Remove { index }); + } + } + } + VectorDiff::Set { index: new_unsorted_index, value: new_value } => { + // We need to _update_ the value to `new_value`, and to _move_ it (since it is a + // new value, we need to sort it). + // + // Find the `old_index` and the `new_index`, respectively representing the + // _from_ and _to_ positions of the value to move. + let old_index = buffered_vector + .iter() + .position(|(unsorted_index, _)| *unsorted_index == new_unsorted_index) + .expect("`buffered_vector` must contain an item with an unsorted index of `new_unsorted_index`"); + + let new_index = + match buffered_vector.binary_search_by(|(_, value)| compare(value, &new_value)) { + Ok(index) => index, + Err(index) => index, + }; + + match old_index.cmp(&new_index) { + // `old_index` is before `new_index`. + // Remove value at `old_index`, and insert the new value at `new_index - 1`: we need + // to subtract 1 because `old_index` has been removed before `new_insert`, which + // has shifted the indices. + Ordering::Less => { + buffered_vector.remove(old_index); + buffered_vector.insert(new_index - 1, (new_unsorted_index, new_value.clone())); + + result.push(VectorDiff::Remove { index: old_index }); + result.push(VectorDiff::Insert { index: new_index - 1, value: new_value }); + } + // `old_index` is the same as `new_index`. + Ordering::Equal => { + buffered_vector.set(new_index, (new_unsorted_index, new_value.clone())); + result.push(VectorDiff::Set { index: new_index, value: new_value }); + } + // `old_index` is after `new_index`. + // Remove value at `old_index`, and insert the new value at `new_index`. No shifting + // here. + Ordering::Greater => { + buffered_vector.remove(old_index); + buffered_vector.insert(new_index, (new_unsorted_index, new_value.clone())); + + result.push(VectorDiff::Remove { index: old_index }); + result.push(VectorDiff::Insert { index: new_index, value: new_value }); + } + } + } + VectorDiff::Truncate { length: new_length } => { + // Keep values where their `unsorted_index` is lower than the `new_length`. + buffered_vector.retain(|(unsorted_index, _)| *unsorted_index < new_length); + result.push(VectorDiff::Truncate { length: new_length }); + } + VectorDiff::Reset { values: new_values } => { + // Calculate the `new_values` with their `unsorted_index`. + let mut new_values = new_values.into_iter().enumerate().collect::>(); + + // Now, we can sort `new_values`. + new_values.sort_by(|(_, left), (_, right)| compare(left, right)); + + // Finally, update `buffered_vector` and create the `VectorDiff::Reset`. + *buffered_vector = new_values.clone(); + result.push(VectorDiff::Reset { + values: new_values.into_iter().map(|(_, value)| value).collect(), + }); + } + } + + result +} diff --git a/eyeball-im-util/src/vector/traits.rs b/eyeball-im-util/src/vector/traits.rs index f407772..599bc5a 100644 --- a/eyeball-im-util/src/vector/traits.rs +++ b/eyeball-im-util/src/vector/traits.rs @@ -1,5 +1,7 @@ //! Public traits. +use std::cmp::Ordering; + use eyeball_im::{ VectorDiff, VectorSubscriber, VectorSubscriberBatchedStream, VectorSubscriberStream, }; @@ -10,7 +12,7 @@ use super::{ ops::{ VecVectorDiffFamily, VectorDiffContainerFamily, VectorDiffContainerOps, VectorDiffFamily, }, - EmptyLimitStream, Filter, FilterMap, Limit, + EmptyLimitStream, Filter, FilterMap, Limit, SortBy, }; /// Abstraction over stream items that the adapters in this module can deal @@ -157,6 +159,17 @@ where let (items, stream) = self.into_parts(); Limit::dynamic_with_initial_limit(items, stream, initial_limit, limit_stream) } + + /// Sort the observed values with `compare`. + /// + /// See [`SortBy`] for more details. + fn sort_by(self, compare: &F) -> (Vector, SortBy<'_, Self::Stream, F>) + where + F: Fn(&T, &T) -> Ordering, + { + let (items, stream) = self.into_parts(); + SortBy::new(items, stream, compare) + } } impl VectorObserverExt for O diff --git a/eyeball-im-util/tests/it/main.rs b/eyeball-im-util/tests/it/main.rs index 4106309..a69bf91 100644 --- a/eyeball-im-util/tests/it/main.rs +++ b/eyeball-im-util/tests/it/main.rs @@ -1,3 +1,4 @@ mod filter; mod filter_map; mod limit; +mod sort; diff --git a/eyeball-im-util/tests/it/sort.rs b/eyeball-im-util/tests/it/sort.rs new file mode 100644 index 0000000..54642af --- /dev/null +++ b/eyeball-im-util/tests/it/sort.rs @@ -0,0 +1,428 @@ +use eyeball_im::{ObservableVector, VectorDiff}; +use eyeball_im_util::vector::VectorObserverExt; +use imbl::vector; +use std::cmp::Ordering; +use stream_assert::{assert_closed, assert_next_eq, assert_pending}; + +fn cmp(left: &T, right: &T) -> Ordering +where + T: Ord, +{ + left.cmp(right) +} + +#[test] +fn new() { + let ob = ObservableVector::::from(vector!['c', 'a', 'd', 'b']); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert_eq!(values, vector!['a', 'b', 'c', 'd']); + assert_pending!(sub); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn append() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append on an empty vector. + ob.append(vector!['d', 'a', 'e']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'd', 'e'] }); + + // Append on an non-empty vector. + ob.append(vector!['f', 'g', 'b']); + assert_next_eq!(sub, VectorDiff::Insert { index: 1, value: 'b' }); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['f', 'g'] }); + + // Another append. + // This time, it contains a duplicated new item + an insert + new items to be + // appended. + ob.append(vector!['i', 'h', 'c', 'j', 'a']); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'a' }); + assert_next_eq!(sub, VectorDiff::Insert { index: 3, value: 'c' }); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['h', 'i', 'j'] }); + + // Another append. + // This time, with two new items that are a duplication of the last item. + ob.append(vector!['k', 'l', 'j', 'm', 'j']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['j', 'j', 'k', 'l', 'm'] }); + + // Items in the vector have been appended and are not sorted. + assert_eq!( + *ob, + vector!['d', 'a', 'e', 'f', 'g', 'b', 'i', 'h', 'c', 'j', 'a', 'k', 'l', 'j', 'm', 'j'] + ); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn clear() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + ob.append(vector!['b', 'a', 'c']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c'] }); + + assert_eq!(*ob, vector!['b', 'a', 'c']); + + // Let's clear it. + ob.clear(); + + assert_next_eq!(sub, VectorDiff::Clear); + + // Items in the vector has been cleared out. + assert!((*ob).is_empty()); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn push_front() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Push front on an empty vector. + ob.push_front('b'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'b' }); + + // Push front on non-empty vector. + // The new item should appear at position 0. + ob.push_front('a'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'a' }); + + // Another push front. + // The new item should appear at last position. + ob.push_front('d'); + assert_next_eq!(sub, VectorDiff::PushBack { value: 'd' }); + + // Another push front. + // The new item should appear in the middle. + ob.push_front('c'); + assert_next_eq!(sub, VectorDiff::Insert { index: 2, value: 'c' }); + + // Items in the vector have been pushed front and are not sorted. + assert_eq!(*ob, vector!['c', 'd', 'a', 'b']); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn push_back() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Push back on an empty vector. + ob.push_back('b'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'b' }); + + // Push back on non-empty vector. + // The new item should appear at position 0. + ob.push_back('a'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'a' }); + + // Another push back. + // The new item should appear at last position. + ob.push_back('d'); + assert_next_eq!(sub, VectorDiff::PushBack { value: 'd' }); + + // Another push back. + // The new item should appear in the middle. + ob.push_back('c'); + assert_next_eq!(sub, VectorDiff::Insert { index: 2, value: 'c' }); + + // Items in the vector have been pushed back and are not sorted. + assert_eq!(*ob, vector!['b', 'a', 'd', 'c']); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn insert() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Insert on an empty vector. + ob.insert(0, 'b'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'b' }); + + // Insert on non-empty vector. + // The new item should appear at position 0. + ob.insert(1, 'a'); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'a' }); + + // Another insert. + // The new item should appear at last position. + ob.insert(1, 'd'); + assert_next_eq!(sub, VectorDiff::PushBack { value: 'd' }); + + // Another insert. + // The new item should appear at last position. + ob.insert(1, 'e'); + assert_next_eq!(sub, VectorDiff::PushBack { value: 'e' }); + + // Another insert. + // The new item should appear in the middle. + ob.insert(3, 'c'); + assert_next_eq!(sub, VectorDiff::Insert { index: 2, value: 'c' }); + + // Items in the vector have been inserted and are not sorted. + assert_eq!(*ob, vector!['b', 'e', 'd', 'c', 'a']); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn pop_front() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['e', 'b', 'a', 'd', 'c']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c', 'd', 'e'] }); + + // Pop front once. + // `e` is at the last sorted position, so it generates a `VectorDiff::PopBack`. + assert_eq!(ob.pop_front(), Some('e')); + assert_next_eq!(sub, VectorDiff::PopBack); + + // Pop front again. + // `b` is at the second sorted position, so it generates a `VectorDiff::Remove`. + assert_eq!(ob.pop_front(), Some('b')); + assert_next_eq!(sub, VectorDiff::Remove { index: 1 }); + + // Pop front again. + // `a` is at the first sorted position, so it generates a + // `VectorDiff::PopFront`. + assert_eq!(ob.pop_front(), Some('a')); + assert_next_eq!(sub, VectorDiff::PopFront); + + // Pop front again. + // `d` is at the last sorted position, so it generates a `VectorDiff::PopBack`. + assert_eq!(ob.pop_front(), Some('d')); + assert_next_eq!(sub, VectorDiff::PopBack); + + // Pop front again. + // `c` is at the first sorted position, so it generates a + // `VectorDiff::PopFront`. + assert_eq!(ob.pop_front(), Some('c')); + assert_next_eq!(sub, VectorDiff::PopFront); + + assert!(ob.is_empty()); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn pop_back() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['e', 'b', 'a', 'd', 'c', 'f']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c', 'd', 'e', 'f'] }); + + // Pop back once. + // `f` is at the last sorted position, so it generates a `VectorDiff::PopBack`. + assert_eq!(ob.pop_back(), Some('f')); + assert_next_eq!(sub, VectorDiff::PopBack); + + // Pop back again. + // `c` is at the third sorted position, so it generates a `VectorDiff::Remove`. + assert_eq!(ob.pop_back(), Some('c')); + assert_next_eq!(sub, VectorDiff::Remove { index: 2 }); + + // Pop back again. + // `d` is at the third sorted position, so it generates a `VectorDiff::Remove`. + assert_eq!(ob.pop_back(), Some('d')); + assert_next_eq!(sub, VectorDiff::Remove { index: 2 }); + + // Pop back again. + // `a` is at the first sorted position, so it generates a + // `VectorDiff::PopFront`. + assert_eq!(ob.pop_back(), Some('a')); + assert_next_eq!(sub, VectorDiff::PopFront); + + // Pop back again. + // `b` is at the first sorted position, so it generates a + // `VectorDiff::PopFront`. + assert_eq!(ob.pop_back(), Some('b')); + assert_next_eq!(sub, VectorDiff::PopFront); + + // Pop back again. + // `e` is at the first sorted position, so it generates a + // `VectorDiff::PopFront`. + assert_eq!(ob.pop_back(), Some('e')); + assert_next_eq!(sub, VectorDiff::PopFront); + + assert!(ob.is_empty()); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn remove() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['e', 'b', 'a', 'd', 'c']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'b', 'c', 'd', 'e'] }); + + // Remove `a`. + ob.remove(2); + assert_next_eq!(sub, VectorDiff::PopFront); + + // Remove `e`. + ob.remove(0); + assert_next_eq!(sub, VectorDiff::PopBack); + + // Remove `c`. + ob.remove(2); + assert_next_eq!(sub, VectorDiff::Remove { index: 1 }); + + // Items in the vector have been removed and are not sorted. + assert_eq!(*ob, vector!['b', 'd']); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn set() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['d', 'e', 'b', 'g']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['b', 'd', 'e', 'g'] }); + + // Same value. + ob.set(0, 'd'); + assert_next_eq!(sub, VectorDiff::Set { index: 1, value: 'd' }); + + // Another value, that is sorted at the same index. + ob.set(0, 'c'); + assert_next_eq!(sub, VectorDiff::Set { index: 1, value: 'c' }); + + // Another value, that is moved to the left. + ob.set(0, 'a'); + assert_next_eq!(sub, VectorDiff::Remove { index: 1 }); + assert_next_eq!(sub, VectorDiff::Insert { index: 0, value: 'a' }); + + // Another value, that is moved to the right. + ob.set(0, 'f'); + assert_next_eq!(sub, VectorDiff::Remove { index: 0 }); + assert_next_eq!(sub, VectorDiff::Insert { index: 2, value: 'f' }); + + // Another value, that is moved to the right-most position. + ob.set(0, 'h'); + assert_next_eq!(sub, VectorDiff::Remove { index: 2 }); + assert_next_eq!(sub, VectorDiff::Insert { index: 3, value: 'h' }); + + // Same operation, at another index, just for fun. + ob.set(2, 'f'); + assert_next_eq!(sub, VectorDiff::Remove { index: 0 }); + assert_next_eq!(sub, VectorDiff::Insert { index: 1, value: 'f' }); + + // Items in the vector have been updated and are not sorted. + assert_eq!(*ob, vector!['h', 'e', 'f', 'g']); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn truncate() { + let mut ob = ObservableVector::::new(); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['c', 'd', 'a']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'c', 'd'] }); + + // Append other items. + ob.append(vector!['b', 'e', 'f']); + assert_next_eq!(sub, VectorDiff::Insert { index: 1, value: 'b' }); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['e', 'f'] }); + + // Truncate. + ob.truncate(2); + assert_next_eq!(sub, VectorDiff::Truncate { length: 2 }); + + // Items in the vector have been truncated and are not sorted. + assert_eq!(*ob, vector!['c', 'd']); + + // Append other items. + ob.append(vector!['b', 'x', 'y']); + assert_next_eq!(sub, VectorDiff::PushFront { value: 'b' }); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['x', 'y'] }); + + drop(ob); + assert_closed!(sub); +} + +#[test] +fn reset() { + let mut ob = ObservableVector::::with_capacity(1); + let (values, mut sub) = ob.subscribe().sort_by(&cmp); + + assert!(values.is_empty()); + assert_pending!(sub); + + // Append a bunch of items. + ob.append(vector!['c', 'd', 'a']); + assert_next_eq!(sub, VectorDiff::Append { values: vector!['a', 'c', 'd'] }); + + // Push back a bunch of items 3 times, so that it overflows the capacity, and we + // get a reset! + ob.push_back('b'); + ob.push_back('f'); + assert_next_eq!(sub, VectorDiff::Reset { values: vector!['a', 'b', 'c', 'd', 'f'] }); + + // Items in the vector have been inserted and are not sorted. + assert_eq!(*ob, vector!['c', 'd', 'a', 'b', 'f']); + + drop(ob); + assert_closed!(sub); +}