Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1505 Convert remaining event handler types to new API #1012

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use std::{
hash::{Hash, Hasher},
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};

Expand All @@ -32,7 +31,7 @@ use crate::{
compression::Compressor,
concern::{Acknowledgment, ReadConcern, WriteConcern},
error::{Error, ErrorKind, Result},
event::{cmap::CmapEventHandler, sdam::SdamEventHandler},
event::EventHandler,
options::ReadConcernLevel,
sdam::{verify_max_staleness, DEFAULT_HEARTBEAT_FREQUENCY, MIN_HEARTBEAT_FREQUENCY},
selection_criteria::{ReadPreference, SelectionCriteria, TagSet},
Expand Down Expand Up @@ -404,22 +403,19 @@ pub struct ClientOptions {
#[serde(skip)]
pub compressors: Option<Vec<Compressor>>,

/// The handler that should process all Connection Monitoring and Pooling events. See the
/// CmapEventHandler type documentation for more details.
/// The handler that should process all Connection Monitoring and Pooling events.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default)]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub cmap_event_handler: Option<Arc<dyn CmapEventHandler>>,
pub cmap_event_handler: Option<EventHandler<crate::event::cmap::CmapEvent>>,

/// The handler that should process all command-related events. See the CommandEventHandler
/// type documentation for more details.
/// The handler that should process all command-related events.
///
/// Note that monitoring command events may incur a performance penalty.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub command_event_handler:
Option<crate::event::EventHandler<crate::event::command::CommandEvent>>,
pub command_event_handler: Option<EventHandler<crate::event::command::CommandEvent>>,

/// The connect timeout passed to each underlying TcpStream when attemtping to connect to the
/// server.
Expand Down Expand Up @@ -520,12 +516,11 @@ pub struct ClientOptions {
#[builder(default)]
pub retry_writes: Option<bool>,

/// The handler that should process all Server Discovery and Monitoring events. See the
/// [`SdamEventHandler`] type documentation for more details.
/// The handler that should process all Server Discovery and Monitoring events.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[builder(default)]
#[builder(default, setter(strip_option))]
#[serde(skip)]
pub sdam_event_handler: Option<Arc<dyn SdamEventHandler>>,
pub sdam_event_handler: Option<EventHandler<crate::event::sdam::SdamEvent>>,

/// The default selection criteria for operations performed on the Client. See the
/// SelectionCriteria type documentation for more details.
Expand Down
13 changes: 3 additions & 10 deletions src/client/session/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,12 @@ use crate::{
bson::{doc, Bson},
coll::options::{CountOptions, InsertManyOptions},
error::Result,
event::sdam::SdamEvent,
options::{Acknowledgment, FindOptions, ReadConcern, ReadPreference, WriteConcern},
runtime,
sdam::ServerInfo,
selection_criteria::SelectionCriteria,
test::{
get_client_options,
log_uncaptured,
Event,
EventClient,
EventHandler,
SdamEvent,
TestClient,
},
test::{get_client_options, log_uncaptured, Event, EventClient, EventHandler, TestClient},
Client,
Collection,
};
Expand Down Expand Up @@ -306,7 +299,7 @@ async fn cluster_time_in_commands() {
let mut options = get_client_options().await.clone();
options.heartbeat_freq = Some(Duration::from_secs(1000));
options.command_event_handler = Some(handler.clone().into());
options.sdam_event_handler = Some(handler.clone());
options.sdam_event_handler = Some(handler.clone().into());

// Ensure we only connect to one server so the monitor checks from other servers
// don't affect the TopologyDescription's clusterTime value between commands.
Expand Down
9 changes: 6 additions & 3 deletions src/cmap/options.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(test)]
use std::cmp::Ordering;
use std::{sync::Arc, time::Duration};
use std::time::Duration;

use derivative::Derivative;
#[cfg(test)]
Expand All @@ -9,7 +9,10 @@ use serde::Deserialize;

use crate::{
client::auth::Credential,
event::cmap::{CmapEventHandler, ConnectionPoolOptions as EventOptions},
event::{
cmap::{CmapEvent, ConnectionPoolOptions as EventOptions},
EventHandler,
},
options::ClientOptions,
serde_util,
};
Expand All @@ -26,7 +29,7 @@ pub(crate) struct ConnectionPoolOptions {
/// Processes all events generated by the pool.
#[derivative(Debug = "ignore", PartialEq = "ignore")]
#[serde(skip)]
pub(crate) cmap_event_handler: Option<Arc<dyn CmapEventHandler>>,
pub(crate) cmap_event_handler: Option<EventHandler<CmapEvent>>,

/// Interval between background thread maintenance runs (e.g. ensure minPoolSize).
#[cfg(test)]
Expand Down
8 changes: 4 additions & 4 deletions src/cmap/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{collections::HashMap, ops::Deref, sync::Arc, time::Duration};
use tokio::sync::{Mutex, RwLock};

use self::{
event::EventHandler,
event::TestEventHandler,
file::{Operation, TestFile, ThreadedOperation},
};

Expand Down Expand Up @@ -70,7 +70,7 @@ struct Executor {

#[derive(Debug)]
struct State {
handler: Arc<EventHandler>,
handler: Arc<TestEventHandler>,
connections: RwLock<HashMap<String, Connection>>,
unlabeled_connections: Mutex<Vec<Connection>>,
threads: RwLock<HashMap<String, CmapThread>>,
Expand Down Expand Up @@ -126,11 +126,11 @@ impl CmapThread {

impl Executor {
async fn new(test_file: TestFile) -> Self {
let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let error = test_file.error;

let mut pool_options = test_file.pool_options.unwrap_or_default();
pool_options.cmap_event_handler = Some(handler.clone());
pool_options.cmap_event_handler = Some(handler.clone().into());

let state = State {
handler,
Expand Down
9 changes: 5 additions & 4 deletions src/cmap/test/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use crate::{event::cmap::*, options::ServerAddress, test::util::EventSubscriber}
use tokio::sync::broadcast::error::SendError;

#[derive(Clone, Debug)]
pub struct EventHandler {
pub struct TestEventHandler {
pub(crate) events: Arc<RwLock<Vec<CmapEvent>>>,
channel_sender: tokio::sync::broadcast::Sender<CmapEvent>,
}

impl EventHandler {
impl TestEventHandler {
pub fn new() -> Self {
let (channel_sender, _) = tokio::sync::broadcast::channel(500);
Self {
Expand All @@ -31,12 +31,13 @@ impl EventHandler {
self.events.write().unwrap().push(event);
}

pub(crate) fn subscribe(&self) -> EventSubscriber<'_, EventHandler, CmapEvent> {
pub(crate) fn subscribe(&self) -> EventSubscriber<'_, TestEventHandler, CmapEvent> {
EventSubscriber::new(self, self.channel_sender.subscribe())
}
}

impl CmapEventHandler for EventHandler {
#[allow(deprecated)]
impl CmapEventHandler for TestEventHandler {
fn handle_pool_created_event(&self, event: PoolCreatedEvent) {
self.handle(event);
}
Expand Down
18 changes: 8 additions & 10 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde::Deserialize;

use super::{event::EventHandler, EVENT_TIMEOUT};
use super::{event::TestEventHandler, EVENT_TIMEOUT};
use crate::{
bson::{doc, Document},
cmap::{
Expand All @@ -9,7 +9,7 @@ use crate::{
Command,
ConnectionPool,
},
event::cmap::{CmapEvent, CmapEventHandler, ConnectionClosedReason},
event::cmap::{CmapEvent, ConnectionClosedReason},
hello::LEGACY_HELLO_COMMAND_NAME,
operation::CommandResponse,
runtime,
Expand Down Expand Up @@ -114,11 +114,10 @@ async fn concurrent_connections() {
.await
.expect("failpoint should succeed");

let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let client_options = get_client_options().await.clone();
let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
options.cmap_event_handler = Some(handler.clone().into());
options.ready = Some(true);

let pool = ConnectionPool::new(
Expand Down Expand Up @@ -203,13 +202,12 @@ async fn connection_error_during_establishment() {
);
let _fp_guard = client.enable_failpoint(failpoint, None).await.unwrap();

let handler = Arc::new(EventHandler::new());
let handler = Arc::new(TestEventHandler::new());
let mut subscriber = handler.subscribe();

let mut options = ConnectionPoolOptions::from_client_options(&client_options);
options.ready = Some(true);
options.cmap_event_handler =
Some(handler.clone() as Arc<dyn crate::event::cmap::CmapEventHandler>);
options.cmap_event_handler = Some(handler.clone().into());
let pool = ConnectionPool::new(
client_options.hosts[0].clone(),
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&client_options))
Expand Down Expand Up @@ -238,8 +236,8 @@ async fn connection_error_during_establishment() {

async fn connection_error_during_operation() {
let mut options = get_client_options().await.clone();
let handler = Arc::new(EventHandler::new());
options.cmap_event_handler = Some(handler.clone() as Arc<dyn CmapEventHandler>);
let handler = Arc::new(TestEventHandler::new());
options.cmap_event_handler = Some(handler.clone().into());
options.hosts.drain(1..);
options.max_pool_size = Some(1);

Expand Down
40 changes: 40 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use futures_core::future::BoxFuture;

use crate::event::command::CommandEvent;

use self::{cmap::CmapEvent, sdam::SdamEvent};

/// A destination for events. Allows implicit conversion via [`From`] for concrete types for
/// convenience with [`crate::options::ClientOptions`] construction:
///
Expand Down Expand Up @@ -75,6 +77,44 @@ impl<T: crate::event::command::CommandEventHandler + 'static> From<Arc<T>>
}
}

#[allow(deprecated)]
impl<T: crate::event::cmap::CmapEventHandler + 'static> From<Arc<T>> for EventHandler<CmapEvent> {
fn from(value: Arc<T>) -> Self {
use CmapEvent::*;
Self::callback(move |ev| match ev {
PoolCreated(ev) => value.handle_pool_created_event(ev),
PoolReady(ev) => value.handle_pool_ready_event(ev),
PoolCleared(ev) => value.handle_pool_cleared_event(ev),
PoolClosed(ev) => value.handle_pool_closed_event(ev),
ConnectionCreated(ev) => value.handle_connection_created_event(ev),
ConnectionReady(ev) => value.handle_connection_ready_event(ev),
ConnectionClosed(ev) => value.handle_connection_closed_event(ev),
ConnectionCheckoutStarted(ev) => value.handle_connection_checkout_started_event(ev),
ConnectionCheckoutFailed(ev) => value.handle_connection_checkout_failed_event(ev),
ConnectionCheckedOut(ev) => value.handle_connection_checked_out_event(ev),
ConnectionCheckedIn(ev) => value.handle_connection_checked_in_event(ev),
})
}
}

#[allow(deprecated)]
impl<T: crate::event::sdam::SdamEventHandler + 'static> From<Arc<T>> for EventHandler<SdamEvent> {
fn from(value: Arc<T>) -> Self {
use SdamEvent::*;
Self::callback(move |ev| match ev {
ServerDescriptionChanged(ev) => value.handle_server_description_changed_event(*ev),
ServerOpening(ev) => value.handle_server_opening_event(ev),
ServerClosed(ev) => value.handle_server_closed_event(ev),
TopologyDescriptionChanged(ev) => value.handle_topology_description_changed_event(*ev),
TopologyOpening(ev) => value.handle_topology_opening_event(ev),
TopologyClosed(ev) => value.handle_topology_closed_event(ev),
ServerHeartbeatStarted(ev) => value.handle_server_heartbeat_started_event(ev),
ServerHeartbeatSucceeded(ev) => value.handle_server_heartbeat_succeeded_event(ev),
ServerHeartbeatFailed(ev) => value.handle_server_heartbeat_failed_event(ev),
})
}
}

impl<T: Send + Sync + 'static> EventHandler<T> {
/// Construct a new event handler with a callback.
pub fn callback(f: impl Fn(T) + Send + Sync + 'static) -> Self {
Expand Down
Loading