Skip to content

Commit

Permalink
Add "drain" feature for subscriptions and connections
Browse files Browse the repository at this point in the history
Co-authored-by: jsudano <jsudano@cisco.com>
  • Loading branch information
jsudano and jsudano authored Nov 16, 2024
1 parent 79bf679 commit b5d0bdf
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 1 deletion.
43 changes: 43 additions & 0 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,39 @@ impl Client {
Ok(())
}

/// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
/// messages, then closes the connection. Once completed, any associated streams associated with the
/// client will be closed, and further client commands will fail
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut subscription = client.subscribe("events.>").await?;
///
/// client.drain().await?;
///
/// # // existing subscriptions are closed and further commands will fail
/// assert!(subscription.next().await.is_none());
/// client
/// .subscribe("events.>")
/// .await
/// .expect_err("Expected further commands to fail");
///
/// # Ok(())
/// # }
/// ```
pub async fn drain(&self) -> Result<(), DrainError> {
// Drain all subscriptions
self.sender.send(Command::Drain { sid: None }).await?;

// Remaining process is handled on the handler-side
Ok(())
}

/// Returns the current state of the connection.
///
/// # Examples
Expand Down Expand Up @@ -808,6 +841,16 @@ impl From<tokio::sync::mpsc::error::SendError<Command>> for SubscribeError {
}
}

