Skip to content

Commit

Permalink
Rephrase consolidate::neu as consolidate_pact
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Dec 13, 2023
1 parent af2f9a9 commit de5e71d
Showing 1 changed file with 146 additions and 145 deletions.
291 changes: 146 additions & 145 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,16 @@
//! underlying system can more clearly see that no work must be done in the later case, and we can
//! drop out of, e.g. iterative computations.
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract};
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::{Capability, Operator};
use timely::dataflow::Scope;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;

use crate::{Collection, ExchangeData, Hashable};
use crate::difference::Semigroup;
use crate::{AsCollection, Collection, ExchangeData, Hashable};

use crate::Data;
use crate::lattice::Lattice;
Expand Down Expand Up @@ -56,11 +62,10 @@ where
Tr::Batcher: Batcher<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
self.map(|k| (k, ()))
.arrange_named::<Tr>(name)
.as_collection(|d, _| d.into_owned())
let exchange =
Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into());
consolidate_pact::<Tr::Batcher, _, _, _, _, _>(&self.map(|d| (d, ())), exchange, name)
.map(|(d, ())| d)
}

/// Aggregates the weights of equal records.
Expand Down Expand Up @@ -89,8 +94,6 @@ where
pub fn consolidate_stream(&self) -> Self {

use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use crate::collection::AsCollection;

self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
Expand All @@ -108,156 +111,154 @@ where
}
}

pub mod neu {
//! Consolidate without building batches.
use timely::PartialOrder;
use timely::dataflow::Scope;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::{Capability, Operator};
use timely::progress::{Antichain, Timestamp};

use crate::collection::AsCollection;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::{Collection, Data, ExchangeData, Hashable};

impl<G, D, R> Collection<G, D, R>
where
G: Scope,
G::Timestamp: Data + Lattice,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
{
/// Aggregates the weights of equal records into at most one record.
///
/// This method uses the type `D`'s `hashed()` method to partition the data. The data are
/// accumulated in place, each held back until their timestamp has completed.
///
/// # Examples
///
/// ```
/// use differential_dataflow::input::Input;
/// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(1 .. 10u32).1;
///
/// x.negate()
/// .concat(&x)
/// .consolidate_named_neu::<MergeBatcher<_, _, _, _>>("Consolidate inputs") // <-- ensures cancellation occurs
/// .assert_empty();
/// });
/// ```
pub fn consolidate_named_neu<B>(&self, name: &str) -> Self
where
B: Batcher<Item=((D, ()), G::Timestamp, R), Time=G::Timestamp> + 'static,
{
let exchange = Exchange::new(move |update: &((D, ()), G::Timestamp, R)| (update.0).0.hashed().into());
self.map(|k| (k, ())).inner
.unary_frontier(exchange, name, |_cap, info| {

// Acquire a logger for arrange events.
let logger = {
let scope = self.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let mut batcher = B::new(logger, info.global_id);
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());


move |input, output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
batcher.push_batch(data);
});

if prev_frontier.borrow() != input.frontier().frontier() {
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
if !input.frontier().less_equal(capability.time()) {

// Assemble the upper bound on times we can commit with this capabilities.
// We must respect the input frontier, and *subsequent* capabilities, as
// we are pretending to retire the capability changes one by one.
upper.clear();
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1)..] {
upper.insert(other_capability.time().clone());
}

// send the batch to downstream consumers, empty or not.
let session = output.session(&capabilities.elements()[index]);
// Extract updates not in advance of `upper`.
let builder = ConsolidateBuilder(session);
let () = batcher.seal(upper.borrow(), |_, _, _| builder);
}
/// Aggregates the weights of equal records into at most one record.
///
/// The data are accumulated in place, each held back until their timestamp has completed.
///
/// This serves as a low-level building-block for more user-friendly functions.
///
/// # Examples
///
/// ```
/// use timely::dataflow::channels::pact::Exchange;
/// use differential_dataflow::Hashable;
/// use differential_dataflow::input::Input;
/// use differential_dataflow::operators::consolidate::consolidate_pact;
/// use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
///
/// ::timely::example(|scope| {
///
/// let x = scope.new_collection_from(1 .. 10u32).1.map(|i| (i, ()));
///
/// let c = x.negate().concat(&x);
/// let exchange = Exchange::new(|update: &((u32,()),u64,isize)| (update.0).0.hashed().into());
/// consolidate_pact::<MergeBatcher<_, _, _, _>, _, _, _, _, _>(&c, exchange, "Consolidate inputs") // <-- ensures cancellation occurs
/// .assert_empty();
/// });
/// ```
pub fn consolidate_pact<B, P, G, K, V, R>(
collection: &Collection<G, (K, V), R>,
pact: P,
name: &str,
) -> Collection<G, (K, V), R>
where
G: Scope,
K: Data,
V: Data,
R: Data + Semigroup,
B: Batcher<Item = ((K, V), G::Timestamp, R), Time = G::Timestamp> + 'static,
P: ParallelizationContract<G::Timestamp, ((K, V), G::Timestamp, R)>,
{
collection
.inner
.unary_frontier(pact, name, |_cap, info| {
// Acquire a logger for arrange events.
let logger = {
let scope = collection.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
};

let mut batcher = B::new(logger, info.global_id);
// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

move |input, output| {
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
batcher.push_batch(data);
});

if prev_frontier.borrow() != input.frontier().frontier() {
if capabilities
.elements()
.iter()
.any(|c| !input.frontier().less_equal(c.time()))
{
let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
if !input.frontier().less_equal(capability.time()) {
// Assemble the upper bound on times we can commit with this capabilities.
// We must respect the input frontier, and *subsequent* capabilities, as
// we are pretending to retire the capability changes one by one.
upper.clear();
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}

// Having extracted and sent batches between each capability and the input frontier,
// we should downgrade all capabilities to match the batcher's lower update frontier.
// This may involve discarding capabilities, which is fine as any new updates arrive
// in messages with new capabilities.

let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
} else {
panic!("failed to find capability");
}
for other_capability in &capabilities.elements()[(index + 1)..] {
upper.insert(other_capability.time().clone());
}

capabilities = new_capabilities;
// send the batch to downstream consumers, empty or not.
let session = output.session(&capabilities.elements()[index]);
// Extract updates not in advance of `upper`.
let builder = ConsolidateBuilder(session);
let _: () = batcher.seal(upper.borrow(), |_, _, _| builder);
}
}

prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
// Having extracted and sent batches between each capability and the input frontier,
// we should downgrade all capabilities to match the batcher's lower update frontier.
// This may involve discarding capabilities, which is fine as any new updates arrive
// in messages with new capabilities.

let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities
.elements()
.iter()
.find(|c| c.time().less_equal(time))
{
new_capabilities.insert(capability.delayed(time));
} else {
panic!("failed to find capability");
}
}

capabilities = new_capabilities;
}
})
.as_collection()
}
}

