Skip to content

Commit

Permalink
Make it work
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Mar 11, 2024
1 parent c6838a5 commit 575bb2f
Showing 1 changed file with 27 additions and 31 deletions.
58 changes: 27 additions & 31 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,31 +219,6 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
}
}

/// Maintain the internal chain structure. Ensures that all chains are of geometrically
/// increasing length. The function assumes that chains itself are well-formed, i.e.,
/// they contain elements in increasing order.
fn maintain(&mut self) {
self.account(self.queue.iter().flatten(), -1);

// Step 1: Sort queue by chain length. Depending on how much we extracted,
// the chains might be mis-ordered.
self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len()));

// Step 2: Merge chains that are within a power of two.
let mut index = self.queue.len().saturating_sub(1);
while index > 0 {
if self.queue[index-1].len() / 2 < self.queue[index].len() {
// Chains at `index-1` and `index` are within a factor of two, merge them.
let list1 = self.queue.remove(index-1);
let list2 = std::mem::take(&mut self.queue[index-1]);
self.queue[index-1] = self.merge_by(list1, list2);
}
index -= 1;
}

self.account(self.queue.iter().flatten(), 1);
}

/// Extract all data that is not in advance of `upper`. Record the lower bound of the remaining
/// data's time in `frontier`.
fn extract(
Expand All @@ -264,7 +239,6 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {

// Walk all chains, separate ready data from data to keep.
for mut chain in std::mem::take(&mut self.queue).drain(..) {
println!("extract drain chain {:?}", chain.iter().map(|v| v.1.len()));
let mut ship_chain = Vec::default();
let mut keep_chain = Vec::default();
for (block_frontier, mut block) in chain.drain(..) {
Expand Down Expand Up @@ -314,15 +288,18 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
ship_chain.push((Antichain::new(), std::mem::replace(&mut ship_buffer, self.empty())));
}
if !keep_buffer.is_empty() {
for t in keep_frontier.iter() {
frontier.insert_ref(t);
}
keep_chain.push((std::mem::take(&mut keep_frontier), std::mem::replace(&mut keep_buffer, self.empty())));
}

// Collect finished chains
if !keep_chain.is_empty() {
if !ship_chain.is_empty() {
// Canonicalize the chain by adjacent blocks that combined fit into a single block.
let mut target: Vec<(Antichain<T>, Vec<_>)> = Vec::with_capacity(chain.len());
for (frontier, mut block) in chain.drain(..) {
let mut target: Vec<(Antichain<T>, Vec<_>)> = Vec::with_capacity(keep_chain.len());
for (frontier, mut block) in keep_chain.drain(..) {
if let Some((last_frontier, last)) = target.last_mut().filter(|(_, last)| {
last.len() + block.len() <= Self::buffer_size()
}) {
Expand All @@ -345,12 +322,31 @@ impl<D: Ord, T: Clone + PartialOrder + Ord, R: Semigroup> MergeSorter<D, T, R> {
}
}

self.account(self.queue.iter().flatten(), 1);

// If we extracted some data, perform maintenance work.
if !ship_chains.is_empty() {
self.maintain();
// Maintain the internal chain structure. Ensures that all chains are of geometrically
// increasing length. The function assumes that chains itself are well-formed, i.e.,
// they contain elements in increasing order.

// Step 1: Sort queue by chain length. Depending on how much we extracted,
// the chains might be mis-ordered.
self.queue.sort_by_key(|chain| std::cmp::Reverse(chain.len()));

// Step 2: Merge chains that are within a power of two.
let mut index = self.queue.len().saturating_sub(1);
while index > 0 {
if self.queue[index-1].len() / 2 < self.queue[index].len() {
// Chains at `index-1` and `index` are within a factor of two, merge them.
let list1 = self.queue.remove(index-1);
let list2 = std::mem::take(&mut self.queue[index-1]);
self.queue[index-1] = self.merge_by(list1, list2);
}
index -= 1;
}
}

self.account(self.queue.iter().flatten(), 1);

// Merge `ship_chains` into a single element. Roll up from the smallest to the largest
// chain.
ship_chains.sort_by_key(|chain| std::cmp::Reverse(chain.len()));
Expand Down

0 comments on commit 575bb2f

Please sign in to comment.