From 32d19beeeb5b59ec5a2b8a04cacf76a28b1c5659 Mon Sep 17 00:00:00 2001 From: youngday Date: Sat, 20 Jul 2024 01:36:26 +0800 Subject: [PATCH] add mqtt --- README.md | 8 +-- examples/mqtt/mqtt_async_pub.rs | 2 +- examples/mqtt/mqtt_async_sub.rs | 18 +++--- examples/mqtt/mqtt_asyncpubsub.rs | 59 ------------------- examples/plot.rs | 95 ++++++++++++++++++++++++++++++- 5 files changed, 106 insertions(+), 76 deletions(-) delete mode 100644 examples/mqtt/mqtt_asyncpubsub.rs diff --git a/README.md b/README.md index bf693a8..da3a568 100644 --- a/README.md +++ b/README.md @@ -54,13 +54,11 @@ COM_TYPE:u32=0;//0=zeromq 1=ice_oryx2,2=? ### speed 15 fps -### queue +### queue (inspired from plotjuggler) ☑️ zeromq ☑️ ice_oryx2 -mqtt -socket -tcp -http +☑️ mqtt +websocket ## goal diff --git a/examples/mqtt/mqtt_async_pub.rs b/examples/mqtt/mqtt_async_pub.rs index 5162e3d..080b078 100644 --- a/examples/mqtt/mqtt_async_pub.rs +++ b/examples/mqtt/mqtt_async_pub.rs @@ -59,6 +59,6 @@ async fn requests(client: AsyncClient) { info!("pub vec:{:?}",i); i+=1; - time::sleep(Duration::from_secs(1)).await; + time::sleep(Duration::from_secs_f64(0.02)).await; } } diff --git a/examples/mqtt/mqtt_async_sub.rs b/examples/mqtt/mqtt_async_sub.rs index 9e520b3..c8dd739 100644 --- a/examples/mqtt/mqtt_async_sub.rs +++ b/examples/mqtt/mqtt_async_sub.rs @@ -1,7 +1,6 @@ use log::{debug, error, info, trace, warn}; use log4rs; -use rumqttc::tokio_rustls::rustls::internal::msgs::base::Payload; use tmq::publish; use tokio::{task, time}; @@ -37,6 +36,7 @@ async fn main() -> Result<(), Box> { loop { let event = eventloop.poll().await; + //payload unpack ref https://github.com/bytebeamio/rumqtt/issues/617 match &event { Ok(v) => { // info!("Event = {v:?}"); @@ -52,8 +52,6 @@ async fn main() -> Result<(), Box> { let _payload=p.payload.clone(); info!("\ntopic = {0:?},payload = {1:?}",_topic,_payload.as_ref()); - - } Packet::PubAck(_) => {} Packet::PingReq(_) => {} @@ -69,18 +67,18 @@ async fn main() -> Result<(), Box> { Packet::UnsubAck(_) => {} Packet::Disconnect(_) => {} } - } - Outgoing(o) => { + }//pack + Outgoing(_o) => { // info!("Outgoing = {o:?}"); } - } - } + }//event + }//ok Err(e) => { error!("Error = {e:?}"); return Ok(()); - } - } - } + }//err + }//result + }//loop } async fn requests(client: AsyncClient) { diff --git a/examples/mqtt/mqtt_asyncpubsub.rs b/examples/mqtt/mqtt_asyncpubsub.rs deleted file mode 100644 index 3c11e44..0000000 --- a/examples/mqtt/mqtt_asyncpubsub.rs +++ /dev/null @@ -1,59 +0,0 @@ -use log::{debug, error, info, trace, warn}; -use log4rs; -use tokio::{task, time}; - -use std::error::Error; -use std::time::Duration; - -use rumqttc::v5::mqttbytes::QoS; -use rumqttc::v5::{AsyncClient, MqttOptions}; - -#[tokio::main(flavor = "current_thread")] -async fn main() -> Result<(), Box> { - // pretty_env_logger::init(); - // color_backtrace::install(); - - log4rs::init_file("examples/config/log.yaml", Default::default()).unwrap(); - info!("log start:trace,debug,info,warn,error."); - - - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - - let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); - task::spawn(async move { - requests(client).await; - time::sleep(Duration::from_secs(3)).await; - }); - - loop { - let event = eventloop.poll().await; - match &event { - Ok(v) => { - println!("Event = {v:?}"); - } - Err(e) => { - println!("Error = {e:?}"); - return Ok(()); - } - } - } -} - -async fn requests(client: AsyncClient) { - client - .subscribe("hello/world", QoS::AtMostOnce) - .await - .unwrap(); - - for i in 1..=10 { - client - .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) - .await - .unwrap(); - - time::sleep(Duration::from_secs(1)).await; - } - - time::sleep(Duration::from_secs(120)).await; -} diff --git a/examples/plot.rs b/examples/plot.rs index a80b7d0..12bef49 100644 --- a/examples/plot.rs +++ b/examples/plot.rs @@ -14,6 +14,7 @@ use std::thread; use std::time::{Duration, Instant}; use tokio::sync::mpsc; use tokio::time::sleep; +use tokio::{task, time}; use realtime_plot::draw_piston_window; use realtime_plot::Settings; @@ -24,12 +25,22 @@ use tmq::{subscribe, Context, Result}; use iceoryx2::{port::subscriber, prelude::*}; use realtime_plot::transmission_data::TransmissionData; +use rumqttc::v5::mqttbytes::v5::Packet; +use rumqttc::v5::mqttbytes::v5::Packet::Publish; +use rumqttc::v5::mqttbytes::v5::Packet::SubAck; +use rumqttc::v5::mqttbytes::QoS; +use rumqttc::v5::{ + AsyncClient, + Event::{Incoming, Outgoing}, + MqttOptions, +}; + const CYCLE_TIME: Duration = Duration::from_millis(10); const FPS: u32 = 15; const LENGTH: u32 = 20; const N_DATA_POINTS: usize = (FPS * LENGTH) as usize; -const COM_TYPE:u32=1;//0=zeromq 1=ice_oryx2,2=? +const COM_TYPE:u32=2;//0=zeromq 1=ice_oryx2,2=mqtt? #[tokio::main] async fn main() { let mut window: PistonWindow = WindowSettings::new("Real Time CPU Usage", [450, 300]) @@ -100,6 +111,77 @@ async fn main() { info!("exit ..."); }); // let result = computation.join().unwrap();//TODO: block and nonblock + } else if COM_TYPE==2 { + let mut mqttoptions = MqttOptions::new("test-2", "localhost", 1884); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + task::spawn(async move { + requests(client).await; + time::sleep(Duration::from_secs(3)).await; + }); + + task::spawn(async move { + loop { + let now = Instant::now(); // 程序起始时间 + info!("mqtt start: {:?}", now); + let event = eventloop.poll().await; + match &event { + Ok(v) => { + // info!("Event = {v:?}"); + match &v { + Incoming(i) => { + // info!("incoming = {i:?}"); + match &i { + Packet::Connect(_, _, _) => {} + Packet::ConnAck(_) => {} + Publish(p) => { + // info!("publish = {p:?}"); + let _topic=p.topic.clone(); + let _payload=p.payload.clone(); + let val=_payload.as_ref()[2].as_f64()*0.01;//[0,1,val] + info!("\ntopic = {0:?},payload = {1:?}",_topic,_payload.as_ref()); + + + let end = now.elapsed().as_millis(); + info!("mqtt end,dur: {:?} ms.", end); + let ret_send = sender.send(val); + info!("ret_send: {:?}", ret_send); + info!("🟢 send val: {:?}", val); + + + } + Packet::PubAck(_) => {} + Packet::PingReq(_) => {} + Packet::PingResp(_) => {} + Packet::Subscribe(_) => {} + SubAck(ack) => { + info!("ack = {ack:?}"); + } + Packet::PubRec(_) => {} + Packet::PubRel(_) => {} + Packet::PubComp(_) => {} + Packet::Unsubscribe(_) => {} + Packet::UnsubAck(_) => {} + Packet::Disconnect(_) => {} + } + }//pack + Outgoing(_o) => { + // info!("Outgoing = {o:?}"); + } + }//event + }//ok + Err(e) => { + error!("Error = {e:?}"); + // return Ok(()); + }//err + }//result + }//loop + }); + + + + }else { } @@ -219,3 +301,14 @@ async fn zmq_sub(socket: &mut subscribe::Subscribe) -> Result { } Ok(value) } + + +async fn requests(client: AsyncClient) { + loop { + client.subscribe("hello", QoS::AtMostOnce).await.unwrap(); + time::sleep(Duration::from_secs(1)).await; + // info!("subscribe:"); + } + + // time::sleep(Duration::from_secs(120)).await; +}