diff --git a/src/client/options.rs b/src/client/options.rs index 315a0585a..64e0684f3 100644 --- a/src/client/options.rs +++ b/src/client/options.rs @@ -12,7 +12,6 @@ use std::{ hash::{Hash, Hasher}, path::PathBuf, str::FromStr, - sync::Arc, time::Duration, }; @@ -32,7 +31,7 @@ use crate::{ compression::Compressor, concern::{Acknowledgment, ReadConcern, WriteConcern}, error::{Error, ErrorKind, Result}, - event::{cmap::CmapEventHandler, sdam::SdamEventHandler}, + event::EventHandler, options::ReadConcernLevel, sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY}, selection_criteria::{ReadPreference, SelectionCriteria, TagSet}, @@ -404,22 +403,19 @@ pub struct ClientOptions { #[serde(skip)] pub compressors: Option>, - /// The handler that should process all Connection Monitoring and Pooling events. See the - /// CmapEventHandler type documentation for more details. + /// The handler that should process all Connection Monitoring and Pooling events. #[derivative(Debug = "ignore", PartialEq = "ignore")] - #[builder(default)] + #[builder(default, setter(strip_option))] #[serde(skip)] - pub cmap_event_handler: Option>, + pub cmap_event_handler: Option>, - /// The handler that should process all command-related events. See the CommandEventHandler - /// type documentation for more details. + /// The handler that should process all command-related events. /// /// Note that monitoring command events may incur a performance penalty. #[derivative(Debug = "ignore", PartialEq = "ignore")] #[builder(default, setter(strip_option))] #[serde(skip)] - pub command_event_handler: - Option>, + pub command_event_handler: Option>, /// The connect timeout passed to each underlying TcpStream when attemtping to connect to the /// server. @@ -520,12 +516,11 @@ pub struct ClientOptions { #[builder(default)] pub retry_writes: Option, - /// The handler that should process all Server Discovery and Monitoring events. See the - /// [`SdamEventHandler`] type documentation for more details. + /// The handler that should process all Server Discovery and Monitoring events. #[derivative(Debug = "ignore", PartialEq = "ignore")] - #[builder(default)] + #[builder(default, setter(strip_option))] #[serde(skip)] - pub sdam_event_handler: Option>, + pub sdam_event_handler: Option>, /// The default selection criteria for operations performed on the Client. See the /// SelectionCriteria type documentation for more details. diff --git a/src/client/session/test.rs b/src/client/session/test.rs index 0253e8774..cb8d00891 100644 --- a/src/client/session/test.rs +++ b/src/client/session/test.rs @@ -9,19 +9,12 @@ use crate::{ bson::{doc, Bson}, coll::options::{CountOptions, InsertManyOptions}, error::Result, + event::sdam::SdamEvent, options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern}, runtime, sdam::ServerInfo, selection_criteria::SelectionCriteria, - test::{ - get_client_options, - log_uncaptured, - Event, - EventClient, - EventHandler, - SdamEvent, - TestClient, - }, + test::{get_client_options, log_uncaptured, Event, EventClient, EventHandler, TestClient}, Client, Collection, }; @@ -306,7 +299,7 @@ async fn cluster_time_in_commands() { let mut options = get_client_options().await.clone(); options.heartbeat_freq = Some(Duration::from_secs(1000)); options.command_event_handler = Some(handler.clone().into()); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); // Ensure we only connect to one server so the monitor checks from other servers // don't affect the TopologyDescription's clusterTime value between commands. diff --git a/src/cmap/options.rs b/src/cmap/options.rs index 784f65f4f..ae760d89d 100644 --- a/src/cmap/options.rs +++ b/src/cmap/options.rs @@ -1,6 +1,6 @@ #[cfg(test)] use std::cmp::Ordering; -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use derivative::Derivative; #[cfg(test)] @@ -9,7 +9,10 @@ use serde::Deserialize; use crate::{ client::auth::Credential, - event::cmap::{CmapEventHandler, ConnectionPoolOptions as EventOptions}, + event::{ + cmap::{CmapEvent, ConnectionPoolOptions as EventOptions}, + EventHandler, + }, options::ClientOptions, serde_util, }; @@ -26,7 +29,7 @@ pub(crate) struct ConnectionPoolOptions { /// Processes all events generated by the pool. #[derivative(Debug = "ignore", PartialEq = "ignore")] #[serde(skip)] - pub(crate) cmap_event_handler: Option>, + pub(crate) cmap_event_handler: Option>, /// Interval between background thread maintenance runs (e.g. ensure minPoolSize). #[cfg(test)] diff --git a/src/cmap/test.rs b/src/cmap/test.rs index ca0fe6b9d..cfc48ef1b 100644 --- a/src/cmap/test.rs +++ b/src/cmap/test.rs @@ -7,7 +7,7 @@ use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration}; use tokio::sync::{Mutex, RwLock}; use self::{ - event::EventHandler, + event::TestEventHandler, file::{Operation, TestFile, ThreadedOperation}, }; @@ -70,7 +70,7 @@ struct Executor { #[derive(Debug)] struct State { - handler: Arc, + handler: Arc, connections: RwLock>, unlabeled_connections: Mutex>, threads: RwLock>, @@ -126,11 +126,11 @@ impl CmapThread { impl Executor { async fn new(test_file: TestFile) -> Self { - let handler = Arc::new(EventHandler::new()); + let handler = Arc::new(TestEventHandler::new()); let error = test_file.error; let mut pool_options = test_file.pool_options.unwrap_or_default(); - pool_options.cmap_event_handler = Some(handler.clone()); + pool_options.cmap_event_handler = Some(handler.clone().into()); let state = State { handler, diff --git a/src/cmap/test/event.rs b/src/cmap/test/event.rs index 699850aca..0a3f6a5a2 100644 --- a/src/cmap/test/event.rs +++ b/src/cmap/test/event.rs @@ -9,12 +9,12 @@ use crate::{event::cmap::*, options::ServerAddress, test::util::EventSubscriber} use tokio::sync::broadcast::error::SendError; #[derive(Clone, Debug)] -pub struct EventHandler { +pub struct TestEventHandler { pub(crate) events: Arc>>, channel_sender: tokio::sync::broadcast::Sender, } -impl EventHandler { +impl TestEventHandler { pub fn new() -> Self { let (channel_sender, _) = tokio::sync::broadcast::channel(500); Self { @@ -31,12 +31,13 @@ impl EventHandler { self.events.write().unwrap().push(event); } - pub(crate) fn subscribe(&self) -> EventSubscriber<'_, EventHandler, CmapEvent> { + pub(crate) fn subscribe(&self) -> EventSubscriber<'_, TestEventHandler, CmapEvent> { EventSubscriber::new(self, self.channel_sender.subscribe()) } } -impl CmapEventHandler for EventHandler { +#[allow(deprecated)] +impl CmapEventHandler for TestEventHandler { fn handle_pool_created_event(&self, event: PoolCreatedEvent) { self.handle(event); } diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index bc463b7ca..224bfb90d 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -1,6 +1,6 @@ use serde::Deserialize; -use super::{event::EventHandler, EVENT_TIMEOUT}; +use super::{event::TestEventHandler, EVENT_TIMEOUT}; use crate::{ bson::{doc, Document}, cmap::{ @@ -9,7 +9,7 @@ use crate::{ Command, ConnectionPool, }, - event::cmap::{CmapEvent, CmapEventHandler, ConnectionClosedReason}, + event::cmap::{CmapEvent, ConnectionClosedReason}, hello::LEGACY_HELLO_COMMAND_NAME, operation::CommandResponse, runtime, @@ -114,11 +114,10 @@ async fn concurrent_connections() { .await .expect("failpoint should succeed"); - let handler = Arc::new(EventHandler::new()); + let handler = Arc::new(TestEventHandler::new()); let client_options = get_client_options().await.clone(); let mut options = ConnectionPoolOptions::from_client_options(&client_options); - options.cmap_event_handler = - Some(handler.clone() as Arc); + options.cmap_event_handler = Some(handler.clone().into()); options.ready = Some(true); let pool = ConnectionPool::new( @@ -203,13 +202,12 @@ async fn connection_error_during_establishment() { ); let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap(); - let handler = Arc::new(EventHandler::new()); + let handler = Arc::new(TestEventHandler::new()); let mut subscriber = handler.subscribe(); let mut options = ConnectionPoolOptions::from_client_options(&client_options); options.ready = Some(true); - options.cmap_event_handler = - Some(handler.clone() as Arc); + options.cmap_event_handler = Some(handler.clone().into()); let pool = ConnectionPool::new( client_options.hosts[0].clone(), ConnectionEstablisher::new(EstablisherOptions::from_client_options(&client_options)) @@ -238,8 +236,8 @@ async fn connection_error_during_establishment() { async fn connection_error_during_operation() { let mut options = get_client_options().await.clone(); - let handler = Arc::new(EventHandler::new()); - options.cmap_event_handler = Some(handler.clone() as Arc); + let handler = Arc::new(TestEventHandler::new()); + options.cmap_event_handler = Some(handler.clone().into()); options.hosts.drain(1..); options.max_pool_size = Some(1); diff --git a/src/event.rs b/src/event.rs index 4cddf89a5..7c634e72f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -10,6 +10,8 @@ use futures_core::future::BoxFuture; use crate::event::command::CommandEvent; +use self::{cmap::CmapEvent, sdam::SdamEvent}; + /// A destination for events. Allows implicit conversion via [`From`] for concrete types for /// convenience with [`crate::options::ClientOptions`] construction: /// @@ -75,6 +77,44 @@ impl From> } } +#[allow(deprecated)] +impl From> for EventHandler { + fn from(value: Arc) -> Self { + use CmapEvent::*; + Self::callback(move |ev| match ev { + PoolCreated(ev) => value.handle_pool_created_event(ev), + PoolReady(ev) => value.handle_pool_ready_event(ev), + PoolCleared(ev) => value.handle_pool_cleared_event(ev), + PoolClosed(ev) => value.handle_pool_closed_event(ev), + ConnectionCreated(ev) => value.handle_connection_created_event(ev), + ConnectionReady(ev) => value.handle_connection_ready_event(ev), + ConnectionClosed(ev) => value.handle_connection_closed_event(ev), + ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev), + ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev), + ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev), + ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev), + }) + } +} + +#[allow(deprecated)] +impl From> for EventHandler { + fn from(value: Arc) -> Self { + use SdamEvent::*; + Self::callback(move |ev| match ev { + ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev), + ServerOpening(ev) => value.handle_server_opening_event(ev), + ServerClosed(ev) => value.handle_server_closed_event(ev), + TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev), + TopologyOpening(ev) => value.handle_topology_opening_event(ev), + TopologyClosed(ev) => value.handle_topology_closed_event(ev), + ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev), + ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev), + ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev), + }) + } +} + impl EventHandler { /// Construct a new event handler with a callback. pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self { diff --git a/src/event/cmap.rs b/src/event/cmap.rs index a3a989ba5..4d8fcfb39 100644 --- a/src/event/cmap.rs +++ b/src/event/cmap.rs @@ -1,7 +1,7 @@ //! Contains the events and functionality for monitoring behavior of the connection pooling of a //! `Client`. -use std::{sync::Arc, time::Duration}; +use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -17,6 +17,8 @@ use crate::trace::{ CONNECTION_TRACING_EVENT_TARGET, }; +use super::EventHandler; + /// We implement `Deserialize` for all of the event types so that we can more easily parse the CMAP /// spec tests. However, we have no need to parse the address field from the JSON files (if it's /// even present). To facilitate populating the address field with an empty value when @@ -281,10 +283,13 @@ fn default_connection_id() -> u32 { 42 } +/// Usage of this trait is deprecated. Applications should use the [`EventHandler`] API. +/// /// Applications can implement this trait to specify custom logic to run on each CMAP event sent /// by the driver. /// /// ```rust +/// # #![allow(deprecated)] /// # use std::sync::Arc; /// # /// # use mongodb::{ @@ -309,7 +314,7 @@ fn default_connection_id() -> u32 { /// } /// /// # fn do_stuff() -> Result<()> { -/// let handler: Arc = Arc::new(FailedCheckoutLogger); +/// let handler = Arc::new(FailedCheckoutLogger); /// let options = ClientOptions::builder() /// .cmap_event_handler(handler) /// .build(); @@ -319,6 +324,7 @@ fn default_connection_id() -> u32 { /// # Ok(()) /// # } /// ``` +#[deprecated = "use the EventHandler API"] pub trait CmapEventHandler: Send + Sync { /// A [`Client`](../../struct.Client.html) will call this method on each registered handler /// whenever a connection pool is created. @@ -369,7 +375,9 @@ pub trait CmapEventHandler: Send + Sync { } #[derive(Clone, Debug, PartialEq, From)] -pub(crate) enum CmapEvent { +#[non_exhaustive] +#[allow(missing_docs)] +pub enum CmapEvent { PoolCreated(PoolCreatedEvent), PoolReady(PoolReadyEvent), PoolCleared(PoolClearedEvent), @@ -385,7 +393,7 @@ pub(crate) enum CmapEvent { #[derive(Clone)] pub(crate) struct CmapEventEmitter { - user_handler: Option>, + user_handler: Option>, #[cfg(feature = "tracing-unstable")] tracing_emitter: ConnectionTracingEventEmitter, @@ -395,7 +403,7 @@ impl CmapEventEmitter { // the topology ID is only used when the tracing feature is on. #[allow(unused_variables)] pub(crate) fn new( - user_handler: Option>, + user_handler: Option>, topology_id: ObjectId, ) -> CmapEventEmitter { Self { @@ -408,7 +416,7 @@ impl CmapEventEmitter { #[cfg(not(feature = "tracing-unstable"))] pub(crate) fn emit_event(&self, generate_event: impl FnOnce() -> CmapEvent) { if let Some(ref handler) = self.user_handler { - handle_cmap_event(handler.as_ref(), generate_event()); + handler.handle(generate_event()); } } @@ -429,39 +437,17 @@ impl CmapEventEmitter { (None, None) => {} (None, Some(tracing_emitter)) => { let event = generate_event(); - handle_cmap_event(tracing_emitter, event); + tracing_emitter.handle(event); } (Some(user_handler), None) => { let event = generate_event(); - handle_cmap_event(user_handler.as_ref(), event); + user_handler.handle(event); } (Some(user_handler), Some(tracing_emitter)) => { let event = generate_event(); - handle_cmap_event(user_handler.as_ref(), event.clone()); - handle_cmap_event(tracing_emitter, event); + user_handler.handle(event.clone()); + tracing_emitter.handle(event); } }; } } - -fn handle_cmap_event(handler: &dyn CmapEventHandler, event: CmapEvent) { - match event { - CmapEvent::PoolCreated(event) => handler.handle_pool_created_event(event), - CmapEvent::PoolReady(event) => handler.handle_pool_ready_event(event), - CmapEvent::PoolCleared(event) => handler.handle_pool_cleared_event(event), - CmapEvent::PoolClosed(event) => handler.handle_pool_closed_event(event), - CmapEvent::ConnectionCreated(event) => handler.handle_connection_created_event(event), - CmapEvent::ConnectionReady(event) => handler.handle_connection_ready_event(event), - CmapEvent::ConnectionClosed(event) => handler.handle_connection_closed_event(event), - CmapEvent::ConnectionCheckoutStarted(event) => { - handler.handle_connection_checkout_started_event(event) - } - CmapEvent::ConnectionCheckoutFailed(event) => { - handler.handle_connection_checkout_failed_event(event) - } - CmapEvent::ConnectionCheckedOut(event) => { - handler.handle_connection_checked_out_event(event) - } - CmapEvent::ConnectionCheckedIn(event) => handler.handle_connection_checked_in_event(event), - } -} diff --git a/src/event/command.rs b/src/event/command.rs index e05cdd351..2624acf9c 100644 --- a/src/event/command.rs +++ b/src/event/command.rs @@ -93,7 +93,7 @@ pub struct CommandFailedEvent { pub service_id: Option, } -/// Usage of this trait is deprecated. Applications should use the simpler +/// Usage of this trait is deprecated. Applications should use the /// [`EventHandler`](crate::event::EventHandler) API. /// /// Applications can implement this trait to specify custom logic to run on each command event sent @@ -134,7 +134,7 @@ pub struct CommandFailedEvent { /// # Ok(()) /// # } /// ``` -#[deprecated] +#[deprecated = "use the EventHandler API"] pub trait CommandEventHandler: Send + Sync { /// A [`Client`](../../struct.Client.html) will call this method on each registered handler /// whenever a database command is initiated. diff --git a/src/event/sdam.rs b/src/event/sdam.rs index 0703fc8fc..f8f5d126d 100644 --- a/src/event/sdam.rs +++ b/src/event/sdam.rs @@ -183,8 +183,11 @@ pub struct ServerHeartbeatFailedEvent { pub server_connection_id: Option, } -#[derive(Clone, Debug)] -pub(crate) enum SdamEvent { +#[derive(Clone, Debug, Serialize)] +#[allow(missing_docs)] +#[non_exhaustive] +#[serde(untagged)] +pub enum SdamEvent { ServerDescriptionChanged(Box), ServerOpening(ServerOpeningEvent), ServerClosed(ServerClosedEvent), @@ -196,10 +199,14 @@ pub(crate) enum SdamEvent { ServerHeartbeatFailed(ServerHeartbeatFailedEvent), } +/// Usage of this trait is deprecated. Applications should use the +/// [`EventHandler`](crate::event::EventHandler) API. +/// /// Applications can implement this trait to specify custom logic to run on each SDAM event sent /// by the driver. /// /// ```rust +/// # #![allow(deprecated)] /// # use std::sync::Arc; /// # /// # use mongodb::{ @@ -224,7 +231,7 @@ pub(crate) enum SdamEvent { /// } /// /// # fn do_stuff() -> Result<()> { -/// let handler: Arc = Arc::new(FailedHeartbeatLogger); +/// let handler = Arc::new(FailedHeartbeatLogger); /// let options = ClientOptions::builder() /// .sdam_event_handler(handler) /// .build(); @@ -234,6 +241,7 @@ pub(crate) enum SdamEvent { /// # Ok(()) /// # } /// ``` +#[deprecated = "use the EventHandler API"] pub trait SdamEventHandler: Send + Sync { /// A [`Client`](../../struct.Client.html) will call this method on each registered handler when /// a server description changes. @@ -272,23 +280,3 @@ pub trait SdamEventHandler: Send + Sync { /// a server heartbeat fails. fn handle_server_heartbeat_failed_event(&self, _event: ServerHeartbeatFailedEvent) {} } - -pub(crate) fn handle_sdam_event(handler: &dyn SdamEventHandler, event: SdamEvent) { - match event { - SdamEvent::ServerClosed(event) => handler.handle_server_closed_event(event), - SdamEvent::ServerDescriptionChanged(e) => { - handler.handle_server_description_changed_event(*e) - } - SdamEvent::ServerOpening(e) => handler.handle_server_opening_event(e), - SdamEvent::TopologyDescriptionChanged(e) => { - handler.handle_topology_description_changed_event(*e) - } - SdamEvent::TopologyOpening(e) => handler.handle_topology_opening_event(e), - SdamEvent::TopologyClosed(e) => handler.handle_topology_closed_event(e), - SdamEvent::ServerHeartbeatStarted(e) => handler.handle_server_heartbeat_started_event(e), - SdamEvent::ServerHeartbeatSucceeded(e) => { - handler.handle_server_heartbeat_succeeded_event(e) - } - SdamEvent::ServerHeartbeatFailed(e) => handler.handle_server_heartbeat_failed_event(e), - } -} diff --git a/src/sdam/description/topology/test/event.rs b/src/sdam/description/topology/test/event.rs index 8ade3494a..7f87ceda2 100644 --- a/src/sdam/description/topology/test/event.rs +++ b/src/sdam/description/topology/test/event.rs @@ -3,6 +3,7 @@ use serde::Deserialize; use crate::{ client::options::ServerAddress, event::sdam::{ + SdamEvent, ServerClosedEvent, ServerDescriptionChangedEvent, ServerOpeningEvent, @@ -11,7 +12,6 @@ use crate::{ TopologyOpeningEvent, }, sdam::{ServerDescription, ServerType, TopologyDescription}, - test::SdamEvent, }; #[derive(Debug, Deserialize)] diff --git a/src/sdam/description/topology/test/sdam.rs b/src/sdam/description/topology/test/sdam.rs index 4c0acd4a9..eab4394bd 100644 --- a/src/sdam/description/topology/test/sdam.rs +++ b/src/sdam/description/topology/test/sdam.rs @@ -10,6 +10,7 @@ use crate::{ client::Client, cmap::{conn::ConnectionGeneration, PoolGeneration}, error::{BulkWriteFailure, CommandError, Error, ErrorKind}, + event::sdam::SdamEvent, hello::{HelloCommandResponse, HelloReply, LastWrite, LEGACY_HELLO_COMMAND_NAME}, options::{ClientOptions, ReadPreference, SelectionCriteria, ServerAddress}, sdam::{ @@ -33,7 +34,6 @@ use crate::{ FailCommandOptions, FailPoint, FailPointMode, - SdamEvent, TestClient, }, }; @@ -275,7 +275,7 @@ async fn run_test(test_file: TestFile) { .expect(test_description); let handler = Arc::new(EventHandler::new()); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); options.test_options_mut().disable_monitoring_threads = true; let mut event_subscriber = handler.subscribe(); diff --git a/src/sdam/test.rs b/src/sdam/test.rs index a93f686e8..cb1eed5aa 100644 --- a/src/sdam/test.rs +++ b/src/sdam/test.rs @@ -11,7 +11,7 @@ use crate::{ client::options::{ClientOptions, ServerAddress}, cmap::RawCommandResponse, error::{Error, ErrorKind}, - event::{cmap::CmapEvent, sdam::SdamEventHandler}, + event::{cmap::CmapEvent, sdam::SdamEvent}, hello::{LEGACY_HELLO_COMMAND_NAME, LEGACY_HELLO_COMMAND_NAME_LOWERCASE}, sdam::{ServerDescription, Topology}, test::{ @@ -23,7 +23,6 @@ use crate::{ FailCommandOptions, FailPoint, FailPointMode, - SdamEvent, TestClient, }, Client, @@ -201,7 +200,7 @@ async fn hello_ok_true() { let mut subscriber = handler.subscribe(); let mut options = setup_client_options.clone(); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); options.direct_connection = Some(true); options.heartbeat_freq = Some(Duration::from_millis(500)); let _client = Client::with_options(options).expect("client creation should succeed"); @@ -277,7 +276,7 @@ async fn removed_server_monitor_stops() -> crate::error::Result<()> { ServerAddress::parse("localhost:49154")?, ]) .heartbeat_freq(Duration::from_millis(50)) - .sdam_event_handler(handler.clone() as Arc) + .sdam_event_handler(handler.clone()) .repl_set_name("foo".to_string()) .build(); diff --git a/src/sdam/topology.rs b/src/sdam/topology.rs index ce44b0559..ed80ceadc 100644 --- a/src/sdam/topology.rs +++ b/src/sdam/topology.rs @@ -26,7 +26,6 @@ use crate::{ }, error::{load_balanced_mode_mismatch, Error, Result}, event::sdam::{ - handle_sdam_event, SdamEvent, ServerClosedEvent, ServerDescriptionChangedEvent, @@ -86,12 +85,12 @@ impl Topology { if let Some(ref user_handler) = user_handler { #[cfg(feature = "tracing-unstable")] - handle_sdam_event(user_handler.as_ref(), event.clone()); + user_handler.handle(event.clone()); #[cfg(not(feature = "tracing-unstable"))] - handle_sdam_event(user_handler.as_ref(), event); + user_handler.handle(event); } #[cfg(feature = "tracing-unstable")] - handle_sdam_event(&tracing_emitter, event); + tracing_emitter.handle(event); ack.acknowledge(()); } diff --git a/src/test.rs b/src/test.rs index 378ddea6e..67ef6ff44 100644 --- a/src/test.rs +++ b/src/test.rs @@ -36,7 +36,6 @@ pub(crate) use self::{ FailPointMode, MatchErrExt, Matchable, - SdamEvent, TestClient, }, }; diff --git a/src/test/client.rs b/src/test/client.rs index c052d8b3e..ba24ed299 100644 --- a/src/test/client.rs +++ b/src/test/client.rs @@ -7,7 +7,7 @@ use crate::{ bson::{doc, Bson}, coll::options::FindOptions, error::{CommandError, Error, ErrorKind}, - event::cmap::CmapEvent, + event::{cmap::CmapEvent, sdam::SdamEvent}, hello::LEGACY_HELLO_COMMAND_NAME, options::{AuthMechanism, ClientOptions, Credential, ServerAddress}, runtime, @@ -21,7 +21,6 @@ use crate::{ FailCommandOptions, FailPoint, FailPointMode, - SdamEvent, SERVER_API, }, Client, @@ -722,8 +721,8 @@ async fn retry_commit_txn_check_out() { let mut options = get_client_options().await.clone(); let handler = Arc::new(EventHandler::new()); - options.cmap_event_handler = Some(handler.clone()); - options.sdam_event_handler = Some(handler.clone()); + options.cmap_event_handler = Some(handler.clone().into()); + options.sdam_event_handler = Some(handler.clone().into()); options.heartbeat_freq = Some(Duration::from_secs(120)); options.app_name = Some("retry_commit_txn_check_out".to_string()); let client = Client::with_options(options).unwrap(); diff --git a/src/test/csfle.rs b/src/test/csfle.rs index 81de13cdf..d4044f83b 100644 --- a/src/test/csfle.rs +++ b/src/test/csfle.rs @@ -32,7 +32,10 @@ use tokio::net::TcpListener; use crate::{ client_encryption::{ClientEncryption, EncryptKey, MasterKey, RangeOptions}, error::{ErrorKind, WriteError, WriteFailure}, - event::command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, + event::{ + command::{CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, + sdam::SdamEvent, + }, options::{ CollectionOptions, CreateCollectionOptions, @@ -47,7 +50,7 @@ use crate::{ WriteConcern, }, runtime, - test::{Event, EventHandler, SdamEvent}, + test::{Event, EventHandler}, Client, Collection, IndexModel, @@ -1623,7 +1626,7 @@ impl DeadlockTestCase { let mut opts = get_client_options().await.clone(); opts.max_pool_size = Some(self.max_pool_size); opts.command_event_handler = Some(event_handler.clone().into()); - opts.sdam_event_handler = Some(event_handler.clone()); + opts.sdam_event_handler = Some(event_handler.clone().into()); let client_encrypted = Client::encrypted_builder(opts, KV_NAMESPACE.clone(), LOCAL_KMS.clone())? .bypass_auto_encryption(self.bypass_auto_encryption) diff --git a/src/test/spec/retryable_reads.rs b/src/test/spec/retryable_reads.rs index 05d9039f7..b0ca50458 100644 --- a/src/test/spec/retryable_reads.rs +++ b/src/test/spec/retryable_reads.rs @@ -5,7 +5,7 @@ use bson::doc; use crate::{ error::Result, event::{ - cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason}, + cmap::{CmapEvent, ConnectionCheckoutFailedReason}, command::CommandEvent, }, runtime, @@ -78,7 +78,7 @@ async fn retry_read_pool_cleared() { let mut client_options = get_client_options().await.clone(); client_options.retry_reads = Some(true); client_options.max_pool_size = Some(1); - client_options.cmap_event_handler = Some(handler.clone() as Arc); + client_options.cmap_event_handler = Some(handler.clone().into()); client_options.command_event_handler = Some(handler.clone().into()); // on sharded clusters, ensure only a single mongos is used if client_options.repl_set_name.is_none() { diff --git a/src/test/spec/retryable_writes.rs b/src/test/spec/retryable_writes.rs index d36c2708c..8061cf817 100644 --- a/src/test/spec/retryable_writes.rs +++ b/src/test/spec/retryable_writes.rs @@ -13,7 +13,7 @@ use crate::{ bson::{doc, Document}, error::{ErrorKind, Result, RETRYABLE_WRITE_ERROR}, event::{ - cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason}, + cmap::{CmapEvent, ConnectionCheckoutFailedReason}, command::CommandEvent, }, options::{ClientOptions, FindOptions, InsertManyOptions}, @@ -400,7 +400,7 @@ async fn retry_write_pool_cleared() { let mut client_options = get_client_options().await.clone(); client_options.retry_writes = Some(true); client_options.max_pool_size = Some(1); - client_options.cmap_event_handler = Some(handler.clone() as Arc); + client_options.cmap_event_handler = Some(handler.clone().into()); client_options.command_event_handler = Some(handler.clone().into()); // on sharded clusters, ensure only a single mongos is used if client_options.repl_set_name.is_none() { diff --git a/src/test/spec/sdam.rs b/src/test/spec/sdam.rs index d9d7052c6..4c9608cca 100644 --- a/src/test/spec/sdam.rs +++ b/src/test/spec/sdam.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use bson::{doc, Document}; use crate::{ + event::sdam::SdamEvent, hello::LEGACY_HELLO_COMMAND_NAME, runtime, test::{ @@ -14,7 +15,6 @@ use crate::{ FailCommandOptions, FailPoint, FailPointMode, - SdamEvent, TestClient, }, Client, @@ -57,7 +57,7 @@ async fn streaming_min_heartbeat_frequency() { let handler = Arc::new(EventHandler::new()); let mut options = get_client_options().await.clone(); options.heartbeat_freq = Some(Duration::from_millis(500)); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); let hosts = options.hosts.clone(); @@ -108,7 +108,7 @@ async fn heartbeat_frequency_is_respected() { let handler = Arc::new(EventHandler::new()); let mut options = get_client_options().await.clone(); options.heartbeat_freq = Some(Duration::from_millis(1000)); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); let hosts = options.hosts.clone(); @@ -174,7 +174,7 @@ async fn rtt_is_updated() { let mut options = get_client_options().await.clone(); options.heartbeat_freq = Some(Duration::from_millis(500)); options.app_name = Some(app_name.to_string()); - options.sdam_event_handler = Some(handler.clone()); + options.sdam_event_handler = Some(handler.clone().into()); options.hosts.drain(1..); options.direct_connection = Some(true); diff --git a/src/test/spec/unified_runner/matcher.rs b/src/test/spec/unified_runner/matcher.rs index 980f2154a..3bf7eee70 100644 --- a/src/test/spec/unified_runner/matcher.rs +++ b/src/test/spec/unified_runner/matcher.rs @@ -3,8 +3,12 @@ use std::fmt::Debug; use crate::{ bson::{doc, spec::ElementType, Bson, Document}, bson_util::get_int, - event::{cmap::CmapEvent, command::CommandEvent, sdam::ServerDescription}, - test::{Event, SdamEvent}, + event::{ + cmap::CmapEvent, + command::CommandEvent, + sdam::{SdamEvent, ServerDescription}, + }, + test::Event, }; use super::{ diff --git a/src/test/spec/unified_runner/test_event.rs b/src/test/spec/unified_runner/test_event.rs index 76e59531c..ab27f7c4e 100644 --- a/src/test/spec/unified_runner/test_event.rs +++ b/src/test/spec/unified_runner/test_event.rs @@ -3,8 +3,9 @@ use crate::{ event::{ cmap::{CmapEvent, ConnectionCheckoutFailedReason, ConnectionClosedReason}, command::CommandEvent, + sdam::SdamEvent, }, - test::{Event, SdamEvent}, + test::Event, ServerType, }; use serde::Deserialize; diff --git a/src/test/spec/unified_runner/test_runner.rs b/src/test/spec/unified_runner/test_runner.rs index 8b047c0b1..9c79edbb7 100644 --- a/src/test/spec/unified_runner/test_runner.rs +++ b/src/test/spec/unified_runner/test_runner.rs @@ -465,9 +465,9 @@ impl TestRunner { }); update_options_for_testing(&mut options); let handler = Arc::new(EventHandler::new()); - options.command_event_handler = Some(handler.clone().receive_command().into()); - options.cmap_event_handler = Some(handler.clone()); - options.sdam_event_handler = Some(handler.clone()); + options.command_event_handler = Some(handler.clone().command_sender().into()); + options.cmap_event_handler = Some(handler.clone().cmap_sender().into()); + options.sdam_event_handler = Some(handler.clone().sdam_sender().into()); options.server_api = server_api; diff --git a/src/test/util.rs b/src/test/util.rs index e7f642ce5..fe3a93fb8 100644 --- a/src/test/util.rs +++ b/src/test/util.rs @@ -6,7 +6,7 @@ mod subscriber; mod trace; pub(crate) use self::{ - event::{Event, EventClient, EventHandler, SdamEvent}, + event::{Event, EventClient, EventHandler}, failpoint::{FailCommandOptions, FailPoint, FailPointGuard, FailPointMode}, matchable::{assert_matches, eq_matches, is_expected_type, MatchErrExt, Matchable}, subscriber::EventSubscriber, @@ -142,8 +142,8 @@ impl TestClientBuilder { if let Some(handler) = self.handler { options.command_event_handler = Some(handler.clone().into()); - options.cmap_event_handler = Some(handler.clone()); - options.sdam_event_handler = Some(handler); + options.cmap_event_handler = Some(handler.clone().into()); + options.sdam_event_handler = Some(handler.clone().into()); } if let Some(freq) = self.min_heartbeat_freq { diff --git a/src/test/util/event.rs b/src/test/util/event.rs index d690ab4cb..5893306d9 100644 --- a/src/test/util/event.rs +++ b/src/test/util/event.rs @@ -7,7 +7,6 @@ use std::{ }; use derive_more::From; -use serde::Serialize; use time::OffsetDateTime; use tokio::sync::broadcast::error::SendError; @@ -17,7 +16,6 @@ use crate::{ event::{ cmap::{ CmapEvent, - CmapEventHandler, ConnectionCheckedInEvent, ConnectionCheckedOutEvent, ConnectionCheckoutFailedEvent, @@ -32,7 +30,7 @@ use crate::{ }, command::{CommandEvent, CommandFailedEvent, CommandStartedEvent, CommandSucceededEvent}, sdam::{ - SdamEventHandler, + SdamEvent, ServerClosedEvent, ServerDescriptionChangedEvent, ServerHeartbeatFailedEvent, @@ -93,20 +91,6 @@ impl Event { } } -#[derive(Clone, Debug, Serialize)] -#[serde(untagged)] -pub(crate) enum SdamEvent { - ServerDescriptionChanged(Box), - ServerOpening(ServerOpeningEvent), - ServerClosed(ServerClosedEvent), - TopologyDescriptionChanged(Box), - TopologyOpening(TopologyOpeningEvent), - TopologyClosed(TopologyClosedEvent), - ServerHeartbeatStarted(ServerHeartbeatStartedEvent), - ServerHeartbeatSucceeded(ServerHeartbeatSucceededEvent), - ServerHeartbeatFailed(ServerHeartbeatFailedEvent), -} - impl SdamEvent { pub fn name(&self) -> &str { match self { @@ -184,7 +168,7 @@ impl EventHandler { } } - pub(crate) fn receive_command(self: Arc) -> tokio::sync::mpsc::Sender { + pub(crate) fn command_sender(self: Arc) -> tokio::sync::mpsc::Sender { let (tx, mut rx) = tokio::sync::mpsc::channel::(100); crate::runtime::spawn(async move { while let Some(ev) = rx.recv().await { @@ -195,6 +179,37 @@ impl EventHandler { tx } + pub(crate) fn cmap_sender(self: Arc) -> tokio::sync::mpsc::Sender { + let (tx, mut rx) = tokio::sync::mpsc::channel::(100); + crate::runtime::spawn(async move { + while let Some(ev) = rx.recv().await { + match &ev { + CmapEvent::ConnectionCheckedOut(_) => { + *self.connections_checked_out.lock().unwrap() += 1 + } + CmapEvent::ConnectionCheckedIn(_) => { + *self.connections_checked_out.lock().unwrap() -= 1 + } + _ => (), + } + self.handle(ev.clone()); + add_event_to_queue(&self.cmap_events, ev); + } + }); + tx + } + + pub(crate) fn sdam_sender(self: Arc) -> tokio::sync::mpsc::Sender { + let (tx, mut rx) = tokio::sync::mpsc::channel::(100); + crate::runtime::spawn(async move { + while let Some(ev) = rx.recv().await { + self.handle(ev.clone()); + add_event_to_queue(&self.sdam_events, ev); + } + }); + tx + } + fn handle(&self, event: impl Into) { // this only errors if no receivers are listening which isn't a concern here. let _: std::result::Result> = @@ -327,7 +342,8 @@ impl EventHandler { } } -impl CmapEventHandler for EventHandler { +#[allow(deprecated)] +impl crate::event::cmap::CmapEventHandler for EventHandler { fn handle_connection_checked_out_event(&self, event: ConnectionCheckedOutEvent) { *self.connections_checked_out.lock().unwrap() += 1; let event = CmapEvent::ConnectionCheckedOut(event); @@ -397,7 +413,8 @@ impl CmapEventHandler for EventHandler { } } -impl SdamEventHandler for EventHandler { +#[allow(deprecated)] +impl crate::event::sdam::SdamEventHandler for EventHandler { fn handle_server_description_changed_event(&self, event: ServerDescriptionChangedEvent) { let event = SdamEvent::ServerDescriptionChanged(Box::new(event)); self.handle(event.clone()); diff --git a/src/trace/connection.rs b/src/trace/connection.rs index 6c9226eb0..27734a2ba 100644 --- a/src/trace/connection.rs +++ b/src/trace/connection.rs @@ -1,22 +1,7 @@ use bson::oid::ObjectId; use crate::{ - event::cmap::{ - CmapEventHandler, - ConnectionCheckedInEvent, - ConnectionCheckedOutEvent, - ConnectionCheckoutFailedEvent, - ConnectionCheckoutFailedReason, - ConnectionCheckoutStartedEvent, - ConnectionClosedEvent, - ConnectionClosedReason, - ConnectionCreatedEvent, - ConnectionReadyEvent, - PoolClearedEvent, - PoolClosedEvent, - PoolCreatedEvent, - PoolReadyEvent, - }, + event::cmap::{CmapEvent, ConnectionCheckoutFailedReason, ConnectionClosedReason}, trace::{TracingRepresentation, CONNECTION_TRACING_EVENT_TARGET}, }; @@ -29,134 +14,127 @@ impl ConnectionTracingEventEmitter { pub(crate) fn new(topology_id: ObjectId) -> ConnectionTracingEventEmitter { Self { topology_id } } -} - -impl CmapEventHandler for ConnectionTracingEventEmitter { - fn handle_pool_created_event(&self, event: PoolCreatedEvent) { - let options_ref = event.options.as_ref(); - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - maxIdleTimeMS = options_ref.and_then(|o| o.max_idle_time.map(|m| m.as_millis())), - maxPoolSize = options_ref.and_then(|o| o.max_pool_size), - minPoolSize = options_ref.and_then(|o| o.min_pool_size), - "Connection pool created", - ); - } - - fn handle_pool_ready_event(&self, event: PoolReadyEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - "Connection pool ready", - ); - } - - fn handle_pool_cleared_event(&self, event: PoolClearedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - serviceId = event.service_id.map(|id| id.tracing_representation()), - "Connection pool cleared", - ); - } - - fn handle_pool_closed_event(&self, event: PoolClosedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - "Connection pool closed", - ); - } - - fn handle_connection_created_event(&self, event: ConnectionCreatedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - driverConnectionId = event.connection_id, - "Connection created", - ); - } - - fn handle_connection_ready_event(&self, event: ConnectionReadyEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - driverConnectionId = event.connection_id, - durationMS = event.duration.as_millis(), - "Connection ready", - ); - } - - fn handle_connection_closed_event(&self, event: ConnectionClosedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - driverConnectionId = event.connection_id, - reason = event.reason.tracing_representation(), - error = event.error.map(|e| e.tracing_representation()), - "Connection closed", - ); - } - - fn handle_connection_checkout_started_event(&self, event: ConnectionCheckoutStartedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - "Connection checkout started", - ); - } - fn handle_connection_checkout_failed_event(&self, event: ConnectionCheckoutFailedEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - reason = event.reason.tracing_representation(), - error = event.error.map(|e| e.tracing_representation()), - durationMS = event.duration.as_millis(), - "Connection checkout failed", - ); - } - - fn handle_connection_checked_out_event(&self, event: ConnectionCheckedOutEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - driverConnectionId = event.connection_id, - durationMS = event.duration.as_millis(), - "Connection checked out", - ); - } - - fn handle_connection_checked_in_event(&self, event: ConnectionCheckedInEvent) { - tracing::debug!( - target: CONNECTION_TRACING_EVENT_TARGET, - topologyId = self.topology_id.tracing_representation(), - serverHost = event.address.host().as_ref(), - serverPort = event.address.port_tracing_representation(), - driverConnectionId = event.connection_id, - "Connection checked in", - ); + pub(crate) fn handle(&self, event: CmapEvent) { + use CmapEvent::*; + match event { + PoolCreated(event) => { + let options_ref = event.options.as_ref(); + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + maxIdleTimeMS = options_ref.and_then(|o| o.max_idle_time.map(|m| m.as_millis())), + maxPoolSize = options_ref.and_then(|o| o.max_pool_size), + minPoolSize = options_ref.and_then(|o| o.min_pool_size), + "Connection pool created", + ); + } + PoolReady(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + "Connection pool ready", + ); + } + PoolCleared(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + serviceId = event.service_id.map(|id| id.tracing_representation()), + "Connection pool cleared", + ); + } + PoolClosed(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + "Connection pool closed", + ); + } + ConnectionCreated(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection created", + ); + } + ConnectionReady(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + durationMS = event.duration.as_millis(), + "Connection ready", + ); + } + ConnectionClosed(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + reason = event.reason.tracing_representation(), + error = event.error.map(|e| e.tracing_representation()), + "Connection closed", + ); + } + ConnectionCheckoutStarted(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + "Connection checkout started", + ); + } + ConnectionCheckoutFailed(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + reason = event.reason.tracing_representation(), + error = event.error.map(|e| e.tracing_representation()), + durationMS = event.duration.as_millis(), + "Connection checkout failed", + ); + } + ConnectionCheckedOut(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + durationMS = event.duration.as_millis(), + "Connection checked out", + ); + } + ConnectionCheckedIn(event) => { + tracing::debug!( + target: CONNECTION_TRACING_EVENT_TARGET, + topologyId = self.topology_id.tracing_representation(), + serverHost = event.address.host().as_ref(), + serverPort = event.address.port_tracing_representation(), + driverConnectionId = event.connection_id, + "Connection checked in", + ); + } + } } } diff --git a/src/trace/topology.rs b/src/trace/topology.rs index 0d2486702..cd3b71161 100644 --- a/src/trace/topology.rs +++ b/src/trace/topology.rs @@ -2,7 +2,7 @@ use bson::oid::ObjectId; use crate::{ event::sdam::{ - SdamEventHandler, + SdamEvent, ServerClosedEvent, ServerDescriptionChangedEvent, ServerHeartbeatFailedEvent, @@ -51,7 +51,22 @@ impl TopologyTracingEventEmitter { } } -impl SdamEventHandler for TopologyTracingEventEmitter { +impl TopologyTracingEventEmitter { + pub(crate) fn handle(&self, event: SdamEvent) { + use SdamEvent::*; + match event { + ServerDescriptionChanged(ev) => self.handle_server_description_changed_event(*ev), + ServerOpening(ev) => self.handle_server_opening_event(ev), + ServerClosed(ev) => self.handle_server_closed_event(ev), + TopologyDescriptionChanged(ev) => self.handle_topology_description_changed_event(*ev), + TopologyOpening(ev) => self.handle_topology_opening_event(ev), + TopologyClosed(ev) => self.handle_topology_closed_event(ev), + ServerHeartbeatStarted(ev) => self.handle_server_heartbeat_started_event(ev), + ServerHeartbeatSucceeded(ev) => self.handle_server_heartbeat_succeeded_event(ev), + ServerHeartbeatFailed(ev) => self.handle_server_heartbeat_failed_event(ev), + } + } + fn handle_server_description_changed_event(&self, _event: ServerDescriptionChangedEvent) { // this is tentatively a no-op based on my proposal to not do separate "topology changed" // and "server changed" log messages due to the redundancy, but that could change