diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bd650a5..fd1f4457 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,11 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `Keys::is_empty()` -> `Keys::finalize()` * `traverse_by_key` ensures `Keys::finalize()` * `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()` -* [MQTT] path listing are done by publishing an empty payload to an internal node path - with a response topic (not to the `/list` topic anymore) -* [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` +* [miniconf_mqtt] path listing are done by publishing an empty payload to an internal node path + with a response topic (no `/list` topic anymore) +* [py/miniconf-mqtt] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` must be passed to Miniconf -* [MQTT, Python] `--list` option removed in favor of `PATH?` +* [py/miniconf-mqtt] `--list` option removed in favor of `PATH?` command ### Added @@ -32,9 +32,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `miniconf_cli`: a menu/command line interface * `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct `Transcode`/`IntoKeys`/`Keys` handling -* [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting - the empty payload to an internal node without a response topic: `PATH!` -* [MQTT] support partial listing: `PATH?` +* [miniconf_mqtt] support on-demand and partial tree dump/list by posting + the empty payload to an internal node without/with a response topic +* [py/miniconf-mqtt] support partial list (`PATH?`) and partial on-demand dump (`PATH!`) ### Removed diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 435d402a..b4890f2f 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -7,7 +7,7 @@ use std_embedded_time::StandardClock; #[derive(Clone, Default, Tree, Debug)] struct Inner { - frame_rate: u32, + a: u32, } #[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] @@ -26,13 +26,25 @@ struct Settings { #[tree(depth = 1)] inner: Inner, #[tree(depth = 1)] - amplitude: [f32; 2], + values: [f32; 2], array: [i32; 4], #[tree(depth = 1)] opt: Option, + #[tree(validate=Self::validate_four)] + four: f32, exit: bool, } +impl Settings { + fn validate_four(&mut self, new: f32) -> Result { + if new < 4.0 { + Err("Less than four") + } else { + Ok(new) + } + } +} + #[tokio::main] async fn main() { env_logger::init(); @@ -45,7 +57,7 @@ async fn main() { Stack, "dt/sinara/dual-iir/01-02-03-04-05-06", StandardClock::default(), - minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer) + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer) .keepalive_interval(60), ) .unwrap(); diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 8a2ea077..7eececd6 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -503,6 +503,7 @@ where let Some(path) = topic .strip_prefix(prefix.as_str()) .and_then(|p| p.strip_prefix("/settings")) + .map(Path::<_, SEPARATOR>::from) else { info!("Unexpected topic: {topic}"); return Changed::Unchanged; @@ -512,7 +513,7 @@ where // Get, Dump, or List // Try a Get assuming a leaf node if let Err(err) = client.publish( - DeferredPublication::new(|buf| settings.get_json(path, buf)) + DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) .topic(topic) .reply(properties) .properties(&[ResponseCode::Ok.into()]) @@ -524,24 +525,18 @@ where minimq::PubError::Serialization(miniconf::Error::Traversal( Traversal::TooShort(_depth), )) => { - // Internal node: try Dump or List + // Internal node: Dump or List (state.state() == &sm::States::Single) .then_some(()) .ok_or("Pending multipart response") .and_then(|()| Multipart::::try_from(properties)) - .and_then(|m| { - m.root(&Path::<_, '/'>::from(path)).map_err(|err| { - debug!("List/Pub root: {err}"); - "Root not found" - }) - }) .map_or_else( |err| { Self::respond(err, ResponseCode::Error, properties, client) .ok(); }, |m| { - *pending = m; + *pending = m.root(&path).unwrap(); // Note(unwrap) checked that it's TooShort but valid leaf state.process_event(sm::Events::Multipart).unwrap(); // Response comes through iter_list/iter_dump }, @@ -559,7 +554,7 @@ where } else { // Set settings - .set_json(path, payload) + .set_json_by_key(&path, payload) .map_err(|err| Self::respond(err, ResponseCode::Error, properties, client).ok()) .map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok()) .is_ok() diff --git a/miniconf_mqtt/tests/republish.rs b/miniconf_mqtt/tests/republish.rs index 3e79f3d3..42fd9dfd 100644 --- a/miniconf_mqtt/tests/republish.rs +++ b/miniconf_mqtt/tests/republish.rs @@ -1,5 +1,5 @@ use miniconf::Tree; -use minimq::{self, types::TopicFilter}; +use minimq; use std_embedded_nal::Stack; use std_embedded_time::StandardClock; @@ -19,10 +19,10 @@ async fn verify_settings() { // Construct a Minimq client to the broker for publishing requests. let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::IpBroker> = minimq::Minimq::new( + let mut mqtt = minimq::Minimq::new( Stack, StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer) + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer) .client_id("tester") .unwrap() .keepalive_interval(60), @@ -37,20 +37,25 @@ async fn verify_settings() { // Subscribe to the settings topic. mqtt.client() - .subscribe(&[TopicFilter::new("republish/device/settings/#")], &[]) + .subscribe( + &[minimq::types::TopicFilter::new( + "republish/device/settings/#", + )], + &[], + ) .unwrap(); // Make sure the device republished all available settings. let mut received_settings = std::collections::HashMap::from([ - ("republish/device/settings/data".to_string(), 0), - ("republish/device/settings/more/inner".to_string(), 0), + ("republish/device/settings/data", 0), + ("republish/device/settings/more/inner", 0), ]); for _ in 0..300 { // 3 seconds mqtt.poll(|_, topic, value, _properties| { - log::info!("{}: {:?}", &topic, value); - let element = received_settings.get_mut(&topic.to_string()).unwrap(); + log::info!("{}: {:?}", topic, value); + let element = received_settings.get_mut(topic).unwrap(); *element += 1; }) .unwrap(); @@ -70,27 +75,24 @@ async fn verify_settings() { async fn main() { env_logger::init(); - // Spawn a task to send MQTT messages. - let task = tokio::task::spawn(async move { verify_settings().await }); + let task = tokio::task::spawn(verify_settings()); let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - // Construct a settings configuration interface. - let mut interface: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> = - miniconf_mqtt::MqttClient::new( - Stack, - "republish/device", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60), - ) - .unwrap(); + let mut interface = miniconf_mqtt::MqttClient::new( + Stack, + "republish/device", + StandardClock::default(), + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer) + .keepalive_interval(60), + ) + .unwrap(); let mut settings = Settings::default(); - // Poll the client for 5 seconds. This should be enough time for the miniconf client to publish - // all settings values. for _ in 0..300 { + // 3 s > REPUBLISH_TIMEOUT_SECONDS // The interface should never indicate a settings update during the republish process. assert!(!interface.update(&mut settings).unwrap()); tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; diff --git a/miniconf_mqtt/tests/validation_failure.rs b/miniconf_mqtt/tests/validation_failure.rs deleted file mode 100644 index 35b77166..00000000 --- a/miniconf_mqtt/tests/validation_failure.rs +++ /dev/null @@ -1,112 +0,0 @@ -use miniconf::{Deserialize, Tree}; -use std_embedded_nal::Stack; -use std_embedded_time::StandardClock; - -const RESPONSE_TOPIC: &str = "validation_failure/device/response"; - -#[derive(Clone, Debug, Default, Tree)] -struct Settings { - #[tree(validate=Self::validate)] - error: bool, - exit: bool, -} -impl Settings { - fn validate(&mut self, new: bool) -> Result { - if new { - self.exit = true; - Err("Should exit") - } else { - self.error = new; - Ok(new) - } - } -} - -#[derive(Deserialize)] -struct Response { - code: u8, - _message: heapless::String<256>, -} - -async fn client_task() { - // Construct a Minimq client to the broker for publishing requests. - let mut buffer = [0u8; 1024]; - let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::IpBroker> = minimq::Minimq::new( - Stack, - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer), - ); - - // Wait for the broker connection - while !mqtt.client().is_connected() { - mqtt.poll(|_client, _topic, _message, _properties| {}) - .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - - let topic_filter = minimq::types::TopicFilter::new(RESPONSE_TOPIC); - mqtt.client().subscribe(&[topic_filter], &[]).unwrap(); - - // Wait the other device to connect. - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - // Configure the error variable to trigger an internal validation failure. - let properties = [minimq::Property::ResponseTopic(minimq::types::Utf8String( - RESPONSE_TOPIC, - ))]; - - log::info!("Publishing error setting"); - mqtt.client() - .publish( - minimq::Publication::new(b"true") - .topic("validation_failure/device/settings/error") - .properties(&properties) - .finish() - .unwrap(), - ) - .unwrap(); - - // Wait until we get a response to the request. - while mqtt - .poll(|_client, _topic, message, _properties| { - let data: Response = serde_json_core::from_slice(message).unwrap().0; - assert!(data.code != 0); - }) - .unwrap() - .is_none() - { - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } -} - -#[tokio::test] -async fn main() { - env_logger::init(); - - // Spawn a task to send MQTT messages. - tokio::task::spawn(async move { client_task().await }); - - // Construct a settings configuration interface. - let mut buffer = [0u8; 1024]; - let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - let mut interface: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 1> = - miniconf_mqtt::MqttClient::new( - Stack, - "validation_failure/device", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60), - ) - .unwrap(); - - let mut settings = Settings::default(); - - while !settings.exit { - interface.update(&mut settings).unwrap(); - - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - } - - // Check that the error setting did not stick. - assert!(!settings.error); -} diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index f34a893e..bb695983 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -30,7 +30,7 @@ def main(): epilog="""Examples: %(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' %(prog)s -d dt/sinara/dual-iir/+ '/afe/0' # GET -%(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET +%(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G10"' # SET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR %(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET %(prog)s -d dt/sinara/dual-iir/+ '/afe!' # DUMP diff --git a/py/test.sh b/py/test.sh index 473a38e0..2de71a70 100644 --- a/py/test.sh +++ b/py/test.sh @@ -2,6 +2,7 @@ set -e set -x +trap 'jobs -p | xargs -r kill' EXIT python -m venv .venv . .venv/bin/activate @@ -11,12 +12,17 @@ cargo build -p miniconf_mqtt --example mqtt cargo run -p miniconf_mqtt --example mqtt & sleep 3 # > REPUBLISH_TIMEOUT_SECONDS +MC="python -m miniconf -b localhost -d dt/sinara/dual-iir/+" + python -m miniconf -b localhost dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0' # GET -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe!' # DUMP -sleep 1 # dump is asynchronous +# GET SET CLEAR LIST DUMP +$MC '/afe/0' '/afe/0="G10"' '/afe/0=' '/afe?' '?' '/afe!' +sleep 1 # DUMP is asynchronous + +$MC '/four=5' +set +e +$MC '/four=2' +test $? -ne 1 && exit 1 +set -e -python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/exit=true' +$MC '/exit=true'