From 5b2a0fcd7b6bd904b603ac353788240083e35783 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 29 Mar 2022 14:28:20 +0200 Subject: [PATCH 1/7] Adding WIP --- src/lib.rs | 115 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 89 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2cfe107..dd76c25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,9 @@ use log::info; pub mod response; pub use response::Response; +// Correlation data for command republishing. +const REPUBLISH_CORRELATION_DATA: Property = Property::CorrelationData("REPUBLISH".as_bytes()); + // The maximum topic length of any settings path. const MAX_TOPIC_LENGTH: usize = 128; @@ -58,38 +61,42 @@ impl From> for Error { mod sm { smlang::statemachine! { transitions: { - *Init + Connected = Connected, - Connected + Subscribed = Active, + *Init + Connected / reset = Subscribing, + Subscribing + Subscribed = Idle, + Idle + PendingRepublish = Republishing, + Republishing + RepublishComplete = Active, _ + Reset = Init, } } - - pub struct Context; - - impl StateMachineContext for Context {} } type Handler = fn(&mut Context, &str, &[u8]) -> Result, Error>; +struct HandlerMeta { + handler: Handler, + republished: bool, +} + +pub type Minireq = sm::StateMachine>; + /// MQTT request/response interface. -pub struct Minireq +pub struct MinireqContext where Stack: TcpClientStack, Clock: embedded_time::Clock, { handlers: heapless::LinearMap< String, - Handler, + HandlerMeta, NUM_REQUESTS, >, mqtt: minimq::Minimq, prefix: String, - state: sm::StateMachine, } impl - Minireq + MinireqContext where Stack: TcpClientStack, Clock: embedded_time::Clock, @@ -125,10 +132,29 @@ where handlers: heapless::LinearMap::default(), mqtt, prefix, - state: sm::StateMachine::new(sm::Context), }) } +} +impl sm::StateMachineContext for + MinireqContext +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + fn reset(&mut self) { + for HandlerMeta { ref mut republished, .. } in self.handlers.values_mut() { + *republished = false; + } + } +} + +impl + Minireq +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ /// Associate a handler to be called when receiving the specified request. /// /// # Args @@ -139,8 +165,8 @@ where topic: &str, handler: Handler, ) -> Result> { - self.handlers - .insert(String::from(topic), handler) + self.context_mut().handlers + .insert(String::from(topic), HandlerMeta { handler, republished: false }) .map(|prev| prev.is_none()) .map_err(|_| Error::RegisterFailed) } @@ -153,12 +179,12 @@ where &[u8], ) -> Result, Error>, { - let Self { + let MinireqContext { handlers, mqtt, prefix, .. - } = self; + } = self.context_mut(); match mqtt.poll(|client, topic, message, properties| { let path = match topic.strip_prefix(prefix.as_str()) { @@ -178,7 +204,7 @@ where // Perform the action let response = match handlers.get(&String::from(path)) { - Some(&handler) => f(handler, path, message).unwrap_or_else(Response::error), + Some(meta) => f(meta.handler, path, message).unwrap_or_else(Response::error), None => Response::custom(-1, "Unregistered request"), }; @@ -225,13 +251,46 @@ where Err(minimq::Error::SessionReset) => { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle // it. - self.state.process_event(sm::Events::Reset).unwrap(); + self.process_event(sm::Events::Reset).unwrap(); Ok(()) } Err(other) => Err(Error::Mqtt(other)), } } + fn _handle_republish(&mut self) { + let MinireqContext { + mqtt, + handlers, + prefix, + .. + } = self.context_mut(); + + for (command_prefix, HandlerMeta { republished, .. }) in handlers.iter_mut().filter(|(_, HandlerMeta { republished, .. })| !republished ) { + + // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic + // length. + let topic: String<{2 * MAX_TOPIC_LENGTH}> = String::from(prefix.as_str()); + prefix.push_str(command_prefix).unwrap(); + + if mqtt.client + .publish( + &topic, + "TODO".as_bytes(), + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + QoS::AtMostOnce, + Retain::Retained, + &[REPUBLISH_CORRELATION_DATA], + ) + .is_err() { + break; + } + + *republished = true; + } + } + /// Poll the request/response interface. /// /// # Args @@ -248,30 +307,34 @@ where &[u8], ) -> Result, Error>, { - if !self.mqtt.client.is_connected() { + if !self.context_mut().mqtt.client.is_connected() { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle it. - self.state.process_event(sm::Events::Reset).unwrap(); + self.process_event(sm::Events::Reset).unwrap(); } - match *self.state.state() { + match *self.state() { sm::States::Init => { - if self.mqtt.client.is_connected() { + if self.context_mut().mqtt.client.is_connected() { // Note(unwrap): It's always safe to process this event in the INIT state. - self.state.process_event(sm::Events::Connected).unwrap(); + self.process_event(sm::Events::Connected).unwrap(); } } - sm::States::Connected => { + sm::States::Subscribing => { // Note(unwrap): We ensure that this storage is always sufficiently large to store // the wildcard post-fix for MQTT. let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = - String::from(self.prefix.as_str()); + String::from(self.context().prefix.as_str()); prefix.push_str("/#").unwrap(); - if self.mqtt.client.subscribe(&prefix, &[]).is_ok() { + if self.context_mut().mqtt.client.subscribe(&prefix, &[]).is_ok() { // Note(unwrap): It is always safe to process a Subscribed event in this state. - self.state.process_event(sm::Events::Subscribed).unwrap(); + self.process_event(sm::Events::Subscribed).unwrap(); } } + sm::States::Republishing => { + self._handle_republish(); + + } sm::States::Active => {} } From 7dba4be13247d21d19c4574e8340552fa4b667ec Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 29 Mar 2022 15:16:44 +0200 Subject: [PATCH 2/7] Adding command republish support --- src/lib.rs | 232 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 146 insertions(+), 86 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index dd76c25..33c5a72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,7 +24,7 @@ use minimq::{ use serde_json_core::heapless::{String, Vec}; -use log::info; +use log::{debug, info}; pub mod response; pub use response::Response; @@ -61,10 +61,14 @@ impl From> for Error { mod sm { smlang::statemachine! { transitions: { - *Init + Connected / reset = Subscribing, - Subscribing + Subscribed = Idle, - Idle + PendingRepublish = Republishing, - Republishing + RepublishComplete = Active, + *Init + Update [is_connected] / reset = Subscribing, + Subscribing + Update [subscribe] = Republishing, + Republishing + Update [republish] = Active, + + // We can always begin republishing again from the active processing state. + Active + PendingRepublish = Republishing, + + // All states can reset if the MQTT broker connection is lost. _ + Reset = Init, } } @@ -78,25 +82,16 @@ struct HandlerMeta { republished: bool, } -pub type Minireq = sm::StateMachine>; - -/// MQTT request/response interface. -pub struct MinireqContext +pub struct Minireq where Stack: TcpClientStack, Clock: embedded_time::Clock, { - handlers: heapless::LinearMap< - String, - HandlerMeta, - NUM_REQUESTS, - >, - mqtt: minimq::Minimq, - prefix: String, + machine: sm::StateMachine>, } impl - MinireqContext + Minireq where Stack: TcpClientStack, Clock: embedded_time::Clock, @@ -128,25 +123,120 @@ where let mut prefix: String = String::new(); write!(&mut prefix, "{}/command", device_prefix).map_err(|_| Error::PrefixTooLong)?; - Ok(Self { + let context = MinireqContext { handlers: heapless::LinearMap::default(), mqtt, prefix, + }; + + Ok(Self { + machine: sm::StateMachine::new(context), }) } } -impl sm::StateMachineContext for - MinireqContext +/// MQTT request/response interface. +pub struct MinireqContext< + Context, + Stack, + Clock, + const MESSAGE_SIZE: usize, + const NUM_REQUESTS: usize, +> where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + handlers: heapless::LinearMap< + String, + HandlerMeta, + NUM_REQUESTS, + >, + mqtt: minimq::Minimq, + prefix: String, +} + +impl + sm::StateMachineContext for MinireqContext where Stack: TcpClientStack, Clock: embedded_time::Clock, { + /// Reset the republish state of all of the handlers. fn reset(&mut self) { - for HandlerMeta { ref mut republished, .. } in self.handlers.values_mut() { + for HandlerMeta { + ref mut republished, + .. + } in self.handlers.values_mut() + { *republished = false; } } + + /// Guard to handle subscription to the command prefix. + /// + /// # Returns + /// Error if the command prefix has not yet been subscribed to. + fn subscribe(&mut self) -> Result<(), ()> { + // Note(unwrap): We ensure that this storage is always sufficiently large to store + // the wildcard post-fix for MQTT. + let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = String::from(self.prefix.as_str()); + prefix.push_str("/#").unwrap(); + + self.mqtt.client.subscribe(&prefix, &[]).map_err(|_| ()) + } + + /// Guard to check for an MQTT broker connection. + /// + /// # Returns + /// Ok if the MQTT broker is connected, false otherwise. + fn is_connected(&mut self) -> Result<(), ()> { + if self.mqtt.client.is_connected() { + Ok(()) + } else { + Err(()) + } + } + + /// Guard to handle republishing all of the command information. + /// + /// # Returns + /// Ok if all command information has been republished. Error if there are still more to be + /// published. + fn republish(&mut self) -> Result<(), ()> { + let MinireqContext { + mqtt, + handlers, + prefix, + .. + } = self; + + for (command_prefix, HandlerMeta { republished, .. }) in handlers + .iter_mut() + .filter(|(_, HandlerMeta { republished, .. })| !republished) + { + // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic + // length. + let mut topic: String<{ 2 * MAX_TOPIC_LENGTH + 1 }> = String::from(prefix.as_str()); + topic.push_str("/").unwrap(); + topic.push_str(command_prefix).unwrap(); + + mqtt.client + .publish( + &topic, + "TODO".as_bytes(), + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + QoS::AtMostOnce, + Retain::Retained, + &[REPUBLISH_CORRELATION_DATA], + ) + .map_err(|_| ())?; + + *republished = true; + } + + Ok(()) + } } impl @@ -165,10 +255,28 @@ where topic: &str, handler: Handler, ) -> Result> { - self.context_mut().handlers - .insert(String::from(topic), HandlerMeta { handler, republished: false }) + let added = self + .machine + .context_mut() + .handlers + .insert( + String::from(topic), + HandlerMeta { + handler, + republished: false, + }, + ) .map(|prev| prev.is_none()) - .map_err(|_| Error::RegisterFailed) + .map_err(|_| Error::RegisterFailed)?; + + // Force a republish of the newly-registered command after adding it. We ignore failures of + // event processing here since that would imply we are adding the handler before we've even + // gotten to the republish state. + self.machine + .process_event(sm::Events::PendingRepublish) + .ok(); + + Ok(added) } fn _handle_mqtt(&mut self, mut f: F) -> Result<(), Error> @@ -184,9 +292,18 @@ where mqtt, prefix, .. - } = self.context_mut(); + } = self.machine.context_mut(); match mqtt.poll(|client, topic, message, properties| { + // If the incoming message has republish correlation data, ignore it. + if properties + .iter() + .any(|&prop| prop == REPUBLISH_CORRELATION_DATA) + { + debug!("Ignoring republish data"); + return; + } + let path = match topic.strip_prefix(prefix.as_str()) { // For paths, we do not want to include the leading slash. Some(path) => { @@ -251,46 +368,13 @@ where Err(minimq::Error::SessionReset) => { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle // it. - self.process_event(sm::Events::Reset).unwrap(); + self.machine.process_event(sm::Events::Reset).unwrap(); Ok(()) } Err(other) => Err(Error::Mqtt(other)), } } - fn _handle_republish(&mut self) { - let MinireqContext { - mqtt, - handlers, - prefix, - .. - } = self.context_mut(); - - for (command_prefix, HandlerMeta { republished, .. }) in handlers.iter_mut().filter(|(_, HandlerMeta { republished, .. })| !republished ) { - - // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic - // length. - let topic: String<{2 * MAX_TOPIC_LENGTH}> = String::from(prefix.as_str()); - prefix.push_str(command_prefix).unwrap(); - - if mqtt.client - .publish( - &topic, - "TODO".as_bytes(), - // TODO: When Minimq supports more QoS levels, this should be increased to - // ensure that the client has received it at least once. - QoS::AtMostOnce, - Retain::Retained, - &[REPUBLISH_CORRELATION_DATA], - ) - .is_err() { - break; - } - - *republished = true; - } - } - /// Poll the request/response interface. /// /// # Args @@ -307,36 +391,12 @@ where &[u8], ) -> Result, Error>, { - if !self.context_mut().mqtt.client.is_connected() { + if !self.machine.context_mut().mqtt.client.is_connected() { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle it. - self.process_event(sm::Events::Reset).unwrap(); + self.machine.process_event(sm::Events::Reset).unwrap(); } - match *self.state() { - sm::States::Init => { - if self.context_mut().mqtt.client.is_connected() { - // Note(unwrap): It's always safe to process this event in the INIT state. - self.process_event(sm::Events::Connected).unwrap(); - } - } - sm::States::Subscribing => { - // Note(unwrap): We ensure that this storage is always sufficiently large to store - // the wildcard post-fix for MQTT. - let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = - String::from(self.context().prefix.as_str()); - prefix.push_str("/#").unwrap(); - - if self.context_mut().mqtt.client.subscribe(&prefix, &[]).is_ok() { - // Note(unwrap): It is always safe to process a Subscribed event in this state. - self.process_event(sm::Events::Subscribed).unwrap(); - } - } - sm::States::Republishing => { - self._handle_republish(); - - } - sm::States::Active => {} - } + self.machine.process_event(sm::Events::Update).ok(); self._handle_mqtt(f) } From 0ee6d6e541f7586f2a918b5ca94173e6c0cb79d7 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 30 Mar 2022 16:07:49 +0200 Subject: [PATCH 3/7] Reordering --- src/lib.rs | 208 ++++++++++++++++++++++++++--------------------------- 1 file changed, 104 insertions(+), 104 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index aa74b39..86825f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -137,6 +137,7 @@ struct HandlerMeta { republished: bool, } +/// MQTT request/response interface. pub struct Minireq where Stack: TcpClientStack, @@ -190,110 +191,6 @@ where } } -/// MQTT request/response interface. -pub struct MinireqContext< - Context, - Stack, - Clock, - const MESSAGE_SIZE: usize, - const NUM_REQUESTS: usize, -> where - Stack: TcpClientStack, - Clock: embedded_time::Clock, -{ - handlers: heapless::LinearMap< - String, - HandlerMeta, - NUM_REQUESTS, - >, - mqtt: minimq::Minimq, - prefix: String, -} - -impl - sm::StateMachineContext for MinireqContext -where - Stack: TcpClientStack, - Clock: embedded_time::Clock, -{ - /// Reset the republish state of all of the handlers. - fn reset(&mut self) { - for HandlerMeta { - ref mut republished, - .. - } in self.handlers.values_mut() - { - *republished = false; - } - } - - /// Guard to handle subscription to the command prefix. - /// - /// # Returns - /// Error if the command prefix has not yet been subscribed to. - fn subscribe(&mut self) -> Result<(), ()> { - // Note(unwrap): We ensure that this storage is always sufficiently large to store - // the wildcard post-fix for MQTT. - let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = String::from(self.prefix.as_str()); - prefix.push_str("/#").unwrap(); - - self.mqtt.client.subscribe(&prefix, &[]).map_err(|_| ()) - } - - /// Guard to check for an MQTT broker connection. - /// - /// # Returns - /// Ok if the MQTT broker is connected, false otherwise. - fn is_connected(&mut self) -> Result<(), ()> { - if self.mqtt.client.is_connected() { - Ok(()) - } else { - Err(()) - } - } - - /// Guard to handle republishing all of the command information. - /// - /// # Returns - /// Ok if all command information has been republished. Error if there are still more to be - /// published. - fn republish(&mut self) -> Result<(), ()> { - let MinireqContext { - mqtt, - handlers, - prefix, - .. - } = self; - - for (command_prefix, HandlerMeta { republished, .. }) in handlers - .iter_mut() - .filter(|(_, HandlerMeta { republished, .. })| !republished) - { - // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic - // length. - let mut topic: String<{ 2 * MAX_TOPIC_LENGTH + 1 }> = String::from(prefix.as_str()); - topic.push_str("/").unwrap(); - topic.push_str(command_prefix).unwrap(); - - mqtt.client - .publish( - &topic, - "TODO".as_bytes(), - // TODO: When Minimq supports more QoS levels, this should be increased to - // ensure that the client has received it at least once. - QoS::AtMostOnce, - Retain::Retained, - &[REPUBLISH_CORRELATION_DATA], - ) - .map_err(|_| ())?; - - *republished = true; - } - - Ok(()) - } -} - impl Minireq where @@ -463,3 +360,106 @@ where self._handle_mqtt(f) } } + +struct MinireqContext< + Context, + Stack, + Clock, + const MESSAGE_SIZE: usize, + const NUM_REQUESTS: usize, +> where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + handlers: heapless::LinearMap< + String, + HandlerMeta, + NUM_REQUESTS, + >, + mqtt: minimq::Minimq, + prefix: String, +} + +impl + sm::StateMachineContext for MinireqContext +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + /// Reset the republish state of all of the handlers. + fn reset(&mut self) { + for HandlerMeta { + ref mut republished, + .. + } in self.handlers.values_mut() + { + *republished = false; + } + } + + /// Guard to handle subscription to the command prefix. + /// + /// # Returns + /// Error if the command prefix has not yet been subscribed to. + fn subscribe(&mut self) -> Result<(), ()> { + // Note(unwrap): We ensure that this storage is always sufficiently large to store + // the wildcard post-fix for MQTT. + let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = String::from(self.prefix.as_str()); + prefix.push_str("/#").unwrap(); + + self.mqtt.client.subscribe(&prefix, &[]).map_err(|_| ()) + } + + /// Guard to check for an MQTT broker connection. + /// + /// # Returns + /// Ok if the MQTT broker is connected, false otherwise. + fn is_connected(&mut self) -> Result<(), ()> { + if self.mqtt.client.is_connected() { + Ok(()) + } else { + Err(()) + } + } + + /// Guard to handle republishing all of the command information. + /// + /// # Returns + /// Ok if all command information has been republished. Error if there are still more to be + /// published. + fn republish(&mut self) -> Result<(), ()> { + let MinireqContext { + mqtt, + handlers, + prefix, + .. + } = self; + + for (command_prefix, HandlerMeta { republished, .. }) in handlers + .iter_mut() + .filter(|(_, HandlerMeta { republished, .. })| !republished) + { + // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic + // length. + let mut topic: String<{ 2 * MAX_TOPIC_LENGTH + 1 }> = String::from(prefix.as_str()); + topic.push_str("/").unwrap(); + topic.push_str(command_prefix).unwrap(); + + mqtt.client + .publish( + &topic, + "TODO".as_bytes(), + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + QoS::AtMostOnce, + Retain::Retained, + &[REPUBLISH_CORRELATION_DATA], + ) + .map_err(|_| ())?; + + *republished = true; + } + + Ok(()) + } +} From dff1c7aac18538b0abd04c6293d2d6ffa7420e15 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 30 Mar 2022 16:10:57 +0200 Subject: [PATCH 4/7] Removing TODO --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 86825f8..6bc87be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -448,7 +448,7 @@ where mqtt.client .publish( &topic, - "TODO".as_bytes(), + "".as_bytes(), // TODO: When Minimq supports more QoS levels, this should be increased to // ensure that the client has received it at least once. QoS::AtMostOnce, From 24ec00185f2ec95fdb1721d85ba2748942cbfca9 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 30 Mar 2022 17:04:31 +0200 Subject: [PATCH 5/7] Fixing republish support --- src/lib.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6bc87be..6933bdf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -361,13 +361,8 @@ where } } -struct MinireqContext< - Context, - Stack, - Clock, - const MESSAGE_SIZE: usize, - const NUM_REQUESTS: usize, -> where +struct MinireqContext +where Stack: TcpClientStack, Clock: embedded_time::Clock, { @@ -448,7 +443,7 @@ where mqtt.client .publish( &topic, - "".as_bytes(), + "{}".as_bytes(), // TODO: When Minimq supports more QoS levels, this should be increased to // ensure that the client has received it at least once. QoS::AtMostOnce, From b63e6c845536b4e3eae2ba1fa0aa17f870c390d0 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 30 Mar 2022 17:05:28 +0200 Subject: [PATCH 6/7] Updating changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index af85399..915b24d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +* All registered command topics are now published upon connection with the broker or when the +command is registered. + ### Fixed * [#3](https://github.com/quartiq/minireq/issues/3) Fixed an issue where large responses would trigger an internal panic * [#7](https://github.com/quartiq/minireq/issues/7) Fixed serialization of responses so they are readable From cce02d772ce42ca81307631fb70cf35ed5d56e95 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 4 Apr 2022 11:20:16 +0200 Subject: [PATCH 7/7] Update src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Jördens --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 6933bdf..8532d4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -443,6 +443,7 @@ where mqtt.client .publish( &topic, + // Empty payload would correspond to deleting a retained message. "{}".as_bytes(), // TODO: When Minimq supports more QoS levels, this should be increased to // ensure that the client has received it at least once.