Skip to content

Commit

Permalink
Merge pull request #220 from quartiq/mqtt-rs-rework
Browse files Browse the repository at this point in the history
mqtt rs rework
  • Loading branch information
ryan-summers authored Jul 9, 2024
2 parents 52fe3c2 + 7b12863 commit d6911c8
Show file tree
Hide file tree
Showing 14 changed files with 562 additions and 542 deletions.
15 changes: 3 additions & 12 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,7 @@ jobs:
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ matrix.toolchain }}

- run: cargo check --verbose

- run: cargo build --no-default-features
- run: cargo build
- run: cargo build --release
Expand All @@ -74,19 +72,15 @@ jobs:
args:
- ""
- --no-default-features

steps:
- uses: actions/checkout@v4

- name: Start Mosquitto
- name: Start Broker
run: |
sudo apt-get install mosquitto
sudo service mosquitto start
- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{matrix.toolchain}}

- run: cargo test ${{matrix.args }}

embedded:
Expand Down Expand Up @@ -119,12 +113,9 @@ jobs:
- mqtt
steps:
- uses: actions/checkout@v4

- name: Start Mosquitto
- name: Start Broker
run: |
sudo apt-get install mosquitto
sudo service mosquitto start
- uses: dtolnay/rust-toolchain@stable

