Skip to content

Commit

Permalink
Merge pull request #172 from Devolutions/WAYK-2564
Browse files Browse the repository at this point in the history
Wayk 2564 - Add endpoint /sessions to list sessions + Fix tests
  • Loading branch information
fdubois1 authored Aug 2, 2021
2 parents 79d6090 + d15b4b1 commit d87bbf2
Show file tree
Hide file tree
Showing 22 changed files with 361 additions and 234 deletions.
55 changes: 50 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmark/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ edition = "2018"

[dependencies]
structopt = "0.3"
uuid = "0.7.1"
uuid = "0.8"
http = "0.1"
jet-proto = { path = "../jet-proto" }
1 change: 1 addition & 0 deletions devolutions-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,5 @@ embed-resource = "1.3"

[dev-dependencies]
exitcode = "1.1"
tokio-test = "0.4"

73 changes: 23 additions & 50 deletions devolutions-gateway/src/http/controllers/jet.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use crate::config::Config;
use crate::http::controllers::health::build_health_response;
use crate::http::middlewares::auth::{parse_auth_header, AuthHeaderType};
use crate::jet::association::{Association, AssociationResponse};
use crate::jet::candidate::Candidate;
use crate::jet_client::JetAssociationsMap;
use crate::utils::association::{remove_jet_association, ACCEPT_REQUEST_TIMEOUT};
use jet_proto::token::JetSessionTokenClaims;
use jet_proto::JET_VERSION_V2;

use saphir::controller::Controller;
use saphir::http::{header, Method, StatusCode};
use saphir::macros::controller;
Expand All @@ -10,14 +17,6 @@ use tokio_02::runtime::Handle;
use tokio_compat_02::FutureExt;
use uuid::Uuid;

use crate::config::Config;
use crate::http::controllers::health::build_health_response;
use crate::http::middlewares::auth::{parse_auth_header, AuthHeaderType};
use crate::jet::association::{Association, AssociationResponse};
use crate::jet::candidate::Candidate;
use crate::jet_client::JetAssociationsMap;
use crate::utils::association::{remove_jet_association, ACCEPT_REQUEST_TIMEOUT};

pub struct JetController {
config: Arc<Config>,
jet_associations: JetAssociationsMap,
Expand Down Expand Up @@ -64,38 +63,33 @@ impl JetController {
};

// check the session token is signed by our provider if unrestricted mode is not set
let jet_tp_claim = match validate_session_token(self.config.as_ref(), &req) {
let session_token = match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);
return (StatusCode::UNAUTHORIZED, ());
}
Ok(expected_token) => {
if !self.config.unrestricted
&& (expected_token.den_session_id.is_none()
|| expected_token.den_session_id.unwrap() != association_id)
{
if !self.config.unrestricted && (expected_token.jet_aid != association_id) {
slog_scope::error!(
"Invalid session token: expected {:?}, got {}",
expected_token.den_session_id,
"Invalid session token: expected {}, got {}",
expected_token.jet_aid.to_string(),
association_id
);
return (StatusCode::FORBIDDEN, ());
} else {
expected_token.jet_tp
expected_token
}
}
};

slog_scope::debug!("The jet_tp claim is {:?}", jet_tp_claim);

// Controller runs by Saphir via tokio 0.2 runtime, we need to use .compat()
// to run Mutex from tokio 0.3 via Saphir's tokio 0.2 runtime. This code should be upgraded
// when saphir perform transition to tokio 0.3
let mut jet_associations = self.jet_associations.lock().compat().await;

jet_associations.insert(
association_id,
Association::new(association_id, JET_VERSION_V2, jet_tp_claim),
Association::new(association_id, JET_VERSION_V2, session_token),
);
start_remove_association_future(self.jet_associations.clone(), association_id).await;

Expand All @@ -114,35 +108,30 @@ impl JetController {
};

