Skip to content

Commit

Permalink
Add customizable queue groups to service API
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Sep 19, 2023
1 parent fe79b4d commit 163ca2d
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 24 deletions.
5 changes: 5 additions & 0 deletions async-nats/src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) struct Inner {
pub(crate) last_error: Option<error::Error>,
/// Custom data added by [Config::stats_handler]
pub(crate) data: String,
/// Queue group to which this endpoint is assigned to.
pub(crate) queue_group: String,
}

impl From<Inner> for Stats {
Expand All @@ -146,6 +148,7 @@ impl From<Inner> for Stats {
average_processing_time: inner.average_processing_time,
last_error: inner.last_error,
data: inner.data,
queue_group: inner.queue_group,
}
}
}
Expand Down Expand Up @@ -177,4 +180,6 @@ pub struct Stats {
pub last_error: Option<error::Error>,
/// Custom data added by [crate::service::Config::stats_handler]
pub data: String,
/// Queue group to which this endpoint is assigned to.
pub queue_group: String,
}
115 changes: 91 additions & 24 deletions async-nats/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{Client, Error, HeaderMap, Message, PublishError, Subscriber};
use self::endpoint::Endpoint;

const SERVICE_API_PREFIX: &str = "$SRV";
const QUEUE_GROUP: &str = "q";
const DEFAULT_QUEUE_GROUP: &str = "q";
pub const NATS_SERVICE_ERROR: &str = "Nats-Service-Error";
pub const NATS_SERVICE_ERROR_CODE: &str = "Nats-Service-Error-Code";

Expand Down Expand Up @@ -113,13 +113,16 @@ pub struct Config {
pub stats_handler: Option<StatsHandler>,
/// Additional service metadata
pub metadata: Option<HashMap<String, String>>,
/// Custom queue group config
pub queue_group: Option<String>,
}

pub struct ServiceBuilder {
client: Client,
description: Option<String>,
stats_handler: Option<StatsHandler>,
metadata: Option<HashMap<String, String>>,
queue_group: Option<String>,
}

impl ServiceBuilder {
Expand All @@ -129,6 +132,7 @@ impl ServiceBuilder {
description: None,
stats_handler: None,
metadata: None,
queue_group: None,
}
}

Expand All @@ -153,7 +157,13 @@ impl ServiceBuilder {
self
}

/// Stats the service with configured options.
/// Custom queue group. Default is `q`.
pub fn queue_group<S: ToString>(mut self, queue_group: S) -> Self {
self.queue_group = Some(queue_group.to_string());
self
}

/// Starts the service with configured options.
pub async fn start<S: ToString>(self, name: S, version: S) -> Result<Service, Error> {
Service::add(
self.client,
Expand All @@ -163,6 +173,7 @@ impl ServiceBuilder {
description: self.description,
stats_handler: self.stats_handler,
metadata: self.metadata,
queue_group: self.queue_group,
},
)
.await
Expand Down Expand Up @@ -208,6 +219,7 @@ pub trait ServiceExt {
/// description: None,
/// stats_handler: None,
/// metadata: None,
/// queue_group: None,
/// })
/// .await?;
///
Expand Down Expand Up @@ -273,16 +285,7 @@ impl ServiceExt for crate::Client {
/// use async_nats::service::ServiceExt;
/// use futures::StreamExt;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut service = client
/// .add_service(async_nats::service::Config {
/// name: "generator".to_string(),
/// version: "1.0.0".to_string(),
/// description: None,
/// stats_handler: None,
/// metadata: None,
/// })
/// .await?;
///
/// let mut service = client.service_builder().start("generator", "1.0.0").await?;
/// let mut endpoint = service.endpoint("get").await?;
///
/// if let Some(request) = endpoint.next().await {
Expand All @@ -300,6 +303,7 @@ pub struct Service {
handle: JoinHandle<Result<(), Error>>,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
subjects: Arc<Mutex<Vec<String>>>,
queue_group: String,
}

impl Service {
Expand All @@ -318,6 +322,9 @@ impl Service {
"service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
)));
}
let queue_group = config
.queue_group
.unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
let id = nuid::next().to_string();
let started = time::OffsetDateTime::now_utc();
let subjects = Arc::new(Mutex::new(Vec::new()));
Expand Down Expand Up @@ -404,6 +411,7 @@ impl Service {
handle,
shutdown_tx,
subjects,
queue_group,
})
}
/// Stops this instance of the [Service].
Expand Down Expand Up @@ -442,7 +450,7 @@ impl Service {
self.info.clone()
}

/// Creates a group for endpoints under common prefix prefix.
/// Creates a group for endpoints under common prefix.
///
/// # Examples
///
Expand All @@ -459,12 +467,37 @@ impl Service {
/// # }
/// ```
pub fn group<S: ToString>(&self, prefix: S) -> Group {
self.group_with_queue_group(prefix, self.queue_group.clone())
}

/// Creates a group for endpoints under common prefix with custom queue group.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::service::ServiceExt;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut service = client.service_builder().start("service", "1.0.0").await?;
///
/// let v1 = service.group("v1");
/// let products = v1.endpoint("products").await?;
/// # Ok(())
/// # }
/// ```
pub fn group_with_queue_group<S: ToString, Z: ToString>(
&self,
prefix: S,
queue_group: Z,
) -> Group {
Group {
subjects: self.subjects.clone(),
prefix: prefix.to_string(),
stats: self.endpoints_state.clone(),
client: self.client.clone(),
shutdown_tx: self.shutdown_tx.clone(),
queue_group: queue_group.to_string(),
}
}

Expand Down Expand Up @@ -493,6 +526,7 @@ impl Service {
self.endpoints_state.clone(),
self.shutdown_tx.clone(),
self.subjects.clone(),
self.queue_group.clone(),
)
}

