diff --git a/CHANGELOG.md b/CHANGELOG.md index af85399..915b24d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +* All registered command topics are now published upon connection with the broker or when the +command is registered. + ### Fixed * [#3](https://github.com/quartiq/minireq/issues/3) Fixed an issue where large responses would trigger an internal panic * [#7](https://github.com/quartiq/minireq/issues/7) Fixed serialization of responses so they are readable diff --git a/src/lib.rs b/src/lib.rs index 82c5225..8532d4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,11 +79,14 @@ use minimq::{ use serde_json_core::heapless::{String, Vec}; -use log::{info, warn}; +use log::{debug, info, warn}; pub mod response; pub use response::Response; +// Correlation data for command republishing. +const REPUBLISH_CORRELATION_DATA: Property = Property::CorrelationData("REPUBLISH".as_bytes()); + // The maximum topic length of any settings path. const MAX_TOPIC_LENGTH: usize = 128; @@ -113,34 +116,34 @@ impl From> for Error { mod sm { smlang::statemachine! { transitions: { - *Init + Connected = Connected, - Connected + Subscribed = Active, + *Init + Update [is_connected] / reset = Subscribing, + Subscribing + Update [subscribe] = Republishing, + Republishing + Update [republish] = Active, + + // We can always begin republishing again from the active processing state. + Active + PendingRepublish = Republishing, + + // All states can reset if the MQTT broker connection is lost. _ + Reset = Init, } } - - pub struct Context; - - impl StateMachineContext for Context {} } type Handler = fn(&mut Context, &str, &[u8]) -> Result, Error>; +struct HandlerMeta { + handler: Handler, + republished: bool, +} + /// MQTT request/response interface. pub struct Minireq where Stack: TcpClientStack, Clock: embedded_time::Clock, { - handlers: heapless::LinearMap< - String, - Handler, - NUM_REQUESTS, - >, - mqtt: minimq::Minimq, - prefix: String, - state: sm::StateMachine, + machine: sm::StateMachine>, } impl @@ -176,14 +179,24 @@ where let mut prefix: String = String::new(); write!(&mut prefix, "{}/command", device_prefix).map_err(|_| Error::PrefixTooLong)?; - Ok(Self { + let context = MinireqContext { handlers: heapless::LinearMap::default(), mqtt, prefix, - state: sm::StateMachine::new(sm::Context), + }; + + Ok(Self { + machine: sm::StateMachine::new(context), }) } +} +impl + Minireq +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ /// Associate a handler to be called when receiving the specified request. /// /// # Args @@ -194,10 +207,28 @@ where topic: &str, handler: Handler, ) -> Result> { - self.handlers - .insert(String::from(topic), handler) + let added = self + .machine + .context_mut() + .handlers + .insert( + String::from(topic), + HandlerMeta { + handler, + republished: false, + }, + ) .map(|prev| prev.is_none()) - .map_err(|_| Error::RegisterFailed) + .map_err(|_| Error::RegisterFailed)?; + + // Force a republish of the newly-registered command after adding it. We ignore failures of + // event processing here since that would imply we are adding the handler before we've even + // gotten to the republish state. + self.machine + .process_event(sm::Events::PendingRepublish) + .ok(); + + Ok(added) } fn _handle_mqtt(&mut self, mut f: F) -> Result<(), Error> @@ -208,14 +239,23 @@ where &[u8], ) -> Result, Error>, { - let Self { + let MinireqContext { handlers, mqtt, prefix, .. - } = self; + } = self.machine.context_mut(); match mqtt.poll(|client, topic, message, properties| { + // If the incoming message has republish correlation data, ignore it. + if properties + .iter() + .any(|&prop| prop == REPUBLISH_CORRELATION_DATA) + { + debug!("Ignoring republish data"); + return; + } + let path = match topic.strip_prefix(prefix.as_str()) { // For paths, we do not want to include the leading slash. Some(path) => { @@ -233,7 +273,7 @@ where // Perform the action let response = match handlers.get(&String::from(path)) { - Some(&handler) => f(handler, path, message).unwrap_or_else(Response::error), + Some(meta) => f(meta.handler, path, message).unwrap_or_else(Response::error), None => Response::custom(-1, "Unregistered request"), }; @@ -287,7 +327,7 @@ where Err(minimq::Error::SessionReset) => { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle // it. - self.state.process_event(sm::Events::Reset).unwrap(); + self.machine.process_event(sm::Events::Reset).unwrap(); Ok(()) } Err(other) => Err(Error::Mqtt(other)), @@ -310,33 +350,112 @@ where &[u8], ) -> Result, Error>, { - if !self.mqtt.client.is_connected() { + if !self.machine.context_mut().mqtt.client.is_connected() { // Note(unwrap): It's always safe to unwrap the reset event. All states must handle it. - self.state.process_event(sm::Events::Reset).unwrap(); + self.machine.process_event(sm::Events::Reset).unwrap(); } - match *self.state.state() { - sm::States::Init => { - if self.mqtt.client.is_connected() { - // Note(unwrap): It's always safe to process this event in the INIT state. - self.state.process_event(sm::Events::Connected).unwrap(); - } - } - sm::States::Connected => { - // Note(unwrap): We ensure that this storage is always sufficiently large to store - // the wildcard post-fix for MQTT. - let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = - String::from(self.prefix.as_str()); - prefix.push_str("/#").unwrap(); - - if self.mqtt.client.subscribe(&prefix, &[]).is_ok() { - // Note(unwrap): It is always safe to process a Subscribed event in this state. - self.state.process_event(sm::Events::Subscribed).unwrap(); - } - } - sm::States::Active => {} - } + self.machine.process_event(sm::Events::Update).ok(); self._handle_mqtt(f) } } + +struct MinireqContext +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + handlers: heapless::LinearMap< + String, + HandlerMeta, + NUM_REQUESTS, + >, + mqtt: minimq::Minimq, + prefix: String, +} + +impl + sm::StateMachineContext for MinireqContext +where + Stack: TcpClientStack, + Clock: embedded_time::Clock, +{ + /// Reset the republish state of all of the handlers. + fn reset(&mut self) { + for HandlerMeta { + ref mut republished, + .. + } in self.handlers.values_mut() + { + *republished = false; + } + } + + /// Guard to handle subscription to the command prefix. + /// + /// # Returns + /// Error if the command prefix has not yet been subscribed to. + fn subscribe(&mut self) -> Result<(), ()> { + // Note(unwrap): We ensure that this storage is always sufficiently large to store + // the wildcard post-fix for MQTT. + let mut prefix: String<{ MAX_TOPIC_LENGTH + 2 }> = String::from(self.prefix.as_str()); + prefix.push_str("/#").unwrap(); + + self.mqtt.client.subscribe(&prefix, &[]).map_err(|_| ()) + } + + /// Guard to check for an MQTT broker connection. + /// + /// # Returns + /// Ok if the MQTT broker is connected, false otherwise. + fn is_connected(&mut self) -> Result<(), ()> { + if self.mqtt.client.is_connected() { + Ok(()) + } else { + Err(()) + } + } + + /// Guard to handle republishing all of the command information. + /// + /// # Returns + /// Ok if all command information has been republished. Error if there are still more to be + /// published. + fn republish(&mut self) -> Result<(), ()> { + let MinireqContext { + mqtt, + handlers, + prefix, + .. + } = self; + + for (command_prefix, HandlerMeta { republished, .. }) in handlers + .iter_mut() + .filter(|(_, HandlerMeta { republished, .. })| !republished) + { + // Note(unwrap): The unwrap cannot fail because of restrictions on the max topic + // length. + let mut topic: String<{ 2 * MAX_TOPIC_LENGTH + 1 }> = String::from(prefix.as_str()); + topic.push_str("/").unwrap(); + topic.push_str(command_prefix).unwrap(); + + mqtt.client + .publish( + &topic, + // Empty payload would correspond to deleting a retained message. + "{}".as_bytes(), + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + QoS::AtMostOnce, + Retain::Retained, + &[REPUBLISH_CORRELATION_DATA], + ) + .map_err(|_| ())?; + + *republished = true; + } + + Ok(()) + } +}