Skip to content

Commit

Permalink
Session recording inside gateway (#147)
Browse files Browse the repository at this point in the history
* -added new claim to the token
-added ffi for the plugin communication
-created new interceptor for capturing session

* removed unused imports

* run cargo fmt and fix clippy warnings

* refactored code

* fixed clippy errors

* refactored code

* refactored code

* fixed fmt error

* fixed build errors

* refactored code

* fixed test

* fixed fmt error

* performed refactoring

* changes naming according to the changes in librecording

* refactored code

Co-authored-by: Anastasiia Romaniuk <romaniuk.anastasiia@apriorit.com>
  • Loading branch information
AnastasiiaRomaniuk and Anastasiia Romaniuk authored Mar 22, 2021
1 parent f4c5405 commit 048df20
Show file tree
Hide file tree
Showing 22 changed files with 876 additions and 94 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions devolutions-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ slog-stdlog = "4.0"
ring = "0.16.0"
spsc-bip-buffer = { git = "https://github.com/Devolutions/spsc-bip-buffer.git", branch = "master" }
indexmap = "1.0"
dlopen = "0.1.8"
dlopen_derive = "0.1.4"

[dependencies.saphir]
git = "https://github.com/richerarc/saphir"
Expand Down
79 changes: 79 additions & 0 deletions devolutions-gateway/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::plugin_manager::PLUGIN_MANAGER;
use cfg_if::cfg_if;
use clap::{crate_name, crate_version, App, Arg};
use picky::key::{PrivateKey, PublicKey};
use picky::pem::Pem;
use serde::{Deserialize, Serialize};
use slog_scope::debug;
use std::env;
use std::fs::File;
use std::io::BufReader;
Expand All @@ -28,6 +30,8 @@ const ARG_CAPTURE_PATH: &str = "capture-path";
const ARG_PROTOCOL: &str = "protocol";
const ARG_LOG_FILE: &str = "log-file";
const ARG_SERVICE_MODE: &str = "service";
const ARG_PLUGINS: &str = "plugins";
const ARG_RECORDING_PATH: &str = "recording-path";

const SERVICE_NAME: &str = "devolutions-gateway";
const DISPLAY_NAME: &str = "Devolutions Gateway";
Expand Down Expand Up @@ -91,6 +95,8 @@ pub struct Config {
pub certificate: CertificateConfig,
pub provisioner_public_key: Option<PublicKey>,
pub delegation_private_key: Option<PrivateKey>,
pub plugins: Option<Vec<String>>,
pub recording_path: Option<String>,
}

impl Default for Config {
Expand Down Expand Up @@ -121,6 +127,8 @@ impl Default for Config {
},
provisioner_public_key: None,
delegation_private_key: None,
plugins: None,
recording_path: None,
}
}
}
Expand Down Expand Up @@ -191,6 +199,10 @@ pub struct ConfigFile {
pub provisioner_public_key_file: Option<String>,
#[serde(rename = "DelegationPrivateKeyFile")]
pub delegation_private_key_file: Option<String>,
#[serde(rename = "Plugins")]
pub plugins: Option<Vec<String>>,
#[serde(rename = "RecordingPath")]
pub recording_path: Option<String>,

// unstable options (subject to change)
#[serde(rename = "ApiKey")]
Expand Down Expand Up @@ -473,6 +485,42 @@ impl Config {
.long("service")
.takes_value(false)
.help("Enable service mode"),
)
.arg(
Arg::with_name(ARG_PLUGINS)
.long("plugin")
.value_name("PATH")
.help("A path where the plugin is located including the plugin name and plugin extension.")
.long_help(
"A path where the plugin is located including the plugin name and plugin extension. \
The plugin will be loaded as dynamic library. \
For example, on linux - home/usr/libs/libplugin.so \
on Windows - D:\\libs\\plugin.dll.",
)
.multiple(true)
.use_delimiter(true)
.value_delimiter(";")
.takes_value(true)
.number_of_values(1),
)
.arg(
Arg::with_name(ARG_RECORDING_PATH)
.long("recording-path")
.value_name("PATH")
.help("A path where the recording of the session will be located.")
.long_help(
"A path where the recording will be saved. \
If not set the TEMP directory will be used.",
)
.takes_value(true)
.empty_values(false)
.validator(|v| {
if PathBuf::from(v).is_dir() {
Ok(())
} else {
Err(String::from("The value does not exist or is not a path"))
}
}),
);

let matches = cli_app.get_matches();
Expand Down Expand Up @@ -588,6 +636,28 @@ impl Config {
config.delegation_private_key = Some(private_key);
}

// plugins parsing
let plugins = matches
.values_of(ARG_PLUGINS)
.unwrap_or_else(Default::default)
.map(|plugin| plugin.to_string())
.collect::<Vec<String>>();

if !plugins.is_empty() {
config.plugins = Some(plugins);
}

// early fail if the specified plugin is not exist
if let Some(plugins) = &config.plugins {
let mut manager = PLUGIN_MANAGER.lock().unwrap();
for plugin in plugins {
debug!("Plugin path: {}", plugin);
manager
.load_plugin(plugin.as_str())
.unwrap_or_else(|e| panic!("Failed to load plugin with error {}", e));
}
}

// listeners parsing

let mut listeners = Vec::new();
Expand Down Expand Up @@ -659,6 +729,10 @@ impl Config {
panic!("provisioner public key is missing in unrestricted mode");
}

if let Some(recording_path) = matches.value_of(ARG_RECORDING_PATH) {
config.recording_path = Some(recording_path.to_owned());
}

config
}

