From a80282ebd71992ee7ee32e90e2943e836c9985ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20CORTIER?= Date: Fri, 19 Aug 2022 13:36:22 -0400 Subject: [PATCH] feat(dgw): subscriber API --- README.md | 5 + devolutions-gateway/doc/api.yaml | 27 +++- devolutions-gateway/doc/subscriber-api.yaml | 78 +++++++++++ devolutions-gateway/src/config.rs | 29 ++++- devolutions-gateway/src/generic_client.rs | 16 ++- .../src/http/controllers/config.rs | 8 +- .../src/http/controllers/jrl.rs | 2 +- .../src/http/controllers/sessions.rs | 3 +- devolutions-gateway/src/jet_client.rs | 42 ++++-- .../src/jet_rendezvous_tcp_proxy.rs | 4 + devolutions-gateway/src/lib.rs | 51 ++++++-- devolutions-gateway/src/listener.rs | 42 ++++-- devolutions-gateway/src/main.rs | 10 +- devolutions-gateway/src/openapi.rs | 95 +++++++++++++- devolutions-gateway/src/proxy.rs | 72 +++++++---- devolutions-gateway/src/rdp.rs | 18 ++- devolutions-gateway/src/registry/mod.rs | 1 + devolutions-gateway/src/routing_client.rs | 41 ------ devolutions-gateway/src/service.rs | 24 +++- devolutions-gateway/src/subscriber.rs | 121 ++++++++++++++++++ devolutions-gateway/src/websocket_client.rs | 52 +++++--- devolutions-gateway/tests/config.rs | 2 + tools/generate-openapi/generate.sh | 3 +- tools/generate-openapi/src/main.rs | 18 ++- tools/subscriber-dummy/Cargo.toml | 15 +++ tools/subscriber-dummy/src/main.rs | 22 ++++ 26 files changed, 648 insertions(+), 153 deletions(-) create mode 100644 devolutions-gateway/doc/subscriber-api.yaml delete mode 100644 devolutions-gateway/src/routing_client.rs create mode 100644 devolutions-gateway/src/subscriber.rs create mode 100644 tools/subscriber-dummy/Cargo.toml create mode 100644 tools/subscriber-dummy/src/main.rs diff --git a/README.md b/README.md index 6d121a84f..c9d989d87 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,11 @@ Currently stable options are: Host segment may be abridged with `*`. +- `Subscriber`: subscriber configuration: + + * `Url`: HTTP URL where notification messages are to be sent, + * `Token`: bearer token to use when making HTTP requests. + ## Sample Usage ### RDP routing diff --git a/devolutions-gateway/doc/api.yaml b/devolutions-gateway/doc/api.yaml index e8d590629..cad1a92bb 100644 --- a/devolutions-gateway/doc/api.yaml +++ b/devolutions-gateway/doc/api.yaml @@ -4,7 +4,7 @@ info: description: Protocol-aware fine-grained relay server contact: name: Devolutions Inc. - email: '' + email: infos@devolutions.net license: name: MIT/Apache-2.0 version: 2022.2.2 @@ -241,6 +241,8 @@ components: format: uuid SubProvisionerPublicKey: $ref: '#/components/schemas/SubProvisionerKey' + Subscriber: + $ref: '#/components/schemas/Subscriber' ConnectionMode: type: string enum: @@ -267,7 +269,7 @@ components: jti: type: string format: uuid - description: Unique ID for current  JRL + description: Unique ID for current JRL ListenerUrls: type: object required: @@ -323,4 +325,25 @@ components: type: string Value: type: string + Subscriber: + type: object + required: + - Url + - Token + properties: + Token: + type: string + Url: + type: string + securitySchemes: + jrl_token: + type: http + scheme: bearer + bearerFormat: JWT + description: Contains the JRL to apply if newer + scope_token: + type: http + scheme: bearer + bearerFormat: JWT + description: Token allowing a single HTTP request for a specific scope diff --git a/devolutions-gateway/doc/subscriber-api.yaml b/devolutions-gateway/doc/subscriber-api.yaml new file mode 100644 index 000000000..27433ab52 --- /dev/null +++ b/devolutions-gateway/doc/subscriber-api.yaml @@ -0,0 +1,78 @@ +openapi: 3.0.3 +info: + title: devolutions-gateway-subscriber + description: API a service must implement in order to receive Devolutions Gateway + notifications + contact: + name: Devolutions Inc. + email: infos@devolutions.net + license: + name: MIT/Apache-2.0 + version: 2022.2.2 +paths: + /: + post: + tags: + - crate::subscriber + summary: Process a message originating from a Devolutions Gateway instance + description: | + Process a message originating from a Devolutions Gateway instance + operationId: PostMessage + requestBody: + description: Message + content: + application/json: + schema: + $ref: '#/components/schemas/SubscriberMessage' + required: true + responses: + '200': + description: Message received and processed successfuly + '400': + description: Bad message + '401': + description: Invalid or missing authorization token + '403': + description: Insufficient permissions + deprecated: false + security: + - subscriber_token: [] +components: + schemas: + SubscriberMessage: + type: object + required: + - kind + properties: + kind: + $ref: '#/components/schemas/SubscriberMessageKind' + session: + $ref: '#/components/schemas/SubscriberSessionInfo' + session_list: + type: array + items: + $ref: '#/components/schemas/SubscriberSessionInfo' + SubscriberMessageKind: + type: string + enum: + - session.started + - session.ended + - session.list + SubscriberSessionInfo: + type: object + required: + - association_id + - start_timestamp + properties: + association_id: + type: string + format: uuid + start_timestamp: + type: string + format: date-time + securitySchemes: + subscriber_token: + type: http + scheme: bearer + description: Token allowing to push messages + diff --git a/devolutions-gateway/src/config.rs b/devolutions-gateway/src/config.rs index 31a294494..4313dda98 100644 --- a/devolutions-gateway/src/config.rs +++ b/devolutions-gateway/src/config.rs @@ -13,6 +13,7 @@ use std::fs::File; use std::io::BufReader; use std::sync::Arc; use tap::prelude::*; +use tokio::sync::Notify; use tokio_rustls::rustls; use url::Url; use uuid::Uuid; @@ -80,8 +81,9 @@ impl Tls { #[derive(Debug, Clone)] pub struct Conf { pub id: Option, - pub listeners: Vec, pub hostname: String, + pub listeners: Vec, + pub subscriber: Option, pub capture_path: Option, pub log_file: Utf8PathBuf, pub log_directive: Option, @@ -176,8 +178,9 @@ impl Conf { Ok(Conf { id: conf_file.id, - listeners, hostname, + listeners, + subscriber: conf_file.subscriber.clone(), capture_path: conf_file.capture_path.clone(), log_file, log_directive: conf_file.log_directive.clone(), @@ -203,6 +206,7 @@ pub struct ConfHandle { struct ConfHandleInner { conf: parking_lot::RwLock>, conf_file: parking_lot::RwLock>, + changed: Notify, } impl ConfHandle { @@ -228,6 +232,7 @@ impl ConfHandle { inner: Arc::new(ConfHandleInner { conf: parking_lot::RwLock::new(Arc::new(conf)), conf_file: parking_lot::RwLock::new(Arc::new(conf_file)), + changed: Notify::new(), }), }) } @@ -242,6 +247,11 @@ impl ConfHandle { self.inner.conf_file.read().clone() } + /// Waits for configuration to be changed + pub async fn change_notified(&self) { + self.inner.changed.notified().await; + } + /// Atomically saves and replaces current configuration with a new one #[instrument(skip(self))] pub fn save_new_conf_file(&self, conf_file: dto::ConfFile) -> anyhow::Result<()> { @@ -249,6 +259,7 @@ impl ConfHandle { save_config(&conf_file).context("Failed to save configuration")?; *self.inner.conf.write() = Arc::new(conf); *self.inner.conf_file.write() = Arc::new(conf_file); + self.inner.changed.notify_waiters(); trace!("success"); Ok(()) } @@ -503,6 +514,10 @@ pub mod dto { /// Listeners to launch at startup pub listeners: Vec, + /// Subscriber API + #[serde(skip_serializing_if = "Option::is_none")] + pub subscriber: Option, + /// (Unstable) Folder and prefix for log files #[serde(skip_serializing_if = "Option::is_none")] pub log_file: Option, @@ -561,6 +576,7 @@ pub mod dto { external_url: "wss://*:7171".try_into().unwrap(), }, ], + subscriber: None, log_file: None, jrl_file: None, log_directive: None, @@ -735,4 +751,13 @@ pub mod dto { pub internal_url: String, pub external_url: String, } + + #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] + #[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] + #[serde(rename_all = "PascalCase")] + pub struct Subscriber { + #[cfg_attr(feature = "openapi", schema(value_type = String))] + pub url: Url, + pub token: String, + } } diff --git a/devolutions-gateway/src/generic_client.rs b/devolutions-gateway/src/generic_client.rs index 1802fe99c..ca227fe25 100644 --- a/devolutions-gateway/src/generic_client.rs +++ b/devolutions-gateway/src/generic_client.rs @@ -2,6 +2,7 @@ use crate::config::Conf; use crate::jet_client::JetAssociationsMap; use crate::preconnection_pdu::{extract_association_claims, read_preconnection_pdu}; use crate::rdp::RdpClient; +use crate::subscriber::SubscriberSender; use crate::token::{ApplicationProtocol, ConnectionMode, CurrentJrl, Protocol, TokenCache}; use crate::{utils, ConnectionModeDetails, GatewaySessionInfo, Proxy}; use anyhow::Context; @@ -14,37 +15,40 @@ use typed_builder::TypedBuilder; #[derive(TypedBuilder)] pub struct GenericClient { - config: Arc, + conf: Arc, associations: Arc, token_cache: Arc, jrl: Arc, client_addr: SocketAddr, client_stream: TcpStream, + subscriber_tx: SubscriberSender, } impl GenericClient { pub async fn serve(self) -> anyhow::Result<()> { let Self { - config, + conf, associations, token_cache, jrl, client_addr, mut client_stream, + subscriber_tx, } = self; let (pdu, mut leftover_bytes) = read_preconnection_pdu(&mut client_stream).await?; let source_ip = client_addr.ip(); - let association_claims = extract_association_claims(&pdu, source_ip, &config, &token_cache, &jrl)?; + let association_claims = extract_association_claims(&pdu, source_ip, &conf, &token_cache, &jrl)?; match association_claims.jet_ap { // We currently special case this because it may be the "RDP-TLS" protocol ApplicationProtocol::Known(Protocol::Rdp) => { RdpClient { - config, + conf, associations, token_cache, jrl, + subscriber_tx, } .serve_with_association_claims_and_leftover_bytes( client_addr, @@ -72,6 +76,7 @@ impl GenericClient { .associations(associations) .association_id(association_id) .client_transport(AnyStream::from(client_stream)) + .subscriber_tx(subscriber_tx) .build() .start(&leftover_bytes) .await @@ -105,10 +110,11 @@ impl GenericClient { .with_filtering_policy(filtering_policy); Proxy::init() - .config(config) + .conf(conf) .session_info(info) .addrs(client_addr, server_transport.addr) .transports(client_stream, server_transport) + .subscriber(subscriber_tx) .select_dissector_and_forward() .await .context("Encountered a failure during plain tcp traffic proxying") diff --git a/devolutions-gateway/src/http/controllers/config.rs b/devolutions-gateway/src/http/controllers/config.rs index d40595dfe..5ab182bf1 100644 --- a/devolutions-gateway/src/http/controllers/config.rs +++ b/devolutions-gateway/src/http/controllers/config.rs @@ -1,4 +1,4 @@ -use crate::config::dto::{DataEncoding, PubKeyFormat}; +use crate::config::dto::{DataEncoding, PubKeyFormat, Subscriber}; use crate::config::ConfHandle; use crate::http::guards::access::{AccessGuard, TokenType}; use crate::http::HttpErrorStatus; @@ -29,8 +29,6 @@ impl ConfigController { } } -const KEY_ALLOWLIST: &[&str] = &["Id", "SubProvisionerPublicKey"]; - #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] #[derive(Serialize, Deserialize)] #[serde(rename_all = "PascalCase")] @@ -39,6 +37,8 @@ pub struct ConfigPatch { pub id: Option, #[serde(skip_serializing_if = "Option::is_none")] pub sub_provisioner_public_key: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub subscriber: Option, } #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] @@ -51,6 +51,8 @@ pub struct SubProvisionerKey { pub encoding: Option, } +const KEY_ALLOWLIST: &[&str] = &["Id", "SubProvisionerPublicKey", "Subscriber"]; + /// Modifies configuration #[cfg_attr(feature = "openapi", utoipa::path( patch, diff --git a/devolutions-gateway/src/http/controllers/jrl.rs b/devolutions-gateway/src/http/controllers/jrl.rs index cde7e5665..c726af2e2 100644 --- a/devolutions-gateway/src/http/controllers/jrl.rs +++ b/devolutions-gateway/src/http/controllers/jrl.rs @@ -121,7 +121,7 @@ async fn get_jrl_info(revocation_list: &CurrentJrl) -> Json { #[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] #[derive(Serialize)] pub struct JrlInfo { - /// Unique ID for current  JRL + /// Unique ID for current JRL pub jti: Uuid, /// JWT "Issued At" claim of JRL pub iat: i64, diff --git a/devolutions-gateway/src/http/controllers/sessions.rs b/devolutions-gateway/src/http/controllers/sessions.rs index 106316e1f..dacfefe1c 100644 --- a/devolutions-gateway/src/http/controllers/sessions.rs +++ b/devolutions-gateway/src/http/controllers/sessions.rs @@ -31,8 +31,7 @@ impl SessionsController { security(("scope_token" = ["gateway.sessions.read"])), ))] pub(crate) async fn get_sessions() -> Json> { - let sessions = SESSIONS_IN_PROGRESS.read().await; - let sessions_in_progress: Vec = sessions.values().cloned().collect(); + let sessions_in_progress: Vec = SESSIONS_IN_PROGRESS.read().values().cloned().collect(); Json(sessions_in_progress) } diff --git a/devolutions-gateway/src/jet_client.rs b/devolutions-gateway/src/jet_client.rs index ddecc8537..a6da6f579 100644 --- a/devolutions-gateway/src/jet_client.rs +++ b/devolutions-gateway/src/jet_client.rs @@ -5,6 +5,7 @@ use crate::jet::association::Association; use crate::jet::candidate::CandidateState; use crate::jet::TransportType; use crate::registry::Registry; +use crate::subscriber::SubscriberSender; use crate::token::JetAssociationTokenClaims; use crate::utils::association::{remove_jet_association, ACCEPT_REQUEST_TIMEOUT}; use crate::utils::create_tls_connector; @@ -34,19 +35,21 @@ pub type JetAssociationsMap = Mutex>; #[derive(TypedBuilder)] pub struct JetClient { - config: Arc, + conf: Arc, associations: Arc, addr: SocketAddr, transport: TcpStream, + subscriber_tx: SubscriberSender, } impl JetClient { pub async fn serve(self) -> anyhow::Result<()> { let Self { - config, + conf, associations, addr, mut transport, + subscriber_tx, } = self; let msg = read_jet_message(&mut transport).await?; @@ -54,7 +57,7 @@ impl JetClient { match msg { JetMessage::JetTestReq(jet_test_req) => handle_test_jet_msg(transport, jet_test_req).await, JetMessage::JetAcceptReq(jet_accept_req) => { - HandleAcceptJetMsg::new(config, addr, transport, jet_accept_req, associations.clone()) + HandleAcceptJetMsg::new(conf, addr, transport, jet_accept_req, associations.clone()) .accept() .await } @@ -64,7 +67,7 @@ impl JetClient { let association_id = response.association_id; let candidate_id = response.candidate_id; - let proxy_result = handle_build_proxy(&associations, config, response).await; + let proxy_result = handle_build_proxy(&associations, conf, subscriber_tx, response).await; remove_jet_association(&associations, association_id, Some(candidate_id)); @@ -82,6 +85,7 @@ async fn handle_build_tls_proxy( client_inspector: PluginRecordingInspector, server_inspector: PluginRecordingInspector, tls_acceptor: &TlsAcceptor, + subscriber_tx: SubscriberSender, ) -> anyhow::Result<()> { let client_stream = tls_acceptor.accept(response.client_transport).await?; let mut client_transport = Interceptor::new(client_stream); @@ -102,13 +106,15 @@ async fn handle_build_tls_proxy( Proxy::init() .session_info(info) .transports(client_transport, server_transport) + .subscriber(subscriber_tx) .forward() .await } async fn handle_build_proxy( associations: &JetAssociationsMap, - config: Arc, + conf: Arc, + subscriber_tx: SubscriberSender, response: HandleConnectJetMsgResponse, ) -> anyhow::Result<()> { let mut recording_inspector: Option<(PluginRecordingInspector, PluginRecordingInspector)> = None; @@ -117,12 +123,12 @@ async fn handle_build_proxy( let mut file_pattern = None; if let Some(association) = associations.lock().get(&association_id) { - match (association.record_session(), config.plugins.is_some()) { + match (association.record_session(), conf.plugins.is_some()) { (true, true) => { let init_result = PluginRecordingInspector::init( association_id, response.candidate_id, - config.recording_path.as_ref().map(|path| path.as_str()), + conf.recording_path.as_ref().map(|path| path.as_str()), )?; recording_dir = init_result.recording_dir; file_pattern = Some(init_result.filename_pattern); @@ -134,16 +140,23 @@ async fn handle_build_proxy( } if let Some((client_inspector, server_inspector)) = recording_inspector { - let tls_acceptor = config + let tls_acceptor = conf .tls .as_ref() .map(|conf| &conf.acceptor) .context("TLS configuration is missing")?; - let proxy_result = handle_build_tls_proxy(response, client_inspector, server_inspector, tls_acceptor).await; + let proxy_result = handle_build_tls_proxy( + response, + client_inspector, + server_inspector, + tls_acceptor, + subscriber_tx, + ) + .await; if let (Some(dir), Some(pattern)) = (recording_dir, file_pattern) { - let registry = Registry::new(config); + let registry = Registry::new(conf); registry.manage_files(association_id.to_string(), pattern, &dir).await; }; @@ -160,6 +173,7 @@ async fn handle_build_proxy( Proxy::init() .session_info(info) .transports(response.server_transport, response.client_transport) + .subscriber(subscriber_tx) .forward() .await } @@ -214,7 +228,7 @@ async fn read_jet_message(transport: &mut TcpStream) -> anyhow::Result, + conf: Arc, transport: Option<(SocketAddr, TcpStream)>, request_msg: JetAcceptReq, associations: Arc, @@ -223,14 +237,14 @@ struct HandleAcceptJetMsg { impl HandleAcceptJetMsg { fn new( - config: Arc, + conf: Arc, addr: SocketAddr, transport: TcpStream, msg: JetAcceptReq, associations: Arc, ) -> Self { HandleAcceptJetMsg { - config, + conf, transport: Some((addr, transport)), request_msg: msg, associations, @@ -270,7 +284,7 @@ impl HandleAcceptJetMsg { status_code, version: self.request_msg.version, association: association_id, - instance: self.config.hostname.clone(), + instance: self.conf.hostname.clone(), timeout: ACCEPT_REQUEST_TIMEOUT.as_secs() as u32, }); let mut response_msg_buffer = Vec::with_capacity(512); diff --git a/devolutions-gateway/src/jet_rendezvous_tcp_proxy.rs b/devolutions-gateway/src/jet_rendezvous_tcp_proxy.rs index 79ccfc462..91762aa55 100644 --- a/devolutions-gateway/src/jet_rendezvous_tcp_proxy.rs +++ b/devolutions-gateway/src/jet_rendezvous_tcp_proxy.rs @@ -2,6 +2,7 @@ use crate::http::controllers::association::start_remove_association_future; use crate::jet::candidate::CandidateState; use crate::jet_client::JetAssociationsMap; use crate::proxy::Proxy; +use crate::subscriber::SubscriberSender; use crate::{ConnectionModeDetails, GatewaySessionInfo}; use anyhow::Context; use std::sync::Arc; @@ -15,6 +16,7 @@ pub struct JetRendezvousTcpProxy { associations: Arc, client_transport: AnyStream, association_id: Uuid, + subscriber_tx: SubscriberSender, } impl JetRendezvousTcpProxy { @@ -23,6 +25,7 @@ impl JetRendezvousTcpProxy { associations, mut client_transport, association_id, + subscriber_tx, } = self; let (mut server_transport, server_leftover, info) = { @@ -70,6 +73,7 @@ impl JetRendezvousTcpProxy { let proxy_result = Proxy::init() .session_info(info) .transports(client_transport, server_transport) + .subscriber(subscriber_tx) .forward() .await .context("An error occurred while running JetRendezvousTcpProxy"); diff --git a/devolutions-gateway/src/lib.rs b/devolutions-gateway/src/lib.rs index 9508db858..3d5a89e76 100644 --- a/devolutions-gateway/src/lib.rs +++ b/devolutions-gateway/src/lib.rs @@ -5,6 +5,9 @@ extern crate serde_derive; #[macro_use] extern crate tracing; +#[cfg(feature = "openapi")] +pub mod openapi; + pub mod config; pub mod generic_client; pub mod http; @@ -14,31 +17,27 @@ pub mod jet_client; pub mod jet_rendezvous_tcp_proxy; pub mod listener; pub mod log; -#[cfg(feature = "openapi")] -pub mod openapi; pub mod plugin_manager; pub mod preconnection_pdu; pub mod proxy; pub mod rdp; pub mod registry; -pub mod routing_client; pub mod service; +pub mod subscriber; pub mod token; pub mod transport; pub mod utils; pub mod websocket_client; -pub use proxy::Proxy; - use chrono::{DateTime, Utc}; use lazy_static::lazy_static; +use parking_lot::RwLock; +use proxy::Proxy; use std::collections::HashMap; use token::ApplicationProtocol; -use tokio::sync::RwLock; use utils::TargetAddr; use uuid::Uuid; -// TODO: investigate if parking_lot::RwLock should be used instead lazy_static! { pub static ref SESSIONS_IN_PROGRESS: RwLock> = RwLock::new(HashMap::new()); } @@ -89,13 +88,41 @@ impl GatewaySessionInfo { } } -pub async fn add_session_in_progress(session: GatewaySessionInfo) { - let mut sessions = SESSIONS_IN_PROGRESS.write().await; - sessions.insert(session.association_id, session); +#[instrument] +pub fn add_session_in_progress(tx: &subscriber::SubscriberSender, session: GatewaySessionInfo) { + let association_id = session.association_id; + let start_timestamp = session.start_timestamp; + + SESSIONS_IN_PROGRESS.write().insert(association_id, session); + + let message = subscriber::SubscriberMessage::SessionStarted { + session: subscriber::SubscriberSessionInfo { + association_id, + start_timestamp, + }, + }; + + if let Err(error) = tx.try_send(message) { + warn!(%error, "Failed to send subscriber message"); + } } -pub async fn remove_session_in_progress(id: Uuid) { - SESSIONS_IN_PROGRESS.write().await.remove(&id); +#[instrument] +pub fn remove_session_in_progress(tx: &subscriber::SubscriberSender, id: Uuid) { + let terminated_session = SESSIONS_IN_PROGRESS.write().remove(&id); + + if let Some(session) = terminated_session { + let message = subscriber::SubscriberMessage::SessionEnded { + session: subscriber::SubscriberSessionInfo { + association_id: id, + start_timestamp: session.start_timestamp, + }, + }; + + if let Err(error) = tx.try_send(message) { + warn!(%error, "Failed to send subscriber message"); + } + } } pub mod tls_sanity { diff --git a/devolutions-gateway/src/listener.rs b/devolutions-gateway/src/listener.rs index 9ad1e4e17..047ba10b7 100644 --- a/devolutions-gateway/src/listener.rs +++ b/devolutions-gateway/src/listener.rs @@ -1,6 +1,7 @@ use crate::config::{Conf, ConfHandle}; use crate::generic_client::GenericClient; use crate::jet_client::{JetAssociationsMap, JetClient}; +use crate::subscriber::SubscriberSender; use crate::token::{CurrentJrl, TokenCache}; use crate::utils::url_to_socket_addr; use crate::websocket_client::WebsocketService; @@ -39,6 +40,7 @@ pub struct GatewayListener { token_cache: Arc, jrl: Arc, conf_handle: ConfHandle, + subscriber_tx: SubscriberSender, } impl GatewayListener { @@ -48,6 +50,7 @@ impl GatewayListener { associations: Arc, token_cache: Arc, jrl: Arc, + subscriber_tx: SubscriberSender, ) -> anyhow::Result { let url = url.to_internal_url(); @@ -80,6 +83,7 @@ impl GatewayListener { associations, token_cache, jrl, + subscriber_tx, }) } @@ -106,9 +110,10 @@ impl GatewayListener { let associations = self.associations.clone(); let token_cache = self.token_cache.clone(); let jrl = self.jrl.clone(); + let subscriber_tx = self.subscriber_tx.clone(); let fut = async move { - if let Err(e) = $handler(conf, associations, token_cache, jrl, stream, peer_addr).await { + if let Err(e) = $handler(conf, associations, token_cache, jrl, subscriber_tx, stream, peer_addr).await { error!(concat!(stringify!($handler), " failure: {:#}"), e); } } @@ -142,20 +147,21 @@ impl GatewayListener { let associations = self.associations.clone(); let token_cache = self.token_cache.clone(); let jrl = self.jrl.clone(); + let subscriber_tx = self.subscriber_tx.clone(); match self.kind() { ListenerKind::Tcp => { - handle_tcp_client(conf, associations, token_cache, jrl, conn, peer_addr) + handle_tcp_client(conf, associations, token_cache, jrl, subscriber_tx, conn, peer_addr) .instrument(info_span!("tcp", client = %peer_addr)) .await? } ListenerKind::Ws => { - handle_ws_client(conf, associations, token_cache, jrl, conn, peer_addr) + handle_ws_client(conf, associations, token_cache, jrl, subscriber_tx, conn, peer_addr) .instrument(info_span!("ws", client = %peer_addr)) .await? } ListenerKind::Wss => { - handle_wss_client(conf, associations, token_cache, jrl, conn, peer_addr) + handle_wss_client(conf, associations, token_cache, jrl, subscriber_tx, conn, peer_addr) .instrument(info_span!("wss", client = %peer_addr)) .await? } @@ -170,6 +176,7 @@ async fn handle_tcp_client( associations: Arc, token_cache: Arc, jrl: Arc, + subscriber_tx: SubscriberSender, stream: TcpStream, peer_addr: SocketAddr, ) -> anyhow::Result<()> { @@ -185,10 +192,11 @@ async fn handle_tcp_client( match &peeked[..n_read] { [b'J', b'E', b'T', b'\0'] => { JetClient::builder() - .config(conf) + .conf(conf) .associations(associations) .addr(peer_addr) .transport(stream) + .subscriber_tx(subscriber_tx) .build() .serve() .instrument(info_span!("jet-client")) @@ -197,12 +205,13 @@ async fn handle_tcp_client( [b'J', b'M', b'U', b'X'] => anyhow::bail!("JMUX TCP listener not yet implemented"), _ => { GenericClient::builder() - .config(conf) + .conf(conf) .associations(associations) .client_addr(peer_addr) .client_stream(stream) .token_cache(token_cache) .jrl(jrl) + .subscriber_tx(subscriber_tx) .build() .serve() .instrument(info_span!("generic-client")) @@ -218,6 +227,7 @@ async fn handle_ws_client( associations: Arc, token_cache: Arc, jrl: Arc, + subscriber_tx: SubscriberSender, conn: TcpStream, peer_addr: SocketAddr, ) -> anyhow::Result<()> { @@ -226,7 +236,7 @@ async fn handle_ws_client( // Annonate using the type alias from `transport` just for sanity let conn: transport::TcpStream = conn; - process_ws_stream(conn, peer_addr, conf, associations, token_cache, jrl).await?; + process_ws_stream(conn, peer_addr, conf, associations, token_cache, jrl, subscriber_tx).await?; Ok(()) } @@ -236,6 +246,7 @@ async fn handle_wss_client( associations: Arc, token_cache: Arc, jrl: Arc, + subscriber_tx: SubscriberSender, stream: TcpStream, peer_addr: SocketAddr, ) -> anyhow::Result<()> { @@ -251,7 +262,16 @@ async fn handle_wss_client( .context("TLS handshake failed")? .pipe(tokio_rustls::TlsStream::Server); - process_ws_stream(tls_stream, peer_addr, conf, associations, token_cache, jrl).await?; + process_ws_stream( + tls_stream, + peer_addr, + conf, + associations, + token_cache, + jrl, + subscriber_tx, + ) + .await?; Ok(()) } @@ -259,19 +279,21 @@ async fn handle_wss_client( async fn process_ws_stream( io: I, remote_addr: SocketAddr, - config: Arc, + conf: Arc, associations: Arc, token_cache: Arc, jrl: Arc, + subscriber_tx: SubscriberSender, ) -> anyhow::Result<()> where I: AsyncWrite + AsyncRead + Unpin + Send + Sync + 'static, { let websocket_service = WebsocketService { associations, - config, + conf, token_cache, jrl, + subscriber_tx, }; let service = service_fn(move |req| { diff --git a/devolutions-gateway/src/main.rs b/devolutions-gateway/src/main.rs index ddda9efc1..899889f5b 100644 --- a/devolutions-gateway/src/main.rs +++ b/devolutions-gateway/src/main.rs @@ -5,6 +5,7 @@ use cfg_if::cfg_if; use devolutions_gateway::config::ConfHandle; use devolutions_gateway::service::{GatewayService, DESCRIPTION, DISPLAY_NAME, SERVICE_NAME}; use std::sync::mpsc; +use tap::prelude::*; use tracing::info; enum CliAction { @@ -84,7 +85,9 @@ fn main() -> anyhow::Result<()> { } else { let mut service = GatewayService::load(config).context("Service loading failed")?; - service.start(); + service + .start() + .tap_err(|error| tracing::error!(?error, "Failed to start"))?; // Waiting for some stop signal (CTRL-C…) let rt = tokio::runtime::Builder::new_current_thread() @@ -119,7 +122,10 @@ fn gateway_service_main( info!("{} service started", service.get_service_name()); info!("args: {:?}", args); - service.start(); + service + .start() + .tap_err(|error| tracing::error!(?error, "Failed to start")) + .expect("start service"); loop { if let Ok(control_code) = rx.recv() { diff --git a/devolutions-gateway/src/openapi.rs b/devolutions-gateway/src/openapi.rs index ae17fc9da..8e001df29 100644 --- a/devolutions-gateway/src/openapi.rs +++ b/devolutions-gateway/src/openapi.rs @@ -1,5 +1,6 @@ use chrono::{DateTime, Utc}; -use utoipa::OpenApi; +use utoipa::openapi::security::{HttpAuthScheme, HttpBuilder, SecurityScheme}; +use utoipa::{Modify, OpenApi}; use uuid::Uuid; #[derive(OpenApi)] @@ -20,18 +21,20 @@ use uuid::Uuid; crate::listener::ListenerUrls, crate::config::dto::DataEncoding, crate::config::dto::PubKeyFormat, + crate::config::dto::Subscriber, crate::http::controllers::diagnostics::ConfigDiagnostic, crate::http::controllers::diagnostics::ClockDiagnostic, crate::http::controllers::config::SubProvisionerKey, crate::http::controllers::config::ConfigPatch, crate::http::controllers::jrl::JrlInfo, - )) + )), + modifiers(&SecurityAddon), )] pub struct ApiDoc; #[allow(dead_code)] #[derive(utoipa::ToSchema)] -pub struct SessionInfo { +struct SessionInfo { association_id: Uuid, application_protocol: String, recording_policy: bool, @@ -41,9 +44,93 @@ pub struct SessionInfo { destination_host: Option, } +#[allow(unused)] #[derive(Serialize, utoipa::ToSchema)] #[serde(rename_all = "kebab-case")] -pub enum ConnectionMode { +enum ConnectionMode { Rdv, Fwd, } + +struct SecurityAddon; + +impl Modify for SecurityAddon { + fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) { + // we can unwrap safely since there already is components registered. + let components = openapi.components.as_mut().unwrap(); + + components.add_security_scheme( + "scope_token", + SecurityScheme::Http( + HttpBuilder::new() + .scheme(HttpAuthScheme::Bearer) + .bearer_format("JWT") + .description(Some( + "Token allowing a single HTTP request for a specific scope".to_owned(), + )) + .build(), + ), + ); + + components.add_security_scheme( + "jrl_token", + SecurityScheme::Http( + HttpBuilder::new() + .scheme(HttpAuthScheme::Bearer) + .bearer_format("JWT") + .description(Some("Contains the JRL to apply if newer".to_owned())) + .build(), + ), + ); + } +} + +#[derive(OpenApi)] +#[openapi( + paths(crate::subscriber::post_subscriber_message), + components(schemas(SubscriberMessage, SubscriberSessionInfo, SubscriberMessageKind)), + modifiers(&SubscriberSecurityAddon), +)] +pub struct SubscriberApiDoc; + +#[derive(utoipa::ToSchema, Serialize)] +struct SubscriberSessionInfo { + association_id: Uuid, + start_timestamp: DateTime, +} + +#[allow(unused)] +#[derive(utoipa::ToSchema, Serialize)] +enum SubscriberMessageKind { + #[serde(rename = "session.started")] + SessionStarted, + #[serde(rename = "session.ended")] + SessionEnded, + #[serde(rename = "session.list")] + SessionList, +} + +#[derive(utoipa::ToSchema, Serialize)] +#[serde(tag = "kind")] +struct SubscriberMessage { + kind: SubscriberMessageKind, + session: Option, + session_list: Option>, +} + +struct SubscriberSecurityAddon; + +impl Modify for SubscriberSecurityAddon { + fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) { + // we can unwrap safely since there already is components registered. + openapi.components.as_mut().unwrap().add_security_scheme( + "subscriber_token", + SecurityScheme::Http( + HttpBuilder::new() + .scheme(HttpAuthScheme::Bearer) + .description(Some("Token allowing to push messages".to_owned())) + .build(), + ), + ); + } +} diff --git a/devolutions-gateway/src/proxy.rs b/devolutions-gateway/src/proxy.rs index 7cd790d47..645bb7159 100644 --- a/devolutions-gateway/src/proxy.rs +++ b/devolutions-gateway/src/proxy.rs @@ -1,8 +1,9 @@ use crate::config::Conf; use crate::interceptor::pcap::PcapInspector; use crate::interceptor::{Dissector, DummyDissector, Interceptor, WaykDissector}; +use crate::subscriber::SubscriberSender; use crate::token::{ApplicationProtocol, Protocol}; -use crate::{add_session_in_progress, remove_session_in_progress, GatewaySessionInfo}; +use crate::GatewaySessionInfo; use camino::Utf8PathBuf; use std::net::SocketAddr; use std::sync::Arc; @@ -10,7 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; pub struct IsMissing; -pub struct HasConfig(Arc); +pub struct HasConf(Arc); pub struct HasSessionInfo(GatewaySessionInfo); @@ -24,69 +25,91 @@ pub struct HasAddresses { b: SocketAddr, } -pub struct Proxy { - pub config: CONF, +pub struct HasSubscriber { + tx: SubscriberSender, +} + +pub struct Proxy { + pub conf: CONF, pub session_info: INFO, pub transports: TRANSPORT, pub addrs: ADDR, + pub subscriber: SUBSCRIBER, } -impl Proxy { +impl Proxy { pub fn init() -> Self { Self { - config: IsMissing, + conf: IsMissing, session_info: IsMissing, transports: IsMissing, addrs: IsMissing, + subscriber: IsMissing, } } } -impl Proxy { - pub fn config(self, config: Arc) -> Proxy { +impl Proxy { + pub fn conf(self, conf: Arc) -> Proxy { Proxy { - config: HasConfig(config), + conf: HasConf(conf), session_info: self.session_info, transports: self.transports, addrs: self.addrs, + subscriber: self.subscriber, } } } -impl Proxy { - pub fn session_info(self, info: GatewaySessionInfo) -> Proxy { +impl Proxy { + pub fn session_info(self, info: GatewaySessionInfo) -> Proxy { Proxy { - config: self.config, + conf: self.conf, session_info: HasSessionInfo(info), transports: self.transports, addrs: self.addrs, + subscriber: self.subscriber, } } } -impl Proxy { - pub fn transports(self, a: A, b: B) -> Proxy, ADDR> { +impl Proxy { + pub fn transports(self, a: A, b: B) -> Proxy, ADDR, SUBSCRIBER> { Proxy { - config: self.config, + conf: self.conf, session_info: self.session_info, transports: HasTransports { a, b }, addrs: self.addrs, + subscriber: self.subscriber, } } } -impl Proxy { - pub fn addrs(self, a: SocketAddr, b: SocketAddr) -> Proxy { +impl Proxy { + pub fn addrs(self, a: SocketAddr, b: SocketAddr) -> Proxy { Proxy { - config: self.config, + conf: self.conf, session_info: self.session_info, transports: self.transports, addrs: HasAddresses { a, b }, + subscriber: self.subscriber, + } + } +} + +impl Proxy { + pub fn subscriber(self, tx: SubscriberSender) -> Proxy { + Proxy { + conf: self.conf, + session_info: self.session_info, + transports: self.transports, + addrs: self.addrs, + subscriber: HasSubscriber { tx }, } } } -impl Proxy, HasAddresses> +impl Proxy, HasAddresses, HasSubscriber> where A: AsyncWrite + AsyncRead + Unpin, B: AsyncWrite + AsyncRead + Unpin, @@ -122,7 +145,7 @@ where where D: Dissector + Send + 'static, { - if let Some(capture_path) = self.config.0.capture_path.as_ref() { + if let Some(capture_path) = self.conf.0.capture_path.as_ref() { let filename = format!( "{}({})-to-{}({})-at-{}.pcap", self.addrs.a.ip(), @@ -144,10 +167,11 @@ where b.inspectors.push(Box::new(server_inspector)); Proxy { - config: self.config, + conf: self.conf, session_info: self.session_info, transports: HasTransports { a, b }, addrs: self.addrs, + subscriber: self.subscriber, } .forward() .await @@ -157,7 +181,7 @@ where } } -impl Proxy, ADDR> +impl Proxy, ADDR, HasSubscriber> where A: AsyncWrite + AsyncRead + Unpin, B: AsyncWrite + AsyncRead + Unpin, @@ -165,9 +189,9 @@ where pub async fn forward(self) -> anyhow::Result<()> { let session_id = self.session_info.0.id(); - add_session_in_progress(self.session_info.0).await; + crate::add_session_in_progress(&self.subscriber.tx, self.session_info.0); let res = transport::forward_bidirectional(self.transports.a, self.transports.b).await; - remove_session_in_progress(session_id).await; + crate::remove_session_in_progress(&self.subscriber.tx, session_id); res.map(|_| ()) } diff --git a/devolutions-gateway/src/rdp.rs b/devolutions-gateway/src/rdp.rs index 1ff70166d..bae8462dd 100644 --- a/devolutions-gateway/src/rdp.rs +++ b/devolutions-gateway/src/rdp.rs @@ -10,6 +10,7 @@ use self::sequence_future::create_downgrade_dvc_capabilities_future; use crate::config::Conf; use crate::jet_client::JetAssociationsMap; use crate::preconnection_pdu::{extract_association_claims, read_preconnection_pdu}; +use crate::subscriber::SubscriberSender; use crate::token::{ApplicationProtocol, ConnectionMode, CurrentJrl, JetAssociationTokenClaims, Protocol, TokenCache}; use crate::transport::x224::NegotiationWithClientTransport; use crate::utils::{self, TargetAddr}; @@ -59,10 +60,11 @@ impl credssp::CredentialsProxy for RdpIdentity { } pub struct RdpClient { - pub config: Arc, + pub conf: Arc, pub associations: Arc, pub token_cache: Arc, pub jrl: Arc, + pub subscriber_tx: SubscriberSender, } impl RdpClient { @@ -70,8 +72,7 @@ impl RdpClient { let (pdu, leftover_bytes) = read_preconnection_pdu(&mut client_stream).await?; let client_addr = client_stream.peer_addr()?; let source_ip = client_addr.ip(); - let association_claims = - extract_association_claims(&pdu, source_ip, &self.config, &self.token_cache, &self.jrl)?; + let association_claims = extract_association_claims(&pdu, source_ip, &self.conf, &self.token_cache, &self.jrl)?; self.serve_with_association_claims_and_leftover_bytes( client_addr, client_stream, @@ -89,7 +90,10 @@ impl RdpClient { mut leftover_bytes: BytesMut, ) -> anyhow::Result<()> { let Self { - config, associations, .. + conf, + associations, + subscriber_tx, + .. } = self; if association_claims.jet_rec { @@ -121,10 +125,11 @@ impl RdpClient { .with_filtering_policy(association_claims.jet_flt); Proxy::init() - .config(config) + .conf(conf) .session_info(info) .addrs(client_addr, server_transport.addr) .transports(client_stream, server_transport) + .subscriber(subscriber_tx) .select_dissector_and_forward() .await .context("plain tcp traffic proxying failed") @@ -134,7 +139,7 @@ impl RdpClient { info!("Starting RDP-TLS redirection"); anyhow::bail!("RDP-TLS is temporary disabled"); - let tls_conf = config.tls.clone().context("TLS configuration is missing")?; + let tls_conf = conf.tls.clone().context("TLS configuration is missing")?; // We can't use FramedRead directly here, because we still have to use // the leftover bytes. As an alternative, the decoder could be modified to use the @@ -234,6 +239,7 @@ impl RdpClient { .associations(associations) .client_transport(AnyStream::from(client_stream)) .association_id(association_id) + .subscriber_tx(subscriber_tx) .build() .start(&leftover_bytes) .await diff --git a/devolutions-gateway/src/registry/mod.rs b/devolutions-gateway/src/registry/mod.rs index f67e5ad15..6c66323e7 100644 --- a/devolutions-gateway/src/registry/mod.rs +++ b/devolutions-gateway/src/registry/mod.rs @@ -203,6 +203,7 @@ IBaZdgBhPfHxF8KfTHvSzcUzWZojuR+ynaFL9AJK+8RiXnB4CJwIDAQAB id: None, listeners: Vec::new(), hostname: "hostname".to_owned(), + subscriber: None, capture_path: None, log_file: String::new().into(), log_directive: None, diff --git a/devolutions-gateway/src/routing_client.rs b/devolutions-gateway/src/routing_client.rs deleted file mode 100644 index 9ccd6e35e..000000000 --- a/devolutions-gateway/src/routing_client.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::config::Conf; -use crate::proxy::Proxy; -use crate::token::ApplicationProtocol; -use crate::utils::TargetAddr; -use crate::{ConnectionModeDetails, GatewaySessionInfo}; -use std::net::SocketAddr; -use std::sync::Arc; -use tokio::io::{AsyncRead, AsyncWrite}; -use url::Url; - -pub struct Client { - routing_url: Url, - config: Arc, -} - -impl Client { - pub fn new(routing_url: Url, config: Arc) -> Self { - Client { routing_url, config } - } - - pub async fn serve(self, client_addr: SocketAddr, client_transport: T) -> anyhow::Result<()> - where - T: AsyncRead + AsyncWrite + Unpin, - { - let server_transport = crate::utils::tcp_transport_connect_with_url(&self.routing_url).await?; - - let destination_host = TargetAddr::try_from(&self.routing_url)?; - - Proxy::init() - .config(self.config) - .session_info(GatewaySessionInfo::new( - uuid::Uuid::new_v4(), - ApplicationProtocol::unknown(), - ConnectionModeDetails::Fwd { destination_host }, - )) - .addrs(client_addr, server_transport.addr) - .transports(client_transport, server_transport) - .select_dissector_and_forward() - .await - } -} diff --git a/devolutions-gateway/src/service.rs b/devolutions-gateway/src/service.rs index 716bea658..35e7fa6b0 100644 --- a/devolutions-gateway/src/service.rs +++ b/devolutions-gateway/src/service.rs @@ -2,7 +2,8 @@ use crate::config::{Conf, ConfHandle}; use crate::http::http_server::configure_http_server; use crate::jet_client::JetAssociationsMap; use crate::listener::GatewayListener; -use crate::log::{self, log_deleter_task, LoggerGuard}; +use crate::log::{self, LoggerGuard}; +use crate::subscriber::subscriber_channel; use crate::token::{CurrentJrl, JrlTokenClaims}; use anyhow::Context; use parking_lot::Mutex; @@ -10,7 +11,7 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; -use tap::Pipe as _; +use tap::prelude::*; use tokio::runtime::{self, Runtime}; pub const SERVICE_NAME: &str = "devolutions-gateway"; @@ -71,7 +72,7 @@ impl GatewayService { DESCRIPTION } - pub fn start(&mut self) { + pub fn start(&mut self) -> anyhow::Result<()> { let runtime = runtime::Builder::new_multi_thread() .enable_all() .build() @@ -80,7 +81,7 @@ impl GatewayService { let config = self.conf_handle.clone(); // create_futures needs to be run in the runtime in order to bind the sockets. - let futures = runtime.block_on(async { create_futures(config).expect("failed to initiate gateway") }); + let futures = runtime.block_on(async { create_futures(config) })?; let join_all = futures::future::join_all(futures); @@ -93,6 +94,8 @@ impl GatewayService { }); self.state = GatewayState::Running { runtime }; + + Ok(()) } pub fn stop(&mut self) { @@ -133,6 +136,8 @@ fn create_futures(conf_handle: ConfHandle) -> anyhow::Result { let mut futures: VecOfFuturesType = Vec::with_capacity(conf.listeners.len()); + let (subscriber_tx, subscriber_rx) = subscriber_channel(); + let listeners = conf .listeners .iter() @@ -143,6 +148,7 @@ fn create_futures(conf_handle: ConfHandle) -> anyhow::Result { associations.clone(), token_cache.clone(), jrl.clone(), + subscriber_tx.clone(), ) .with_context(|| format!("Failed to initialize {}", listener.internal_url)) }) @@ -160,9 +166,17 @@ fn create_futures(conf_handle: ConfHandle) -> anyhow::Result { { let log_path = conf.log_file.clone(); - futures.push(Box::pin(async move { log_deleter_task(&log_path).await })); + futures.push(Box::pin(async move { crate::log::log_deleter_task(&log_path).await })); } + futures.push(Box::pin(async move { + crate::subscriber::subscriber_polling_task(subscriber_tx).await + })); + + futures.push(Box::pin(async move { + crate::subscriber::subscriber_task(conf_handle.clone(), subscriber_rx).await + })); + Ok(futures) } diff --git a/devolutions-gateway/src/subscriber.rs b/devolutions-gateway/src/subscriber.rs new file mode 100644 index 000000000..3538c1fb2 --- /dev/null +++ b/devolutions-gateway/src/subscriber.rs @@ -0,0 +1,121 @@ +use crate::config::dto::Subscriber; +use crate::config::ConfHandle; +use crate::SESSIONS_IN_PROGRESS; +use anyhow::Context as _; +use chrono::{DateTime, Utc}; +use std::time::Duration; +use tokio::sync::mpsc; +use tokio::time::sleep; +use uuid::Uuid; + +pub type SubscriberSender = mpsc::Sender; +pub type SubscriberReceiver = mpsc::Receiver; + +pub fn subscriber_channel() -> (SubscriberSender, SubscriberReceiver) { + mpsc::channel(64) +} + +#[derive(Debug, Serialize)] +pub struct SubscriberSessionInfo { + pub association_id: Uuid, + pub start_timestamp: DateTime, +} + +#[derive(Debug, Serialize)] +#[serde(tag = "kind")] +pub enum SubscriberMessage { + #[serde(rename = "session.started")] + SessionStarted { session: SubscriberSessionInfo }, + #[serde(rename = "session.ended")] + SessionEnded { session: SubscriberSessionInfo }, + #[serde(rename = "session.list")] + SessionList { session_list: Vec }, +} + +pub async fn send_message(subscriber: &Subscriber, message: &SubscriberMessage) -> anyhow::Result<()> { + let client = reqwest::Client::new(); + + client + .post(subscriber.url.clone()) + .header("Authorization", format!("Bearer {}", subscriber.token)) + .json(message) + .send() + .await + .context("Failed to post message at the subscriber URL")? + .error_for_status() + .context("Subscriber responded with an error code")?; + + Ok(()) +} + +#[instrument(skip(tx))] +pub async fn subscriber_polling_task(tx: SubscriberSender) -> anyhow::Result<()> { + const TASK_INTERVAL: Duration = Duration::from_secs(60 * 20); // once per 20 minutes + + debug!("Task started"); + + loop { + trace!("Send session list message"); + + let session_list: Vec<_> = SESSIONS_IN_PROGRESS + .read() + .values() + .map(|session| SubscriberSessionInfo { + association_id: session.association_id, + start_timestamp: session.start_timestamp, + }) + .collect(); + + let message = SubscriberMessage::SessionList { session_list }; + + tx.send(message) + .await + .map_err(|e| anyhow::anyhow!("Subscriber Task ended: {e}"))?; + + sleep(TASK_INTERVAL).await; + } +} + +#[instrument(skip(conf_handle, rx))] +pub async fn subscriber_task(conf_handle: ConfHandle, mut rx: SubscriberReceiver) -> anyhow::Result<()> { + debug!("Task started"); + + let mut conf = conf_handle.get_conf(); + + loop { + tokio::select! { + _ = conf_handle.change_notified() => { + conf = conf_handle.get_conf(); + } + msg = rx.recv() => { + let msg = msg.context("All senders are dead")?; + if let Some(subscriber) = conf.subscriber.as_ref() { + debug!(?msg, %subscriber.url, "Send message"); + if let Err(error) = send_message(subscriber, &msg).await { + warn!(error = format!("{error:#}"), "Couldn't send message to the subscriber"); + } + } else { + trace!(?msg, "Subscriber is not configured, ignore message"); + } + } + } + } +} + +/// Process a message originating from a Devolutions Gateway instance +#[cfg(feature = "openapi")] +#[allow(unused)] +#[utoipa::path( + post, + operation_id = "PostMessage", + path = "/", + request_body(content = SubscriberMessage, description = "Message", content_type = "application/json"), + responses( + (status = 200, description = "Message received and processed successfuly"), + (status = 400, description = "Bad message"), + (status = 401, description = "Invalid or missing authorization token"), + (status = 403, description = "Insufficient permissions"), + ), + security(("subscriber_token" = [])), +)] +fn post_subscriber_message() {} diff --git a/devolutions-gateway/src/websocket_client.rs b/devolutions-gateway/src/websocket_client.rs index e662f3b5a..93ecabc60 100644 --- a/devolutions-gateway/src/websocket_client.rs +++ b/devolutions-gateway/src/websocket_client.rs @@ -2,6 +2,7 @@ use crate::config::Conf; use crate::jet::candidate::CandidateState; use crate::jet::TransportType; use crate::jet_client::JetAssociationsMap; +use crate::subscriber::SubscriberSender; use crate::token::{CurrentJrl, TokenCache}; use crate::utils::association::remove_jet_association; use crate::{ConnectionModeDetails, GatewaySessionInfo, Proxy}; @@ -22,7 +23,8 @@ pub struct WebsocketService { pub associations: Arc, pub token_cache: Arc, pub jrl: Arc, - pub config: Arc, + pub conf: Arc, + pub subscriber_tx: SubscriberSender, } impl WebsocketService { @@ -36,18 +38,31 @@ impl WebsocketService { .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JET accept error - {:?}", err))) } else if req.method() == Method::GET && req_uri.starts_with("/jet/connect") { info!("{} {}", req.method(), req_uri); - handle_jet_connect(req, client_addr, self.associations.clone(), self.config.clone()) - .await - .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JET connect error - {:?}", err))) + handle_jet_connect( + req, + client_addr, + self.associations.clone(), + self.conf.clone(), + self.subscriber_tx.clone(), + ) + .await + .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JET connect error - {:?}", err))) } else if req.method() == Method::GET && req_uri.starts_with("/jet/test") { info!("{} {}", req.method(), req_uri); handle_jet_test(req, &self.associations) .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JET test error - {:?}", err))) } else if req.method() == Method::GET && (req_uri.starts_with("/jmux") || req_uri.starts_with("/jet/jmux")) { info!("{} {}", req.method(), req_uri); - handle_jmux(req, client_addr, &self.config, &self.token_cache, &self.jrl) - .await - .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JMUX error - {:#}", err))) + handle_jmux( + req, + client_addr, + self.conf.clone(), + &self.token_cache, + &self.jrl, + self.subscriber_tx.clone(), + ) + .await + .map_err(|err| io::Error::new(ErrorKind::Other, format!("Handle JMUX error - {:#}", err))) } else { saphir::server::inject_raw_with_peer_addr(req, Some(client_addr)) .await @@ -179,8 +194,9 @@ async fn handle_jet_connect( client_addr: SocketAddr, associations: Arc, config: Arc, + subscriber_tx: SubscriberSender, ) -> Result, saphir::error::InternalError> { - match handle_jet_connect_impl(req, client_addr, associations, config).await { + match handle_jet_connect_impl(req, client_addr, associations, config, subscriber_tx).await { Ok(res) => Ok(res), Err(()) => { let mut res = Response::new(Body::empty()); @@ -194,7 +210,8 @@ async fn handle_jet_connect_impl( mut req: Request, client_addr: SocketAddr, associations: Arc, - config: Arc, + conf: Arc, + subscriber_tx: SubscriberSender, ) -> Result, ()> { use crate::interceptor::plugin_recording::PluginRecordingInspector; use crate::interceptor::Interceptor; @@ -291,12 +308,12 @@ async fn handle_jet_connect_impl( let association_id = candidate.association_id(); let candidate_id = candidate.id(); - match (association.record_session(), config.plugins.is_some()) { + match (association.record_session(), conf.plugins.is_some()) { (true, true) => { let init_result = PluginRecordingInspector::init( association_id, candidate_id, - config.recording_path.as_ref().map(|path| path.as_str()), + conf.recording_path.as_ref().map(|path| path.as_str()), ) .map_err(|e| error!("Couldn't initialize PluginRecordingInspector: {}", e))?; @@ -339,11 +356,12 @@ async fn handle_jet_connect_impl( let proxy_result = Proxy::init() .session_info(info) .transports(client_transport, server_transport) + .subscriber(subscriber_tx) .forward() .await; if let (Some(dir), Some(pattern)) = (recording_dir, file_pattern) { - let registry = crate::registry::Registry::new(config); + let registry = crate::registry::Registry::new(conf); registry .manage_files(association_id.to_string(), pattern, dir.as_path()) .await; @@ -366,6 +384,7 @@ async fn handle_jet_connect_impl( Proxy::init() .session_info(info) .transports(client_transport, server_transport) + .subscriber(subscriber_tx) .forward() .await }; @@ -465,9 +484,10 @@ fn process_req(req: &Request) -> Response { async fn handle_jmux( mut req: Request, client_addr: SocketAddr, - config: &Conf, + conf: Arc, token_cache: &TokenCache, jrl: &CurrentJrl, + subscriber_tx: SubscriberSender, ) -> io::Result> { use crate::http::middlewares::auth::{parse_auth_header, AuthHeaderType}; use crate::token::AccessTokenClaims; @@ -490,7 +510,7 @@ async fn handle_jmux( return Err(io::Error::new(io::ErrorKind::Other, "missing authorization")); }; - let claims = match crate::http::middlewares::auth::authenticate(client_addr, token, config, token_cache, jrl) { + let claims = match crate::http::middlewares::auth::authenticate(client_addr, token, &conf, token_cache, jrl) { Ok(AccessTokenClaims::Jmux(claims)) => claims, Ok(_) => { return Err(io::Error::new(io::ErrorKind::Other, "token not allowed")); @@ -557,7 +577,7 @@ async fn handle_jmux( }, ); - crate::add_session_in_progress(info).await; + crate::add_session_in_progress(&subscriber_tx, info); JmuxProxy::new(reader, writer) .with_config(config) @@ -566,7 +586,7 @@ async fn handle_jmux( .await .map_err(|e| error!("JMUX proxy error: {}", e))?; - crate::remove_session_in_progress(session_id).await; + crate::remove_session_in_progress(&subscriber_tx, session_id); Ok::<(), ()>(()) }); diff --git a/devolutions-gateway/tests/config.rs b/devolutions-gateway/tests/config.rs index 949244883..f16972e9a 100644 --- a/devolutions-gateway/tests/config.rs +++ b/devolutions-gateway/tests/config.rs @@ -69,6 +69,7 @@ fn sample_1() -> Sample { external_url: "wss://*:443".try_into().unwrap(), }, ], + subscriber: None, log_file: None, jrl_file: None, log_directive: Some("info,devolutions_gateway=trace,devolutions_gateway::log=debug".to_owned()), @@ -102,6 +103,7 @@ fn sample_2() -> Sample { tls_certificate_file: Some("/path/to/tls-certificate.pem".into()), tls_private_key_file: Some("/path/to/tls-private.key".into()), listeners: vec![], + subscriber: None, log_file: Some("/path/to/log/file.log".into()), jrl_file: None, log_directive: None, diff --git a/tools/generate-openapi/generate.sh b/tools/generate-openapi/generate.sh index 887d072ec..6ced9c4c5 100755 --- a/tools/generate-openapi/generate.sh +++ b/tools/generate-openapi/generate.sh @@ -1,2 +1,3 @@ #!/bin/bash -cargo run > ../../devolutions-gateway/doc/api.yaml \ No newline at end of file +cargo run > ../../devolutions-gateway/doc/api.yaml +cargo run -- subscriber > ../../devolutions-gateway/doc/subscriber-api.yaml diff --git a/tools/generate-openapi/src/main.rs b/tools/generate-openapi/src/main.rs index ef0083733..a574fca04 100644 --- a/tools/generate-openapi/src/main.rs +++ b/tools/generate-openapi/src/main.rs @@ -1,8 +1,20 @@ -use devolutions_gateway::openapi::ApiDoc; +use devolutions_gateway::openapi::{ApiDoc, SubscriberApiDoc}; use utoipa::OpenApi; fn main() { - let api = ApiDoc::openapi(); - let yaml = serde_yaml::to_string(&api).unwrap(); + let yaml = match std::env::args().nth(1).as_deref() { + Some("subscriber") => { + let mut api = SubscriberApiDoc::openapi(); + api.info.title = "devolutions-gateway-subscriber".to_owned(); + api.info.description = + Some("API a service must implement in order to receive Devolutions Gateway notifications".to_owned()); + serde_yaml::to_string(&api).unwrap() + } + Some("gateway") | None => { + let api = ApiDoc::openapi(); + serde_yaml::to_string(&api).unwrap() + } + _ => panic!("Unknown API doc"), + }; println!("{yaml}"); } diff --git a/tools/subscriber-dummy/Cargo.toml b/tools/subscriber-dummy/Cargo.toml new file mode 100644 index 000000000..cdc38aa73 --- /dev/null +++ b/tools/subscriber-dummy/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "subscriber-dummy" +version = "0.1.0" +authors = ["Devolutions Inc. "] +edition = "2021" +publish = false + +[workspace] +resolver = "2" +members = ["."] + +[dependencies] +axum = "0.5.15" +hyper = "0.14.20" +tokio = { version = "1.20.1", features = ["full"] } diff --git a/tools/subscriber-dummy/src/main.rs b/tools/subscriber-dummy/src/main.rs new file mode 100644 index 000000000..1081caadb --- /dev/null +++ b/tools/subscriber-dummy/src/main.rs @@ -0,0 +1,22 @@ +use axum::body::Body; +use axum::http::Request; +use axum::routing::post; +use axum::Router; + +#[tokio::main] +async fn main() { + let app = Router::new().route("/subscriber", post(post_message)); + let socket_addr = "0.0.0.0:9999".parse().unwrap(); + + axum::Server::bind(&socket_addr) + .serve(app.into_make_service()) + .await + .unwrap(); +} + +async fn post_message(req: Request) { + println!("Request: {req:?}"); + let body = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let body = String::from_utf8_lossy(&body); + println!("Body: {body}"); +}