Skip to content

Commit

Permalink
feat(dgw): subscriber API
Browse files Browse the repository at this point in the history
  • Loading branch information
CBenoit committed Aug 23, 2022
1 parent 62832c8 commit a80282e
Show file tree
Hide file tree
Showing 26 changed files with 648 additions and 153 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ Currently stable options are:

Host segment may be abridged with `*`.

- `Subscriber`: subscriber configuration:

* `Url`: HTTP URL where notification messages are to be sent,
* `Token`: bearer token to use when making HTTP requests.

## Sample Usage

### RDP routing
Expand Down
27 changes: 25 additions & 2 deletions devolutions-gateway/doc/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ info:
description: Protocol-aware fine-grained relay server
contact:
name: Devolutions Inc.
email: ''
email: infos@devolutions.net
license:
name: MIT/Apache-2.0
version: 2022.2.2
Expand Down Expand Up @@ -241,6 +241,8 @@ components:
format: uuid
SubProvisionerPublicKey:
$ref: '#/components/schemas/SubProvisionerKey'
Subscriber:
$ref: '#/components/schemas/Subscriber'
ConnectionMode:
type: string
enum:
Expand All @@ -267,7 +269,7 @@ components:
jti:
type: string
format: uuid
description: Unique ID for current  JRL
description: Unique ID for current JRL
ListenerUrls:
type: object
required:
Expand Down Expand Up @@ -323,4 +325,25 @@ components:
type: string
Value:
type: string
Subscriber:
type: object
required:
- Url
- Token
properties:
Token:
type: string
Url:
type: string
securitySchemes:
jrl_token:
type: http
scheme: bearer
bearerFormat: JWT
description: Contains the JRL to apply if newer
scope_token:
type: http
scheme: bearer
bearerFormat: JWT
description: Token allowing a single HTTP request for a specific scope

78 changes: 78 additions & 0 deletions devolutions-gateway/doc/subscriber-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
openapi: 3.0.3
info:
title: devolutions-gateway-subscriber
description: API a service must implement in order to receive Devolutions Gateway
notifications
contact:
name: Devolutions Inc.
email: infos@devolutions.net
license:
name: MIT/Apache-2.0
version: 2022.2.2
paths:
/:
post:
tags:
- crate::subscriber
summary: Process a message originating from a Devolutions Gateway instance
description: |
Process a message originating from a Devolutions Gateway instance
operationId: PostMessage
requestBody:
description: Message
content:
application/json:
schema:
$ref: '#/components/schemas/SubscriberMessage'
required: true
responses:
'200':
description: Message received and processed successfuly
'400':
description: Bad message
'401':
description: Invalid or missing authorization token
'403':
description: Insufficient permissions
deprecated: false
security:
- subscriber_token: []
components:
schemas:
SubscriberMessage:
type: object
required:
- kind
properties:
kind:
$ref: '#/components/schemas/SubscriberMessageKind'
session:
$ref: '#/components/schemas/SubscriberSessionInfo'
session_list:
type: array
items:
$ref: '#/components/schemas/SubscriberSessionInfo'
SubscriberMessageKind:
type: string
enum:
- session.started
- session.ended
- session.list
SubscriberSessionInfo:
type: object
required:
- association_id
- start_timestamp
properties:
association_id:
type: string
format: uuid
start_timestamp:
type: string
format: date-time
securitySchemes:
subscriber_token:
type: http
scheme: bearer
description: Token allowing to push messages

