Skip to content

Commit

Permalink
Refactor codebase into cli and core modules, add unix socket support,…
Browse files Browse the repository at this point in the history
… bump to 0.5.0
  • Loading branch information
chipsenkbeil committed Aug 4, 2021
1 parent bb7829e commit d477547
Show file tree
Hide file tree
Showing 22 changed files with 853 additions and 420 deletions.
33 changes: 1 addition & 32 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "distant"
description = "Operate on a remote computer through file and process manipulation"
categories = ["command-line-utilities"]
version = "0.4.0"
version = "0.5.0"
authors = ["Chip Senkbeil <chip@senkbeil.org>"]
edition = "2018"
homepage = "https://github.com/chipsenkbeil/distant"
Expand All @@ -18,7 +18,6 @@ codegen-units = 1
[dependencies]
bytes = "1.0.1"
derive_more = { version = "0.99.16", default-features = false, features = ["display", "from", "error", "is_variant"] }
directories = "3.0.2"
futures = "0.3.16"
hex = "0.4.3"
k256 = { version = "0.9.6", features = ["ecdh"] }
Expand Down
2 changes: 2 additions & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod opt;
mod subcommand;
119 changes: 79 additions & 40 deletions src/opt.rs → src/cli/opt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use crate::{data::RequestPayload, subcommand};
use crate::{
cli::subcommand,
core::{
constants::{SESSION_FILE_PATH_STR, SESSION_SOCKET_PATH_STR},
data::RequestPayload,
},
};
use derive_more::{Display, Error, From, IsVariant};
use lazy_static::lazy_static;
use std::{
Expand Down Expand Up @@ -48,6 +54,20 @@ pub struct CommonOpt {
pub log_file: Option<PathBuf>,
}

/// Contains options related sessions
#[derive(Debug, StructOpt)]
pub struct SessionOpt {
/// Represents the location of the file containing session information,
/// only useful when the session is set to "file"
#[structopt(long, default_value = &SESSION_FILE_PATH_STR)]
pub session_file: PathBuf,

/// Represents the location of the session's socket to communicate across,
/// only useful when the session is set to "socket"
#[structopt(long, default_value = &SESSION_SOCKET_PATH_STR)]
pub session_socket: PathBuf,
}

#[derive(Debug, StructOpt)]
pub enum Subcommand {
/// Performs some action on a remote machine
Expand All @@ -58,9 +78,6 @@ pub enum Subcommand {

/// Begins listening for incoming requests
Listen(ListenSubcommand),

/// Performs some task related to the current session
Session(SessionSubcommand),
}

impl Subcommand {
Expand All @@ -70,41 +87,12 @@ impl Subcommand {
Self::Action(cmd) => subcommand::action::run(cmd, opt)?,
Self::Launch(cmd) => subcommand::launch::run(cmd, opt)?,
Self::Listen(cmd) => subcommand::listen::run(cmd, opt)?,
Self::Session(cmd) => subcommand::session::run(cmd, opt)?,
}

Ok(())
}
}

/// Represents subcommand to operate on a session
#[derive(Debug, StructOpt)]
pub enum SessionSubcommand {
/// Clears the current session
Clear,

/// Reports whether or not a session exists
Exists,

/// Prints out information about the available sessions
Info {
/// Represents the format that results should be returned
///
/// Currently, there are two possible formats:
///
/// 1. "json": printing out JSON for external program usage
/// 2. "shell": printing out human-readable results for interactive shell usage
#[structopt(
short,
long,
case_insensitive = true,
default_value = Mode::Shell.into(),
possible_values = Mode::VARIANTS
)]
mode: Mode,
},
}

/// Represents the communication medium used for the send command
#[derive(
Copy,
Expand Down Expand Up @@ -150,11 +138,15 @@ pub struct ActionSubcommand {
/// Represents the medium for retrieving a session for use in performing the action
#[structopt(
long,
default_value = SessionInput::File.into(),
default_value = SessionInput::default().into(),
possible_values = SessionInput::VARIANTS
)]
pub session: SessionInput,

/// Contains additional information related to sessions
#[structopt(flatten)]
pub session_data: SessionOpt,

/// If specified, commands to send are sent over stdin and responses are received
/// over stdout (and stderr if mode is shell)
#[structopt(short, long)]
Expand Down Expand Up @@ -244,6 +236,25 @@ pub enum SessionOutput {
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,

/// Special scenario where the session is not shared but is instead kept within the
/// launch program, where the program now listens on a unix socket for input
#[cfg(unix)]
Socket,
}

impl Default for SessionOutput {
/// For unix-based systems, output defaults to a socket
#[cfg(unix)]
fn default() -> Self {
Self::Socket
}

/// For non-unix-based systems, output defaults to a file
#[cfg(not(unix))]
fn default() -> Self {
Self::File
}
}

