Skip to content

Commit

Permalink
Refactor to support a payload for request/response with multiple entr…
Browse files Browse the repository at this point in the history
…ies; bump to 0.10.0
  • Loading branch information
chipsenkbeil committed Aug 10, 2021
1 parent 5a5d7f6 commit 86e4d7f
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 154 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.9.5"
version = "0.10.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"
Expand Down
4 changes: 2 additions & 2 deletions src/cli/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
cli::subcommand,
core::{
constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR, TIMEOUT_STR},
data::RequestPayload,
data::RequestData,
},
};
use derive_more::{Display, Error, From, IsVariant};
Expand Down Expand Up @@ -168,7 +168,7 @@ pub struct ActionSubcommand {

/// Operation to send over the wire if not in interactive mode
#[structopt(subcommand)]
pub operation: Option<RequestPayload>,
pub operation: Option<RequestData>,
}

/// Represents options for binding a server to an IP address
Expand Down
63 changes: 41 additions & 22 deletions src/cli/subcommand/action/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
cli::opt::Mode,
core::{
constants::MAX_PIPE_CHUNK_SIZE,
data::{Request, RequestPayload, Response, ResponsePayload},
data::{Request, RequestData, Response, ResponseData},
net::{Client, DataStream},
utils::StringBuf,
},
Expand Down Expand Up @@ -114,13 +114,15 @@ where

// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
let payload_result = RequestPayload::from_iter_safe(
let result = RequestData::from_iter_safe(
std::iter::once("distant")
.chain(data.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await
match result {
Ok(data) => {
match client
.send(Request::new(tenant.as_str(), vec![data]))
.await
{
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Expand All @@ -142,7 +144,8 @@ where
// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
debug!("Client sending stdin: {:?}", data);
let req = Request::new(tenant.as_str(), RequestPayload::ProcStdin { id, data });
let req =
Request::new(tenant.as_str(), vec![RequestData::ProcStdin { id, data }]);
let result = client.send(req).await;

if let Err(x) = result {
Expand All @@ -161,7 +164,10 @@ where
"Response stream no longer available",
)
})?;
let done = res.payload.is_proc_done() && config.is_proc();

// NOTE: If the loop is for a proxy process, we should assume that the payload
// is all-or-nothing for the done check
let done = config.is_proc() && res.payload.iter().any(|x| x.is_proc_done());

format_response(config.into(), res)?.print();

Expand Down Expand Up @@ -257,27 +263,40 @@ impl ResponseOut {
}

pub fn format_response(mode: Mode, res: Response) -> io::Result<ResponseOut> {
let payload_cnt = res.payload.len();

Ok(match mode {
Mode::Json => ResponseOut::StdoutLine(format!(
"{}",
serde_json::to_string(&res)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x))?
)),
Mode::Shell => format_shell(res),

// NOTE: For shell, we assume a singular entry in the response's payload
Mode::Shell if payload_cnt != 1 => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Got {} entries in payload data, but shell expects exactly 1",
payload_cnt
),
))
}
Mode::Shell => format_shell(res.payload.into_iter().next().unwrap()),
})
}

