From 914a236c949760a675b87158a34d7d139410d0b4 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 22 Feb 2024 16:05:31 -0500 Subject: [PATCH] Remove dependency on rdkafka Signed-off-by: Moritz Hoffmann --- Cargo.toml | 1 - examples/capture-test.rs | 312 --------------------------------------- 2 files changed, 313 deletions(-) delete mode 100644 examples/capture-test.rs diff --git a/Cargo.toml b/Cargo.toml index 528913e33..dc9ab0c0e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,6 @@ members = [ [dev-dependencies] bincode = "1.3.1" -rdkafka = "0.24" indexmap = "2.1" rand="0.4" byteorder="1" diff --git a/examples/capture-test.rs b/examples/capture-test.rs deleted file mode 100644 index 66fff934a..000000000 --- a/examples/capture-test.rs +++ /dev/null @@ -1,312 +0,0 @@ -use rand::{Rng, SeedableRng, StdRng}; - -use timely::dataflow::*; -use timely::dataflow::operators::probe::Handle; - -use differential_dataflow::input::Input; -use differential_dataflow::Collection; -use differential_dataflow::operators::*; -use differential_dataflow::lattice::Lattice; - -type Node = u32; -type Edge = (Node, Node); - -fn main() { - - let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap(); - let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap(); - let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap(); - let topic = std::env::args().nth(4).unwrap(); - - let write = std::env::args().any(|x| x == "write"); - let read = std::env::args().any(|x| x == "read"); - - // define a new computational scope, in which to run BFS - timely::execute_from_args(std::env::args(), move |worker| { - - let timer = ::std::time::Instant::now(); - - // define BFS dataflow; return handles to roots and edges inputs - let mut probe = Handle::new(); - let (mut roots, mut graph, _write_token, _read_token) = worker.dataflow(|scope| { - - let (root_input, roots) = scope.new_collection(); - let (edge_input, graph) = scope.new_collection(); - - let result = bfs(&graph, &roots); - - let result = - result.map(|(_,l)| l) - .consolidate() - .probe_with(&mut probe); - - let write_token = if write { - Some(kafka::create_sink(&result.inner, "localhost:9092", &topic)) - } else { None }; - - let read_token = if read { - let (read_token, stream) = kafka::create_source(result.scope(), "localhost:9092", &topic, "group"); - use differential_dataflow::AsCollection; - stream - .as_collection() - .negate() - .concat(&result) - .consolidate() - .inspect(|x| println!("In error: {:?}", x)) - .probe_with(&mut probe) - .assert_empty() - ; - Some(read_token) - } else { None }; - - (root_input, edge_input, write_token, read_token) - }); - - let seed: &[_] = &[1, 2, 3, 4]; - let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions - let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions - - roots.insert(0); - roots.close(); - - println!("performing BFS on {} nodes, {} edges:", nodes, edges); - - if worker.index() == 0 { - for _ in 0 .. edges { - graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); - } - } - - println!("{:?}\tloaded", timer.elapsed()); - - graph.advance_to(1); - graph.flush(); - worker.step_while(|| probe.less_than(graph.time())); - - println!("{:?}\tstable", timer.elapsed()); - - for round in 0 .. { - if write { - std::thread::sleep(std::time::Duration::from_millis(100)); - } - for element in 0 .. batch { - if worker.index() == 0 { - graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); - graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes))); - } - graph.advance_to(2 + round * batch + element); - } - graph.flush(); - - let timer2 = ::std::time::Instant::now(); - worker.step_while(|| probe.less_than(&graph.time())); - - if worker.index() == 0 { - let elapsed = timer2.elapsed(); - println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64)); - } - } - println!("finished; elapsed: {:?}", timer.elapsed()); - }).unwrap(); -} - -// returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs(edges: &Collection, roots: &Collection) -> Collection -where G::Timestamp: Lattice+Ord { - - // initialize roots as reaching themselves at distance 0 - let nodes = roots.map(|x| (x, 0)); - - // repeatedly update minimal distances each node can be reached from each root - nodes.iterate(|inner| { - - let edges = edges.enter(&inner.scope()); - let nodes = nodes.enter(&inner.scope()); - - inner.join_map(&edges, |_k,l,d| (*d, l+1)) - .concat(&nodes) - .reduce(|_, s, t| t.push((*s[0].0, 1))) - }) -} - - -pub mod kafka { - - use serde::{Serialize, Deserialize}; - use timely::scheduling::SyncActivator; - use rdkafka::{ClientContext, config::ClientConfig}; - use rdkafka::consumer::{BaseConsumer, ConsumerContext}; - use rdkafka::error::{KafkaError, RDKafkaError}; - use differential_dataflow::capture::Writer; - - use std::hash::Hash; - use timely::progress::Timestamp; - use timely::dataflow::{Scope, Stream}; - use differential_dataflow::ExchangeData; - use differential_dataflow::lattice::Lattice; - - /// Creates a Kafka source from supplied configuration information. - pub fn create_source(scope: G, addr: &str, topic: &str, group: &str) -> (Box, Stream) - where - G: Scope, - D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, - T: ExchangeData + Hash + for<'a> serde::Deserialize<'a> + Timestamp + Lattice, - R: ExchangeData + Hash + for<'a> serde::Deserialize<'a>, - { - differential_dataflow::capture::source::build(scope, |activator| { - let source = KafkaSource::new(addr, topic, group, activator); - differential_dataflow::capture::YieldingIter::new_from(Iter::::new_from(source), std::time::Duration::from_millis(10)) - }) - } - - pub fn create_sink(stream: &Stream, addr: &str, topic: &str) -> Box - where - G: Scope, - D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, - T: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a> + Timestamp + Lattice, - R: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>, - { - use std::rc::Rc; - use std::cell::RefCell; - use differential_dataflow::hashable::Hashable; - - let sink = KafkaSink::new(addr, topic); - let result = Rc::new(RefCell::new(sink)); - let sink_hash = (addr.to_string(), topic.to_string()).hashed(); - differential_dataflow::capture::sink::build( - &stream, - sink_hash, - Rc::downgrade(&result), - Rc::downgrade(&result), - ); - Box::new(result) - - } - - pub struct KafkaSource { - consumer: BaseConsumer, - } - - impl KafkaSource { - pub fn new(addr: &str, topic: &str, group: &str, activator: SyncActivator) -> Self { - let mut kafka_config = ClientConfig::new(); - // This is mostly cargo-cult'd in from `source/kafka.rs`. - kafka_config.set("bootstrap.servers", &addr.to_string()); - kafka_config - .set("enable.auto.commit", "false") - .set("auto.offset.reset", "earliest"); - - kafka_config.set("topic.metadata.refresh.interval.ms", "30000"); // 30 seconds - kafka_config.set("fetch.message.max.bytes", "134217728"); - kafka_config.set("group.id", group); - kafka_config.set("isolation.level", "read_committed"); - let activator = ActivationConsumerContext(activator); - let consumer = kafka_config.create_with_context::<_, BaseConsumer<_>>(activator).unwrap(); - use rdkafka::consumer::Consumer; - consumer.subscribe(&[topic]).unwrap(); - Self { - consumer, - } - } - } - - pub struct Iter { - pub source: KafkaSource, - phantom: std::marker::PhantomData<(D, T, R)>, - } - - impl Iter { - /// Constructs a new iterator from a bytes source. - pub fn new_from(source: KafkaSource) -> Self { - Self { - source, - phantom: std::marker::PhantomData, - } - } - } - - impl Iterator for Iter - where - D: for<'a>Deserialize<'a>, - T: for<'a>Deserialize<'a>, - R: for<'a>Deserialize<'a>, - { - type Item = differential_dataflow::capture::Message; - fn next(&mut self) -> Option { - use rdkafka::message::Message; - self.source - .consumer - .poll(std::time::Duration::from_millis(0)) - .and_then(|result| result.ok()) - .and_then(|message| { - message.payload().and_then(|message| bincode::deserialize::>(message).ok()) - }) - } - } - - /// An implementation of [`ConsumerContext`] that unparks the wrapped thread - /// when the message queue switches from nonempty to empty. - struct ActivationConsumerContext(SyncActivator); - - impl ClientContext for ActivationConsumerContext { } - - impl ActivationConsumerContext { - fn activate(&self) { - self.0.activate().unwrap(); - } - } - - impl ConsumerContext for ActivationConsumerContext { - fn message_queue_nonempty_callback(&self) { - self.activate(); - } - } - - use std::time::Duration; - use rdkafka::producer::DefaultProducerContext; - use rdkafka::producer::{BaseRecord, ThreadedProducer}; - - pub struct KafkaSink { - topic: String, - producer: ThreadedProducer, - buffer: Vec, - } - - impl KafkaSink { - pub fn new(addr: &str, topic: &str) -> Self { - let mut config = ClientConfig::new(); - config.set("bootstrap.servers", &addr); - config.set("queue.buffering.max.kbytes", &format!("{}", 16 << 20)); - config.set("queue.buffering.max.messages", &format!("{}", 10_000_000)); - config.set("queue.buffering.max.ms", &format!("{}", 10)); - let producer = config - .create_with_context::<_, ThreadedProducer<_>>(DefaultProducerContext) - .expect("creating kafka producer for kafka sinks failed"); - Self { - producer, - topic: topic.to_string(), - buffer: Vec::new(), - } - } - } - - impl Writer for KafkaSink { - fn poll(&mut self, item: &T) -> Option { - self.buffer.clear(); - bincode::serialize_into(&mut self.buffer, item).expect("Writing to a `Vec` cannot fail"); - let record = BaseRecord::<[u8], _>::to(&self.topic).payload(&self.buffer); - self.producer.send(record).err().map(|(e, _)| { - if let KafkaError::MessageProduction(RDKafkaError::QueueFull) = e { - Duration::from_secs(1) - } else { - // TODO(frank): report this error upwards so the user knows the sink is dead. - Duration::from_secs(1) - } - }) - } - fn done(&self) -> bool { - self.producer.in_flight_count() == 0 - } - } - -}