Skip to content

Commit

Permalink
Paginate sessions list, added filtering (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugeny authored Jul 3, 2022
1 parent a3a8a60 commit 6830c0c
Show file tree
Hide file tree
Showing 34 changed files with 592 additions and 130 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ poem-openapi = { version = "^1.3.30", features = [
"static-files",
] }
russh = { version = "0.34.0-beta.5", features = ["openssl"] }
russh-keys = { version = "0.22.0-beta.2", features = ["openssl"] }
russh-keys = { version = "0.22.0-beta.3", features = ["openssl"] }
rust-embed = "6.3"
time = "0.3"
tokio = { version = "1.19", features = ["tracing", "signal", "macros"] }
Expand Down
1 change: 1 addition & 0 deletions warpgate-admin/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use poem_openapi::OpenApi;
pub mod known_hosts_detail;
pub mod known_hosts_list;
pub mod logs;
mod pagination;
pub mod recordings_detail;
pub mod sessions_detail;
pub mod sessions_list;
Expand Down
55 changes: 55 additions & 0 deletions warpgate-admin/src/api/pagination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use poem_openapi::types::{ParseFromJSON, ToJSON};
use poem_openapi::Object;
use sea_orm::{ConnectionTrait, EntityTrait, FromQueryResult, PaginatorTrait, QuerySelect, Select};

#[derive(Object)]
#[oai(inline)]
pub struct PaginatedResponse<T: ParseFromJSON + ToJSON + Send + Sync> {
items: Vec<T>,
offset: u64,
total: u64,
}

pub struct PaginationParams {
pub offset: Option<u64>,
pub limit: Option<u64>,
}

impl<T: ParseFromJSON + ToJSON + Send + Sync> PaginatedResponse<T> {
pub async fn new<E, M, C, P>(
query: Select<E>,
params: PaginationParams,
db: &'_ C,
postprocess: P,
) -> poem::Result<PaginatedResponse<T>>
where
E: EntityTrait<Model = M>,
C: ConnectionTrait,
M: FromQueryResult + Sized + Send + Sync + 'static,
P: FnMut(E::Model) -> T,
{
let offset = params.offset.unwrap_or(0);
let limit = params.limit.unwrap_or(100);

let paginator = query.clone().paginate(db, limit as usize);

let total = paginator
.num_items()
.await
.map_err(poem::error::InternalServerError)? as u64;

let query = query.offset(offset).limit(limit);

let items = query
.all(db)
.await
.map_err(poem::error::InternalServerError)?;

let items = items.into_iter().map(postprocess).collect::<Vec<_>>();
Ok(PaginatedResponse {
items,
offset,
total,
})
}
}
66 changes: 54 additions & 12 deletions warpgate-admin/src/api/sessions_list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use super::pagination::{PaginatedResponse, PaginationParams};
use futures::{SinkExt, StreamExt};
use poem::session::Session;
use poem::web::websocket::{Message, WebSocket};
use poem::web::Data;
use poem::{handler, IntoResponse};
use poem_openapi::param::Query;
use poem_openapi::payload::Json;
use poem_openapi::{ApiResponse, OpenApi};
use sea_orm::{DatabaseConnection, EntityTrait, QueryOrder};
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter, QueryOrder};
use std::sync::Arc;
use tokio::sync::Mutex;
use warpgate_common::{SessionSnapshot, State};
Expand All @@ -11,7 +17,7 @@ pub struct Api;
#[derive(ApiResponse)]
enum GetSessionsResponse {
#[oai(status = 200)]
Ok(Json<Vec<SessionSnapshot>>),
Ok(Json<PaginatedResponse<SessionSnapshot>>),
}