// check the session token is signed by our provider if unrestricted mode is not set
let jet_tp_claim = match validate_session_token(self.config.as_ref(), &req) {
let session_token = match validate_session_token(self.config.as_ref(), &req) {
Err(e) => {
slog_scope::error!("Couldn't validate session token: {}", e);
return (StatusCode::UNAUTHORIZED, None);
}
Ok(expected_token) => {
if !self.config.unrestricted
&& (expected_token.den_session_id.is_none()
|| expected_token.den_session_id.unwrap() != association_id)
{
if !self.config.unrestricted && (expected_token.jet_aid != association_id) {
slog_scope::error!(
"Invalid session token: expected {:?}, got {}",
expected_token.den_session_id,
"Invalid session token: expected {}, got {}",
expected_token.jet_aid.to_string(),
association_id
);
return (StatusCode::FORBIDDEN, None);
} else {
expected_token.jet_tp
expected_token
}
}
};

slog_scope::debug!("The jet_tp claim is {:?}", jet_tp_claim);

// create association
let mut jet_associations = self.jet_associations.lock().compat().await;

if let std::collections::hash_map::Entry::Vacant(e) = jet_associations.entry(association_id) {
e.insert(Association::new(association_id, JET_VERSION_V2, jet_tp_claim));
e.insert(Association::new(association_id, JET_VERSION_V2, session_token));
start_remove_association_future(self.jet_associations.clone(), association_id).await;
}

Expand Down Expand Up @@ -185,20 +174,7 @@ pub async fn remove_association(jet_associations: JetAssociationsMap, uuid: Uuid
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "lowercase")]
pub enum JetTpType {
Relay,
Record,
}

#[derive(Deserialize)]
struct PartialSessionToken {
den_session_id: Option<Uuid>,
jet_tp: Option<JetTpType>,
}

fn validate_session_token(config: &Config, req: &Request) -> Result<PartialSessionToken, String> {
fn validate_session_token(config: &Config, req: &Request) -> Result<JetSessionTokenClaims, String> {
let key = config
.provisioner_public_key
.as_ref()
Expand All @@ -214,13 +190,10 @@ fn validate_session_token(config: &Config, req: &Request) -> Result<PartialSessi
match parse_auth_header(auth_str) {
Some((AuthHeaderType::Bearer, token)) => {
use picky::jose::jwt::{JwtSig, JwtValidator};
let jwt = JwtSig::<PartialSessionToken>::decode(&token, key, &JwtValidator::no_check())
let jwt = JwtSig::<JetSessionTokenClaims>::decode(&token, key, &JwtValidator::no_check())
.map_err(|e| format!("Invalid session token: {:?}", e))?;

Ok(PartialSessionToken {
den_session_id: jwt.claims.den_session_id,
jet_tp: jwt.claims.jet_tp,
})
Ok(jwt.claims)
}
_ => Err("Invalid authorization type".to_string()),
}
Expand Down
24 changes: 14 additions & 10 deletions devolutions-gateway/src/http/controllers/sessions.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
use crate::SESSION_IN_PROGRESS_COUNT;
use crate::http::HttpErrorStatus;
use crate::{GatewaySessionInfo, SESSIONS_IN_PROGRESS};
use saphir::controller::Controller;
use saphir::http::{Method, StatusCode};
use saphir::macros::controller;
use std::sync::atomic::Ordering;
use saphir::prelude::Json;

pub struct SessionsController;

impl Default for SessionsController {
fn default() -> Self {
Self
}
}

#[controller(name = "sessions")]
impl SessionsController {
#[get("/count")]
async fn get_count(&self) -> (StatusCode, String) {
let sessions_count = SESSION_IN_PROGRESS_COUNT.load(Ordering::Relaxed).to_string();
(StatusCode::OK, sessions_count)
let sessions = SESSIONS_IN_PROGRESS.read().await;
(StatusCode::OK, sessions.len().to_string())
}

#[get("/")]
async fn get_sessions(&self) -> Result<Json<Vec<GatewaySessionInfo>>, HttpErrorStatus> {
let sessions = SESSIONS_IN_PROGRESS.read().await;

let sessions_in_progress: Vec<GatewaySessionInfo> = sessions.values().map(|x| x.clone()).collect();

Ok(Json(sessions_in_progress))
}
}
2 changes: 1 addition & 1 deletion devolutions-gateway/src/http/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn configure_http_server(config: Arc<Config>, jet_associations: JetAssociati
info!("Loading HTTP controllers");
let health = HealthController::new(config.clone());
let jet = JetController::new(config.clone(), jet_associations.clone());
let session = SessionsController::default();
let session = SessionsController;

let registry_name = config
.sogar_registry_config
Expand Down
16 changes: 10 additions & 6 deletions devolutions-gateway/src/jet/association.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::http::controllers::jet::JetTpType;
use crate::jet::candidate::{Candidate, CandidateResponse, CandidateState};
use crate::jet::TransportType;
use chrono::serde::ts_seconds;
use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use jet_proto::token::JetSessionTokenClaims;
use serde_json::Value;
use uuid::Uuid;

Expand All @@ -12,17 +12,17 @@ pub struct Association {
version: u8,
creation_timestamp: DateTime<Utc>,
candidates: IndexMap<Uuid, Candidate>,
jet_tp: Option<JetTpType>,
session_token: JetSessionTokenClaims,
}

impl Association {
pub fn new(id: Uuid, version: u8, jet_tp: Option<JetTpType>) -> Self {
pub fn new(id: Uuid, version: u8, session_token: JetSessionTokenClaims) -> Self {
Association {
id,
version,
creation_timestamp: Utc::now(),
candidates: IndexMap::new(),
jet_tp,
session_token,
}
}

Expand Down Expand Up @@ -92,8 +92,12 @@ impl Association {
.any(|(_, candidate)| candidate.state() == CandidateState::Connected)
}

pub fn get_jet_tp_claim(&self) -> Option<JetTpType> {
self.jet_tp.clone()
pub fn jet_session_token_claims(&self) -> &JetSessionTokenClaims {
&self.session_token
}

pub fn record_session(&self) -> bool {
self.session_token.jet_rec
}
}

Expand Down
Loading

0 comments on commit d87bbf2

Please sign in to comment.