Expand All @@ -517,6 +551,7 @@ impl Service {
self.endpoints_state.clone(),
self.shutdown_tx.clone(),
self.subjects.clone(),
self.queue_group.clone(),
)
.add(subject)
.await
Expand All @@ -529,10 +564,11 @@ pub struct Group {
client: Client,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
subjects: Arc<Mutex<Vec<String>>>,
queue_group: String,
}

impl Group {
/// Creates a group for endpoints under common prefix prefix.
/// Creates a group for [Endpoints][Endpoint] under common prefix.
///
/// # Examples
///
Expand All @@ -549,12 +585,37 @@ impl Group {
/// # }
/// ```
pub fn group<S: ToString>(&self, prefix: S) -> Group {
self.group_with_queue_group(prefix, self.queue_group.clone())
}

/// Creates a group for [Endpoints][Endpoint] under common prefix with custom queue group.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use async_nats::service::ServiceExt;
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut service = client.service_builder().start("service", "1.0.0").await?;
///
/// let v1 = service.group("v1");
/// let products = v1.endpoint("products").await?;
/// # Ok(())
/// # }
/// ```
pub fn group_with_queue_group<S: ToString, Z: ToString>(
&self,
prefix: S,
queue_group: Z,
) -> Group {
Group {
prefix: prefix.to_string(),
stats: self.stats.clone(),
client: self.client.clone(),
shutdown_tx: self.shutdown_tx.clone(),
subjects: self.subjects.clone(),
queue_group: queue_group.to_string(),
}
}

Expand All @@ -580,6 +641,7 @@ impl Group {
self.stats.clone(),
self.shutdown_tx.clone(),
self.subjects.clone(),
self.queue_group.clone(),
)
.add(format!("{}.{}", self.prefix, subject.to_string()))
.await
Expand Down Expand Up @@ -607,6 +669,7 @@ impl Group {
self.stats.clone(),
self.shutdown_tx.clone(),
self.subjects.clone(),
self.queue_group.clone(),
)
}
}
Expand Down Expand Up @@ -654,14 +717,8 @@ impl Request {
/// use async_nats::service::ServiceExt;
/// use futures::StreamExt;
/// # let client = async_nats::connect("demo.nats.io").await?;
/// # let mut service = client.add_service(async_nats::service::Config {
/// # name: "generator".to_string(),
/// # version: "1.0.0".to_string(),
/// # description: None,
/// # stats_handler: None,
/// # metadata: None,
/// # }).await?;
///
/// # let mut service = client
/// # .service_builder().start("serviceA", "1.0.0.1").await?;
/// let mut endpoint = service.endpoint("endpoint").await?;
/// let request = endpoint.next().await.unwrap();
/// request.respond(Ok("hello".into())).await?;
Expand Down Expand Up @@ -693,7 +750,6 @@ impl Request {
};
let elapsed = self.issued.elapsed();
let mut stats = self.stats.lock().unwrap();
// let mut stats = stats.endpoints.entry(key)
let stats = stats.endpoints.get_mut(self.endpoint.as_str()).unwrap();
stats.requests += 1;
stats.processing_time += elapsed;
Expand All @@ -710,6 +766,7 @@ pub struct EndpointBuilder {
name: Option<String>,
metadata: Option<HashMap<String, String>>,
subjects: Arc<Mutex<Vec<String>>>,
queue_group: String,
}

impl EndpointBuilder {
Expand All @@ -718,6 +775,7 @@ impl EndpointBuilder {
stats: Arc<Mutex<Endpoints>>,
shutdown_tx: Sender<()>,
subjects: Arc<Mutex<Vec<String>>>,
queue_group: String,
) -> EndpointBuilder {
EndpointBuilder {
client,
Expand All @@ -726,6 +784,7 @@ impl EndpointBuilder {
shutdown_tx,
name: None,
metadata: None,
queue_group,
}
}

Expand All @@ -741,13 +800,19 @@ impl EndpointBuilder {
self
}

/// Custom queue group for the [Endpoint]. Otherwise it will be derived from group or service.
pub fn queue_group<S: ToString>(mut self, queue_group: S) -> EndpointBuilder {
self.queue_group = queue_group.to_string();
self
}

/// Finalizes the builder and adds the [Endpoint].
pub async fn add<S: ToString>(self, subject: S) -> Result<Endpoint, Error> {
let subject = subject.to_string();
let name = self.name.clone().unwrap_or_else(|| subject.clone());
let requests = self
.client
.queue_subscribe(subject.clone(), QUEUE_GROUP.to_string())
.queue_subscribe(subject.clone(), self.queue_group.clone())
.await?;
debug!("created service for endpoint {subject}");

Expand All @@ -759,7 +824,9 @@ impl EndpointBuilder {
.entry(subject.clone())
.or_insert(endpoint::Inner {
name,
subject: subject.clone(),
metadata: self.metadata.unwrap_or_default(),
queue_group: self.queue_group.clone(),
..Default::default()
});
self.subjects.lock().unwrap().push(subject.clone());
Expand Down
Loading

0 comments on commit 163ca2d

Please sign in to comment.