diff --git a/manual/src/encryption.md b/manual/src/encryption.md index 9f5a16e8d..6b50b3a6a 100644 --- a/manual/src/encryption.md +++ b/manual/src/encryption.md @@ -524,7 +524,7 @@ async fn main() -> Result<()> { .build() .await?; let db = encrypted_client.database("test"); - db.drop(None).await?; + db.drop().await?; // Create the collection with encrypted fields. db.create_collection( diff --git a/src/action.rs b/src/action.rs new file mode 100644 index 000000000..5a47a31b1 --- /dev/null +++ b/src/action.rs @@ -0,0 +1,140 @@ +//! Action builder types. + +mod drop; +mod list_databases; +mod perf; +mod session; +mod shutdown; +mod watch; + +pub use drop::{DropDatabase, DropDatabaseFuture}; +pub use list_databases::{ListDatabases, ListSpecificationsFuture}; +pub use perf::{WarmConnectionPool, WarmConnectionPoolFuture}; +pub use session::{StartSession, StartSessionFuture}; +pub use shutdown::{Shutdown, ShutdownFuture}; +pub use watch::{Watch, WatchExplicitSessionFuture, WatchImplicitSessionFuture}; + +macro_rules! option_setters { + ( + $opt_field:ident: $opt_field_ty:ty; + $( + $(#[$($attrss:tt)*])* + $opt_name:ident: $opt_ty:ty, + )+ + ) => { + fn options(&mut self) -> &mut $opt_field_ty { + self.$opt_field.get_or_insert_with(<$opt_field_ty>::default) + } + + #[cfg(test)] + #[allow(dead_code)] + pub(crate) fn with_options(mut self, value: impl Into>) -> Self { + self.options = value.into(); + self + } + + $( + $(#[$($attrss)*])* + pub fn $opt_name(mut self, value: $opt_ty) -> Self { + self.options().$opt_name = Some(value); + self + } + )+ + }; +} +use option_setters; + +/// Generates: +/// * an `IntoFuture` executing the given method body +/// * an opaque wrapper type for the future in case we want to do something more fancy than +/// BoxFuture. +/// * a `run` method for sync execution, optionally with a wrapper function +macro_rules! action_impl { + // Generate with no sync type conversion + ( + impl Action$(<$lt:lifetime>)? for $action:ty { + type Future = $f_ty:ident; + async fn execute($($args:ident)+) -> $out:ty $code:block + } + ) => { + crate::action::action_impl_inner! { + $action => $f_ty; + $($lt)?; + async fn($($args)+) -> $out $code + } + + #[cfg(any(feature = "sync", feature = "tokio-sync"))] + impl$(<$lt>)? $action { + /// Synchronously execute this action. + pub fn run(self) -> $out { + crate::runtime::block_on(std::future::IntoFuture::into_future(self)) + } + } + }; + // Generate with a sync type conversion + ( + impl Action$(<$lt:lifetime>)? for $action:ty { + type Future = $f_ty:ident; + async fn execute($($args:ident)+) -> $out:ty $code:block + fn sync_wrap($($wrap_args:ident)+) -> $sync_out:ty $wrap_code:block + } + ) => { + crate::action::action_impl_inner! { + $action => $f_ty; + $($lt)?; + async fn($($args)+) -> $out $code + } + + #[cfg(any(feature = "sync", feature = "tokio-sync"))] + impl$(<$lt>)? $action { + /// Synchronously execute this action. + pub fn run(self) -> $sync_out { + let $($wrap_args)+ = crate::runtime::block_on(std::future::IntoFuture::into_future(self)); + return $wrap_code + } + } + } +} +pub(crate) use action_impl; + +macro_rules! action_impl_inner { + ( + $action:ty => $f_ty:ident; + $($lt:lifetime)?; + async fn($($args:ident)+) -> $out:ty $code:block + ) => { + impl$(<$lt>)? std::future::IntoFuture for $action { + type Output = $out; + type IntoFuture = $f_ty$(<$lt>)?; + + fn into_future($($args)+) -> Self::IntoFuture { + $f_ty(Box::pin(async move { + $code + })) + } + } + + crate::action::action_impl_future_wrapper!($($lt)?, $f_ty, $out); + + impl$(<$lt>)? std::future::Future for $f_ty$(<$lt>)? { + type Output = $out; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { + self.0.as_mut().poll(cx) + } + } + } +} +pub(crate) use action_impl_inner; + +macro_rules! action_impl_future_wrapper { + (, $f_ty:ident, $out:ty) => { + /// Opaque future type for action execution. + pub struct $f_ty(crate::BoxFuture<'static, $out>); + }; + ($lt:lifetime, $f_ty:ident, $out:ty) => { + /// Opaque future type for action execution. + pub struct $f_ty<$lt>(crate::BoxFuture<$lt, $out>); + }; +} +pub(crate) use action_impl_future_wrapper; diff --git a/src/action/drop.rs b/src/action/drop.rs new file mode 100644 index 000000000..85a32cea3 --- /dev/null +++ b/src/action/drop.rs @@ -0,0 +1,69 @@ +use crate::{ + error::Result, + operation::drop_database as op, + options::WriteConcern, + ClientSession, + Database, +}; + +use super::{action_impl, option_setters}; + +impl Database { + /// Drops the database, deleting all data, collections, and indexes stored in it. + /// + /// `await` will return `Result<()>`. + pub fn drop(&self) -> DropDatabase { + DropDatabase { + db: self, + options: None, + session: None, + } + } +} + +#[cfg(any(feature = "sync", feature = "tokio-sync"))] +impl crate::sync::Database { + /// Drops the database, deleting all data, collections, and indexes stored in it. + /// + /// [`run`](DropDatabase::run) will return `Result<()>`. + pub fn drop(&self) -> DropDatabase { + self.async_database.drop() + } +} + +/// Drops the database, deleting all data, collections, and indexes stored in it. Create by calling +/// [`Database::drop`] and execute with `await` (or [`run`](DropDatabase::run) if using the sync +/// client). +#[must_use] +pub struct DropDatabase<'a> { + db: &'a Database, + options: Option, + session: Option<&'a mut ClientSession>, +} + +impl<'a> DropDatabase<'a> { + option_setters!(options: op::DropDatabaseOptions; + /// The write concern for the operation. + write_concern: WriteConcern, + ); + + /// Runs the drop using the provided session. + pub fn session(mut self, value: impl Into<&'a mut ClientSession>) -> Self { + self.session = Some(value.into()); + self + } +} + +action_impl! { + impl Action<'a> for DropDatabase<'a> { + type Future = DropDatabaseFuture; + + async fn execute(mut self) -> Result<()> { + resolve_options!(self.db, self.options, [write_concern]); + let op = op::DropDatabase::new(self.db.name().to_string(), self.options); + self.db.client() + .execute_operation(op, self.session) + .await + } + } +} diff --git a/src/client/action/list_databases.rs b/src/action/list_databases.rs similarity index 74% rename from src/client/action/list_databases.rs rename to src/action/list_databases.rs index 7430622d7..752da8ea9 100644 --- a/src/client/action/list_databases.rs +++ b/src/action/list_databases.rs @@ -1,12 +1,10 @@ -use std::{future::IntoFuture, marker::PhantomData}; +use std::marker::PhantomData; use bson::{Bson, Document}; -use futures_util::FutureExt; #[cfg(any(feature = "sync", feature = "tokio-sync"))] use crate::sync::Client as SyncClient; use crate::{ - client::BoxFuture, error::{ErrorKind, Result}, operation::list_databases as op, results::DatabaseSpecification, @@ -14,7 +12,7 @@ use crate::{ ClientSession, }; -use super::option_setters; +use super::{action_impl, option_setters}; impl Client { /// Gets information about each database present in the cluster the Client is connected to. @@ -96,34 +94,31 @@ impl<'a, M> ListDatabases<'a, M> { } } -impl<'a> IntoFuture for ListDatabases<'a, ListSpecifications> { - type Output = Result>; - type IntoFuture = BoxFuture<'a, Self::Output>; +action_impl! { + impl Action<'a> for ListDatabases<'a, ListSpecifications> { + type Future = ListSpecificationsFuture; - fn into_future(self) -> Self::IntoFuture { - async { + async fn execute(self) -> Result> { let op = op::ListDatabases::new(false, self.options); - self.client - .execute_operation(op, self.session) - .await - .and_then(|dbs| { - dbs.into_iter() - .map(|db_spec| { - bson::from_slice(db_spec.as_bytes()).map_err(crate::error::Error::from) - }) - .collect() - }) + self.client + .execute_operation(op, self.session) + .await + .and_then(|dbs| { + dbs.into_iter() + .map(|db_spec| { + bson::from_slice(db_spec.as_bytes()).map_err(crate::error::Error::from) + }) + .collect() + }) } - .boxed() } } -impl<'a> IntoFuture for ListDatabases<'a, ListNames> { - type Output = Result>; - type IntoFuture = BoxFuture<'a, Self::Output>; +action_impl! { + impl Action<'a> for ListDatabases<'a, ListNames> { + type Future = ListNamesFuture; - fn into_future(self) -> Self::IntoFuture { - async { + async fn execute(self) -> Result> { let op = op::ListDatabases::new(true, self.options); match self.client.execute_operation(op, self.session).await { Ok(databases) => databases @@ -133,7 +128,7 @@ impl<'a> IntoFuture for ListDatabases<'a, ListNames> { .get_str("name") .map_err(|_| ErrorKind::InvalidResponse { message: "Expected \"name\" field in server response, but it was \ - not found" + not found" .to_string(), })?; Ok(name.to_string()) @@ -142,17 +137,5 @@ impl<'a> IntoFuture for ListDatabases<'a, ListNames> { Err(e) => Err(e), } } - .boxed() - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl<'a, M> ListDatabases<'a, M> -where - Self: IntoFuture, -{ - /// Synchronously execute this action. - pub fn run(self) -> ::Output { - crate::runtime::block_on(self.into_future()) } } diff --git a/src/action/perf.rs b/src/action/perf.rs new file mode 100644 index 000000000..42129fa3d --- /dev/null +++ b/src/action/perf.rs @@ -0,0 +1,51 @@ +pub use crate::client::action::perf::WarmConnectionPoolFuture; +use crate::Client; + +impl Client { + /// Add connections to the connection pool up to `min_pool_size`. This is normally not needed - + /// the connection pool will be filled in the background, and new connections created as needed + /// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of + /// creating new connections up-front so that individual operations execute as quickly as + /// possible. + /// + /// Note that topology changes require rebuilding the connection pool, so this method cannot + /// guarantee that the pool will always be filled for the lifetime of the `Client`. + /// + /// Does nothing if `min_pool_size` is unset or zero. + /// + /// `await` will return `()`. + pub fn warm_connection_pool(&self) -> WarmConnectionPool { + WarmConnectionPool { client: self } + } +} + +#[cfg(any(feature = "sync", feature = "tokio-sync"))] +impl crate::sync::Client { + /// Add connections to the connection pool up to `min_pool_size`. This is normally not needed - + /// the connection pool will be filled in the background, and new connections created as needed + /// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of + /// creating new connections up-front so that individual operations execute as quickly as + /// possible. + /// + /// Note that topology changes require rebuilding the connection pool, so this method cannot + /// guarantee that the pool will always be filled for the lifetime of the `Client`. + /// + /// Does nothing if `min_pool_size` is unset or zero. + /// + /// `await` will return `()`. + /// + /// [`run`](WarmConnectionPool::run) will return `()`. + pub fn warm_connection_pool(&self) -> WarmConnectionPool { + self.async_client.warm_connection_pool() + } +} + +/// Add connections to the connection pool up to `min_pool_size`. Create by calling +/// [`Client::warm_connection_pool`] and execute with `await` (or [`run`](WarmConnectionPool::run) +/// if using the sync client). +#[must_use] +pub struct WarmConnectionPool<'a> { + pub(crate) client: &'a Client, +} + +// IntoFuture impl in src/client/action/perf.rs diff --git a/src/client/action/session.rs b/src/action/session.rs similarity index 77% rename from src/client/action/session.rs rename to src/action/session.rs index fb4b33482..f807c54a2 100644 --- a/src/client/action/session.rs +++ b/src/action/session.rs @@ -1,18 +1,11 @@ -use std::future::IntoFuture; - -use futures_util::FutureExt; - use crate::{ - client::{ - options::{SessionOptions, TransactionOptions}, - BoxFuture, - }, + client::options::{SessionOptions, TransactionOptions}, error::Result, Client, ClientSession, }; -use super::option_setters; +use super::{action_impl, option_setters}; impl Client { /// Starts a new [`ClientSession`]. @@ -67,25 +60,19 @@ impl<'a> StartSession<'a> { ); } -impl<'a> IntoFuture for StartSession<'a> { - type Output = Result; - type IntoFuture = BoxFuture<'a, Self::Output>; +action_impl! { + impl Action<'a> for StartSession<'a> { + type Future = StartSessionFuture; - fn into_future(self) -> Self::IntoFuture { - async { + async fn execute(self) -> Result { if let Some(options) = &self.options { options.validate()?; } Ok(ClientSession::new(self.client.clone(), self.options, false).await) } - .boxed() - } -} -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl<'a> StartSession<'a> { - /// Synchronously execute this action. - pub fn run(self) -> Result { - crate::runtime::block_on(self.into_future()).map(Into::into) + fn sync_wrap(out) -> Result { + out.map(Into::into) + } } } diff --git a/src/action/shutdown.rs b/src/action/shutdown.rs new file mode 100644 index 000000000..532a73fb5 --- /dev/null +++ b/src/action/shutdown.rs @@ -0,0 +1,144 @@ +use crate::Client; + +pub use crate::client::action::shutdown::ShutdownFuture; + +impl Client { + /// Shut down this `Client`, terminating background thread workers and closing connections. + /// Using this method is not required under most circumstances (resources will be cleaned up in + /// the background when dropped) but can be needed when creating `Client`s in a loop or precise + /// control of lifespan timing is required. This will wait for any live handles to + /// server-side resources (see below) to be dropped and any associated server-side + /// operations to finish. + /// + /// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait + /// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by + /// only using those types in shorter-lived scopes than the `Client`. If this is not possible, + /// see [`immediate`](Shutdown::immediate). For example: + /// + /// ```rust + /// # use mongodb::{Client, GridFsBucket, error::Result}; + /// async fn upload_data(bucket: &GridFsBucket) { + /// let stream = bucket.open_upload_stream("test", None); + /// // .. write to the stream .. + /// } + /// + /// # async fn run() -> Result<()> { + /// let client = Client::with_uri_str("mongodb://example.com").await?; + /// let bucket = client.database("test").gridfs_bucket(None); + /// upload_data(&bucket).await; + /// client.shutdown().await; + /// // Background cleanup work from `upload_data` is guaranteed to have run. + /// # Ok(()) + /// # } + /// ``` + /// + /// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed: + /// + /// ```rust + /// # use mongodb::{Client, error::Result}; + /// # async fn run() -> Result<()> { + /// let client = Client::with_uri_str("mongodb://example.com").await?; + /// let bucket = client.database("test").gridfs_bucket(None); + /// let stream = bucket.open_upload_stream("test", None); + /// // .. write to the stream .. + /// drop(stream); + /// client.shutdown().await; + /// // Background cleanup work for `stream` is guaranteed to have run. + /// # Ok(()) + /// # } + /// ``` + /// + /// Calling any methods on clones of this `Client` or derived handles after this will return + /// errors. + /// + /// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or + /// `GridFsUploadStream`. + /// + /// `await` will return `()`. + pub fn shutdown(self) -> Shutdown { + Shutdown { + client: self, + immediate: false, + } + } +} + +#[cfg(any(feature = "sync", feature = "tokio-sync"))] +impl crate::sync::Client { + /// Shut down this `Client`, terminating background thread workers and closing connections. + /// Using this method is not required under most circumstances (resources will be cleaned up in + /// the background when dropped) but can be needed when creating `Client`s in a loop or precise + /// control of lifespan timing is required. This will wait for any live handles to + /// server-side resources (see below) to be dropped and any associated server-side + /// operations to finish. + /// + /// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait + /// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by + /// only using those types in shorter-lived scopes than the `Client`. If this is not possible, + /// see [`immediate`](Shutdown::immediate). For example: + /// + /// ```rust + /// # use mongodb::{sync::{Client, gridfs::GridFsBucket}, error::Result}; + /// fn upload_data(bucket: &GridFsBucket) { + /// let stream = bucket.open_upload_stream("test", None); + /// // .. write to the stream .. + /// } + /// + /// # fn run() -> Result<()> { + /// let client = Client::with_uri_str("mongodb://example.com")?; + /// let bucket = client.database("test").gridfs_bucket(None); + /// upload_data(&bucket); + /// client.shutdown(); + /// // Background cleanup work from `upload_data` is guaranteed to have run. + /// # Ok(()) + /// # } + /// ``` + /// + /// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed: + /// + /// ```rust + /// # use mongodb::{sync::Client, error::Result}; + /// # fn run() -> Result<()> { + /// let client = Client::with_uri_str("mongodb://example.com")?; + /// let bucket = client.database("test").gridfs_bucket(None); + /// let stream = bucket.open_upload_stream("test", None); + /// // .. write to the stream .. + /// drop(stream); + /// client.shutdown(); + /// // Background cleanup work for `stream` is guaranteed to have run. + /// # Ok(()) + /// # } + /// ``` + /// + /// Calling any methods on clones of this `Client` or derived handles after this will return + /// errors. + /// + /// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or + /// `GridFsUploadStream`. + /// + /// [`run`](Shutdown::run) will return `()`. + pub fn shutdown(self) -> Shutdown { + self.async_client.shutdown() + } +} + +/// Shut down this `Client`, terminating background thread workers and closing connections. Create +/// by calling [`Client::shutdown`] and execute with `await` (or [`run`](Shutdown::run) if using the +/// sync client). +#[must_use] +pub struct Shutdown { + pub(crate) client: Client, + pub(crate) immediate: bool, +} + +impl Shutdown { + /// If `true`, execution will not wait for pending resources to be cleaned up, + /// which may cause both client-side errors and server-side resource leaks. Defaults to + /// `false`. + pub fn immediate(mut self, value: bool) -> Self { + self.immediate = value; + self + } +} + +// IntoFuture impl in src/client/action/shutdown.rs diff --git a/src/client/action/watch.rs b/src/action/watch.rs similarity index 91% rename from src/client/action/watch.rs rename to src/action/watch.rs index 53ee1e09e..2157d656d 100644 --- a/src/client/action/watch.rs +++ b/src/action/watch.rs @@ -1,9 +1,8 @@ -use std::{future::IntoFuture, time::Duration}; +use std::time::Duration; use bson::{Bson, Document, Timestamp}; -use futures_util::FutureExt; -use super::option_setters; +use super::{action_impl, option_setters}; use crate::{ change_stream::{ event::{ChangeStreamEvent, ResumeToken}, @@ -11,7 +10,6 @@ use crate::{ session::SessionChangeStream, ChangeStream, }, - client::BoxFuture, collation::Collation, error::{ErrorKind, Result}, operation::AggregateTarget, @@ -293,12 +291,11 @@ impl<'a> Watch<'a, ImplicitSession> { } } -impl<'a> IntoFuture for Watch<'a, ImplicitSession> { - type Output = Result>>; - type IntoFuture = BoxFuture<'a, Self::Output>; +action_impl! { + impl Action<'a> for Watch<'a, ImplicitSession> { + type Future = WatchImplicitSessionFuture; - fn into_future(mut self) -> Self::IntoFuture { - async move { + async fn execute(mut self) -> Result>> { resolve_options!( self.client, self.options, @@ -313,16 +310,18 @@ impl<'a> IntoFuture for Watch<'a, ImplicitSession> { .execute_watch(self.pipeline, self.options, self.target, None) .await } - .boxed() + + fn sync_wrap(out) -> Result>> { + out.map(crate::sync::ChangeStream::new) + } } } -impl<'a> IntoFuture for Watch<'a, ExplicitSession<'a>> { - type Output = Result>>; - type IntoFuture = BoxFuture<'a, Self::Output>; +action_impl! { + impl Action<'a> for Watch<'a, ExplicitSession<'a>> { + type Future = WatchExplicitSessionFuture; - fn into_future(mut self) -> Self::IntoFuture { - async move { + async fn execute(mut self) -> Result>> { resolve_read_concern_with_session!( self.client, self.options, @@ -348,22 +347,9 @@ impl<'a> IntoFuture for Watch<'a, ExplicitSession<'a>> { ) .await } - .boxed() - } -} -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl<'a> Watch<'a, ImplicitSession> { - /// Synchronously execute this action. - pub fn run(self) -> Result>> { - crate::runtime::block_on(self.into_future()).map(crate::sync::ChangeStream::new) - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl<'a> Watch<'a, ExplicitSession<'a>> { - /// Synchronously execute this action. - pub fn run(self) -> Result>> { - crate::runtime::block_on(self.into_future()).map(crate::sync::SessionChangeStream::new) + fn sync_wrap(out) -> Result>> { + out.map(crate::sync::SessionChangeStream::new) + } } } diff --git a/src/client/action.rs b/src/client/action.rs index 29076c985..622eb321a 100644 --- a/src/client/action.rs +++ b/src/client/action.rs @@ -1,42 +1,2 @@ -//! Action builder types. - -pub(crate) mod list_databases; pub(crate) mod perf; -pub(crate) mod session; pub(crate) mod shutdown; -pub(crate) mod watch; - -pub use list_databases::ListDatabases; -pub use perf::WarmConnectionPool; -pub use session::StartSession; -pub use shutdown::Shutdown; -pub use watch::Watch; - -macro_rules! option_setters { - ( - $opt_field:ident: $opt_field_ty:ty; - $( - $(#[$($attrss:tt)*])* - $opt_name:ident: $opt_ty:ty, - )+ - ) => { - fn options(&mut self) -> &mut $opt_field_ty { - self.$opt_field.get_or_insert_with(<$opt_field_ty>::default) - } - - #[cfg(test)] - pub(crate) fn with_options(mut self, value: impl Into>) -> Self { - self.options = value.into(); - self - } - - $( - $(#[$($attrss)*])* - pub fn $opt_name(mut self, value: $opt_ty) -> Self { - self.options().$opt_name = Some(value); - self - } - )+ - }; -} -use option_setters; diff --git a/src/client/action/perf.rs b/src/client/action/perf.rs index df3da9516..3905c03d6 100644 --- a/src/client/action/perf.rs +++ b/src/client/action/perf.rs @@ -1,62 +1,10 @@ -use std::future::IntoFuture; +use crate::action::action_impl; -use futures_util::FutureExt; +action_impl! { + impl Action<'a> for crate::action::WarmConnectionPool<'a> { + type Future = WarmConnectionPoolFuture; -use crate::{client::BoxFuture, Client}; - -impl Client { - /// Add connections to the connection pool up to `min_pool_size`. This is normally not needed - - /// the connection pool will be filled in the background, and new connections created as needed - /// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of - /// creating new connections up-front so that individual operations execute as quickly as - /// possible. - /// - /// Note that topology changes require rebuilding the connection pool, so this method cannot - /// guarantee that the pool will always be filled for the lifetime of the `Client`. - /// - /// Does nothing if `min_pool_size` is unset or zero. - /// - /// `await` will return `()`. - pub fn warm_connection_pool(&self) -> WarmConnectionPool { - WarmConnectionPool { client: self } - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl crate::sync::Client { - /// Add connections to the connection pool up to `min_pool_size`. This is normally not needed - - /// the connection pool will be filled in the background, and new connections created as needed - /// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of - /// creating new connections up-front so that individual operations execute as quickly as - /// possible. - /// - /// Note that topology changes require rebuilding the connection pool, so this method cannot - /// guarantee that the pool will always be filled for the lifetime of the `Client`. - /// - /// Does nothing if `min_pool_size` is unset or zero. - /// - /// `await` will return `()`. - /// - /// [`run`](WarmConnectionPool::run) will return `()`. - pub fn warm_connection_pool(&self) -> WarmConnectionPool { - self.async_client.warm_connection_pool() - } -} - -/// Add connections to the connection pool up to `min_pool_size`. Create by calling -/// [`Client::warm_connection_pool`] and execute with `await` (or [`run`](WarmConnectionPool::run) -/// if using the sync client). -#[must_use] -pub struct WarmConnectionPool<'a> { - client: &'a Client, -} - -impl<'a> IntoFuture for WarmConnectionPool<'a> { - type Output = (); - type IntoFuture = BoxFuture<'a, ()>; - - fn into_future(self) -> Self::IntoFuture { - async { + async fn execute(self) -> () { if !self .client .inner @@ -69,14 +17,5 @@ impl<'a> IntoFuture for WarmConnectionPool<'a> { } self.client.inner.topology.warm_pool().await; } - .boxed() - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl<'a> WarmConnectionPool<'a> { - /// Synchronously execute this action. - pub fn run(self) { - crate::runtime::block_on(self.into_future()) } } diff --git a/src/client/action/shutdown.rs b/src/client/action/shutdown.rs index 42f5d1590..904b4ecc3 100644 --- a/src/client/action/shutdown.rs +++ b/src/client/action/shutdown.rs @@ -1,155 +1,14 @@ -use std::{future::IntoFuture, sync::atomic::Ordering}; +use std::sync::atomic::Ordering; -use futures_util::{future::join_all, FutureExt}; +use futures_util::future::join_all; -use crate::{client::BoxFuture, Client}; +use crate::action::action_impl; -impl Client { - /// Shut down this `Client`, terminating background thread workers and closing connections. - /// Using this method is not required under most circumstances (resources will be cleaned up in - /// the background when dropped) but can be needed when creating `Client`s in a loop or precise - /// control of lifespan timing is required. This will wait for any live handles to - /// server-side resources (see below) to be dropped and any associated server-side - /// operations to finish. - /// - /// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait - /// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by - /// only using those types in shorter-lived scopes than the `Client`. If this is not possible, - /// see [`immediate`](Shutdown::immediate). For example: - /// - /// ```rust - /// # use mongodb::{Client, GridFsBucket, error::Result}; - /// async fn upload_data(bucket: &GridFsBucket) { - /// let stream = bucket.open_upload_stream("test", None); - /// // .. write to the stream .. - /// } - /// - /// # async fn run() -> Result<()> { - /// let client = Client::with_uri_str("mongodb://example.com").await?; - /// let bucket = client.database("test").gridfs_bucket(None); - /// upload_data(&bucket).await; - /// client.shutdown().await; - /// // Background cleanup work from `upload_data` is guaranteed to have run. - /// # Ok(()) - /// # } - /// ``` - /// - /// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed: - /// - /// ```rust - /// # use mongodb::{Client, error::Result}; - /// # async fn run() -> Result<()> { - /// let client = Client::with_uri_str("mongodb://example.com").await?; - /// let bucket = client.database("test").gridfs_bucket(None); - /// let stream = bucket.open_upload_stream("test", None); - /// // .. write to the stream .. - /// drop(stream); - /// client.shutdown().await; - /// // Background cleanup work for `stream` is guaranteed to have run. - /// # Ok(()) - /// # } - /// ``` - /// - /// Calling any methods on clones of this `Client` or derived handles after this will return - /// errors. - /// - /// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or - /// `GridFsUploadStream`. - /// - /// `await` will return `()`. - pub fn shutdown(self) -> Shutdown { - Shutdown { - client: self, - immediate: false, - } - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl crate::sync::Client { - /// Shut down this `Client`, terminating background thread workers and closing connections. - /// Using this method is not required under most circumstances (resources will be cleaned up in - /// the background when dropped) but can be needed when creating `Client`s in a loop or precise - /// control of lifespan timing is required. This will wait for any live handles to - /// server-side resources (see below) to be dropped and any associated server-side - /// operations to finish. - /// - /// IMPORTANT: Any live resource handles that are not dropped will cause this method to wait - /// indefinitely. It's strongly recommended to structure your usage to avoid this, e.g. by - /// only using those types in shorter-lived scopes than the `Client`. If this is not possible, - /// see [`immediate`](Shutdown::immediate). For example: - /// - /// ```rust - /// # use mongodb::{sync::{Client, gridfs::GridFsBucket}, error::Result}; - /// fn upload_data(bucket: &GridFsBucket) { - /// let stream = bucket.open_upload_stream("test", None); - /// // .. write to the stream .. - /// } - /// - /// # fn run() -> Result<()> { - /// let client = Client::with_uri_str("mongodb://example.com")?; - /// let bucket = client.database("test").gridfs_bucket(None); - /// upload_data(&bucket); - /// client.shutdown(); - /// // Background cleanup work from `upload_data` is guaranteed to have run. - /// # Ok(()) - /// # } - /// ``` - /// - /// If the handle is used in the same scope as `shutdown`, explicit `drop` may be needed: - /// - /// ```rust - /// # use mongodb::{sync::Client, error::Result}; - /// # fn run() -> Result<()> { - /// let client = Client::with_uri_str("mongodb://example.com")?; - /// let bucket = client.database("test").gridfs_bucket(None); - /// let stream = bucket.open_upload_stream("test", None); - /// // .. write to the stream .. - /// drop(stream); - /// client.shutdown(); - /// // Background cleanup work for `stream` is guaranteed to have run. - /// # Ok(()) - /// # } - /// ``` - /// - /// Calling any methods on clones of this `Client` or derived handles after this will return - /// errors. - /// - /// Handles to server-side resources are `Cursor`, `SessionCursor`, `Session`, or - /// `GridFsUploadStream`. - /// - /// [`run`](Shutdown::run) will return `()`. - pub fn shutdown(self) -> Shutdown { - self.async_client.shutdown() - } -} - -/// Shut down this `Client`, terminating background thread workers and closing connections. Create -/// by calling [`Client::shutdown`] and execute with `await` (or [`run`](Shutdown::run) if using the -/// sync client). -#[must_use] -pub struct Shutdown { - client: Client, - immediate: bool, -} +action_impl! { + impl Action for crate::action::Shutdown { + type Future = ShutdownFuture; -impl Shutdown { - /// If `true`, execution will not wait for pending resources to be cleaned up, - /// which may cause both client-side errors and server-side resource leaks. Defaults to - /// `false`. - pub fn immediate(mut self, value: bool) -> Self { - self.immediate = value; - self - } -} - -impl IntoFuture for Shutdown { - type Output = (); - - type IntoFuture = BoxFuture<'static, ()>; - - fn into_future(self) -> Self::IntoFuture { - async move { + async fn execute(self) -> () { if !self.immediate { // Subtle bug: if this is inlined into the `join_all(..)` call, Rust will extend the // lifetime of the temporary unnamed `MutexLock` until the end of the *statement*, @@ -172,14 +31,5 @@ impl IntoFuture for Shutdown { .executed .store(true, Ordering::SeqCst); } - .boxed() - } -} - -#[cfg(any(feature = "sync", feature = "tokio-sync"))] -impl Shutdown { - /// Synchronously execute this action. - pub fn run(self) { - crate::runtime::block_on(self.into_future()) } } diff --git a/src/client/session/test.rs b/src/client/session/test.rs index b2dfd7c77..1391d5af2 100644 --- a/src/client/session/test.rs +++ b/src/client/session/test.rs @@ -182,7 +182,7 @@ macro_rules! for_each_op { db_op!($test_name, db, db.create_collection("sessionopcoll", None)), ) .await; - $test_func("dropDatabase", db_op!($test_name, db, db.drop(None))).await; + $test_func("dropDatabase", db_op!($test_name, db, db.drop())).await; // client operations $test_func("listDatabases", client_op!(client, client.list_databases())).await; diff --git a/src/db.rs b/src/db.rs index 7ae56126a..af3e656fc 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,13 +14,12 @@ use crate::{ cursor::Cursor, error::{Error, ErrorKind, Result}, gridfs::{options::GridFsBucketOptions, GridFsBucket}, - operation::{Aggregate, Create, DropDatabase, ListCollections, RunCommand, RunCursorCommand}, + operation::{Aggregate, Create, ListCollections, RunCommand, RunCursorCommand}, options::{ AggregateOptions, CollectionOptions, CreateCollectionOptions, DatabaseOptions, - DropDatabaseOptions, ListCollectionsOptions, RunCursorCommandOptions, }, @@ -165,35 +164,6 @@ impl Database { Collection::new(self.clone(), name, Some(options)) } - async fn drop_common( - &self, - options: impl Into>, - session: impl Into>, - ) -> Result<()> { - let mut options = options.into(); - resolve_options!(self, options, [write_concern]); - - let drop_database = DropDatabase::new(self.name().to_string(), options); - self.client() - .execute_operation(drop_database, session) - .await - } - - /// Drops the database, deleting all data, collections, and indexes stored in it. - pub async fn drop(&self, options: impl Into>) -> Result<()> { - self.drop_common(options, None).await - } - - /// Drops the database, deleting all data, collections, and indexes stored in it using the - /// provided `ClientSession`. - pub async fn drop_with_session( - &self, - options: impl Into>, - session: &mut ClientSession, - ) -> Result<()> { - self.drop_common(options, session).await - } - /// Gets information about each of the collections in the database. The cursor will yield a /// document pertaining to each collection in the database. pub async fn list_collections( diff --git a/src/db/options.rs b/src/db/options.rs index 6f29650a4..9bc66e29b 100644 --- a/src/db/options.rs +++ b/src/db/options.rs @@ -247,16 +247,6 @@ pub enum TimeseriesGranularity { Hours, } -/// Specifies the options to a [`Database::drop`](../struct.Database.html#method.drop) operation. -#[derive(Clone, Debug, Default, TypedBuilder, Serialize)] -#[serde(rename_all = "camelCase")] -#[builder(field_defaults(default, setter(into)))] -#[non_exhaustive] -pub struct DropDatabaseOptions { - /// The write concern for the operation. - pub write_concern: Option, -} - /// Specifies the options to a /// [`Database::list_collections`](../struct.Database.html#method.list_collections) operation. #[skip_serializing_none] diff --git a/src/lib.rs b/src/lib.rs index 6b610da15..b5d59582c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -318,6 +318,7 @@ pub use ::bson; #[cfg(feature = "in-use-encryption-unstable")] pub use ::mongocrypt; +pub mod action; mod bson_util; pub mod change_stream; mod client; @@ -363,7 +364,7 @@ pub use crate::{ gridfs::{GridFsBucket, GridFsDownloadStream, GridFsUploadStream}, }; -pub use {client::action, client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*, search_index::SearchIndexModel}; +pub use {client::session::ClusterTime, coll::Namespace, index::IndexModel, sdam::public::*, search_index::SearchIndexModel}; /// A boxed future. pub type BoxFuture<'a, T> = std::pin::Pin + Send + 'a>>; diff --git a/src/operation.rs b/src/operation.rs index 61a6c1e5f..21c224da4 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -8,7 +8,7 @@ mod create_indexes; mod delete; mod distinct; mod drop_collection; -mod drop_database; +pub(crate) mod drop_database; mod drop_indexes; mod find; mod find_and_modify; @@ -61,7 +61,6 @@ pub(crate) use create_indexes::CreateIndexes; pub(crate) use delete::Delete; pub(crate) use distinct::Distinct; pub(crate) use drop_collection::DropCollection; -pub(crate) use drop_database::DropDatabase; pub(crate) use drop_indexes::DropIndexes; pub(crate) use find::Find; pub(crate) use find_and_modify::FindAndModify; diff --git a/src/operation/drop_database.rs b/src/operation/drop_database.rs index 2cd2b8cf7..82f6250f0 100644 --- a/src/operation/drop_database.rs +++ b/src/operation/drop_database.rs @@ -1,7 +1,5 @@ -#[cfg(test)] -mod test; - use bson::Document; +use serde::Serialize; use crate::{ bson::doc, @@ -13,7 +11,7 @@ use crate::{ OperationWithDefaults, WriteConcernOnlyBody, }, - options::{DropDatabaseOptions, WriteConcern}, + options::WriteConcern, }; #[derive(Debug)] @@ -22,12 +20,14 @@ pub(crate) struct DropDatabase { options: Option, } -impl DropDatabase { - #[cfg(test)] - fn empty() -> Self { - Self::new(String::new(), None) - } +#[derive(Clone, Debug, Default, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct DropDatabaseOptions { + /// The write concern for the operation. + pub(crate) write_concern: Option, +} +impl DropDatabase { pub(crate) fn new(target_db: String, options: Option) -> Self { Self { target_db, options } } diff --git a/src/operation/drop_database/test.rs b/src/operation/drop_database/test.rs deleted file mode 100644 index f092a2b37..000000000 --- a/src/operation/drop_database/test.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::{ - bson::doc, - cmap::StreamDescription, - concern::{Acknowledgment, WriteConcern}, - error::{ErrorKind, WriteFailure}, - operation::{test::handle_response_test, DropDatabase, Operation}, - options::DropDatabaseOptions, -}; - -#[test] -fn build() { - let mut op = DropDatabase { - target_db: "test_db".to_string(), - options: Some(DropDatabaseOptions { - write_concern: Some(WriteConcern { - w: Some(Acknowledgment::Custom("abc".to_string())), - ..Default::default() - }), - }), - }; - - let description = StreamDescription::new_testing(); - let cmd = op.build(&description).expect("build should succeed"); - - assert_eq!(cmd.name.as_str(), "dropDatabase"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - assert_eq!( - cmd.body, - doc! { - "dropDatabase": 1, - "writeConcern": { "w": "abc" } - } - ); - - let mut op = DropDatabase { - target_db: "test_db".to_string(), - options: None, - }; - let cmd = op.build(&description).expect("build should succeed"); - assert_eq!(cmd.name.as_str(), "dropDatabase"); - assert_eq!(cmd.target_db.as_str(), "test_db"); - assert_eq!( - cmd.body, - doc! { - "dropDatabase": 1, - } - ); -} - -#[test] -fn handle_success() { - let op = DropDatabase::empty(); - - let ok_response = doc! { "ok": 1.0 }; - handle_response_test(&op, ok_response).unwrap(); - let ok_extra = doc! { "ok": 1.0, "hello": "world" }; - handle_response_test(&op, ok_extra).unwrap(); -} - -#[test] -fn handle_write_concern_error() { - let op = DropDatabase::empty(); - - let response = doc! { - "writeConcernError": { - "code": 100, - "codeName": "hello world", - "errmsg": "12345" - }, - "ok": 1 - }; - - let err = handle_response_test(&op, response).unwrap_err(); - - match *err.kind { - ErrorKind::Write(WriteFailure::WriteConcernError(ref wc_err)) => { - assert_eq!(wc_err.code, 100); - assert_eq!(wc_err.code_name, "hello world"); - assert_eq!(wc_err.message, "12345"); - } - ref e => panic!("expected write concern error, got {:?}", e), - } -} diff --git a/src/sync/db.rs b/src/sync/db.rs index a94e32606..7d96bed18 100644 --- a/src/sync/db.rs +++ b/src/sync/db.rs @@ -8,7 +8,6 @@ use crate::{ AggregateOptions, CollectionOptions, CreateCollectionOptions, - DropDatabaseOptions, GridFsBucketOptions, ListCollectionsOptions, ReadConcern, @@ -106,24 +105,6 @@ impl Database { Collection::new(self.async_database.collection_with_options(name, options)) } - /// Drops the database, deleting all data, collections, users, and indexes stored in it. - pub fn drop(&self, options: impl Into>) -> Result<()> { - runtime::block_on(self.async_database.drop(options.into())) - } - - /// Drops the database, deleting all data, collections, users, and indexes stored in it using - /// the provided `ClientSession`. - pub fn drop_with_session( - &self, - options: impl Into>, - session: &mut ClientSession, - ) -> Result<()> { - runtime::block_on( - self.async_database - .drop_with_session(options.into(), &mut session.async_client_session), - ) - } - /// Gets information about each of the collections in the database. The cursor will yield a /// document pertaining to each collection in the database. pub fn list_collections( diff --git a/src/sync/test.rs b/src/sync/test.rs index b8284c10a..ab40620da 100644 --- a/src/sync/test.rs +++ b/src/sync/test.rs @@ -405,7 +405,7 @@ fn mixed_sync_and_async() -> Result<()> { let sync_client = Client::with_options(CLIENT_OPTIONS.clone())?; let async_client = runtime::block_on(async { AsyncTestClient::new().await }); let sync_db = sync_client.database(DB_NAME); - sync_db.drop(None)?; + sync_db.drop().run()?; sync_db .collection::(COLL_NAME) .insert_one(doc! { "a": 1 }, None)?; diff --git a/src/test/client.rs b/src/test/client.rs index 95d1a8637..e9af578d1 100644 --- a/src/test/client.rs +++ b/src/test/client.rs @@ -189,7 +189,7 @@ async fn list_databases() { let client = TestClient::new().await; for name in expected_dbs { - client.database(name).drop(None).await.unwrap(); + client.database(name).drop().await.unwrap(); } let prev_dbs = client.list_databases().await.unwrap(); @@ -235,7 +235,7 @@ async fn list_database_names() { ]; for name in expected_dbs { - client.database(name).drop(None).await.unwrap(); + client.database(name).drop().await.unwrap(); } let prev_dbs = client.list_database_names().await.unwrap(); @@ -311,7 +311,7 @@ async fn list_authorized_databases() { } for name in dbs { - client.database(name).drop(None).await.unwrap(); + client.database(name).drop().await.unwrap(); } } @@ -843,7 +843,7 @@ async fn manual_shutdown_with_resources() { return; } let db = client.database("shutdown_test"); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); let coll = db.collection::("test"); coll.insert_many([doc! {}, doc! {}], None).await.unwrap(); let bucket = db.gridfs_bucket(None); @@ -908,7 +908,7 @@ async fn manual_shutdown_immediate_with_resources() { return; } let db = client.database("shutdown_test"); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); let coll = db.collection::("test"); coll.insert_many([doc! {}, doc! {}], None).await.unwrap(); let bucket = db.gridfs_bucket(None); diff --git a/src/test/csfle.rs b/src/test/csfle.rs index d4044f83b..91ae9f5a5 100644 --- a/src/test/csfle.rs +++ b/src/test/csfle.rs @@ -3038,7 +3038,7 @@ async fn auto_encryption_keys(master_key: MasterKey) -> Result<()> { return Ok(()); } let db = client.database("test_auto_encryption_keys"); - db.drop(None).await?; + db.drop().await?; let ce = ClientEncryption::new( client.into_client(), KV_NAMESPACE.clone(), @@ -3549,7 +3549,7 @@ async fn fle2_example() -> Result<()> { .collection::("datakeys") .drop(None) .await?; - test_client.database("docsExamples").drop(None).await?; + test_client.database("docsExamples").drop().await?; // Create two data keys. let ce = ClientEncryption::new( diff --git a/src/test/db.rs b/src/test/db.rs index 44983d400..88c64d050 100644 --- a/src/test/db.rs +++ b/src/test/db.rs @@ -39,7 +39,7 @@ async fn get_coll_info(db: &Database, filter: Option) -> Vec> = db .list_collections(None, None) @@ -78,7 +78,7 @@ async fn list_collections() { async fn list_collections_filter() { let client = TestClient::new().await; let db = client.database(function_name!()); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); let colls: Result> = db .list_collections(None, None) @@ -119,7 +119,7 @@ async fn list_collections_filter() { async fn list_collection_names() { let client = TestClient::new().await; let db = client.database(function_name!()); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); assert!(db.list_collection_names(None).await.unwrap().is_empty()); @@ -148,7 +148,7 @@ async fn list_collection_names() { async fn collection_management() { let client = TestClient::new().await; let db = client.database(function_name!()); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); assert!(db.list_collection_names(None).await.unwrap().is_empty()); @@ -339,7 +339,7 @@ async fn index_option_defaults_test(defaults: Option, name: .index_option_defaults(defaults.clone()) .build(); db.create_collection(name, options).await.unwrap(); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); let events = client.get_command_started_events(&["create"]); assert_eq!(events.len(), 1); diff --git a/src/test/documentation_examples.rs b/src/test/documentation_examples.rs index 794b8c5c1..4d6b095e2 100644 --- a/src/test/documentation_examples.rs +++ b/src/test/documentation_examples.rs @@ -1507,7 +1507,7 @@ async fn stable_api_examples() -> GenericResult<()> { async fn aggregation_examples() -> GenericResult<()> { let client = TestClient::new().await; let db = client.database("aggregation_examples"); - db.drop(None).await?; + db.drop().await?; aggregation_data::populate(&db).await?; // Each example is within its own scope to allow the example to include @@ -1649,7 +1649,7 @@ async fn aggregation_examples() -> GenericResult<()> { async fn run_command_examples() -> Result<()> { let client = TestClient::new().await; let db = client.database("run_command_examples"); - db.drop(None).await?; + db.drop().await?; db.collection::("restaurants") .insert_one( doc! { @@ -1681,7 +1681,7 @@ async fn run_command_examples() -> Result<()> { async fn index_examples() -> Result<()> { let client = TestClient::new().await; let db = client.database("index_examples"); - db.drop(None).await?; + db.drop().await?; db.collection::("records") .insert_many( vec![ @@ -1765,7 +1765,7 @@ async fn change_streams_examples() -> Result<()> { return Ok(()); } let db = client.database("change_streams_examples"); - db.drop(None).await?; + db.drop().await?; let inventory = db.collection::("inventory"); // Populate an item so the collection exists for the change stream to watch. inventory.insert_one(doc! {}, None).await?; diff --git a/src/test/spec/retryable_writes.rs b/src/test/spec/retryable_writes.rs index 8061cf817..fb1f5270b 100644 --- a/src/test/spec/retryable_writes.rs +++ b/src/test/spec/retryable_writes.rs @@ -175,7 +175,7 @@ async fn run_legacy() { .unwrap(); assert_eq!(test_case.outcome.collection.data, actual_data); - client.database(&db_name).drop(None).await.unwrap(); + client.database(&db_name).drop().await.unwrap(); } } diff --git a/src/test/spec/write_error.rs b/src/test/spec/write_error.rs index 99f9d4499..cdfd6471b 100644 --- a/src/test/spec/write_error.rs +++ b/src/test/spec/write_error.rs @@ -18,7 +18,7 @@ async fn details() { } let db = client.database("write_error_details"); - db.drop(None).await.unwrap(); + db.drop().await.unwrap(); db.create_collection( "test", CreateCollectionOptions::builder() diff --git a/src/test/timeseries.rs b/src/test/timeseries.rs index acb534b50..93c006f0d 100644 --- a/src/test/timeseries.rs +++ b/src/test/timeseries.rs @@ -18,7 +18,7 @@ async fn list_collections_timeseries() -> Result<()> { return Ok(()); } let db = client.database("list_collections_timeseries"); - db.drop(None).await?; + db.drop().await?; db.create_collection( "test", CreateCollectionOptions::builder()