fn format_shell(res: Response) -> ResponseOut {
match res.payload {
ResponsePayload::Ok => ResponseOut::None,
ResponsePayload::Error { description } => {
fn format_shell(data: ResponseData) -> ResponseOut {
match data {
ResponseData::Ok => ResponseOut::None,
ResponseData::Error { description } => {
ResponseOut::StderrLine(format!("Failed: '{}'.", description))
}
ResponsePayload::Blob { data } => {
ResponseData::Blob { data } => {
ResponseOut::StdoutLine(String::from_utf8_lossy(&data).to_string())
}
ResponsePayload::Text { data } => ResponseOut::StdoutLine(data),
ResponsePayload::DirEntries { entries, .. } => ResponseOut::StdoutLine(format!(
ResponseData::Text { data } => ResponseOut::StdoutLine(data),
ResponseData::DirEntries { entries, .. } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
Expand All @@ -301,7 +320,7 @@ fn format_shell(res: Response) -> ResponseOut {
.collect::<Vec<String>>()
.join("\n"),
)),
ResponsePayload::Metadata {
ResponseData::Metadata {
canonicalized_path,
file_type,
len,
Expand Down Expand Up @@ -329,18 +348,18 @@ fn format_shell(res: Response) -> ResponseOut {
accessed.unwrap_or_default(),
modified.unwrap_or_default(),
)),
ResponsePayload::ProcEntries { entries } => ResponseOut::StdoutLine(format!(
ResponseData::ProcEntries { entries } => ResponseOut::StdoutLine(format!(
"{}",
entries
.into_iter()
.map(|entry| format!("{}: {} {}", entry.id, entry.cmd, entry.args.join(" ")))
.collect::<Vec<String>>()
.join("\n"),
)),
ResponsePayload::ProcStart { .. } => ResponseOut::None,
ResponsePayload::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponsePayload::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponsePayload::ProcDone { id, success, code } => {
ResponseData::ProcStart { .. } => ResponseOut::None,
ResponseData::ProcStdout { data, .. } => ResponseOut::Stdout(data),
ResponseData::ProcStderr { data, .. } => ResponseOut::Stderr(data),
ResponseData::ProcDone { id, success, code } => {
if success {
ResponseOut::None
} else if let Some(code) = code {
Expand All @@ -349,7 +368,7 @@ fn format_shell(res: Response) -> ResponseOut {
ResponseOut::StderrLine(format!("Proc {} failed", id))
}
}
ResponsePayload::SystemInfo {
ResponseData::SystemInfo {
family,
os,
arch,
Expand Down
16 changes: 9 additions & 7 deletions src/cli/subcommand/action/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
cli::opt::{ActionSubcommand, CommonOpt, Mode, SessionInput},
core::{
data::{Request, ResponsePayload},
data::{Request, ResponseData},
net::{Client, DataStream, TransportError},
session::{Session, SessionFile},
utils,
Expand Down Expand Up @@ -94,18 +94,20 @@ where

if let Some(req) = cmd
.operation
.map(|payload| Request::new(tenant.as_str(), payload))
.map(|payload| Request::new(tenant.as_str(), vec![payload]))
{
is_proc_req = req.payload.is_proc_run();
// NOTE: We know that there is a single payload entry, so it's all-or-nothing
is_proc_req = req.payload.iter().any(|x| x.is_proc_run());

debug!("Client sending request: {:?}", req);
let res = client.send_timeout(req, timeout).await?;

// Store the spawned process id for using in sending stdin (if we spawned a proc)
proc_id = match &res.payload {
ResponsePayload::ProcStart { id } => *id,
_ => 0,
};
// NOTE: We can assume that there is a single payload entry in response to our single
// entry in our request
if let Some(ResponseData::ProcStart { id }) = res.payload.first() {
proc_id = *id;
}

inner::format_response(cmd.mode, res)?.print();
}
Expand Down
57 changes: 30 additions & 27 deletions src/cli/subcommand/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
cli::opt::{CommonOpt, LaunchSubcommand, Mode, SessionOutput},
core::{
constants::CLIENT_BROADCAST_CHANNEL_CAPACITY,
data::{Request, RequestPayload, Response, ResponsePayload},
data::{Request, RequestData, Response, ResponseData},
net::{Client, Transport, TransportReadHalf, TransportWriteHalf},
session::{Session, SessionFile},
utils,
Expand Down Expand Up @@ -167,8 +167,9 @@ async fn socket_loop(
tokio::spawn(async move {
while let Some(req) = req_rx.recv().await {
debug!(
"Forwarding request of type {} to server",
req.payload.as_ref()
"Forwarding request of type{} {} to server",
if req.payload.len() > 1 { "s" } else { "" },
req.to_payload_type_string()
);
if let Err(x) = client.fire_timeout(req, duration).await {
error!("Client failed to send request: {:?}", x);
Expand Down Expand Up @@ -308,22 +309,22 @@ async fn handle_conn_incoming<T>(
// beginning, we exit the function early via return.
let tenant = tenant.unwrap();

// Perform cleanup if done
for id in state.lock().await.processes.as_slice() {
debug!("Cleaning conn {} :: killing process {}", conn_id, id);
if let Err(x) = req_tx
.send(Request::new(
tenant.clone(),
RequestPayload::ProcKill { id: *id },
))
.await
{
error!(
"<Client @ {}> Failed to send kill signal for process {}: {}",
conn_id, id, x
);
break;
}
// Perform cleanup if done by sending a request to kill each running process
// debug!("Cleaning conn {} :: killing process {}", conn_id, id);
if let Err(x) = req_tx
.send(Request::new(
tenant.clone(),
state
.lock()
.await
.processes
.iter()
.map(|id| RequestData::ProcKill { id: *id })
.collect(),
))
.await
{
error!("<Client @ {}> Failed to send kill signals: {}", conn_id, x);
}
}

Expand All @@ -347,19 +348,21 @@ async fn handle_conn_outgoing<T>(
// Forward along responses that are for our connection
Ok(res) if res.tenant == tenant => {
debug!(
"Conn {} being sent response of type {}",
"Conn {} being sent response of type{} {}",
conn_id,
res.payload.as_ref()
if res.payload.len() > 1 { "s" } else { "" },
res.to_payload_type_string(),
);

// If a new process was started, we want to capture the id and
// associate it with the connection
match &res.payload {
ResponsePayload::ProcStart { id } => {
debug!("Tracking proc {} for conn {}", id, conn_id);
state.lock().await.processes.push(*id);
}
_ => {}
let ids = res.payload.iter().filter_map(|x| match x {
ResponseData::ProcStart { id } => Some(*id),
_ => None,
});
for id in ids {
debug!("Tracking proc {} for conn {}", id, conn_id);
state.lock().await.processes.push(id);
}

if let Err(x) = writer.send(res).await {
Expand Down
Loading

0 comments on commit 86e4d7f

Please sign in to comment.