diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index f9f39fc86..81988bbf6 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -695,7 +695,7 @@ where } // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + let batch = batcher.seal::(upper.borrow(), Tr::Builder::with_capacity); writer.insert(batch.clone(), Some(capability.time().clone())); @@ -723,7 +723,7 @@ where } else { // Announce progress updates, even without data. - let _batch = batcher.seal::(input.frontier().frontier().to_owned()); + let _batch = batcher.seal::(input.frontier().frontier(), Tr::Builder::with_capacity); writer.seal(input.frontier().frontier().to_owned()); } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index d21f08c2c..b67220a87 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -106,3 +106,157 @@ where .as_collection() } } + +pub mod neu { + //! Consolidate without building batches. + + use timely::PartialOrder; + use timely::dataflow::Scope; + use timely::dataflow::channels::pact::Exchange; + use timely::dataflow::channels::pushers::buffer::Session; + use timely::dataflow::channels::pushers::{Counter, Tee}; + use timely::dataflow::operators::{Capability, Operator}; + use timely::progress::{Antichain, Timestamp}; + + use crate::collection::AsCollection; + use crate::difference::Semigroup; + use crate::lattice::Lattice; + use crate::trace::{Batcher, Builder}; + use crate::{Collection, Data, ExchangeData, Hashable}; + + impl Collection + where + G: Scope, + G::Timestamp: Data + Lattice, + D: ExchangeData + Hashable, + R: Semigroup + ExchangeData, + { + /// Aggregates the weights of equal records into at most one record. + /// + /// This method uses the type `D`'s `hashed()` method to partition the data. The data are + /// accumulated in place, each held back until their timestamp has completed. + /// + /// # Examples + /// + /// ``` + /// use differential_dataflow::input::Input; + /// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; + /// + /// ::timely::example(|scope| { + /// + /// let x = scope.new_collection_from(1 .. 10u32).1; + /// + /// x.negate() + /// .concat(&x) + /// .consolidate_named_neu::>("Consolidate inputs") // <-- ensures cancellation occurs + /// .assert_empty(); + /// }); + /// ``` + pub fn consolidate_named_neu(&self, name: &str) -> Self + where + B: Batcher + 'static, + { + let exchange = Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into()); + self.map(|k| (k, ())).inner + .unary_frontier(exchange, name, |_cap, info| { + + // Acquire a logger for arrange events. + let logger = { + let scope = self.scope(); + let register = scope.log_register(); + register.get::("differential/arrange") + }; + + let mut batcher = B::new(logger, info.global_id); + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); + let mut prev_frontier = Antichain::from_elem(::minimum()); + + + move |input, output| { + input.for_each(|cap, data| { + capabilities.insert(cap.retain()); + batcher.push_batch(data); + }); + + if prev_frontier.borrow() != input.frontier().frontier() { + if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + let mut upper = Antichain::new(); // re-used allocation for sealing batches. + + // For each capability not in advance of the input frontier ... + for (index, capability) in capabilities.elements().iter().enumerate() { + if !input.frontier().less_equal(capability.time()) { + + // Assemble the upper bound on times we can commit with this capabilities. + // We must respect the input frontier, and *subsequent* capabilities, as + // we are pretending to retire the capability changes one by one. + upper.clear(); + for time in input.frontier().frontier().iter() { + upper.insert(time.clone()); + } + for other_capability in &capabilities.elements()[(index + 1)..] { + upper.insert(other_capability.time().clone()); + } + + // send the batch to downstream consumers, empty or not. + let session = output.session(&capabilities.elements()[index]); + // Extract updates not in advance of `upper`. + let builder = ConsolidateBuilder(session); + let () = batcher.seal(upper.borrow(), |_, _, _| builder); + } + } + + // Having extracted and sent batches between each capability and the input frontier, + // we should downgrade all capabilities to match the batcher's lower update frontier. + // This may involve discarding capabilities, which is fine as any new updates arrive + // in messages with new capabilities. + + let mut new_capabilities = Antichain::new(); + for time in batcher.frontier().iter() { + if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + new_capabilities.insert(capability.delayed(time)); + } else { + panic!("failed to find capability"); + } + } + + capabilities = new_capabilities; + } + + prev_frontier.clear(); + prev_frontier.extend(input.frontier().frontier().iter().cloned()); + } + } + }) + .as_collection() + } + } + + struct ConsolidateBuilder<'a, D: Data, T: Timestamp, R: Data>(Session<'a, T, Vec<(D, T, R)>, Counter>>); + + impl<'a, D: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, D, T, R> { + type Item = ((D, ()), T, R); + type Time = T; + type Output = (); + + fn new() -> Self { + unimplemented!() + } + + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + unimplemented!() + } + + fn push(&mut self, element: Self::Item) { + self.0.give((element.0.0, element.1, element.2)); + } + + fn copy(&mut self, element: &Self::Item) { + self.0.give((element.0.0.clone(), element.1.clone(), element.2.clone())); + } + + fn done(self, _lower: Antichain, _upper: Antichain, _since: Antichain) -> Self::Output { + () + } + } +} diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 2c0feec3f..cf62c59a6 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -6,6 +6,7 @@ use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; +use timely::progress::frontier::AntichainRef; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; @@ -58,7 +59,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef, constructor: C) -> B::Output { let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); @@ -87,7 +88,7 @@ where } } } - B::with_capacity(keys, vals, upds) + constructor(keys, vals, upds) }; let mut kept = Vec::new(); @@ -134,8 +135,8 @@ where self.sorter.push(&mut buffer); } - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); - self.lower = upper; + let seal = builder.done(self.lower.clone(), upper.to_owned(), Antichain::from_elem(T::minimum())); + self.lower = upper.to_owned(); seal } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index fcd9f5ff8..0f94b0458 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -6,6 +6,7 @@ use timely::container::columnation::{Columnation, TimelyStack}; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::{frontier::Antichain, Timestamp}; +use timely::progress::frontier::AntichainRef; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; @@ -62,7 +63,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef, constructor: C) -> B::Output { let mut merged = Default::default(); self.sorter.finish_into(&mut merged); @@ -91,7 +92,7 @@ where } } } - B::with_capacity(keys, vals, upds) + constructor(keys, vals, upds) }; let mut kept = Vec::new(); @@ -132,8 +133,8 @@ where // Drain buffers (fast reclamation). self.sorter.clear_stash(); - let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum())); - self.lower = upper; + let seal = builder.done(self.lower.clone(), upper.to_owned(), Antichain::from_elem(T::minimum())); + self.lower = upper.to_owned(); seal } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 8531c555c..6d22864db 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -320,7 +320,7 @@ pub trait Batcher { /// Adds an unordered batch of elements to the batcher. fn push_batch(&mut self, batch: RefOrMut>); /// Returns all updates not greater or equal to an element of `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output; + fn seal, C: FnOnce(usize, usize, usize) -> B>(&mut self, upper: AntichainRef, constructor: C) -> B::Output; /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> timely::progress::frontier::AntichainRef; } diff --git a/tests/trace.rs b/tests/trace.rs index dac7eff22..e524529af 100644 --- a/tests/trace.rs +++ b/tests/trace.rs @@ -2,7 +2,7 @@ use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; use differential_dataflow::trace::implementations::ValSpine; -use differential_dataflow::trace::{Trace, TraceReader, Batcher}; +use differential_dataflow::trace::{Builder, Trace, TraceReader, Batcher}; use differential_dataflow::trace::cursor::Cursor; type IntegerTrace = ValSpine; @@ -22,7 +22,7 @@ fn get_trace() -> ValSpine { ])); let batch_ts = &[1, 2, 3]; - let batches = batch_ts.iter().map(move |i| batcher.seal::(Antichain::from_elem(*i))); + let batches = batch_ts.iter().map(move |i| batcher.seal::(Antichain::from_elem(*i).borrow(), IntegerBuilder::with_capacity)); for b in batches { trace.insert(b); }