Skip to content

Commit

Permalink
feat(dgw): improve logs (#528)
Browse files Browse the repository at this point in the history
- Records additional info on running sessions
- Improves file rotation
  • Loading branch information
CBenoit authored Sep 6, 2023
1 parent b181908 commit 433e253
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ inherits = "release"
lto = true

[patch.crates-io]
tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "454313f66da3a662" }
tracing-appender = { git = "https://github.com/CBenoit/tracing.git", rev = "42097daf92e683cf18da7639ddccb056721a796c" }
40 changes: 20 additions & 20 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,13 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
pending_channels.insert(id, (destination_url.clone(), api_response_tx));
msg_to_send_tx
.send(Message::open(id, MAXIMUM_PACKET_SIZE_IN_BYTES as u16, destination_url))
.context("Couldn’t send CHANNEL OPEN message through mpsc channel")?;
.context("couldn’t send CHANNEL OPEN message through mpsc channel")?;
}
None => warn!("Couldn’t allocate ID for API request: {}", destination_url),
}
}
JmuxApiRequest::Start { id, stream, leftover } => {
let channel = jmux_ctx.get_channel(id).with_context(|| format!("Couldn’t find channel with id {id}"))?;
let channel = jmux_ctx.get_channel(id).with_context(|| format!("couldn’t find channel with id {id}"))?;

let (data_tx, data_rx) = mpsc::unbounded_channel::<Vec<u8>>();

Expand Down Expand Up @@ -361,7 +361,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
Some(internal_msg) = internal_msg_rx.recv() => {
match internal_msg {
InternalMessage::Eof { id } => {
let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("Couldn’t find channel with id {id}"))?;
let channel = jmux_ctx.get_channel_mut(id).with_context(|| format!("couldn’t find channel with id {id}"))?;
let channel_span = channel.span.clone();
let local_id = channel.local_id;
let distant_id = channel.distant_id;
Expand All @@ -371,19 +371,19 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Eof;
msg_to_send_tx
.send(Message::eof(distant_id))
.context("Couldn’t send EOF message")?;
.context("couldn’t send EOF message")?;
},
JmuxChannelState::Eof => {
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
},
JmuxChannelState::Closed => {
jmux_ctx.unregister(local_id);
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
channel_span.in_scope(|| {
debug!("Channel closed");
});
Expand Down Expand Up @@ -411,7 +411,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc

msg_to_send_tx
.send(Message::open_success(distant_id, local_id, initial_window_size, maximum_packet_size))
.context("Couldn’t send OPEN SUCCESS message through mpsc channel")?;
.context("couldn’t send OPEN SUCCESS message through mpsc channel")?;

channel_span.in_scope(|| {
debug!("Channel accepted");
Expand Down Expand Up @@ -481,7 +481,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
debug!(%error, %msg.destination_url, %peer_id, "Invalid destination requested");
msg_to_send_tx
.send(Message::open_failure(peer_id, ReasonCode::CONNECTION_NOT_ALLOWED_BY_RULESET, error.to_string()))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
continue;
}

Expand All @@ -491,7 +491,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
warn!("Couldn’t allocate local ID for distant peer {}: no more ID available", peer_id);
msg_to_send_tx
.send(Message::open_failure(peer_id, ReasonCode::GENERAL_FAILURE, "no more ID available"))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
continue;
}
};
Expand Down Expand Up @@ -596,7 +596,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
// Simplest flow control logic for now: just send back a WINDOW ADJUST message to
// increase back peer’s window size.
msg_to_send_tx.send(Message::window_adjust(distant_id, data_length))
.context("Couldn’t send WINDOW ADJUST message")?;
.context("couldn’t send WINDOW ADJUST message")?;
}
Message::Eof(msg) => {
// Per the spec:
Expand Down Expand Up @@ -628,7 +628,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(channel.distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
},
JmuxChannelState::Closed => {},
}
Expand Down Expand Up @@ -671,7 +671,7 @@ async fn scheduler_task_impl<T: AsyncRead + Unpin + Send + 'static>(task: JmuxSc
channel.local_state = JmuxChannelState::Closed;
msg_to_send_tx
.send(Message::close(distant_id))
.context("Couldn’t send CLOSE message")?;
.context("couldn’t send CLOSE message")?;
}

