From 3748aab09d295037b6264f263e92e4764cecadd8 Mon Sep 17 00:00:00 2001 From: Simonas Kazlauskas Date: Thu, 18 Mar 2021 19:41:16 +0000 Subject: [PATCH] Make Topic a newtype (#29) Turns out having anything with any sort of lifetime in stream/sink chains will trigger nasty issues such as https://github.com/rust-lang/rust/issues/79648 which is pretty difficult to figure out how to work around. Making `Topic` a newtype erases the lifetime from the type, making it significantly easier to work with in thsoe contexts. --- examples/publish.rs | 4 ++-- src/lib.rs | 9 +++------ src/publish/mod.rs | 2 +- src/publish/publishers/mock.rs | 3 ++- src/publish/publishers/null.rs | 2 +- src/publish/sink.rs | 18 +++++++++--------- src/tests.rs | 4 ++-- src/topic.rs | 21 +++++++++++++++++++++ 8 files changed, 41 insertions(+), 22 deletions(-) create mode 100644 src/topic.rs diff --git a/examples/publish.rs b/examples/publish.rs index b0b14d5..6cfd15e 100644 --- a/examples/publish.rs +++ b/examples/publish.rs @@ -15,8 +15,8 @@ struct UserCreatedMessage { impl<'a> EncodableMessage for &'a UserCreatedMessage { type Error = hedwig::validators::JsonSchemaValidatorError; type Validator = hedwig::validators::JsonSchemaValidator; - fn topic(&self) -> &'static str { - "user.created" + fn topic(&self) -> hedwig::Topic { + "user.created".into() } fn encode(self, validator: &Self::Validator) -> Result { Ok(validator diff --git a/src/lib.rs b/src/lib.rs index 945efd6..8a67d52 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,7 +58,7 @@ //! impl<'a> hedwig::publish::EncodableMessage for &'a UserCreatedMessage { //! type Error = hedwig::validators::JsonSchemaValidatorError; //! type Validator = hedwig::validators::JsonSchemaValidator; -//! fn topic(&self) -> hedwig::Topic { "user.created" } +//! fn topic(&self) -> hedwig::Topic { "user.created".into() } //! fn encode(self, validator: &Self::Validator) //! -> Result { //! validator.validate( @@ -106,20 +106,17 @@ #![cfg_attr(docsrs, feature(doc_cfg))] use std::{collections::BTreeMap, time::SystemTime}; - +pub use topic::Topic; use uuid::Uuid; #[cfg(feature = "publish")] #[cfg_attr(docsrs, doc(cfg(feature = "publish")))] pub mod publish; - #[cfg(test)] mod tests; +mod topic; pub mod validators; -/// A message queue topic name to which messages can be published -pub type Topic = &'static str; - /// All errors that may be returned when operating top level APIs. #[derive(Debug, thiserror::Error)] #[non_exhaustive] diff --git a/src/publish/mod.rs b/src/publish/mod.rs index f943731..8b68124 100644 --- a/src/publish/mod.rs +++ b/src/publish/mod.rs @@ -185,7 +185,7 @@ where None => None, Some(stream_item) => Some(( stream_item, - this.topic, + *this.topic, this.messages .next() .expect("should be as many messages as publishes"), diff --git a/src/publish/publishers/mock.rs b/src/publish/publishers/mock.rs index 07c386c..09f7bf3 100644 --- a/src/publish/publishers/mock.rs +++ b/src/publish/publishers/mock.rs @@ -45,7 +45,8 @@ impl MockPublisher { /// be published, was indeed published /// /// Panics if the message was not published. - pub fn assert_message_published(&self, topic: Topic, uuid: &Uuid) { + pub fn assert_message_published>(&self, topic: T, uuid: &Uuid) { + let topic = topic.into(); { let lock = self.0.lock().expect("this mutex cannot get poisoned"); for (mt, msg) in &lock[..] { diff --git a/src/publish/publishers/null.rs b/src/publish/publishers/null.rs index 6721c44..00fcf53 100644 --- a/src/publish/publishers/null.rs +++ b/src/publish/publishers/null.rs @@ -20,7 +20,7 @@ impl Publisher for NullPublisher { type MessageError = std::convert::Infallible; type PublishStream = NullPublishStream; - fn publish<'a, I>(&self, _: &'static str, messages: I) -> Self::PublishStream + fn publish<'a, I>(&self, _: crate::Topic, messages: I) -> Self::PublishStream where I: Iterator + ExactSizeIterator, { diff --git a/src/publish/sink.rs b/src/publish/sink.rs index d4da7d0..575ebd4 100644 --- a/src/publish/sink.rs +++ b/src/publish/sink.rs @@ -272,7 +272,7 @@ mod test { type Validator = TestValidator; fn topic(&self) -> Topic { - "test_topic" + "test_topic".into() } fn encode(self, _: &Self::Validator) -> Result { @@ -382,7 +382,7 @@ mod test { assert_eq!(1, sink.sink.poll_close_called); assert_eq!( vec![( - "test_topic", + "test_topic".into(), TestMessage("foo").encode(&TestValidator).unwrap() )], sink.sink.elements @@ -517,14 +517,14 @@ mod test { assert_eq!( Ok(()), sink.as_mut() - .start_send(("test_topic", test_validated_message("foo"))) + .start_send(("test_topic".into(), test_validated_message("foo"))) ); } /// The publisher should start flushing when the batch size has been exceeded #[test] fn batching_batches() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 3; let publisher = MockPublisher::new(); let sink = publisher_sink(publisher, batch_size); @@ -565,7 +565,7 @@ mod test { /// The publisher should flush buffered elements when asked to close #[test] fn close_flushes_batch() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 3; let publisher = MockPublisher::new(); let sink = publisher_sink(publisher, batch_size); @@ -599,7 +599,7 @@ mod test { /// The publisher should flush buffered elements when asked to flush #[test] fn flush_incomplete_batch() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 3; let publisher = MockPublisher::new(); let sink = publisher_sink(publisher, batch_size); @@ -635,7 +635,7 @@ mod test { #[test] #[should_panic] fn panic_at_buffer_full_without_ready_check() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 1; let publisher = MockPublisher::new(); let sink = publisher_sink(publisher, batch_size); @@ -656,7 +656,7 @@ mod test { /// Step through flushing a non-full batch and see that yield points are respected #[test] fn partial_flushing_check() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 3; let (publisher, command) = ControlledPublisher::new(); let sink = publisher_sink(publisher, batch_size); @@ -724,7 +724,7 @@ mod test { /// A failed message can be re-sent to the sink and eventually succeed #[test] fn flushing_error_retry() { - let topic = "test_topic"; + let topic = "test_topic".into(); let batch_size = 5; let (publisher, command) = ControlledPublisher::new(); let sink = publisher_sink(publisher, batch_size); diff --git a/src/tests.rs b/src/tests.rs index cf62bc7..a42c3ea 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -71,8 +71,8 @@ impl<'a, I: serde::Serialize> EncodableMessage for &'a JsonUserCreatedMessage type Error = validators::JsonSchemaValidatorError; type Validator = validators::JsonSchemaValidator; - fn topic(&self) -> &'static str { - "user.created" + fn topic(&self) -> crate::Topic { + "user.created".into() } fn encode(self, validator: &Self::Validator) -> Result { validator.validate( diff --git a/src/topic.rs b/src/topic.rs new file mode 100644 index 0000000..f9c0f69 --- /dev/null +++ b/src/topic.rs @@ -0,0 +1,21 @@ +/// A message queue topic name to which messages can be published +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)] +pub struct Topic(&'static str); + +impl std::fmt::Display for Topic { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + std::fmt::Display::fmt(self.0, f) + } +} + +impl From<&'static str> for Topic { + fn from(s: &'static str) -> Topic { + Topic(s) + } +} + +impl From for &'static str { + fn from(s: Topic) -> &'static str { + s.0 + } +}