29 changes: 27 additions & 2 deletions devolutions-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::fs::File;
use std::io::BufReader;
use std::sync::Arc;
use tap::prelude::*;
use tokio::sync::Notify;
use tokio_rustls::rustls;
use url::Url;
use uuid::Uuid;
Expand Down Expand Up @@ -80,8 +81,9 @@ impl Tls {
#[derive(Debug, Clone)]
pub struct Conf {
pub id: Option<Uuid>,
pub listeners: Vec<ListenerUrls>,
pub hostname: String,
pub listeners: Vec<ListenerUrls>,
pub subscriber: Option<dto::Subscriber>,
pub capture_path: Option<Utf8PathBuf>,
pub log_file: Utf8PathBuf,
pub log_directive: Option<String>,
Expand Down Expand Up @@ -176,8 +178,9 @@ impl Conf {

Ok(Conf {
id: conf_file.id,
listeners,
hostname,
listeners,
subscriber: conf_file.subscriber.clone(),
capture_path: conf_file.capture_path.clone(),
log_file,
log_directive: conf_file.log_directive.clone(),
Expand All @@ -203,6 +206,7 @@ pub struct ConfHandle {
struct ConfHandleInner {
conf: parking_lot::RwLock<Arc<Conf>>,
conf_file: parking_lot::RwLock<Arc<dto::ConfFile>>,
changed: Notify,
}

impl ConfHandle {
Expand All @@ -228,6 +232,7 @@ impl ConfHandle {
inner: Arc::new(ConfHandleInner {
conf: parking_lot::RwLock::new(Arc::new(conf)),
conf_file: parking_lot::RwLock::new(Arc::new(conf_file)),
changed: Notify::new(),
}),
})
}
Expand All @@ -242,13 +247,19 @@ impl ConfHandle {
self.inner.conf_file.read().clone()
}

/// Waits for configuration to be changed
pub async fn change_notified(&self) {
self.inner.changed.notified().await;
}

/// Atomically saves and replaces current configuration with a new one
#[instrument(skip(self))]
pub fn save_new_conf_file(&self, conf_file: dto::ConfFile) -> anyhow::Result<()> {
let conf = Conf::from_conf_file(&conf_file).context("Invalid configuration file")?;
save_config(&conf_file).context("Failed to save configuration")?;
*self.inner.conf.write() = Arc::new(conf);
*self.inner.conf_file.write() = Arc::new(conf_file);
self.inner.changed.notify_waiters();
trace!("success");
Ok(())
}
Expand Down Expand Up @@ -503,6 +514,10 @@ pub mod dto {
/// Listeners to launch at startup
pub listeners: Vec<ListenerConf>,

/// Subscriber API
#[serde(skip_serializing_if = "Option::is_none")]
pub subscriber: Option<Subscriber>,

/// (Unstable) Folder and prefix for log files
#[serde(skip_serializing_if = "Option::is_none")]
pub log_file: Option<Utf8PathBuf>,
Expand Down Expand Up @@ -561,6 +576,7 @@ pub mod dto {
external_url: "wss://*:7171".try_into().unwrap(),
},
],
subscriber: None,
log_file: None,
jrl_file: None,
log_directive: None,
Expand Down Expand Up @@ -735,4 +751,13 @@ pub mod dto {
pub internal_url: String,
pub external_url: String,
}

#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct Subscriber {
#[cfg_attr(feature = "openapi", schema(value_type = String))]
pub url: Url,
pub token: String,
}
}
16 changes: 11 additions & 5 deletions devolutions-gateway/src/generic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::config::Conf;
use crate::jet_client::JetAssociationsMap;
use crate::preconnection_pdu::{extract_association_claims, read_preconnection_pdu};
use crate::rdp::RdpClient;
use crate::subscriber::SubscriberSender;
use crate::token::{ApplicationProtocol, ConnectionMode, CurrentJrl, Protocol, TokenCache};
use crate::{utils, ConnectionModeDetails, GatewaySessionInfo, Proxy};
use anyhow::Context;
Expand All @@ -14,37 +15,40 @@ use typed_builder::TypedBuilder;

#[derive(TypedBuilder)]
pub struct GenericClient {
config: Arc<Conf>,
conf: Arc<Conf>,
associations: Arc<JetAssociationsMap>,
token_cache: Arc<TokenCache>,
jrl: Arc<CurrentJrl>,
client_addr: SocketAddr,
client_stream: TcpStream,
subscriber_tx: SubscriberSender,
}

impl GenericClient {
pub async fn serve(self) -> anyhow::Result<()> {
let Self {
config,
conf,
associations,
token_cache,
jrl,
client_addr,
mut client_stream,
subscriber_tx,
} = self;

let (pdu, mut leftover_bytes) = read_preconnection_pdu(&mut client_stream).await?;
let source_ip = client_addr.ip();
let association_claims = extract_association_claims(&pdu, source_ip, &config, &token_cache, &jrl)?;
let association_claims = extract_association_claims(&pdu, source_ip, &conf, &token_cache, &jrl)?;

match association_claims.jet_ap {
// We currently special case this because it may be the "RDP-TLS" protocol
ApplicationProtocol::Known(Protocol::Rdp) => {
RdpClient {
config,
conf,
associations,
token_cache,
jrl,
subscriber_tx,
}
.serve_with_association_claims_and_leftover_bytes(
client_addr,
Expand Down Expand Up @@ -72,6 +76,7 @@ impl GenericClient {
.associations(associations)
.association_id(association_id)
.client_transport(AnyStream::from(client_stream))
.subscriber_tx(subscriber_tx)
.build()
.start(&leftover_bytes)
.await
Expand Down Expand Up @@ -105,10 +110,11 @@ impl GenericClient {
.with_filtering_policy(filtering_policy);

Proxy::init()
.config(config)
.conf(conf)
.session_info(info)
.addrs(client_addr, server_transport.addr)
.transports(client_stream, server_transport)
.subscriber(subscriber_tx)
.select_dissector_and_forward()
.await
.context("Encountered a failure during plain tcp traffic proxying")
Expand Down
8 changes: 5 additions & 3 deletions devolutions-gateway/src/http/controllers/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::config::dto::{DataEncoding, PubKeyFormat};
use crate::config::dto::{DataEncoding, PubKeyFormat, Subscriber};
use crate::config::ConfHandle;
use crate::http::guards::access::{AccessGuard, TokenType};
use crate::http::HttpErrorStatus;
Expand Down Expand Up @@ -29,8 +29,6 @@ impl ConfigController {
}
}

const KEY_ALLOWLIST: &[&str] = &["Id", "SubProvisionerPublicKey"];

#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand All @@ -39,6 +37,8 @@ pub struct ConfigPatch {
pub id: Option<Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sub_provisioner_public_key: Option<SubProvisionerKey>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subscriber: Option<Subscriber>,
}

#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
Expand All @@ -51,6 +51,8 @@ pub struct SubProvisionerKey {
pub encoding: Option<DataEncoding>,
}

const KEY_ALLOWLIST: &[&str] = &["Id", "SubProvisionerPublicKey", "Subscriber"];

/// Modifies configuration
#[cfg_attr(feature = "openapi", utoipa::path(
patch,
Expand Down
2 changes: 1 addition & 1 deletion devolutions-gateway/src/http/controllers/jrl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async fn get_jrl_info(revocation_list: &CurrentJrl) -> Json<JrlInfo> {
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
#[derive(Serialize)]
pub struct JrlInfo {
/// Unique ID for current  JRL
/// Unique ID for current JRL
pub jti: Uuid,
/// JWT "Issued At" claim of JRL
pub iat: i64,
Expand Down
3 changes: 1 addition & 2 deletions devolutions-gateway/src/http/controllers/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ impl SessionsController {
security(("scope_token" = ["gateway.sessions.read"])),
))]
pub(crate) async fn get_sessions() -> Json<Vec<GatewaySessionInfo>> {
let sessions = SESSIONS_IN_PROGRESS.read().await;
let sessions_in_progress: Vec<GatewaySessionInfo> = sessions.values().cloned().collect();
let sessions_in_progress: Vec<GatewaySessionInfo> = SESSIONS_IN_PROGRESS.read().values().cloned().collect();
Json(sessions_in_progress)
}

Expand Down
Loading

0 comments on commit a80282e

Please sign in to comment.