diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index a08529531..b5305fc88 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -6,10 +6,16 @@ //! underlying system can more clearly see that no work must be done in the later case, and we can //! drop out of, e.g. iterative computations. +use timely::dataflow::channels::pact::{Exchange, ParallelizationContract}; +use timely::dataflow::channels::pushers::buffer::Session; +use timely::dataflow::channels::pushers::{Counter, Tee}; +use timely::dataflow::operators::{Capability, Operator}; use timely::dataflow::Scope; +use timely::progress::{Antichain, Timestamp}; +use timely::PartialOrder; -use crate::{Collection, ExchangeData, Hashable}; use crate::difference::Semigroup; +use crate::{AsCollection, Collection, ExchangeData, Hashable}; use crate::Data; use crate::lattice::Lattice; @@ -56,11 +62,10 @@ where Tr::Batcher: Batcher, Tr::Builder: Builder, { - use crate::operators::arrange::arrangement::Arrange; - use crate::trace::cursor::MyTrait; - self.map(|k| (k, ())) - .arrange_named::(name) - .as_collection(|d, _| d.into_owned()) + let exchange = + Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into()); + consolidate_pact::(&self.map(|d| (d, ())), exchange, name) + .map(|(d, ())| d) } /// Aggregates the weights of equal records. @@ -89,8 +94,6 @@ where pub fn consolidate_stream(&self) -> Self { use timely::dataflow::channels::pact::Pipeline; - use timely::dataflow::operators::Operator; - use crate::collection::AsCollection; self.inner .unary(Pipeline, "ConsolidateStream", |_cap, _info| { @@ -108,156 +111,154 @@ where } } -pub mod neu { - //! Consolidate without building batches. - - use timely::PartialOrder; - use timely::dataflow::Scope; - use timely::dataflow::channels::pact::Exchange; - use timely::dataflow::channels::pushers::buffer::Session; - use timely::dataflow::channels::pushers::{Counter, Tee}; - use timely::dataflow::operators::{Capability, Operator}; - use timely::progress::{Antichain, Timestamp}; - - use crate::collection::AsCollection; - use crate::difference::Semigroup; - use crate::lattice::Lattice; - use crate::trace::{Batcher, Builder}; - use crate::{Collection, Data, ExchangeData, Hashable}; - - impl Collection - where - G: Scope, - G::Timestamp: Data + Lattice, - D: ExchangeData + Hashable, - R: Semigroup + ExchangeData, - { - /// Aggregates the weights of equal records into at most one record. - /// - /// This method uses the type `D`'s `hashed()` method to partition the data. The data are - /// accumulated in place, each held back until their timestamp has completed. - /// - /// # Examples - /// - /// ``` - /// use differential_dataflow::input::Input; - /// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; - /// - /// ::timely::example(|scope| { - /// - /// let x = scope.new_collection_from(1 .. 10u32).1; - /// - /// x.negate() - /// .concat(&x) - /// .consolidate_named_neu::>("Consolidate inputs") // <-- ensures cancellation occurs - /// .assert_empty(); - /// }); - /// ``` - pub fn consolidate_named_neu(&self, name: &str) -> Self - where - B: Batcher + 'static, - { - let exchange = Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into()); - self.map(|k| (k, ())).inner - .unary_frontier(exchange, name, |_cap, info| { - - // Acquire a logger for arrange events. - let logger = { - let scope = self.scope(); - let register = scope.log_register(); - register.get::("differential/arrange") - }; - - let mut batcher = B::new(logger, info.global_id); - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - let mut prev_frontier = Antichain::from_elem(::minimum()); - - - move |input, output| { - input.for_each(|cap, data| { - capabilities.insert(cap.retain()); - batcher.push_batch(data); - }); - - if prev_frontier.borrow() != input.frontier().frontier() { - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { - let mut upper = Antichain::new(); // re-used allocation for sealing batches. - - // For each capability not in advance of the input frontier ... - for (index, capability) in capabilities.elements().iter().enumerate() { - if !input.frontier().less_equal(capability.time()) { - - // Assemble the upper bound on times we can commit with this capabilities. - // We must respect the input frontier, and *subsequent* capabilities, as - // we are pretending to retire the capability changes one by one. - upper.clear(); - for time in input.frontier().frontier().iter() { - upper.insert(time.clone()); - } - for other_capability in &capabilities.elements()[(index + 1)..] { - upper.insert(other_capability.time().clone()); - } - - // send the batch to downstream consumers, empty or not. - let session = output.session(&capabilities.elements()[index]); - // Extract updates not in advance of `upper`. - let builder = ConsolidateBuilder(session); - let () = batcher.seal(upper.borrow(), |_, _, _| builder); - } +/// Aggregates the weights of equal records into at most one record. +/// +/// The data are accumulated in place, each held back until their timestamp has completed. +/// +/// This serves as a low-level building-block for more user-friendly functions. +/// +/// # Examples +/// +/// ``` +/// use timely::dataflow::channels::pact::Exchange; +/// use differential_dataflow::Hashable; +/// use differential_dataflow::input::Input; +/// use differential_dataflow::operators::consolidate::consolidate_pact; +/// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; +/// +/// ::timely::example(|scope| { +/// +/// let x = scope.new_collection_from(1 .. 10u32).1.map(|i| (i, ())); +/// +/// let c = x.negate().concat(&x); +/// let exchange = Exchange::new(|update: &((u32,()),u64,isize)| (update.0).0.hashed().into()); +/// consolidate_pact::, _, _, _, _, _>(&c, exchange, "Consolidate inputs") // <-- ensures cancellation occurs +/// .assert_empty(); +/// }); +/// ``` +pub fn consolidate_pact( + collection: &Collection, + pact: P, + name: &str, +) -> Collection +where + G: Scope, + K: Data, + V: Data, + R: Data + Semigroup, + B: Batcher + 'static, + P: ParallelizationContract, +{ + collection + .inner + .unary_frontier(pact, name, |_cap, info| { + // Acquire a logger for arrange events. + let logger = { + let scope = collection.scope(); + let register = scope.log_register(); + register.get::("differential/arrange") + }; + + let mut batcher = B::new(logger, info.global_id); + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); + let mut prev_frontier = Antichain::from_elem(::minimum()); + + move |input, output| { + input.for_each(|cap, data| { + capabilities.insert(cap.retain()); + batcher.push_batch(data); + }); + + if prev_frontier.borrow() != input.frontier().frontier() { + if capabilities + .elements() + .iter() + .any(|c| !input.frontier().less_equal(c.time())) + { + let mut upper = Antichain::new(); // re-used allocation for sealing batches. + + // For each capability not in advance of the input frontier ... + for (index, capability) in capabilities.elements().iter().enumerate() { + if !input.frontier().less_equal(capability.time()) { + // Assemble the upper bound on times we can commit with this capabilities. + // We must respect the input frontier, and *subsequent* capabilities, as + // we are pretending to retire the capability changes one by one. + upper.clear(); + for time in input.frontier().frontier().iter() { + upper.insert(time.clone()); } - - // Having extracted and sent batches between each capability and the input frontier, - // we should downgrade all capabilities to match the batcher's lower update frontier. - // This may involve discarding capabilities, which is fine as any new updates arrive - // in messages with new capabilities. - - let mut new_capabilities = Antichain::new(); - for time in batcher.frontier().iter() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { - new_capabilities.insert(capability.delayed(time)); - } else { - panic!("failed to find capability"); - } + for other_capability in &capabilities.elements()[(index + 1)..] { + upper.insert(other_capability.time().clone()); } - capabilities = new_capabilities; + // send the batch to downstream consumers, empty or not. + let session = output.session(&capabilities.elements()[index]); + // Extract updates not in advance of `upper`. + let builder = ConsolidateBuilder(session); + let _: () = batcher.seal(upper.borrow(), |_, _, _| builder); } + } - prev_frontier.clear(); - prev_frontier.extend(input.frontier().frontier().iter().cloned()); + // Having extracted and sent batches between each capability and the input frontier, + // we should downgrade all capabilities to match the batcher's lower update frontier. + // This may involve discarding capabilities, which is fine as any new updates arrive + // in messages with new capabilities. + + let mut new_capabilities = Antichain::new(); + for time in batcher.frontier().iter() { + if let Some(capability) = capabilities + .elements() + .iter() + .find(|c| c.time().less_equal(time)) + { + new_capabilities.insert(capability.delayed(time)); + } else { + panic!("failed to find capability"); + } } + + capabilities = new_capabilities; } - }) - .as_collection() - } - } - struct ConsolidateBuilder<'a, D: Data, T: Timestamp, R: Data>(Session<'a, T, Vec<(D, T, R)>, Counter>>); + prev_frontier.clear(); + prev_frontier.extend(input.frontier().frontier().iter().cloned()); + } + } + }) + .as_collection() +} - impl<'a, D: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, D, T, R> { - type Item = ((D, ()), T, R); - type Time = T; - type Output = (); +/// A builder that wraps a session for direct output to a stream. +struct ConsolidateBuilder<'a, K: Data, V: Data, T: Timestamp, R: Data>( + Session<'a, T, Vec<((K, V), T, R)>, Counter>>, +); - fn new() -> Self { - unimplemented!() - } +impl<'a, K: Data, V: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, K, V, T, R> { + type Item = ((K, V), T, R); + type Time = T; + type Output = (); - fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { - unimplemented!() - } + fn new() -> Self { + unimplemented!() + } - fn push(&mut self, element: Self::Item) { - self.0.give((element.0.0, element.1, element.2)); - } + fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { + unimplemented!() + } - fn copy(&mut self, element: &Self::Item) { - self.0.give((element.0.0.clone(), element.1.clone(), element.2.clone())); - } + fn push(&mut self, element: Self::Item) { + self.0.give((element.0, element.1, element.2)); + } - fn done(self, _lower: Antichain, _upper: Antichain, _since: Antichain) -> Self::Output { - () - } + fn copy(&mut self, element: &Self::Item) { + self.0.give(element.clone()); } + + fn done( + self, + _lower: Antichain, + _upper: Antichain, + _since: Antichain, + ) { } }