From 758ba9c603e0a78c050c83b317efa1aca0d262d4 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 18 Jul 2023 16:18:23 +0200 Subject: [PATCH 1/5] feat(subscriber) expose server parts The `ConsoleLayer` builder provides the user with a console layer and a server, which is used to start the gRPC server. However, it may be desireable to expose the instrumentation server together with other services on the same Tonic router. This was requested explicitly in #428. Additionally, to add tests which make use of the instrumentation server (as part of improving test coverage for #450), more flexibility is needed than what is provided by the current API. Specifically we would like to connect a client and server via an in memory channel, rather than a TCP connection. This change adds an additional method to `console_subscriber::Server` called `into_parts` which allows the user to access the `InstrumentServer` directly. A handle which controls the lifetime of the `Aggregator` is also provided, as the user must ensure that the aggregator lives at least as long as the instrument server. To facilitate the addition of functionality which would result in more "parts" in the future, `into_parts` returns a non-exhaustive struct, rather than a tuple of parts. Closes: #428 --- console-subscriber/src/lib.rs | 125 ++++++++++++++++++++++++++++++---- 1 file changed, 113 insertions(+), 12 deletions(-) diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 6b0c1a75e..7b6daecf2 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,6 +1,6 @@ #![doc = include_str!("../README.md")] use console_api as proto; -use proto::resources::resource; +use proto::{instrument::instrument_server::InstrumentServer, resources::resource}; use serde::Serialize; use std::{ cell::RefCell, @@ -15,7 +15,10 @@ use std::{ use thread_local::ThreadLocal; #[cfg(unix)] use tokio::net::UnixListener; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, +}; #[cfg(unix)] use tokio_stream::wrappers::UnixListenerStream; use tracing_core::{ @@ -933,18 +936,15 @@ impl Server { /// /// [`tonic`]: https://docs.rs/tonic/ pub async fn serve_with( - mut self, + self, mut builder: tonic::transport::Server, ) -> Result<(), Box> { - let aggregate = self - .aggregator - .take() - .expect("cannot start server multiple times"); - let aggregate = spawn_named(aggregate.run(), "console::aggregate"); let addr = self.addr.clone(); - let router = builder.add_service( - proto::instrument::instrument_server::InstrumentServer::new(self), - ); + let ServerParts { + instrument_server: service, + aggregator_handle: aggregate, + } = self.into_parts(); + let router = builder.add_service(service); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -957,9 +957,110 @@ impl Server { spawn_named(serve, "console::serve").await } }; - aggregate.abort(); + drop(aggregate); res?.map_err(Into::into) } + + /// Returns the parts needed to spawn a gRPC server and keep the aggregation + /// worker running. + /// + /// Note that a server spawned in this way will overwrite any value set by + /// [`Builder::server_addr`] as the user becomes responsible for defining + /// the address when calling [`Router::serve`]. + /// + /// # Examples + /// + /// The parts can be used to serve the instrument server together with + /// other endpoints from the same gRPC server. + /// + /// ``` + /// use console_subscriber::{ConsoleLayer, ServerParts}; + /// + /// # let runtime = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .unwrap(); + /// # runtime.block_on(async { + /// let (console_layer, server) = ConsoleLayer::builder().build(); + /// let ServerParts { + /// instrument_server, + /// aggregator_handle, + /// .. + /// } = server.into_parts(); + /// + /// let router = tonic::transport::Server::builder() + /// //.add_service(some_other_service) + /// .add_service(instrument_server); + /// let serve = router.serve(std::net::SocketAddr::new( + /// std::net::IpAddr::V4(std::net::Ipv4Addr::new(127, 0, 0, 1)), + /// 6669, + /// )); + /// + /// // Finally, spawn the server. + /// tokio::spawn(serve); + /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused. + /// # drop(console_layer); + /// # drop(aggregator_handle); + /// # }); + /// ``` + /// + /// [`Router::serve`]: fn@tonic::transport::server::Router::serve + pub fn into_parts(mut self) -> ServerParts { + let aggregate = self + .aggregator + .take() + .expect("cannot start server multiple times"); + let aggregate = spawn_named(aggregate.run(), "console::aggregate"); + + let service = proto::instrument::instrument_server::InstrumentServer::new(self); + + ServerParts { + instrument_server: service, + aggregator_handle: AggregatorHandle { + join_handle: aggregate, + }, + } + } +} + +/// Server Parts +/// +/// This struct contains the parts returned by [`Server::into_parts`]. It may contain +/// further parts in the future, an as such is marked as `non_exhaustive`. +/// +/// The `InstrumentServer` can be used to construct a router which +/// can be added to a [`tonic`] gRPC server. +/// +/// The [`AggregatorHandle`] must be kept until after the server has been +/// shut down. +/// +/// See the [`Server::into_parts`] documentation for usage. +#[non_exhaustive] +pub struct ServerParts { + /// The instrument server. + /// + /// See the documentation for [`InstrumentServer`] for details. + pub instrument_server: InstrumentServer, + + /// The aggregate handle. + /// + /// See the documentation for [`AggregatorHandle`] for details. + pub aggregator_handle: AggregatorHandle, +} + +/// Aggregator handle. +/// +/// This object is returned from [`Server::into_parts`] and must be +/// kept as long as the `InstrumentServer` - which is also +/// returned - is in use. +pub struct AggregatorHandle { + join_handle: JoinHandle<()>, +} + +impl Drop for AggregatorHandle { + fn drop(&mut self) { + self.join_handle.abort(); + } } #[tonic::async_trait] From 1d0213de2034810a6003c0bafffbaedecd1c3f76 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 19 Jul 2023 12:34:59 +0200 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Eliza Weisman --- console-subscriber/src/lib.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 7b6daecf2..0edfb7176 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -964,8 +964,8 @@ impl Server { /// Returns the parts needed to spawn a gRPC server and keep the aggregation /// worker running. /// - /// Note that a server spawned in this way will overwrite any value set by - /// [`Builder::server_addr`] as the user becomes responsible for defining + /// Note that a server spawned in this way will disregard any value set by + /// [`Builder::server_addr`], as the user becomes responsible for defining /// the address when calling [`Router::serve`]. /// /// # Examples @@ -1042,7 +1042,7 @@ pub struct ServerParts { /// See the documentation for [`InstrumentServer`] for details. pub instrument_server: InstrumentServer, - /// The aggregate handle. + /// A handle to the background worker task responsible for aggregating trace data. /// /// See the documentation for [`AggregatorHandle`] for details. pub aggregator_handle: AggregatorHandle, From 95937c5c447a81300927cc1659867ec3afa0a729 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Mon, 24 Jul 2023 10:36:09 +0200 Subject: [PATCH 3/5] make `AggregatorHandle` abort explcit Instead of aborting the aggregator task upon dropping the `AggregatorHandle`, we now provide an explicit `abort` method. If the user chooses, they can discard the aggregator handle and while they will lose the possibility to clean up the aggregator task, they will be otherwise unaffected. --- console-subscriber/src/lib.rs | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index 0edfb7176..a5bf2d137 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1000,7 +1000,8 @@ impl Server { /// tokio::spawn(serve); /// # // Avoid a warning that `console_layer` and `aggregator_handle` are unused. /// # drop(console_layer); - /// # drop(aggregator_handle); + /// # let mut aggregator_handle = aggregator_handle; + /// # aggregator_handle.abort(); /// # }); /// ``` /// @@ -1031,8 +1032,8 @@ impl Server { /// The `InstrumentServer` can be used to construct a router which /// can be added to a [`tonic`] gRPC server. /// -/// The [`AggregatorHandle`] must be kept until after the server has been -/// shut down. +/// The [`AggregatorHandle`] can be used to abort the associated aggregator task +/// after the server has been shut down. /// /// See the [`Server::into_parts`] documentation for usage. #[non_exhaustive] @@ -1050,15 +1051,32 @@ pub struct ServerParts { /// Aggregator handle. /// -/// This object is returned from [`Server::into_parts`] and must be -/// kept as long as the `InstrumentServer` - which is also -/// returned - is in use. +/// This object is returned from [`Server::into_parts`]. It can be +/// used to abort the aggregator task. +/// +/// The aggregator collects the traces that implement the async runtime +/// being observed and prepares them to be served by the gRPC server. +/// +/// Normally, if the server, started with [`Server::serve`] or +/// [`Server::serve_with`] stops for any reason, the aggregator is aborted, +/// hoewver, if the server was started with the [`InstrumentServer`] returned +/// from [`Server::into_parts`], then it is the responsibility of the user +/// of the API to stop the aggregator task by calling [`abort`] on this +/// object. +/// +/// [`abort`]: fn@crate::Aggregator::abort pub struct AggregatorHandle { join_handle: JoinHandle<()>, } -impl Drop for AggregatorHandle { - fn drop(&mut self) { +impl AggregatorHandle { + /// Aborts the task running this aggregator. + /// + /// To avoid having a disconnected aggregator running forever, this + /// method should be called when the [`tonic::transport::Server`] started + /// with the [`InstrumentServer`] also returned from [`Server::into_parts`] + /// stops running. + pub fn abort(&mut self) { self.join_handle.abort(); } } From adca82c3a1012590c4e69e8bd6137fd88c403c8b Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Mon, 24 Jul 2023 11:05:13 +0200 Subject: [PATCH 4/5] fixed doc link --- console-subscriber/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index a5bf2d137..d9d39f53d 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1064,7 +1064,7 @@ pub struct ServerParts { /// of the API to stop the aggregator task by calling [`abort`] on this /// object. /// -/// [`abort`]: fn@crate::Aggregator::abort +/// [`abort`]: fn@crate::AggregatorHandle::abort pub struct AggregatorHandle { join_handle: JoinHandle<()>, } From 7a2912c8123aedcb74bd9d86ac15d574beb1205e Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 27 Jul 2023 23:05:01 +0200 Subject: [PATCH 5/5] expose the Aggregator instead of a handle to abort it As discussed in code review, for such a low level api as `Server::into_parts`, it makes sense to allow the user to spawn the aggregator where they like, rather than spawning it internally. Since I couldn't find a way to return and use a `dyn Future` (boxed or otherwise), the `Aggregator` has been made public with a (async) single function `run()` which will start it's run loop. --- console-subscriber/src/aggregator/mod.rs | 15 ++++++- console-subscriber/src/lib.rs | 51 ++++++++++++++---------- 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index 7359970b5..4496cba28 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -22,7 +22,13 @@ mod shrink; use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; -pub(crate) struct Aggregator { +/// Aggregates instrumentation traces and prepares state for the instrument +/// server. +/// +/// The `Aggregator` is responsible for receiving and organizing the +/// instrumentated events and preparing the data to be served to a instrument +/// client. +pub struct Aggregator { /// Channel of incoming events emitted by `TaskLayer`s. events: mpsc::Receiver, @@ -157,7 +163,12 @@ impl Aggregator { } } - pub(crate) async fn run(mut self) { + /// Runs the aggregator. + /// + /// This method will start the aggregator loop and should run as long as + /// the instrument server is running. If the instrument server stops, + /// this future can be aborted. + pub async fn run(mut self) { let mut publish = tokio::time::interval(self.publish_interval); loop { let should_send = tokio::select! { diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index d9d39f53d..8df105cce 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -42,7 +42,7 @@ mod stats; pub(crate) mod sync; mod visitors; -use aggregator::Aggregator; +pub use aggregator::Aggregator; pub use builder::{Builder, ServerAddr}; use callsites::Callsites; use record::Recorder; @@ -941,10 +941,11 @@ impl Server { ) -> Result<(), Box> { let addr = self.addr.clone(); let ServerParts { - instrument_server: service, - aggregator_handle: aggregate, + instrument_server, + aggregator, } = self.into_parts(); - let router = builder.add_service(service); + let aggregate = spawn_named(aggregator.run(), "console::aggregate"); + let router = builder.add_service(instrument_server); let res = match addr { ServerAddr::Tcp(addr) => { let serve = router.serve(addr); @@ -957,17 +958,21 @@ impl Server { spawn_named(serve, "console::serve").await } }; - drop(aggregate); + aggregate.abort(); res?.map_err(Into::into) } - /// Returns the parts needed to spawn a gRPC server and keep the aggregation - /// worker running. + /// Returns the parts needed to spawn a gRPC server and the aggregator that + /// supplies it. /// /// Note that a server spawned in this way will disregard any value set by /// [`Builder::server_addr`], as the user becomes responsible for defining /// the address when calling [`Router::serve`]. /// + /// Additionally, the user of this API must ensure that the [`Aggregator`] + /// is running for as long as the gRPC server is. If the server stops + /// running, the aggregator task can be aborted. + /// /// # Examples /// /// The parts can be used to serve the instrument server together with @@ -984,10 +989,11 @@ impl Server { /// let (console_layer, server) = ConsoleLayer::builder().build(); /// let ServerParts { /// instrument_server, - /// aggregator_handle, + /// aggregator, /// .. /// } = server.into_parts(); /// + /// let aggregator_handle = tokio::spawn(aggregator.run()); /// let router = tonic::transport::Server::builder() /// //.add_service(some_other_service) /// .add_service(instrument_server); @@ -1007,19 +1013,16 @@ impl Server { /// /// [`Router::serve`]: fn@tonic::transport::server::Router::serve pub fn into_parts(mut self) -> ServerParts { - let aggregate = self + let aggregator = self .aggregator .take() .expect("cannot start server multiple times"); - let aggregate = spawn_named(aggregate.run(), "console::aggregate"); - let service = proto::instrument::instrument_server::InstrumentServer::new(self); + let instrument_server = proto::instrument::instrument_server::InstrumentServer::new(self); ServerParts { - instrument_server: service, - aggregator_handle: AggregatorHandle { - join_handle: aggregate, - }, + instrument_server, + aggregator, } } } @@ -1032,8 +1035,9 @@ impl Server { /// The `InstrumentServer` can be used to construct a router which /// can be added to a [`tonic`] gRPC server. /// -/// The [`AggregatorHandle`] can be used to abort the associated aggregator task -/// after the server has been shut down. +/// The `aggregator` is a future which should be running as long as the server is. +/// Generally, this future should be spawned onto an appropriate runtime and then +/// aborted if the server gets shut down. /// /// See the [`Server::into_parts`] documentation for usage. #[non_exhaustive] @@ -1043,10 +1047,17 @@ pub struct ServerParts { /// See the documentation for [`InstrumentServer`] for details. pub instrument_server: InstrumentServer, - /// A handle to the background worker task responsible for aggregating trace data. + /// The aggregator. + /// + /// Responsible for collecting and preparing traces for the instrument server + /// to send its clients. + /// + /// The aggregator should be [`run`] when the instrument server is started. + /// If the server stops running for any reason, the aggregator task can be + /// aborted. /// - /// See the documentation for [`AggregatorHandle`] for details. - pub aggregator_handle: AggregatorHandle, + /// [`run`]: fn@crate::Aggregator::run + pub aggregator: Aggregator, } /// Aggregator handle.