- run: cargo run --example ${{matrix.example}}
- run: sh py/test.sh
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `Keys::is_empty()` -> `Keys::finalize()`
* `traverse_by_key` ensures `Keys::finalize()`
* `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()`
* [MQTT] path listing are done by publishing an empty payload to an internal node path
with a response topic (not to the `/list` topic anymore)
* [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client`
must be passed to Miniconf
* [MQTT, Python] `--list` option removed in favor of `PATH?`

### Added

Expand All @@ -27,16 +32,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* `miniconf_cli`: a menu/command line interface
* `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct
`Transcode`/`IntoKeys`/`Keys` handling
* [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting
the empty payload to an internal node without a response topic: `PATH!`
* [MQTT] support partial listing: `PATH?`

### Removed

* `digits()` gone in favor of using `usize::checked_ilog10()`
* `rust_version` and `MSRV`: these crates aim to support the latest stable version of rust

### Changed
* [breaking-python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client`
must be passed to Miniconf

## [0.11.0](https://github.com/quartiq/miniconf/compare/v0.10.1...v0.11.0) - 2024-04-30

### Changed
Expand Down
15 changes: 7 additions & 8 deletions miniconf/src/jsonpath.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::{
fmt::Write,
ops::{Deref, DerefMut},
ops::{ControlFlow::*, Deref, DerefMut},
};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -57,17 +57,16 @@ impl<'a> Iterator for JsonPathIter<'a> {
type Item = &'a str;

fn next(&mut self) -> Option<Self::Item> {
// Reappropriation of `Result` as `Either`
for (open, close) in [
(".'", Ok("'")), // "'" inclusive
(".", Err(&['.', '['][..])), // '.' or '[' exclusive
("['", Ok("']")), // "']" inclusive
("[", Ok("]")), // "]" inclusive
(".'", Continue("'")), // "'" inclusive
(".", Break(&['.', '['][..])), // '.' or '[' exclusive
("['", Continue("']")), // "']" inclusive
("[", Continue("]")), // "]" inclusive
] {
if let Some(rest) = self.0.strip_prefix(open) {
let (end, sep) = match close {
Err(close) => (rest.find(close).unwrap_or(rest.len()), 0),
Ok(close) => (rest.find(close)?, close.len()),
Break(close) => (rest.find(close).unwrap_or(rest.len()), 0),
Continue(close) => (rest.find(close)?, close.len()),
};
let (next, rest) = rest.split_at(end);
self.0 = &rest[sep..];
Expand Down
2 changes: 1 addition & 1 deletion miniconf_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ miniconf = { version = "0.11.0", path = "../miniconf", features = [
] }
postcard = "1.0.8"
serde-json-core = "0.5.1"
yafnv = "1.0.0"
yafnv = "2.0.0"

[features]
std = []
Expand Down
4 changes: 2 additions & 2 deletions miniconf_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ where
(val as _, rest)
}
};
let check: u32 = yafnv::fnv1a(val.iter().copied());
let check: u32 = yafnv::fnv1a(val);
awrite(&mut write, " ".as_bytes()).await?;
let rl = rest.len();
let mut sl = &mut rest[..];
Expand All @@ -234,7 +234,7 @@ where
Err(miniconf::Error::Traversal(Traversal::Absent(_depth))) => "absent".as_bytes(),
ret => &buf[..ret?],
};
if yafnv::fnv1a::<u32, _>(def.iter().copied()) == check {
if yafnv::fnv1a::<u32, _>(def) == check {
awrite(&mut write, " (default)\n".as_bytes()).await?;
} else {
awrite(&mut write, " (default: ".as_bytes()).await?;
Expand Down
2 changes: 1 addition & 1 deletion miniconf_mqtt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ std = []
[dependencies]
miniconf = { version = "0.11", features = ["json-core"], default-features = false, path = "../miniconf" }
minimq = "0.9.0"
smlang = "0.6"
smlang = "0.7"
embedded-io = "0.6"
log = "0.4"
heapless = "0.8"
Expand Down
14 changes: 14 additions & 0 deletions miniconf_mqtt/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# `miniconf` MQTT Client

This package contains a MQTT client exposing a [`miniconf`](https://crates.io/crates/miniconf) interface via MQTT using [`minimq`](https://crates.io/crates/minimq).

## Command types

| Command | Node | Response Topic | Payload |
| --- | --- | --- | --- |
| Get | Leaf | set | empty |
| List | Internal | set | empty |
| Dump | | not set | empty |
| Set | Leaf | | some |
| Error | Internal | | some |

## Notes

* A list command will also list paths that are absent at runtime.
111 changes: 29 additions & 82 deletions miniconf_mqtt/examples/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,61 @@
use heapless::String;
use miniconf::Tree;
use minimq::Publication;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use std_embedded_nal::Stack;
use std_embedded_time::StandardClock;

#[derive(Clone, Default, Tree, Debug)]
struct NestedSettings {
struct Inner {
frame_rate: u32,
}

#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)]
enum Gain {
#[default]
G1,
G10,
G100,
}

#[derive(Clone, Default, Tree, Debug)]
struct Settings {
stream: String<32>,
#[tree(depth = 1)]
inner: NestedSettings,

afe: [Gain; 2],
#[tree(depth = 1)]
inner: Inner,
#[tree(depth = 1)]
amplitude: [f32; 2],

array: [i32; 4],
#[tree(depth = 1)]
opt: Option<i32>,
exit: bool,
}

async fn mqtt_client() {
// Construct a Minimq client to the broker for publishing requests.
let mut buffer = [0u8; 1024];
let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::NamedBroker<Stack>> =
minimq::Minimq::new(
Stack,
StandardClock::default(),
minimq::ConfigBuilder::new(
minimq::broker::NamedBroker::new("localhost", Stack).unwrap(),
&mut buffer,
)
.client_id("tester")
.unwrap()
.keepalive_interval(60),
);

// Wait for the broker connection
while !mqtt.client().is_connected() {
mqtt.poll(|_client, _topic, _message, _properties| {})
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
}

log::info!("Test client connected");

// Wait momentarily for the other client to connect.
tokio::time::sleep(Duration::from_secs(1)).await;

// Configure settings.
mqtt.client()
.publish(
Publication::new(b"32.4")
.topic("sample/prefix/settings/amplitude/0")
.finish()
.unwrap(),
)
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

mqtt.client()
.publish(
Publication::new(b"10")
.topic("sample/prefix/settings/inner/frame_rate")
.finish()
.unwrap(),
)
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;

mqtt.client()
.publish(
Publication::new(b"true")
.topic("sample/prefix/settings/exit")
.finish()
.unwrap(),
)
.unwrap();
}

#[tokio::main]
async fn main() {
env_logger::init();

// Spawn a task to send MQTT messages.
tokio::task::spawn(async move { mqtt_client().await });

let mut buffer = [0u8; 1024];
let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap();

// Construct a settings configuration interface.
let mut client: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> =
miniconf_mqtt::MqttClient::new(
Stack,
"sample/prefix",
StandardClock::default(),
minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60),
)
.unwrap();
let mut client = miniconf_mqtt::MqttClient::new(
Stack,
"dt/sinara/dual-iir/01-02-03-04-05-06",
StandardClock::default(),
minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer)
.keepalive_interval(60),
)
.unwrap();

let mut settings = Settings::default();
loop {
while !settings.exit {
tokio::time::sleep(Duration::from_millis(10)).await;
if client.update(&mut settings).unwrap() {
println!("Settings updated: {:?}", settings);
}

if settings.exit {
break;
}

tokio::time::sleep(Duration::from_millis(10)).await;
}
println!("Exiting on request");
}
Loading

0 comments on commit d6911c8

Please sign in to comment.