#[derive(Error, Debug)]
#[error("failed to send drain: {0}")]
pub struct DrainError(#[source] crate::Error);

impl From<tokio::sync::mpsc::error::SendError<Command>> for DrainError {
fn from(err: tokio::sync::mpsc::error::SendError<Command>) -> Self {
DrainError(Box::new(err))
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum RequestErrorKind {
/// There are services listening on requested subject, but they didn't respond
Expand Down
93 changes: 92 additions & 1 deletion async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ pub(crate) enum Command {
Flush {
observer: oneshot::Sender<()>,
},
Drain {
sid: Option<u64>,
},
Reconnect,
}

Expand Down Expand Up @@ -411,6 +414,7 @@ struct Subscription {
queue_group: Option<String>,
delivered: u64,
max: Option<u64>,
is_draining: bool,
}

#[derive(Debug)]
Expand All @@ -431,6 +435,7 @@ pub(crate) struct ConnectionHandler {
ping_interval: Interval,
should_reconnect: bool,
flush_observers: Vec<oneshot::Sender<()>>,
is_draining: bool,
}

impl ConnectionHandler {
Expand All @@ -453,6 +458,7 @@ impl ConnectionHandler {
ping_interval,
should_reconnect: false,
flush_observers: Vec::new(),
is_draining: false,
}
}

Expand Down Expand Up @@ -532,6 +538,20 @@ impl ConnectionHandler {
}
}

// Before handling any commands, drop any subscriptions which are draining
// Note: safe to assume subscription drain has completed at this point, as we would have flushed
// all outgoing UNSUB messages in the previous call to this fn, and we would have processed and
// delivered any remaining messages to the subscription in the loop above.
self.handler.subscriptions.retain(|_, s| !s.is_draining);

if self.handler.is_draining {
// The entire connection is draining. This means we flushed outgoing messages in the previous
// call to this fn, we handled any remaining messages from the server in the loop above, and
// all subs were drained, so drain is complete and we should exit instead of processing any
// further messages
return Poll::Ready(ExitReason::Closed);
}

// WARNING: after the following loop `handle_command`,
// or other functions which call `enqueue_write_op`,
// cannot be called anymore. Runtime wakeups won't
Expand Down Expand Up @@ -630,7 +650,11 @@ impl ConnectionHandler {
};
debug!("reconnected");
}
ExitReason::Closed => break,
ExitReason::Closed => {
// Safe to ignore result as we're shutting down anyway
self.connector.events_tx.try_send(Event::Closed).ok();
break;
}
ExitReason::ReconnectRequested => {
debug!("reconnect requested");
// Should be ok to ingore error, as that means we are not in connected state.
Expand Down Expand Up @@ -773,6 +797,26 @@ impl ConnectionHandler {
Command::Flush { observer } => {
self.flush_observers.push(observer);
}
Command::Drain { sid } => {
let mut drain_sub = |sid: u64, sub: &mut Subscription| {
sub.is_draining = true;
self.connection
.enqueue_write_op(&ClientOp::Unsubscribe { sid, max: None });
};

if let Some(sid) = sid {
if let Some(sub) = self.subscriptions.get_mut(&sid) {
drain_sub(sid, sub);
}
} else {
// sid isn't set, so drain the whole client
self.connector.events_tx.try_send(Event::Draining).ok();
self.is_draining = true;
for (&sid, sub) in self.subscriptions.iter_mut() {
drain_sub(sid, sub);
}
}
}
Command::Subscribe {
sid,
subject,
Expand All @@ -785,6 +829,7 @@ impl ConnectionHandler {
max: None,
subject: subject.to_owned(),
queue_group: queue_group.to_owned(),
is_draining: false,
};

self.subscriptions.insert(sid, subscription);
Expand Down Expand Up @@ -1026,6 +1071,8 @@ pub enum Event {
Connected,
Disconnected,
LameDuckMode,
Draining,
Closed,
SlowConsumer(u64),
ServerError(ServerError),
ClientError(ClientError),
Expand All @@ -1037,6 +1084,8 @@ impl fmt::Display for Event {
Event::Connected => write!(f, "connected"),
Event::Disconnected => write!(f, "disconnected"),
Event::LameDuckMode => write!(f, "lame duck mode detected"),
Event::Draining => write!(f, "draining"),
Event::Closed => write!(f, "closed"),
Event::SlowConsumer(sid) => write!(f, "slow consumers for subscription {sid}"),
Event::ServerError(err) => write!(f, "server error: {err}"),
Event::ClientError(err) => write!(f, "client error: {err}"),
Expand Down Expand Up @@ -1251,6 +1300,48 @@ impl Subscriber {
.await?;
Ok(())
}

/// Unsubscribes immediately but leaves the subscription open to allow any in-flight messages
/// on the subscription to be delivered. The stream will be closed after any remaining messages
/// are delivered
///
/// # Examples
/// ```no_run
/// # use futures::StreamExt;
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
///
/// let mut subscriber = client.subscribe("test").await?;
///
/// tokio::spawn({
/// let task_client = client.clone();
/// async move {
/// loop {
/// _ = task_client.publish("test", "data".into()).await;
/// }
/// }
/// });
///
/// client.flush().await?;
/// subscriber.drain().await?;
///
/// while let Some(message) = subscriber.next().await {
/// println!("message received: {:?}", message);
/// }
/// println!("no more messages, unsubscribed");
/// # Ok(())
/// # }
/// ```
pub async fn drain(&mut self) -> Result<(), UnsubscribeError> {
self.sender
.send(Command::Drain {
sid: Some(self.sid),
})
.await?;

Ok(())
}
}

#[derive(Error, Debug, PartialEq)]
Expand Down
138 changes: 138 additions & 0 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,4 +1042,142 @@ mod client {

assert_eq!(client.timeout(), None);
}

#[tokio::test]
async fn drain_subscription_basic() {
use std::error::Error;
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// confirm we receive that data
assert!(sub.next().await.is_some());

// now drain the subscription
let result = sub.drain().await;
match result {
Ok(()) => println!("ok"),
Err(err) => {
println!("error: {}", err);
println!("source: {:?}", err.source())
}
}

// assert the stream is closed after draining
assert!(sub.next().await.is_none());

// confirm we can still reconnect and send messages on a new subscription
let mut sub2 = client.subscribe("test2").await.unwrap();
client.publish("test2", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub2.next().await.is_some());
}

#[tokio::test]
async fn drain_subscription_unsub_after() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

sub.unsubscribe_after(120)
.await
.expect("Expected to send unsub_after");

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// Send the drain command
sub.drain().await.expect("Expected to drain the sub");

// we should receive all published data then close immediately
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_none());
}

#[tokio::test]
async fn drain_subscription_active() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

// spawn a task to constantly write to the subscription
let constant_writer = tokio::spawn({
let client = client.clone();
async move {
loop {
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
}
}
});

let mut sub = client.subscribe("test").await.unwrap();

// confirm we receive some data
assert!(sub.next().await.is_some());

// now drain the subscription
sub.drain().await.unwrap();

// yield to the runtime to ensure constant_writer gets a chance to publish a message or two to the subject
tokio::time::sleep(Duration::from_millis(1)).await;

// assert the subscription stream is closed after draining
let sleep_fut = async move { while sub.next().await.is_some() {} };
tokio::time::timeout(Duration::from_secs(10), sleep_fut)
.await
.expect("Expected stream to drain within 10s");

// assert constant_writer doesn't fail to write after the only sub is drained (i.e. client operations still work fine)
assert!(!constant_writer.is_finished());

// confirm we can still reconnect and receive messages on the same subject on a new subscription
let mut sub2 = client.subscribe("test").await.unwrap();
assert!(sub2.next().await.is_some());
}

#[tokio::test]
async fn drain_client_basic() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// confirm we receive that data
assert!(sub.next().await.is_some());

// now drain the client
client.drain().await.unwrap();

// assert the sub's stream is closed after draining
assert!(sub.next().await.is_none());

// we should not be able to perform any more operations on a drained client
client
.subscribe("test2")
.await
.expect_err("Expected client to be drained");

client
.publish("test", "data".into())
.await
.expect_err("Expected client to be drained");

// we should be able to connect with a new client
let _client2 = async_nats::connect(server.client_url())
.await
.expect("Expected to be able to create a new client");
}
}

0 comments on commit b5d0bdf

Please sign in to comment.