Skip to content

Commit

Permalink
Merge pull request #221 from quartiq/mqtt-rs-rework
Browse files Browse the repository at this point in the history
follow up changes to mqtt rework
  • Loading branch information
jordens authored Jul 9, 2024
2 parents d6911c8 + 1b90687 commit a7f86fe
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 161 deletions.
14 changes: 7 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `Keys::is_empty()` -> `Keys::finalize()`
* `traverse_by_key` ensures `Keys::finalize()`
* `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()`
* [MQTT] path listing are done by publishing an empty payload to an internal node path
with a response topic (not to the `/list` topic anymore)
* [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client`
* [miniconf_mqtt] path listing are done by publishing an empty payload to an internal node path
with a response topic (no `/list` topic anymore)
* [py/miniconf-mqtt] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client`
must be passed to Miniconf
* [MQTT, Python] `--list` option removed in favor of `PATH?`
* [py/miniconf-mqtt] `--list` option removed in favor of `PATH?` command

### Added

Expand All @@ -32,9 +32,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `miniconf_cli`: a menu/command line interface
* `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct
`Transcode`/`IntoKeys`/`Keys` handling
* [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting
the empty payload to an internal node without a response topic: `PATH!`
* [MQTT] support partial listing: `PATH?`
* [miniconf_mqtt] support on-demand and partial tree dump/list by posting
the empty payload to an internal node without/with a response topic
* [py/miniconf-mqtt] support partial list (`PATH?`) and partial on-demand dump (`PATH!`)

### Removed

Expand Down
18 changes: 15 additions & 3 deletions miniconf_mqtt/examples/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std_embedded_time::StandardClock;

#[derive(Clone, Default, Tree, Debug)]
struct Inner {
frame_rate: u32,
a: u32,
}

#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
Expand All @@ -26,13 +26,25 @@ struct Settings {
#[tree(depth = 1)]
inner: Inner,
#[tree(depth = 1)]
amplitude: [f32; 2],
values: [f32; 2],
array: [i32; 4],
#[tree(depth = 1)]
opt: Option<i32>,
#[tree(validate=Self::validate_four)]
four: f32,
exit: bool,
}

impl Settings {
fn validate_four(&mut self, new: f32) -> Result<f32, &'static str> {
if new < 4.0 {
Err("Less than four")
} else {
Ok(new)
}
}
}

#[tokio::main]
async fn main() {
env_logger::init();
Expand All @@ -45,7 +57,7 @@ async fn main() {
Stack,
"dt/sinara/dual-iir/01-02-03-04-05-06",
StandardClock::default(),
minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer)
minimq::ConfigBuilder::<minimq::broker::IpBroker>::new(localhost.into(), &mut buffer)
.keepalive_interval(60),
)
.unwrap();
Expand Down
15 changes: 5 additions & 10 deletions miniconf_mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ where
let Some(path) = topic
.strip_prefix(prefix.as_str())
.and_then(|p| p.strip_prefix("/settings"))
.map(Path::<_, SEPARATOR>::from)
else {
info!("Unexpected topic: {topic}");
return Changed::Unchanged;
Expand All @@ -512,7 +513,7 @@ where
// Get, Dump, or List
// Try a Get assuming a leaf node
if let Err(err) = client.publish(
DeferredPublication::new(|buf| settings.get_json(path, buf))
DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf))
.topic(topic)
.reply(properties)
.properties(&[ResponseCode::Ok.into()])
Expand All @@ -524,24 +525,18 @@ where
minimq::PubError::Serialization(miniconf::Error::Traversal(
Traversal::TooShort(_depth),
)) => {
// Internal node: try Dump or List
// Internal node: Dump or List
(state.state() == &sm::States::Single)
.then_some(())
.ok_or("Pending multipart response")
.and_then(|()| Multipart::<Settings, Y>::try_from(properties))
.and_then(|m| {
m.root(&Path::<_, '/'>::from(path)).map_err(|err| {
debug!("List/Pub root: {err}");
"Root not found"
})
})
.map_or_else(
|err| {
Self::respond(err, ResponseCode::Error, properties, client)
.ok();
},
|m| {
*pending = m;
*pending = m.root(&path).unwrap(); // Note(unwrap) checked that it's TooShort but valid leaf
state.process_event(sm::Events::Multipart).unwrap();
// Response comes through iter_list/iter_dump
},
Expand All @@ -559,7 +554,7 @@ where
} else {
// Set
settings
.set_json(path, payload)
.set_json_by_key(&path, payload)
.map_err(|err| Self::respond(err, ResponseCode::Error, properties, client).ok())
.map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok())
.is_ok()
Expand Down
44 changes: 23 additions & 21 deletions miniconf_mqtt/tests/republish.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use miniconf::Tree;
use minimq::{self, types::TopicFilter};
use minimq;
use std_embedded_nal::Stack;
use std_embedded_time::StandardClock;

Expand All @@ -19,10 +19,10 @@ async fn verify_settings() {
// Construct a Minimq client to the broker for publishing requests.
let mut buffer = [0u8; 1024];
let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();
let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::IpBroker> = minimq::Minimq::new(
let mut mqtt = minimq::Minimq::new(
Stack,
StandardClock::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer)
minimq::ConfigBuilder::<minimq::broker::IpBroker>::new(localhost.into(), &mut buffer)
.client_id("tester")
.unwrap()
.keepalive_interval(60),
Expand All @@ -37,20 +37,25 @@ async fn verify_settings() {

// Subscribe to the settings topic.
mqtt.client()
.subscribe(&[TopicFilter::new("republish/device/settings/#")], &[])
.subscribe(
&[minimq::types::TopicFilter::new(
"republish/device/settings/#",
)],
&[],
)
.unwrap();

// Make sure the device republished all available settings.
let mut received_settings = std::collections::HashMap::from([
("republish/device/settings/data".to_string(), 0),
("republish/device/settings/more/inner".to_string(), 0),
("republish/device/settings/data", 0),
("republish/device/settings/more/inner", 0),
]);

for _ in 0..300 {
// 3 seconds
mqtt.poll(|_, topic, value, _properties| {
log::info!("{}: {:?}", &topic, value);
let element = received_settings.get_mut(&topic.to_string()).unwrap();
log::info!("{}: {:?}", topic, value);
let element = received_settings.get_mut(topic).unwrap();
*element += 1;
})
.unwrap();
Expand All @@ -70,27 +75,24 @@ async fn verify_settings() {
async fn main() {
env_logger::init();

// Spawn a task to send MQTT messages.
let task = tokio::task::spawn(async move { verify_settings().await });
let task = tokio::task::spawn(verify_settings());

let mut buffer = [0u8; 1024];
let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();

// Construct a settings configuration interface.
let mut interface: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> =
miniconf_mqtt::MqttClient::new(
Stack,
"republish/device",
StandardClock::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();
let mut interface = miniconf_mqtt::MqttClient::new(
Stack,
"republish/device",
StandardClock::default(),
minimq::ConfigBuilder::<minimq::broker::IpBroker>::new(localhost.into(), &mut buffer)
.keepalive_interval(60),
)
.unwrap();

let mut settings = Settings::default();

// Poll the client for 5 seconds. This should be enough time for the miniconf client to publish
// all settings values.
for _ in 0..300 {
// 3 s > REPUBLISH_TIMEOUT_SECONDS
// The interface should never indicate a settings update during the republish process.
assert!(!interface.update(&mut settings).unwrap());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
Expand Down
112 changes: 0 additions & 112 deletions miniconf_mqtt/tests/validation_failure.rs

This file was deleted.

2 changes: 1 addition & 1 deletion py/miniconf-mqtt/miniconf/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def main():
epilog="""Examples:
%(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"'
%(prog)s -d dt/sinara/dual-iir/+ '/afe/0' # GET
%(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET
%(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G10"' # SET
%(prog)s -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR
%(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET
%(prog)s -d dt/sinara/dual-iir/+ '/afe!' # DUMP
Expand Down
20 changes: 13 additions & 7 deletions py/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

set -e
set -x
trap 'jobs -p | xargs -r kill' EXIT

python -m venv .venv
. .venv/bin/activate
Expand All @@ -11,12 +12,17 @@ cargo build -p miniconf_mqtt --example mqtt
cargo run -p miniconf_mqtt --example mqtt &
sleep 3 # > REPUBLISH_TIMEOUT_SECONDS

MC="python -m miniconf -b localhost -d dt/sinara/dual-iir/+"

python -m miniconf -b localhost dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"'
python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0' # GET
python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET
python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR
python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET
python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe!' # DUMP
sleep 1 # dump is asynchronous
# GET SET CLEAR LIST DUMP
$MC '/afe/0' '/afe/0="G10"' '/afe/0=' '/afe?' '?' '/afe!'
sleep 1 # DUMP is asynchronous

$MC '/four=5'
set +e
$MC '/four=2'
test $? -ne 1 && exit 1
set -e

python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/exit=true'
$MC '/exit=true'

0 comments on commit a7f86fe

Please sign in to comment.