/// Represents the means by which to consume a session when performing an action
Expand Down Expand Up @@ -274,6 +285,25 @@ pub enum SessionInput {
/// Session is stored and retrieved over anonymous pipes (stdout/stdin)
/// in form of `DISTANT DATA <host> <port> <auth key>`
Pipe,

/// Session isn't directly available but instead there is a process listening
/// on a unix socket that will forward requests and responses
#[cfg(unix)]
Socket,
}

impl Default for SessionInput {
/// For unix-based systems, input defaults to a socket
#[cfg(unix)]
fn default() -> Self {
Self::Socket
}

/// For non-unix-based systems, input defaults to a file
#[cfg(not(unix))]
fn default() -> Self {
Self::File
}
}

/// Represents subcommand to launch a remote server
Expand All @@ -282,11 +312,20 @@ pub struct LaunchSubcommand {
/// Represents the medium for sharing the session upon launching on a remote machine
#[structopt(
long,
default_value = SessionOutput::File.into(),
default_value = SessionOutput::default().into(),
possible_values = SessionOutput::VARIANTS
)]
pub session: SessionOutput,

/// Contains additional information related to sessions
#[structopt(flatten)]
pub session_data: SessionOpt,

/// Runs in background via daemon-mode (does nothing on windows); only applies
/// when session is socket
#[structopt(short, long)]
pub daemon: bool,

/// Represents the format that results should be returned when session is "keep",
/// causing the launcher to enter an interactive loop to handle input and output
/// itself rather than enabling other clients to connect
Expand All @@ -299,13 +338,13 @@ pub struct LaunchSubcommand {
)]
pub mode: Mode,

/// Path to remote program to execute via ssh
#[structopt(short, long, default_value = "distant")]
pub remote_program: String,
/// Path to distant program to execute via ssh
#[structopt(long, default_value = "distant")]
pub distant: String,

/// Path to ssh program to execute
#[structopt(short, long, default_value = "ssh")]
pub ssh_program: String,
#[structopt(long, default_value = "ssh")]
pub ssh: String,

/// Control the IP address that the server binds to.
///
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::{
data::{Request, RequestPayload, Response, ResponsePayload},
net::Client,
opt::Mode,
cli::opt::Mode,
core::{
data::{Request, RequestPayload, Response, ResponsePayload},
net::{Client, DataStream},
},
};
use derive_more::IsVariant;
use log::*;
use std::marker::Unpin;
use structopt::StructOpt;
use tokio::{
io,
io::{self, AsyncRead, AsyncWrite},
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
Expand All @@ -34,8 +37,15 @@ impl From<LoopConfig> for Mode {
/// Starts a new action loop that processes requests and receives responses
///
/// id represents the id of a remote process
pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Result<()> {
let mut stream = client.to_response_stream();
pub async fn interactive_loop<T>(
mut client: Client<T>,
tenant: String,
config: LoopConfig,
) -> io::Result<()>
where
T: AsyncRead + AsyncWrite + DataStream + Unpin + 'static,
{
let mut stream = client.to_response_broadcast_stream();

// Create a channel that can report when we should stop the loop based on a received request
let (tx_stop, mut rx_stop) = oneshot::channel::<()>();
Expand All @@ -55,7 +65,7 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res

// For json mode, all stdin is treated as individual requests
LoopConfig::Json => {
trace!("Client sending request: {:?}", line);
debug!("Client sending request: {:?}", line);
let result = serde_json::from_str(&line)
.map_err(|x| io::Error::new(io::ErrorKind::InvalidData, x));
match result {
Expand All @@ -80,7 +90,7 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
continue;
}

trace!("Client sending command: {:?}", line);
debug!("Client sending command: {:?}", line);

// NOTE: We have to stick something in as the first argument as clap/structopt
// expect the binary name as the first item in the iterator
Expand All @@ -89,15 +99,17 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res
.chain(line.trim().split(' ').filter(|s| !s.trim().is_empty())),
);
match payload_result {
Ok(payload) => match client.send(Request::from(payload)).await {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
Ok(payload) => {
match client.send(Request::new(tenant.as_str(), payload)).await {
Ok(res) => match format_response(Mode::Shell, res) {
Ok(out) => out.print(),
Err(x) => error!("Failed to format response: {}", x),
},
Err(x) => {
error!("Failed to send request: {}", x)
}
}
},
}
Err(x) => {
error!("Failed to parse command: {}", x);
}
Expand All @@ -106,11 +118,14 @@ pub async fn interactive_loop(mut client: Client, config: LoopConfig) -> io::Res

// For non-interactive shell mode, all stdin is treated as a proc's stdin
LoopConfig::Proc { id } => {
trace!("Client sending stdin: {:?}", line);
let req = Request::from(RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
});
debug!("Client sending stdin: {:?}", line);
let req = Request::new(
tenant.as_str(),
RequestPayload::ProcStdin {
id,
data: line.into_bytes(),
},
);
let result = client.send(req).await;

if let Err(x) = result {
Expand Down
Loading

0 comments on commit d477547

Please sign in to comment.