Skip to content

Commit

Permalink
break change: change message format, add cmd finish message
Browse files Browse the repository at this point in the history
  • Loading branch information
timzaak committed May 23, 2024
1 parent 6fc0f70 commit 3208395
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea
release
17 changes: 8 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
name = mproxy
ctrl_name = mpublish
rust_path = agent

release-mac-x86_64:
mkdir -p release
cd $(rust_path) && cargo build --release --target=x86_64-apple-darwin
strip $(rust_path)/target/x86_64-apple-darwin/release/$(name)
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(name)
cp $(rust_path)/target/x86_64-apple-darwin/release/$(name) ./release/
strip $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
cp $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name) ./release/
otool -L $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name)
cp $(rust_path)/target/x86_64-apple-darwin/release/$(ctrl_name) ./release/

# brew install wget
release-mac-aarch64:
Expand All @@ -21,8 +22,8 @@ release-mac-aarch64:
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(name)
cp $(rust_path)/target/aarch64-apple-darwin/release/$(name) ./release/
strip $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
cp $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name) ./release/
otool -L $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name)
cp $(rust_path)/target/aarch64-apple-darwin/release/$(ctrl_name) ./release/

release-linux-aarch64:
sudo apt-get install -y build-essential
Expand All @@ -31,10 +32,8 @@ release-linux-aarch64:
strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(name)
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(name) ./release/
cd $(rust_path) && cargo build --release --target=aarch64-unknown-linux-gnu
strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name)
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name) ./release/


strip $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name)
cp $(rust_path)/target/aarch64-unknown-linux-gnu/release/$(ctrl_name) ./release/

release-linux:
sudo apt-get install -y build-essential
Expand All @@ -43,4 +42,4 @@ release-linux:
strip $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(name)
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(name) ./release
strip $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name)
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name) ./release
cp $(rust_path)/target/x86_64-unknown-linux-gnu/release/$(ctrl_name) ./release
19 changes: 13 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,33 @@
# Web websocket-mqtt send
# publish topic: cmd/$clientId
{
cmd: "ls",
command: "ls",
requestId: "random_to_track",
t: "Cmd"
#stream: false, # can be empty, default is false. this project now only support false.
}

# mproxy response
# publish topic: cmd/$client/resp
# success response
{
type: "Ok"
t: "D"
data: "abc.txt/nccn.txt",
requestId: "random_to_track",
reqId: "random_to_track",
pid: 39512, #process id
seq: 1 #some may resp more than one time, so set seq to keep order.
}
# failure response
{
type: "Err",
t: "Err",
message: "response data"
requestId: "random_to_track"
reqId: "random_to_track"
}
# finish response
{
t: "Ok",
reqId: "random_to_track",
pid: 39512, #process id
}
```
### 配置文件
Expand All @@ -67,7 +74,7 @@ cd shell && ./run_mqtt.sh && cd ../
mprocs

# Check MQTT agent if is OK
cd agent && cargo run --bin mpublish -- --config=mpublish.yml ls -ls
cd agent && cargo run --bin mpublish -- --config=mpublish.yml -- ls -ls
```
Web [Figma UI](https://www.figma.com/design/iyL4dms3B8AWGZS14FCRuf/RMQTT-EXEC?node-id=0%3A1&t=rnIL1LSWwQIXfZdf-1)
## 限制
Expand Down
21 changes: 15 additions & 6 deletions agent/src/bin/mpublish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::env;
use std::io::IsTerminal;
use std::path::PathBuf;
use std::process::exit;
use anyhow::bail;
use clap::Parser;
use rumqttc::v5::{AsyncClient, MqttOptions, Incoming};
Expand Down Expand Up @@ -43,6 +44,7 @@ async fn main() -> anyhow::Result<()> {
let topic = config.get_subscribe_command_topic();
client.subscribe(topic, QoS::ExactlyOnce).await?;

let client_ = client.clone();
tokio::spawn(async move {
loop {
let event = eventloop.poll().await;
Expand All @@ -51,10 +53,17 @@ async fn main() -> anyhow::Result<()> {
match event {
rumqttc::v5::Event::Incoming(Incoming::Publish(data)) => {
let resp: ResponseMessage = serde_json::from_slice(data.payload.as_ref()).unwrap();

match resp {
ResponseMessage::Ok {data, seq, pid, ..} => println!("[{pid}][{seq}] {data}"),
ResponseMessage::Err {message,..} => eprintln!("{message}"),
ResponseMessage::D {data, seq, pid, ..} => println!("[{pid}][{seq}] {data}"),
ResponseMessage::Err {message,..} => {
eprintln!("{message}");
let _ = client_.disconnect().await;
exit(1);
},
ResponseMessage::Ok {..} => {
let _ = client_.disconnect().await;
exit(0);
}
}
}
_ => ()
Expand All @@ -64,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
};
}
});
let command = RequestMessage::Cmd{command: command.to_string(), request_id: "test_request_id".to_string()};
let command = RequestMessage::Cmd{command: command.to_string(), req_id: "test_request_id".to_string()};
let command = serde_json::to_vec(&command).unwrap();
client.publish(config.get_publish_command_topic(), QoS::ExactlyOnce, false, command).await?;
tokio::signal::ctrl_c().await?;
Expand All @@ -85,10 +94,10 @@ mod test{
let config = Cli::parse_from([APP_NAME,"--config=config", "ls", "pwd"]);
println!("{config:?}");

let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "ls", "pwd"]);
let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "ls -ls"]);
println!("{config:?}");