#[derive(ApiResponse)]
Expand All @@ -26,20 +32,35 @@ impl Api {
async fn api_get_all_sessions(
&self,
db: Data<&Arc<Mutex<DatabaseConnection>>>,
offset: Query<Option<u64>>,
limit: Query<Option<u64>>,
active_only: Query<Option<bool>>,
logged_in_only: Query<Option<bool>>,
) -> poem::Result<GetSessionsResponse> {
use warpgate_db_entities::Session;

let db = db.lock().await;
let sessions = Session::Entity::find()
.order_by_desc(Session::Column::Started)
.all(&*db)
.await
.map_err(poem::error::InternalServerError)?;
let sessions = sessions
.into_iter()
.map(Into::into)
.collect::<Vec<SessionSnapshot>>();
Ok(GetSessionsResponse::Ok(Json(sessions)))
let mut q = Session::Entity::find().order_by_desc(Session::Column::Started);

if active_only.unwrap_or(false) {
q = q.filter(Session::Column::Ended.is_null());
}
if logged_in_only.unwrap_or(false) {
q = q.filter(Session::Column::Username.is_not_null());
}

Ok(GetSessionsResponse::Ok(Json(
PaginatedResponse::new(
q,
PaginationParams {
limit: *limit,
offset: *offset,
},
&*db,
Into::into,
)
.await?,
)))
}

#[oai(
Expand All @@ -50,6 +71,7 @@ impl Api {
async fn api_close_all_sessions(
&self,
state: Data<&Arc<Mutex<State>>>,
session: &Session,
) -> poem::Result<CloseAllSessionsResponse> {
let state = state.lock().await;

Expand All @@ -58,6 +80,26 @@ impl Api {
session.handle.close();
}

session.purge();

Ok(CloseAllSessionsResponse::Ok)
}
}

#[handler]
pub async fn api_get_sessions_changes_stream(
ws: WebSocket,
state: Data<&Arc<Mutex<State>>>,
) -> impl IntoResponse {
let mut receiver = state.lock().await.subscribe();

ws.on_upgrade(|socket| async move {
let (mut sink, _) = socket.split();

while receiver.recv().await.is_ok() {
sink.send(Message::Text("".to_string())).await?;
}

Ok::<(), anyhow::Error>(())
})
}
2 changes: 1 addition & 1 deletion warpgate-admin/src/api/ssh_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Api {
.into_iter()
.map(|k| SSHKey {
kind: k.name().to_owned(),
public_key_base64: k.public_key_base64().replace('\n', "").replace('\r', ""),
public_key_base64: k.public_key_base64(),
})
.collect();
Ok(GetSSHOwnKeysResponse::Ok(Json(keys)))
Expand Down
4 changes: 4 additions & 0 deletions warpgate-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ pub fn admin_api_app(services: &Services) -> impl IntoEndpoint {
"/recordings/:id/tcpdump",
crate::api::recordings_detail::api_get_recording_tcpdump,
)
.at(
"/sessions/changes",
crate::api::sessions_list::api_get_sessions_changes_stream,
)
.data(db)
.data(config_provider)
.data(state)
Expand Down
1 change: 0 additions & 1 deletion warpgate-common/src/config_providers/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ impl ConfigProvider for FileConfigProvider {
let mut base64_bytes = BASE64_MIME.encode(public_key_bytes);
base64_bytes.pop();
base64_bytes.pop();
let base64_bytes = base64_bytes.replace("\r\n", "");

let client_key = format!("{} {}", kind, base64_bytes);
debug!(username = &user.username[..], "Client key: {}", client_key);
Expand Down
2 changes: 1 addition & 1 deletion warpgate-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ pub use config_providers::*;
pub use data::*;
pub use protocols::*;
pub use services::*;
pub use state::{SessionState, State};
pub use state::{SessionState, SessionStateInit, State};
pub use try_macro::*;
pub use types::*;
8 changes: 6 additions & 2 deletions warpgate-common/src/protocols/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl WarpgateServerHandle {
use sea_orm::ActiveValue::Set;

{
self.session_state.lock().await.username = Some(username.clone())
let mut state = self.session_state.lock().await;
state.username = Some(username.clone());
state.emit_change()
}

let db = self.db.lock().await;
Expand All @@ -63,7 +65,9 @@ impl WarpgateServerHandle {
pub async fn set_target(&self, target: &Target) -> Result<()> {
use sea_orm::ActiveValue::Set;
{
self.session_state.lock().await.target = Some(target.clone());
let mut state = self.session_state.lock().await;
state.target = Some(target.clone());
state.emit_change()
}

let db = self.db.lock().await;
Expand Down
44 changes: 36 additions & 8 deletions warpgate-common/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Weak};
use tokio::sync::Mutex;
use tokio::sync::{broadcast, Mutex};
use tracing::*;
use uuid::Uuid;
use warpgate_db_entities::Session;
Expand All @@ -13,34 +13,43 @@ pub struct State {
pub sessions: HashMap<SessionId, Arc<Mutex<SessionState>>>,
db: Arc<Mutex<DatabaseConnection>>,
this: Weak<Mutex<Self>>,
change_sender: broadcast::Sender<()>,
}

impl State {
pub fn new(db: &Arc<Mutex<DatabaseConnection>>) -> Arc<Mutex<Self>> {
let sender = broadcast::channel(2).0;
Arc::<Mutex<Self>>::new_cyclic(|me| {
Mutex::new(Self {
sessions: HashMap::new(),
db: db.clone(),
this: me.clone(),
change_sender: sender,
})
})
}

pub async fn register_session(
&mut self,
protocol: &ProtocolName,
session: &Arc<Mutex<SessionState>>,
state: SessionStateInit,
) -> Result<Arc<Mutex<WarpgateServerHandle>>> {
let id = uuid::Uuid::new_v4();
self.sessions.insert(id, session.clone());

let state = Arc::new(Mutex::new(SessionState::new(
state,
self.change_sender.clone(),
)));

self.sessions.insert(id, state.clone());

{
use sea_orm::ActiveValue::Set;

let values = Session::ActiveModel {
id: Set(id),
started: Set(chrono::Utc::now()),
remote_address: Set(session
remote_address: Set(state
.lock()
.await
.remote_address
Expand All @@ -57,23 +66,31 @@ impl State {
.context("Error inserting session")?;
}

let _ = self.change_sender.send(());

match self.this.upgrade() {
Some(this) => Ok(Arc::new(Mutex::new(WarpgateServerHandle::new(
id,
self.db.clone(),
this,
session.clone(),
state,
)))),
None => anyhow::bail!("State is being detroyed"),
}
}

pub fn subscribe(&mut self) -> broadcast::Receiver<()> {
self.change_sender.subscribe()
}

pub async fn remove_session(&mut self, id: SessionId) {
self.sessions.remove(&id);

if let Err(error) = self.mark_session_complete(id).await {
error!(%error, %id, "Could not update session in the DB");
}

let _ = self.change_sender.send(());
}

async fn mark_session_complete(&mut self, id: Uuid) -> Result<()> {
Expand All @@ -95,15 +112,26 @@ pub struct SessionState {
pub username: Option<String>,
pub target: Option<Target>,
pub handle: Box<dyn SessionHandle + Send>,
change_sender: broadcast::Sender<()>,
}

pub struct SessionStateInit {
pub remote_address: Option<SocketAddr>,
pub handle: Box<dyn SessionHandle + Send>,
}

impl SessionState {
pub fn new(remote_address: Option<SocketAddr>, handle: Box<dyn SessionHandle + Send>) -> Self {
fn new(init: SessionStateInit, change_sender: broadcast::Sender<()>) -> Self {
SessionState {
remote_address,
remote_address: init.remote_address,
username: None,
target: None,
handle,
handle: init.handle,
change_sender,
}
}

pub fn emit_change(&self) {
let _ = self.change_sender.send(());
}
}
7 changes: 4 additions & 3 deletions warpgate-protocol-http/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::time::Duration;

use http::StatusCode;
use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC};
use poem::session::Session;
use poem::web::{Data, Redirect};
use poem::{Endpoint, EndpointExt, FromRequest, IntoResponse, Request, Response};
use warpgate_common::{Services, TargetOptions};
use std::time::Duration;
use warpgate_common::{ProtocolName, Services, TargetOptions};

pub const PROTOCOL_NAME: ProtocolName = "HTTP";
static USERNAME_SESSION_KEY: &str = "username";
static TARGET_SESSION_KEY: &str = "target_name";
pub static SESSION_MAX_AGE: Duration = Duration::from_secs(60 * 30);
pub static COOKIE_MAX_AGE: Duration = Duration::from_secs(60 * 60 * 24);

pub trait SessionExt {
fn has_selected_target(&self) -> bool;
Expand Down
Loading

0 comments on commit 6830c0c

Please sign in to comment.