if channel.local_state == JmuxChannelState::Closed {
Expand Down Expand Up @@ -734,7 +734,7 @@ impl DataReaderTask {
trace!("Started forwarding");

while let Some(bytes) = bytes_stream.next().await {
let bytes = bytes.context("Couldn’t read next bytes from stream")?;
let bytes = bytes.context("couldn’t read next bytes from stream")?;

let chunk_size = maximum_packet_size - Header::SIZE - ChannelData::FIXED_PART_SIZE;

Expand All @@ -755,15 +755,15 @@ impl DataReaderTask {
window_size.fetch_sub(bytes_to_send_now.len(), Ordering::SeqCst);
msg_to_send_tx
.send(Message::data(distant_id, bytes_to_send_now))
.context("Couldn’t send DATA message")?;
.context("couldn’t send DATA message")?;
}

window_size_updated.notified().await;
} else {
window_size.fetch_sub(bytes.len(), Ordering::SeqCst);
msg_to_send_tx
.send(Message::data(distant_id, bytes))
.context("Couldn’t send DATA message")?;
.context("couldn’t send DATA message")?;
break;
}
}
Expand All @@ -773,7 +773,7 @@ impl DataReaderTask {
trace!("Finished forwarding (EOF)");
internal_msg_tx
.send(InternalMessage::Eof { id: local_id })
.context("Couldn’t send EOF notification")?;
.context("couldn’t send EOF notification")?;

Ok(())
}
Expand Down Expand Up @@ -856,8 +856,8 @@ impl StreamResolverTask {
ReasonCode::from(error.kind()),
error.to_string(),
))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("Couldn't resolve {}:{}: {}", host, port, error);
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("couldn't resolve {}:{}: {}", host, port, error);
}
};
let socket_addr = addrs.next().expect("at least one resolved address should be present");
Expand All @@ -867,7 +867,7 @@ impl StreamResolverTask {
Ok(stream) => {
internal_msg_tx
.send(InternalMessage::StreamResolved { channel, stream })
.context("Could't send back resolved stream through internal mpsc channel")?;
.context("could't send back resolved stream through internal mpsc channel")?;
}
Err(error) => {
debug!(?error, "TcpStream::connect failed");
Expand All @@ -877,7 +877,7 @@ impl StreamResolverTask {
ReasonCode::from(error.kind()),
error.to_string(),
))
.context("Couldn’t send OPEN FAILURE message through mpsc channel")?;
.context("couldn’t send OPEN FAILURE message through mpsc channel")?;
anyhow::bail!("Couldn’t connect TCP socket to {}:{}: {}", host, port, error);
}
},
Expand Down
33 changes: 21 additions & 12 deletions devolutions-gateway/src/api/fwd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use axum::response::Response;
use axum::routing::get;
use axum::Router;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::Instrument as _;
use tracing::{field, Instrument as _};
use typed_builder::TypedBuilder;
use uuid::Uuid;

