Skip to content

Commit

Permalink
feat(console): add support for Unix domain sockets
Browse files Browse the repository at this point in the history
Add support for console connections that use Unix domain sockets rather
than TCP.

Fix #296.
  • Loading branch information
benesch committed Dec 11, 2022
1 parent d98f159 commit bac69ec
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ env-filter = ["tracing-subscriber/env-filter"]

crossbeam-utils = "0.8.7"
tokio = { version = "^1.15", features = ["sync", "time", "macros", "tracing"] }
tokio-stream = "0.1"
tokio-stream = { version = "0.1", features = ["net"] }
thread_local = "1.1.3"
console-api = { version = "0.4.0", path = "../console-api", features = ["transport"] }
tonic = { version = "0.8", features = ["transport"] }
Expand Down
23 changes: 23 additions & 0 deletions console-subscriber/examples/uds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::time::Duration;
use tokio::{fs, task, time};
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cwd = fs::canonicalize(".").await?;
let addr = cwd.join("console-server");
console_subscriber::ConsoleLayer::builder()
.server_addr(&*addr)
.init();
info!(
"listening for console connections at file://localhost{}",
addr.display()
);
task::Builder::default()
.name("sleepy")
.spawn(async move { time::sleep(Duration::from_secs(90)).await })
.unwrap()
.await?;