let config = Cli::parse_from([APP_NAME, "-c","config.yaml"]);
let config = Cli::parse_from([APP_NAME, "-c","config.yaml", "--", "ls", "-ls"]);
println!("{config:?}");

}
Expand Down
9 changes: 5 additions & 4 deletions agent/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ impl Handler {

async fn run_command(connection:Arc<Connection>, cmd:RequestMessage, publish_topic:Arc<String>) {
match cmd {
RequestMessage::Cmd { command,request_id } => {
RequestMessage::Cmd { command, req_id: request_id } => {
let command_parsed = shellish_parse::parse(&command,false);
let command_parsed = match command_parsed {
Ok(result) => result,
Err(e) => {
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {req_id: request_id.clone(), message:format!("{}", e)}).await;
return;
}
};
Expand All @@ -46,13 +46,14 @@ impl Handler {
for line in reader.lines().filter_map(|line| line.ok()) {
//TODO: handle publish error
let _ = connection.publish_response(
&publish_topic, ResponseMessage::Ok {request_id: request_id.clone(),data:line, seq: seq, pid: pid}).await;
&publish_topic, ResponseMessage::D { req_id: request_id.clone(),data:line, seq, pid }).await;
seq +=1;
}
}
let _ = connection.publish_response(&publish_topic, ResponseMessage::Ok { req_id: request_id, pid}).await;
}
Err(e) => {
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {request_id: request_id.clone(), message:format!("{}", e)}).await;
let _ = connection.publish_response(&publish_topic, ResponseMessage::Err {req_id: request_id.clone(), message:format!("{}", e)}).await;
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions agent/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(tag = "t", rename_all_fields="camelCase")]
pub enum RequestMessage {
Cmd {
command: String,
request_id: String,
req_id: String,
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(tag = "t", rename_all_fields="camelCase")]
pub enum ResponseMessage {
Ok{ request_id:String, seq:u32, data:String, pid:u32},
Err{ request_id:String, message:String}
}
D { req_id:String, seq:u32, data:String, pid:u32},
Err { req_id:String, message:String },
Ok { req_id: String, pid: u32},
}

#[cfg(test)]
mod test {
use crate::message::ResponseMessage;

#[test]
fn test_one() {
let r = ResponseMessage::Ok {req_id: "1".to_string(), pid:1};
let parse_result = serde_json::to_string(&r);
assert_eq!(parse_result.unwrap(), r#"{"t":"Ok","reqId":"1","pid":1}"#.to_string());
}
}
6 changes: 3 additions & 3 deletions mprocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ procs:
env:
RUST_LOG: debug
shell: cd agent && cargo run --bin mproxy mproxy.yml
web:
shell: cd web && npm run dev
stop: SIGKILL
# web:
# shell: cd web && npm run dev
# stop: SIGKILL

0 comments on commit 3208395

Please sign in to comment.