Skip to content

Commit

Permalink
Fix subscriptions and optional max_messages
Browse files Browse the repository at this point in the history
  • Loading branch information
davidMcneil committed Apr 21, 2018
1 parent 4a8bcce commit af4ff4b
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 10 deletions.
1 change: 0 additions & 1 deletion src/http_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub struct Config {
pub port: u16,
pub default_message_ttl: Duration,
pub default_ack_deadline: Duration,
pub default_return_immediately: bool,
pub default_max_messages: usize,
pub cleanup_interval: Duration,
pub max_pull_wait: Duration,
Expand Down
8 changes: 5 additions & 3 deletions src/http_protocol/subscription_handlers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#![cfg_attr(feature = "cargo-clippy", allow(needless_pass_by_value))]

use super::Config;
use super::types;
use super::Config;
use chrono::Duration;
use courier::SubscriptionMeta;
use registry::SharedRegistry;
use rocket::State;
use rocket::http::Status;
use rocket::response::status::Custom;
use rocket::State;
use rocket_contrib::Json;
use uuid::Uuid;

Expand Down Expand Up @@ -96,13 +96,15 @@ pub fn list(reg: State<SharedRegistry>) -> Json<types::SubscriptionList> {

#[post("/<name>/pull", data = "<config>")]
pub fn pull(
cfg: State<Config>,
reg: State<SharedRegistry>,
name: String,
config: Json<types::PullConfig>,
) -> Option<Json<types::MessageList>> {
let mut reg = reg.write().unwrap();
let config = config.into_inner();
reg.pull(&name, config.max_messages)
let max = config.max_messages.unwrap_or(cfg.default_max_messages);
reg.pull(&name, max)
.map(|messages| Json(types::MessageList::new(messages)))
}

Expand Down
18 changes: 16 additions & 2 deletions src/http_protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ fn get_client() -> (Config, Client) {
port: 3140,
default_message_ttl: Duration::seconds(3600),
default_ack_deadline: Duration::seconds(60),
default_return_immediately: false,
default_max_messages: 1,
cleanup_interval: Duration::seconds(1),
max_pull_wait: Duration::seconds(5),
Expand Down Expand Up @@ -440,6 +439,21 @@ fn http_protocol_lists() {
let mut actual = response_as::<SubscriptionNameList>(&mut response);
actual.subscriptions.sort();
assert_eq!(expected, actual);
// Delete a subscription
client
.delete("api/v0/subscriptions/subscription1")
.header(ContentType::JSON)
.dispatch();
// List the topic subscriptions
let expected = SubscriptionNameList::new(vec![String::from("subscription0")]);
let mut response = client
.get("api/v0/topics/topic0/subscriptions")
.header(ContentType::JSON)
.dispatch();
assert_eq!(Status::Ok, response.status());
let mut actual = response_as::<SubscriptionNameList>(&mut response);
actual.subscriptions.sort();
assert_eq!(expected, actual);
}

#[test]
Expand Down Expand Up @@ -497,7 +511,7 @@ fn http_protocol_basic() {
.dispatch();
let messages = response_as::<MessageList>(&mut response).messages;
assert_eq!(Status::Ok, response.status());
assert_eq!(pull_config.max_messages, messages.len());
assert_eq!(pull_config.max_messages.unwrap(), messages.len());
assert_eq!(messages[0].data, String::from("first"));
assert_eq!(messages[1].data, String::from("second"));
// Try and pull messages from historical subscription1
Expand Down
6 changes: 4 additions & 2 deletions src/http_protocol/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ impl MessageList {

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct PullConfig {
pub max_messages: usize,
pub max_messages: Option<usize>,
}

impl PullConfig {
#[allow(dead_code)]
pub fn new(max_messages: usize) -> Self {
Self { max_messages }
Self {
max_messages: Some(max_messages),
}
}
}
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub fn main() {
port: opt.port,
default_message_ttl: Duration::seconds(3600),
default_ack_deadline: Duration::seconds(60),
default_return_immediately: false,
default_max_messages: 1,
cleanup_interval: Duration::seconds(1),
max_pull_wait: Duration::seconds(5),
Expand Down
11 changes: 10 additions & 1 deletion src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ impl Registry {
}

pub fn delete_subscription(&mut self, subscription_name: &str) -> bool {
self.subscriptions.remove(subscription_name).is_some()
let subscription = self.subscriptions.remove(subscription_name);
match subscription {
Some(sub) => {
if let Some(topic_store) = self.topics.get_mut(&sub.topic) {
topic_store.subscriptions.remove(&sub.name);
}
true
}
None => false,
}
}

pub fn get_subscription(&self, subscription_name: &str) -> Option<SubscriptionMeta> {
Expand Down

0 comments on commit af4ff4b

Please sign in to comment.