struct ConsolidateBuilder<'a, D: Data, T: Timestamp, R: Data>(Session<'a, T, Vec<(D, T, R)>, Counter<T, (D, T, R), Tee<T, (D, T, R)>>>);
prev_frontier.clear();
prev_frontier.extend(input.frontier().frontier().iter().cloned());
}
}
})
.as_collection()
}

impl<'a, D: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, D, T, R> {
type Item = ((D, ()), T, R);
type Time = T;
type Output = ();
/// A builder that wraps a session for direct output to a stream.
struct ConsolidateBuilder<'a, K: Data, V: Data, T: Timestamp, R: Data>(
Session<'a, T, Vec<((K, V), T, R)>, Counter<T, ((K, V), T, R), Tee<T, ((K, V), T, R)>>>,
);

fn new() -> Self {
unimplemented!()
}
impl<'a, K: Data, V: Data, T: Timestamp, R: Data> Builder for ConsolidateBuilder<'a, K, V, T, R> {
type Item = ((K, V), T, R);
type Time = T;
type Output = ();

fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
unimplemented!()
}
fn new() -> Self {
unimplemented!()
}

fn push(&mut self, element: Self::Item) {
self.0.give((element.0.0, element.1, element.2));
}
fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
unimplemented!()
}

fn copy(&mut self, element: &Self::Item) {
self.0.give((element.0.0.clone(), element.1.clone(), element.2.clone()));
}
fn push(&mut self, element: Self::Item) {
self.0.give((element.0, element.1, element.2));
}

fn done(self, _lower: Antichain<Self::Time>, _upper: Antichain<Self::Time>, _since: Antichain<Self::Time>) -> Self::Output {
()
}
fn copy(&mut self, element: &Self::Item) {
self.0.give(element.clone());
}

fn done(
self,
_lower: Antichain<Self::Time>,
_upper: Antichain<Self::Time>,
_since: Antichain<Self::Time>,
) { }
}

0 comments on commit de5e71d

Please sign in to comment.