Expand Down Expand Up @@ -749,6 +823,9 @@ impl Config {
.as_ref()
.map(|pem| PrivateKey::from_pem(pem).unwrap());

let plugins = config_file.plugins;
let recording_path = config_file.recording_path;

// unstable options (subject to change)
let api_key = config_file.api_key;
let unrestricted = config_file.unrestricted.unwrap_or(true);
Expand All @@ -770,6 +847,8 @@ impl Config {
},
provisioner_public_key,
delegation_private_key,
plugins,
recording_path,
..Default::default()
})
}
Expand Down
91 changes: 60 additions & 31 deletions devolutions-gateway/src/http/controllers/jet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,31 +64,39 @@ impl JetController {
};

// check the session token is signed by our provider if unrestricted mode is not set
if !self.config.unrestricted {
match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);

return (StatusCode::UNAUTHORIZED, ());
}
Ok(expected_id) if expected_id != association_id => {
let jet_tp_claim = match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);
return (StatusCode::UNAUTHORIZED, ());
}
Ok(expected_token) => {
if !self.config.unrestricted
&& (expected_token.den_session_id.is_none()
|| expected_token.den_session_id.unwrap() != association_id)
{
slog_scope::error!(
"Invalid session token: expected {}, got {}",
expected_id,
"Invalid session token: expected {:?}, got {}",
expected_token.den_session_id,
association_id
);
return (StatusCode::FORBIDDEN, ());
} else {
expected_token.jet_tp
}
Ok(_) => { /* alright */ }
}
}
};

slog_scope::debug!("The jet_tp claim is {:?}", jet_tp_claim);

// Controller runs by Saphir via tokio 0.2 runtime, we need to use .compat()
// to run Mutex from tokio 0.3 via Saphir's tokio 0.2 runtime. This code should be upgraded
// when saphir perform transition to tokio 0.3
let mut jet_associations = self.jet_associations.lock().compat().await;

jet_associations.insert(association_id, Association::new(association_id, JET_VERSION_V2));
jet_associations.insert(
association_id,
Association::new(association_id, JET_VERSION_V2, jet_tp_claim),
);
start_remove_association_future(self.jet_associations.clone(), association_id).await;

(StatusCode::OK, ())
Expand All @@ -106,29 +114,38 @@ impl JetController {
};

// check the session token is signed by our provider if unrestricted mode is not set
if !self.config.unrestricted {
match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);
return (StatusCode::UNAUTHORIZED, None);
}
Ok(expected_id) if expected_id != association_id => {
let jet_tp_claim = match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);
return (StatusCode::UNAUTHORIZED, None);
}
Ok(expected_token) => {
if !self.config.unrestricted
&& (expected_token.den_session_id.is_none()
|| expected_token.den_session_id.unwrap() != association_id)
{
slog_scope::error!(
"Invalid session token: expected {}, got {}",
expected_id,
"Invalid session token: expected {:?}, got {}",
expected_token.den_session_id,
association_id
);
return (StatusCode::FORBIDDEN, None);
} else {
expected_token.jet_tp
}
Ok(_) => { /* alright */ }
}
}
};

slog_scope::debug!("The jet_tp claim is {:?}", jet_tp_claim);

// create association
let mut jet_associations = self.jet_associations.lock().compat().await;

if !jet_associations.contains_key(&association_id) {
jet_associations.insert(association_id, Association::new(association_id, JET_VERSION_V2));
jet_associations.insert(
association_id,
Association::new(association_id, JET_VERSION_V2, jet_tp_claim),
);
start_remove_association_future(self.jet_associations.clone(), association_id).await;
}
let association = jet_associations
Expand Down Expand Up @@ -170,12 +187,20 @@ pub async fn remove_association(jet_associations: JetAssociationsMap, uuid: Uuid
}
}

fn validate_session_token(config: &Config, req: &Request) -> Result<Uuid, String> {
#[derive(Deserialize)]
struct PartialSessionToken {
den_session_id: Uuid,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum JetTpType {
Relay,
Record,
}

#[derive(Deserialize)]
struct PartialSessionToken {
den_session_id: Option<Uuid>,
jet_tp: Option<JetTpType>,
}

fn validate_session_token(config: &Config, req: &Request) -> Result<PartialSessionToken, String> {
let key = config
.provisioner_public_key
.as_ref()
Expand All @@ -193,7 +218,11 @@ fn validate_session_token(config: &Config, req: &Request) -> Result<Uuid, String
use picky::jose::jwt::{JwtSig, JwtValidator};
let jwt = JwtSig::<PartialSessionToken>::decode(&token, key, &JwtValidator::no_check())
.map_err(|e| format!("Invalid session token: {:?}", e))?;
Ok(jwt.claims.den_session_id)

Ok(PartialSessionToken {
den_session_id: jwt.claims.den_session_id,
jet_tp: jwt.claims.jet_tp,
})
}
_ => Err("Invalid authorization type".to_string()),
}
Expand Down
2 changes: 2 additions & 0 deletions devolutions-gateway/src/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ use byteorder::{LittleEndian, ReadBytesExt};
use std::net::SocketAddr;

pub mod pcap;
pub mod pcap_recording;
pub mod rdp;

pub trait PacketInterceptor: Send + Sync {
fn on_new_packet(&mut self, source_addr: Option<SocketAddr>, data: &[u8]);
fn boxed_clone(&self) -> Box<dyn PacketInterceptor>;
}

pub trait MessageReader: Send + Sync {
Expand Down
3 changes: 3 additions & 0 deletions devolutions-gateway/src/interceptor/pcap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,7 @@ impl PacketInterceptor for PcapInterceptor {
}
}
}
fn boxed_clone(&self) -> Box<dyn PacketInterceptor> {
Box::new(self.clone())
}
}
Loading

0 comments on commit 048df20

Please sign in to comment.