Expand Down Expand Up @@ -71,7 +71,13 @@ async fn handle_fwd_tcp(
.with_tls(false)
.build()
.run()
.instrument(info_span!("tcp", client = %source_addr))
.instrument(info_span!(
"tcp",
client = %source_addr,
session_id = field::Empty,
protocol = field::Empty,
target = field::Empty
))
.await;

if let Err(error) = result {
Expand Down Expand Up @@ -122,7 +128,13 @@ async fn handle_fwd_tls(
.with_tls(true)
.build()
.run()
.instrument(info_span!("tls", client = %source_addr))
.instrument(info_span!(
"tls",
client = %source_addr,
session_id = field::Empty,
protocol = field::Empty,
target = field::Empty
))
.await;

if let Err(error) = result {
Expand All @@ -145,7 +157,6 @@ impl<S> Forward<S>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
#[instrument(skip_all)]
async fn run(self) -> anyhow::Result<()> {
let Self {
conf,
Expand All @@ -165,12 +176,16 @@ where
anyhow::bail!("invalid connection mode")
};

tracing::Span::current().record("session_id", claims.jet_aid.to_string());
tracing::Span::current().record("protocol", claims.jet_ap.to_string());

trace!("Select and connect to target");

let ((server_stream, server_addr), selected_target) =
utils::successive_try(&targets, utils::tcp_connect).await?;

trace!(%selected_target, "Connected");
tracing::Span::current().record("target", selected_target.to_string());

if with_tls {
trace!("Establishing TLS connection with server");
Expand All @@ -181,10 +196,7 @@ where
.await
.context("TLS connect")?;

info!(
"Starting WebSocket-TLS forwarding with application protocol {}",
claims.jet_ap
);
info!(protocol = %claims.jet_ap, target = %server_addr, "WebSocket-TLS forwarding");

let info = SessionInfo::new(
claims.jet_aid,
Expand All @@ -211,10 +223,7 @@ where
.await
.context("Encountered a failure during plain tls traffic proxying")
} else {
info!(
"Starting WebSocket-TCP forwarding with application protocol {}",
claims.jet_ap
);
info!("WebSocket-TCP forwarding");

let info = SessionInfo::new(
claims.jet_aid,
Expand Down
4 changes: 2 additions & 2 deletions devolutions-gateway/src/api/rdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use axum::extract::ws::WebSocket;
use axum::extract::{ConnectInfo, State, WebSocketUpgrade};
use axum::response::Response;
use tracing::Instrument as _;
use tracing::{field, Instrument as _};

use crate::config::Conf;
use crate::http::HttpError;
Expand Down Expand Up @@ -68,7 +68,7 @@ async fn handle_socket(
subscriber_tx,
&active_recordings,
)
.instrument(info_span!("rdp", client = %source_addr))
.instrument(info_span!("rdp", client = %source_addr, target = field::Empty, session_id = field::Empty))
.await;

if let Err(error) = result {
Expand Down
16 changes: 8 additions & 8 deletions devolutions-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl ConfHandle {
fn save_config(conf: &dto::ConfFile) -> anyhow::Result<()> {
let conf_file_path = get_conf_file_path();
let json = serde_json::to_string_pretty(conf).context("failed JSON serialization of configuration")?;
std::fs::write(&conf_file_path, json).with_context(|| format!("Failed to write file at {conf_file_path}"))?;
std::fs::write(&conf_file_path, json).with_context(|| format!("failed to write file at {conf_file_path}"))?;
Ok(())
}

Expand Down Expand Up @@ -333,9 +333,9 @@ fn load_conf_file(conf_path: &Utf8Path) -> anyhow::Result<Option<dto::ConfFile>>
Ok(file) => BufReader::new(file)
.pipe(serde_json::from_reader)
.map(Some)
.with_context(|| format!("Invalid config file at {conf_path}")),
.with_context(|| format!("invalid config file at {conf_path}")),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(anyhow::anyhow!(e).context(format!("Couldn't open config file at {conf_path}"))),
Err(e) => Err(anyhow::anyhow!(e).context(format!("couldn't open config file at {conf_path}"))),
}
}

Expand Down Expand Up @@ -373,7 +373,7 @@ fn read_rustls_certificate(
(Some(path), _) => {
let mut x509_chain_file = normalize_data_path(path, &get_data_dir())
.pipe_ref(File::open)
.with_context(|| format!("Couldn't open file at {path}"))?
.with_context(|| format!("couldn't open file at {path}"))?
.pipe(std::io::BufReader::new);

let mut x509_chain = Vec::new();
Expand Down Expand Up @@ -402,7 +402,7 @@ fn read_rustls_certificate(
}
Err(e) => {
return anyhow::Error::new(e)
.context(format!("Couldn't parse pem document at position {}", x509_chain.len()))
.context(format!("couldn't parse pem document at position {}", x509_chain.len()))
.pipe(Err)
}
}
Expand Down Expand Up @@ -432,7 +432,7 @@ fn read_pub_key(
match (path, data) {
(Some(path), _) => normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(PublicKey::from_pem_str)
.context("couldn't parse pem document")
.map(Some),
Expand Down Expand Up @@ -461,7 +461,7 @@ fn read_rustls_priv_key(
(Some(path), _) => {
let pem: Pem = normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(str::parse)
.context("couldn't parse pem document")?;

Expand All @@ -488,7 +488,7 @@ fn read_priv_key(
match (path, data) {
(Some(path), _) => normalize_data_path(path, &get_data_dir())
.pipe_ref(std::fs::read_to_string)
.with_context(|| format!("Couldn't read file at {path}"))?
.with_context(|| format!("couldn't read file at {path}"))?
.pipe_deref(PrivateKey::from_pem_str)
.context("couldn't parse pem document")
.map(Some),
Expand Down
18 changes: 8 additions & 10 deletions devolutions-gateway/src/generic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,23 @@ where

match connection_mode {
ConnectionMode::Rdv => {
info!(
"Starting TCP rendezvous redirection for application protocol {}",
application_protocol
);
anyhow::bail!("not yet supported");
anyhow::bail!("TCP rendezvous not supported");
}
ConnectionMode::Fwd { targets, creds: None } => {
info!(
"Starting plain TCP forward redirection for application protocol {}",
application_protocol
);

if association_claims.jet_rec {
anyhow::bail!("can't meet recording policy");
}

let ((mut server_stream, server_addr), selected_target) =
utils::successive_try(&targets, utils::tcp_connect).await?;

info!(
session = %association_claims.jet_aid,
protocol = %application_protocol,
target = %server_addr,
"plain TCP forwarding"
);

server_stream
.write_buf(&mut leftover_bytes)
.await
Expand Down
2 changes: 1 addition & 1 deletion devolutions-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
CliAction::ConfigInitOnly => {
let conf_file = devolutions_gateway::config::load_conf_file_or_generate_new()?;
let conf_file_json =
serde_json::to_string_pretty(&conf_file).context("Couldn't represent config file as JSON")?;
serde_json::to_string_pretty(&conf_file).context("couldn't represent config file as JSON")?;
println!("{conf_file_json}");
}
CliAction::Run { service_mode } => {
Expand Down
Loading

0 comments on commit 433e253

Please sign in to comment.