Skip to content

Commit

Permalink
RUST-1512 Macro for fluent API definition, Database::drop conversion (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
abr-egn authored Jan 25, 2024
1 parent c070269 commit 0f928c9
Show file tree
Hide file tree
Showing 27 changed files with 498 additions and 531 deletions.
2 changes: 1 addition & 1 deletion manual/src/encryption.md
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ async fn main() -> Result<()> {
.build()
.await?;
let db = encrypted_client.database("test");
db.drop(None).await?;
db.drop().await?;
// Create the collection with encrypted fields.
db.create_collection(
Expand Down
140 changes: 140 additions & 0 deletions src/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! Action builder types.
mod drop;
mod list_databases;
mod perf;
mod session;
mod shutdown;
mod watch;

pub use drop::{DropDatabase, DropDatabaseFuture};
pub use list_databases::{ListDatabases, ListSpecificationsFuture};
pub use perf::{WarmConnectionPool, WarmConnectionPoolFuture};
pub use session::{StartSession, StartSessionFuture};
pub use shutdown::{Shutdown, ShutdownFuture};
pub use watch::{Watch, WatchExplicitSessionFuture, WatchImplicitSessionFuture};

macro_rules! option_setters {
(
$opt_field:ident: $opt_field_ty:ty;
$(
$(#[$($attrss:tt)*])*
$opt_name:ident: $opt_ty:ty,
)+
) => {
fn options(&mut self) -> &mut $opt_field_ty {
self.$opt_field.get_or_insert_with(<$opt_field_ty>::default)
}

#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn with_options(mut self, value: impl Into<Option<$opt_field_ty>>) -> Self {
self.options = value.into();
self
}

$(
$(#[$($attrss)*])*
pub fn $opt_name(mut self, value: $opt_ty) -> Self {
self.options().$opt_name = Some(value);
self
}
)+
};
}
use option_setters;

/// Generates:
/// * an `IntoFuture` executing the given method body
/// * an opaque wrapper type for the future in case we want to do something more fancy than
/// BoxFuture.
/// * a `run` method for sync execution, optionally with a wrapper function
macro_rules! action_impl {
// Generate with no sync type conversion
(
impl Action$(<$lt:lifetime>)? for $action:ty {
type Future = $f_ty:ident;
async fn execute($($args:ident)+) -> $out:ty $code:block
}
) => {
crate::action::action_impl_inner! {
$action => $f_ty;
$($lt)?;
async fn($($args)+) -> $out $code
}

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
impl$(<$lt>)? $action {
/// Synchronously execute this action.
pub fn run(self) -> $out {
crate::runtime::block_on(std::future::IntoFuture::into_future(self))
}
}
};
// Generate with a sync type conversion
(
impl Action$(<$lt:lifetime>)? for $action:ty {
type Future = $f_ty:ident;
async fn execute($($args:ident)+) -> $out:ty $code:block
fn sync_wrap($($wrap_args:ident)+) -> $sync_out:ty $wrap_code:block
}
) => {
crate::action::action_impl_inner! {
$action => $f_ty;
$($lt)?;
async fn($($args)+) -> $out $code
}

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
impl$(<$lt>)? $action {
/// Synchronously execute this action.
pub fn run(self) -> $sync_out {
let $($wrap_args)+ = crate::runtime::block_on(std::future::IntoFuture::into_future(self));
return $wrap_code
}
}
}
}
pub(crate) use action_impl;

macro_rules! action_impl_inner {
(
$action:ty => $f_ty:ident;
$($lt:lifetime)?;
async fn($($args:ident)+) -> $out:ty $code:block
) => {
impl$(<$lt>)? std::future::IntoFuture for $action {
type Output = $out;
type IntoFuture = $f_ty$(<$lt>)?;

fn into_future($($args)+) -> Self::IntoFuture {
$f_ty(Box::pin(async move {
$code
}))
}
}

crate::action::action_impl_future_wrapper!($($lt)?, $f_ty, $out);

impl$(<$lt>)? std::future::Future for $f_ty$(<$lt>)? {
type Output = $out;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
self.0.as_mut().poll(cx)
}
}
}
}
pub(crate) use action_impl_inner;

macro_rules! action_impl_future_wrapper {
(, $f_ty:ident, $out:ty) => {
/// Opaque future type for action execution.
pub struct $f_ty(crate::BoxFuture<'static, $out>);
};
($lt:lifetime, $f_ty:ident, $out:ty) => {
/// Opaque future type for action execution.
pub struct $f_ty<$lt>(crate::BoxFuture<$lt, $out>);
};
}
pub(crate) use action_impl_future_wrapper;
69 changes: 69 additions & 0 deletions src/action/drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::{
error::Result,
operation::drop_database as op,
options::WriteConcern,
ClientSession,
Database,
};

use super::{action_impl, option_setters};

impl Database {
/// Drops the database, deleting all data, collections, and indexes stored in it.
///
/// `await` will return `Result<()>`.
pub fn drop(&self) -> DropDatabase {
DropDatabase {
db: self,
options: None,
session: None,
}
}
}

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
impl crate::sync::Database {
/// Drops the database, deleting all data, collections, and indexes stored in it.
///
/// [`run`](DropDatabase::run) will return `Result<()>`.
pub fn drop(&self) -> DropDatabase {
self.async_database.drop()
}
}

/// Drops the database, deleting all data, collections, and indexes stored in it. Create by calling
/// [`Database::drop`] and execute with `await` (or [`run`](DropDatabase::run) if using the sync
/// client).
#[must_use]
pub struct DropDatabase<'a> {
db: &'a Database,
options: Option<op::DropDatabaseOptions>,
session: Option<&'a mut ClientSession>,
}

impl<'a> DropDatabase<'a> {
option_setters!(options: op::DropDatabaseOptions;
/// The write concern for the operation.
write_concern: WriteConcern,
);

/// Runs the drop using the provided session.
pub fn session(mut self, value: impl Into<&'a mut ClientSession>) -> Self {
self.session = Some(value.into());
self
}
}

action_impl! {
impl Action<'a> for DropDatabase<'a> {
type Future = DropDatabaseFuture;

async fn execute(mut self) -> Result<()> {
resolve_options!(self.db, self.options, [write_concern]);
let op = op::DropDatabase::new(self.db.name().to_string(), self.options);
self.db.client()
.execute_operation(op, self.session)
.await
}
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use std::{future::IntoFuture, marker::PhantomData};
use std::marker::PhantomData;

use bson::{Bson, Document};
use futures_util::FutureExt;

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
use crate::sync::Client as SyncClient;
use crate::{
client::BoxFuture,
error::{ErrorKind, Result},
operation::list_databases as op,
results::DatabaseSpecification,
Client,
ClientSession,
};

use super::option_setters;
use super::{action_impl, option_setters};

impl Client {
/// Gets information about each database present in the cluster the Client is connected to.
Expand Down Expand Up @@ -96,34 +94,31 @@ impl<'a, M> ListDatabases<'a, M> {
}
}

impl<'a> IntoFuture for ListDatabases<'a, ListSpecifications> {
type Output = Result<Vec<DatabaseSpecification>>;
type IntoFuture = BoxFuture<'a, Self::Output>;
action_impl! {
impl Action<'a> for ListDatabases<'a, ListSpecifications> {
type Future = ListSpecificationsFuture;

fn into_future(self) -> Self::IntoFuture {
async {
async fn execute(self) -> Result<Vec<DatabaseSpecification>> {
let op = op::ListDatabases::new(false, self.options);
self.client
.execute_operation(op, self.session)
.await
.and_then(|dbs| {
dbs.into_iter()
.map(|db_spec| {
bson::from_slice(db_spec.as_bytes()).map_err(crate::error::Error::from)
})
.collect()
})
self.client
.execute_operation(op, self.session)
.await
.and_then(|dbs| {
dbs.into_iter()
.map(|db_spec| {
bson::from_slice(db_spec.as_bytes()).map_err(crate::error::Error::from)
})
.collect()
})
}
.boxed()
}
}

impl<'a> IntoFuture for ListDatabases<'a, ListNames> {
type Output = Result<Vec<String>>;
type IntoFuture = BoxFuture<'a, Self::Output>;
action_impl! {
impl Action<'a> for ListDatabases<'a, ListNames> {
type Future = ListNamesFuture;

fn into_future(self) -> Self::IntoFuture {
async {
async fn execute(self) -> Result<Vec<String>> {
let op = op::ListDatabases::new(true, self.options);
match self.client.execute_operation(op, self.session).await {
Ok(databases) => databases
Expand All @@ -133,7 +128,7 @@ impl<'a> IntoFuture for ListDatabases<'a, ListNames> {
.get_str("name")
.map_err(|_| ErrorKind::InvalidResponse {
message: "Expected \"name\" field in server response, but it was \
not found"
not found"
.to_string(),
})?;
Ok(name.to_string())
Expand All @@ -142,17 +137,5 @@ impl<'a> IntoFuture for ListDatabases<'a, ListNames> {
Err(e) => Err(e),
}
}
.boxed()
}
}

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
impl<'a, M> ListDatabases<'a, M>
where
Self: IntoFuture,
{
/// Synchronously execute this action.
pub fn run(self) -> <Self as IntoFuture>::Output {
crate::runtime::block_on(self.into_future())
}
}
51 changes: 51 additions & 0 deletions src/action/perf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
pub use crate::client::action::perf::WarmConnectionPoolFuture;
use crate::Client;

impl Client {
/// Add connections to the connection pool up to `min_pool_size`. This is normally not needed -
/// the connection pool will be filled in the background, and new connections created as needed
/// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of
/// creating new connections up-front so that individual operations execute as quickly as
/// possible.
///
/// Note that topology changes require rebuilding the connection pool, so this method cannot
/// guarantee that the pool will always be filled for the lifetime of the `Client`.
///
/// Does nothing if `min_pool_size` is unset or zero.
///
/// `await` will return `()`.
pub fn warm_connection_pool(&self) -> WarmConnectionPool {
WarmConnectionPool { client: self }
}
}

#[cfg(any(feature = "sync", feature = "tokio-sync"))]
impl crate::sync::Client {
/// Add connections to the connection pool up to `min_pool_size`. This is normally not needed -
/// the connection pool will be filled in the background, and new connections created as needed
/// up to `max_pool_size`. However, it can sometimes be preferable to pay the (larger) cost of
/// creating new connections up-front so that individual operations execute as quickly as
/// possible.
///
/// Note that topology changes require rebuilding the connection pool, so this method cannot
/// guarantee that the pool will always be filled for the lifetime of the `Client`.
///
/// Does nothing if `min_pool_size` is unset or zero.
///
/// `await` will return `()`.
///
/// [`run`](WarmConnectionPool::run) will return `()`.
pub fn warm_connection_pool(&self) -> WarmConnectionPool {
self.async_client.warm_connection_pool()
}
}

/// Add connections to the connection pool up to `min_pool_size`. Create by calling
/// [`Client::warm_connection_pool`] and execute with `await` (or [`run`](WarmConnectionPool::run)
/// if using the sync client).
#[must_use]
pub struct WarmConnectionPool<'a> {
pub(crate) client: &'a Client,
}

// IntoFuture impl in src/client/action/perf.rs
Loading

0 comments on commit 0f928c9

Please sign in to comment.