Ok(())
}
77 changes: 68 additions & 9 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::{ConsoleLayer, Server};
#[cfg(unix)]
use std::path::Path;
use std::{
net::{SocketAddr, ToSocketAddrs},
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6, ToSocketAddrs},
path::PathBuf,
thread,
time::Duration,
Expand Down Expand Up @@ -32,7 +34,7 @@ pub struct Builder {
pub(crate) retention: Duration,

/// The address on which to serve the RPC server.
pub(super) server_addr: SocketAddr,
pub(super) server_addr: ServerAddr,

/// If and where to save a recording of the events.
pub(super) recording_path: Option<PathBuf>,
Expand All @@ -58,7 +60,7 @@ impl Default for Builder {
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
self_trace: false,
Expand Down Expand Up @@ -138,7 +140,7 @@ impl Builder {
/// defaults.
///
/// [environment variable]: `Builder::with_default_env`
pub fn server_addr(self, server_addr: impl Into<SocketAddr>) -> Self {
pub fn server_addr(self, server_addr: impl Into<ServerAddr>) -> Self {
Self {
server_addr: server_addr.into(),
..self
Expand Down Expand Up @@ -231,11 +233,14 @@ impl Builder {
}

if let Ok(bind) = std::env::var("TOKIO_CONSOLE_BIND") {
self.server_addr = bind
.to_socket_addrs()
.expect("TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321")
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND");
self.server_addr = ServerAddr::Tcp(
bind.to_socket_addrs()
.expect(
"TOKIO_CONSOLE_BIND must be formatted as HOST:PORT, such as localhost:4321",
)
.next()
.expect("tokio console could not resolve TOKIO_CONSOLE_BIND"),
);
}

if let Some(interval) = duration_from_env("TOKIO_CONSOLE_PUBLISH_INTERVAL") {
Expand Down Expand Up @@ -456,6 +461,60 @@ impl Builder {
}
}

/// Specifies the address on which a [`Server`] should listen.
///
/// [`Server`]: crate::Server
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ServerAddr {
/// A TCP address.
Tcp(SocketAddr),
/// A Unix socket address.
#[cfg(unix)]
Unix(PathBuf),
}

impl From<SocketAddr> for ServerAddr {
fn from(addr: SocketAddr) -> ServerAddr {
ServerAddr::Tcp(addr)
}
}

impl From<SocketAddrV4> for ServerAddr {
fn from(addr: SocketAddrV4) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}

impl From<SocketAddrV6> for ServerAddr {
fn from(addr: SocketAddrV6) -> ServerAddr {
ServerAddr::Tcp(addr.into())
}
}

impl<I> From<(I, u16)> for ServerAddr
where
I: Into<IpAddr>,
{
fn from(pieces: (I, u16)) -> ServerAddr {
ServerAddr::Tcp(pieces.into())
}
}

#[cfg(unix)]
impl From<PathBuf> for ServerAddr {
fn from(path: PathBuf) -> ServerAddr {
ServerAddr::Unix(path)
}
}

#[cfg(unix)]
impl<'a> From<&'a Path> for ServerAddr {
fn from(path: &'a Path) -> ServerAddr {
ServerAddr::Unix(path.to_path_buf())
}
}

/// Initializes the console [tracing `Subscriber`][sub] and starts the console
/// subscriber [`Server`] on its own background thread.
///
Expand Down
32 changes: 23 additions & 9 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ use serde::Serialize;
use std::{
cell::RefCell,
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
};
use thread_local::ThreadLocal;
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{mpsc, oneshot};
#[cfg(unix)]
use tokio_stream::wrappers::UnixListenerStream;
use tracing_core::{
span::{self, Id},
subscriber::{self, Subscriber},
Expand All @@ -37,6 +41,7 @@ mod visitors;

use aggregator::Aggregator;
pub use builder::Builder;
use builder::ServerAddr;
use callsites::Callsites;
use record::Recorder;
use stack::SpanStack;
Expand Down Expand Up @@ -134,7 +139,7 @@ pub struct ConsoleLayer {
/// [cli]: https://crates.io/crates/tokio-console
pub struct Server {
subscribe: mpsc::Sender<Command>,
addr: SocketAddr,
addr: ServerAddr,
aggregator: Option<Aggregator>,
client_buffer: usize,
}
Expand Down Expand Up @@ -945,13 +950,22 @@ impl Server {
.take()
.expect("cannot start server multiple times");
let aggregate = spawn_named(aggregate.run(), "console::aggregate");
let addr = self.addr;
let serve = builder
.add_service(proto::instrument::instrument_server::InstrumentServer::new(
self,
))
.serve(addr);
let res = spawn_named(serve, "console::serve").await;
let addr = self.addr.clone();
let router = builder.add_service(
proto::instrument::instrument_server::InstrumentServer::new(self),
);
let res = match addr {
ServerAddr::Tcp(addr) => {
let serve = router.serve(addr);
spawn_named(serve, "console::serve").await
}
#[cfg(unix)]
ServerAddr::Unix(path) => {
let incoming = UnixListener::bind(path)?;
let serve = router.serve_with_incoming(UnixListenerStream::new(incoming));
spawn_named(serve, "console::serve").await
}
};
aggregate.abort();
res?.map_err(Into::into)
}
Expand Down
1 change: 1 addition & 0 deletions tokio-console/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio = { version = "1", features = ["full", "rt-multi-thread"] }
tonic = { version = "0.8", features = ["transport"] }
futures = "0.3"
tui = { version = "0.16.0", default-features = false, features = ["crossterm"] }
tower = "0.4.12"
tracing = "0.1"
tracing-subscriber = { version = "0.3.0", features = ["env-filter"] }
tracing-journald = { version = "0.2", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub struct Config {
///
/// This may be an IP address and port, or a DNS name.
///
/// On Unix platforms, this may also be a URI with the `file` scheme that
/// specifies the path to a Unix domain socket, as in
/// `file://localhost/path/to/socket`.
///
/// [default: http://127.0.0.1:6669]
#[clap(value_hint = ValueHint::Url)]
pub(crate) target_addr: Option<Uri>,
Expand Down
33 changes: 31 additions & 2 deletions tokio-console/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use console_api::instrument::{
use console_api::tasks::TaskDetails;
use futures::stream::StreamExt;
use std::{error::Error, pin::Pin, time::Duration};
use tonic::{transport::Channel, transport::Uri, Streaming};
#[cfg(unix)]
use tokio::net::UnixStream;
use tonic::{
transport::{Channel, Endpoint, Uri},
Streaming,
};

#[derive(Debug)]
pub struct Connection {
Expand Down Expand Up @@ -78,7 +83,31 @@ impl Connection {
tokio::time::sleep(backoff).await;
}
let try_connect = async {
let mut client = InstrumentClient::connect(self.target.clone()).await?;
let channel = match self.target.scheme_str() {
#[cfg(unix)]
Some("file") => {
// Dummy endpoint is ignored by the connector.
let endpoint = Endpoint::from_static("http://localhost");
if !matches!(self.target.host(), None | Some("localhost")) {
return Err("cannot connect to non-localhost unix domain socket".into());
}
let path = self.target.path().to_owned();
endpoint
.connect_with_connector(tower::service_fn(move |_| {
UnixStream::connect(path.clone())
}))
.await?
}
#[cfg(not(unix))]
Some("file") => {
return Err("unix domain sockets are not supported on this platform".into());
}
_ => {
let endpoint = Endpoint::try_from(self.target.clone())?;
endpoint.connect().await?
}
};
let mut client = InstrumentClient::new(channel);
let request = tonic::Request::new(InstrumentRequest {});
let stream = Box::new(client.watch_updates(request).await?.into_inner());
Ok::<State, Box<dyn Error + Send + Sync>>(State::Connected { client, stream })
Expand Down

0 comments on commit bac69ec

Please sign in to comment.