Skip to content

Commit

Permalink
RUST-1505 Convert remaining event handler types to new API (#1012)
Browse files Browse the repository at this point in the history
  • Loading branch information
abr-egn authored Jan 23, 2024
1 parent 7a6091c commit c070269
Show file tree
Hide file tree
Showing 27 changed files with 321 additions and 303 deletions.
23 changes: 9 additions & 14 deletions src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{
hash::{Hash, Hasher},
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};

Expand All @@ -32,7 +31,7 @@ use crate::{
compression::Compressor,
concern::{Acknowledgment, ReadConcern, WriteConcern},
error::{Error, ErrorKind, Result},
event::{cmap::CmapEventHandler, sdam::SdamEventHandler},
event::EventHandler,
options::ReadConcernLevel,
sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
Expand Down Expand Up @@ -404,22 +403,19 @@ pub struct ClientOptions {
#[serde(skip)]
pub compressors: Option<Vec<Compressor>>,

/// The handler that should process all Connection Monitoring and Pooling events. See the
/// CmapEventHandler type documentation for more details.
/// The handler that should process all Connection Monitoring and Pooling events.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default)]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub cmap_event_handler: Option<Arc<dyn CmapEventHandler>>,
pub cmap_event_handler: Option<EventHandler<crate::event::cmap::CmapEvent>>,

/// The handler that should process all command-related events. See the CommandEventHandler
/// type documentation for more details.
/// The handler that should process all command-related events.
///
/// Note that monitoring command events may incur a performance penalty.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub command_event_handler:
Option<crate::event::EventHandler<crate::event::command::CommandEvent>>,
pub command_event_handler: Option<EventHandler<crate::event::command::CommandEvent>>,

/// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
/// server.
Expand Down Expand Up @@ -520,12 +516,11 @@ pub struct ClientOptions {
#[builder(default)]
pub retry_writes: Option<bool>,

/// The handler that should process all Server Discovery and Monitoring events. See the
/// [`SdamEventHandler`] type documentation for more details.
/// The handler that should process all Server Discovery and Monitoring events.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default)]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub sdam_event_handler: Option<Arc<dyn SdamEventHandler>>,
pub sdam_event_handler: Option<EventHandler<crate::event::sdam::SdamEvent>>,

/// The default selection criteria for operations performed on the Client. See the
/// SelectionCriteria type documentation for more details.
Expand Down
13 changes: 3 additions & 10 deletions src/client/session/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,12 @@ use crate::{
bson::{doc, Bson},
coll::options::{CountOptions, InsertManyOptions},
error::Result,
event::sdam::SdamEvent,
options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern},
runtime,
sdam::ServerInfo,
selection_criteria::SelectionCriteria,
test::{
get_client_options,
log_uncaptured,
Event,
EventClient,
EventHandler,
SdamEvent,
TestClient,
},
test::{get_client_options, log_uncaptured, Event, EventClient, EventHandler, TestClient},
Client,
Collection,
};
Expand Down Expand Up @@ -306,7 +299,7 @@ async fn cluster_time_in_commands() {
let mut options = get_client_options().await.clone();
options.heartbeat_freq = Some(Duration::from_secs(1000));
options.command_event_handler = Some(handler.clone().into());
options.sdam_event_handler = Some(handler.clone());
options.sdam_event_handler = Some(handler.clone().into());

// Ensure we only connect to one server so the monitor checks from other servers
// don't affect the TopologyDescription's clusterTime value between commands.
Expand Down
9 changes: 6 additions & 3 deletions src/cmap/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(test)]
use std::cmp::Ordering;
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use derivative::Derivative;
#[cfg(test)]
Expand All @@ -9,7 +9,10 @@ use serde::Deserialize;

use crate::{
client::auth::Credential,
event::cmap::{CmapEventHandler, ConnectionPoolOptions as EventOptions},
event::{
cmap::{CmapEvent, ConnectionPoolOptions as EventOptions},
EventHandler,
},
options::ClientOptions,
serde_util,
};
Expand All @@ -26,7 +29,7 @@ pub(crate) struct ConnectionPoolOptions {
/// Processes all events generated by the pool.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[serde(skip)]
pub(crate) cmap_event_handler: Option<Arc<dyn CmapEventHandler>>,
pub(crate) cmap_event_handler: Option<EventHandler<CmapEvent>>,

/// Interval between background thread maintenance runs (e.g. ensure minPoolSize).
#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use tokio::sync::{Mutex, RwLock};

use self::{
event::EventHandler,
event::TestEventHandler,
file::{Operation, TestFile, ThreadedOperation},
};

Expand Down Expand Up @@ -70,7 +70,7 @@ struct Executor {

#[derive(Debug)]
struct State {
handler: Arc<EventHandler>,
handler: Arc<TestEventHandler>,
connections: RwLock<HashMap<String, Connection>>,
unlabeled_connections: Mutex<Vec<Connection>>,
threads: RwLock<HashMap<String, CmapThread>>,
Expand Down Expand Up @@ -126,11 +126,11 @@ impl CmapThread {

impl Executor {
async fn new(test_file: TestFile) -> Self {
let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let error = test_file.error;

let mut pool_options = test_file.pool_options.unwrap_or_default();
pool_options.cmap_event_handler = Some(handler.clone());
pool_options.cmap_event_handler = Some(handler.clone().into());

let state = State {
handler,
Expand Down
9 changes: 5 additions & 4 deletions src/cmap/test/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use crate::{event::cmap::*, options::ServerAddress, test::util::EventSubscriber}
use tokio::sync::broadcast::error::SendError;

#[derive(Clone, Debug)]
pub struct EventHandler {
pub struct TestEventHandler {
pub(crate) events: Arc<RwLock<Vec<CmapEvent>>>,
channel_sender: tokio::sync::broadcast::Sender<CmapEvent>,
}

impl EventHandler {
impl TestEventHandler {
pub fn new() -> Self {
let (channel_sender, _) = tokio::sync::broadcast::channel(500);
Self {
Expand All @@ -31,12 +31,13 @@ impl EventHandler {
self.events.write().unwrap().push(event);
}

pub(crate) fn subscribe(&self) -> EventSubscriber<'_, EventHandler, CmapEvent> {
pub(crate) fn subscribe(&self) -> EventSubscriber<'_, TestEventHandler, CmapEvent> {
EventSubscriber::new(self, self.channel_sender.subscribe())
}
}

impl CmapEventHandler for EventHandler {
#[allow(deprecated)]
impl CmapEventHandler for TestEventHandler {
fn handle_pool_created_event(&self, event: PoolCreatedEvent) {
self.handle(event);
}
Expand Down
18 changes: 8 additions & 10 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::Deserialize;

use super::{event::EventHandler, EVENT_TIMEOUT};
use super::{event::TestEventHandler, EVENT_TIMEOUT};
use crate::{
bson::{doc, Document},
cmap::{
Expand All @@ -9,7 +9,7 @@ use crate::{
Command,
ConnectionPool,
},
event::cmap::{CmapEvent, CmapEventHandler, ConnectionClosedReason},
event::cmap::{CmapEvent, ConnectionClosedReason},
hello::LEGACY_HELLO_COMMAND_NAME,
operation::CommandResponse,
runtime,
Expand Down Expand Up @@ -114,11 +114,10 @@ async fn concurrent_connections() {
.await
.expect("failpoint should succeed");

let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let client_options = get_client_options().await.clone();
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
options.cmap_event_handler = Some(handler.clone().into());
options.ready = Some(true);

let pool = ConnectionPool::new(
Expand Down Expand Up @@ -203,13 +202,12 @@ async fn connection_error_during_establishment() {
);
let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap();

let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let mut subscriber = handler.subscribe();

let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.ready = Some(true);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
options.cmap_event_handler = Some(handler.clone().into());
let pool = ConnectionPool::new(
client_options.hosts[0].clone(),
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&client_options))
Expand Down Expand Up @@ -238,8 +236,8 @@ async fn connection_error_during_establishment() {

async fn connection_error_during_operation() {
let mut options = get_client_options().await.clone();
let handler = Arc::new(EventHandler::new());
options.cmap_event_handler = Some(handler.clone() as Arc<dyn CmapEventHandler>);
let handler = Arc::new(TestEventHandler::new());
options.cmap_event_handler = Some(handler.clone().into());
options.hosts.drain(1..);
options.max_pool_size = Some(1);

Expand Down
40 changes: 40 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use futures_core::future::BoxFuture;

use crate::event::command::CommandEvent;

use self::{cmap::CmapEvent, sdam::SdamEvent};

/// A destination for events. Allows implicit conversion via [`From`] for concrete types for
/// convenience with [`crate::options::ClientOptions`] construction:
///
Expand Down Expand Up @@ -75,6 +77,44 @@ impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
}
}

#[allow(deprecated)]
impl<T: crate::event::cmap::CmapEventHandler + 'static> From<Arc<T>> for EventHandler<CmapEvent> {
fn from(value: Arc<T>) -> Self {
use CmapEvent::*;
Self::callback(move |ev| match ev {
PoolCreated(ev) => value.handle_pool_created_event(ev),
PoolReady(ev) => value.handle_pool_ready_event(ev),
PoolCleared(ev) => value.handle_pool_cleared_event(ev),
PoolClosed(ev) => value.handle_pool_closed_event(ev),
ConnectionCreated(ev) => value.handle_connection_created_event(ev),
ConnectionReady(ev) => value.handle_connection_ready_event(ev),
ConnectionClosed(ev) => value.handle_connection_closed_event(ev),
ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev),
ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev),
ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev),
ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev),
})
}
}

#[allow(deprecated)]
impl<T: crate::event::sdam::SdamEventHandler + 'static> From<Arc<T>> for EventHandler<SdamEvent> {
fn from(value: Arc<T>) -> Self {
use SdamEvent::*;
Self::callback(move |ev| match ev {
ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev),
ServerOpening(ev) => value.handle_server_opening_event(ev),
ServerClosed(ev) => value.handle_server_closed_event(ev),
TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev),
TopologyOpening(ev) => value.handle_topology_opening_event(ev),
TopologyClosed(ev) => value.handle_topology_closed_event(ev),
ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev),
ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev),
ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev),
})
}
}

impl<T: Send + Sync + 'static> EventHandler<T> {
/// Construct a new event handler with a callback.
pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
Expand Down
Loading

0 comments on commit c070269

Please sign in to comment.