Skip to content

Commit

Permalink
Performance Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 12, 2024
1 parent c05120b commit 411cf3c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/http/controllers/debug_controller/get_queues_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn handle_request(
{
let topic_data = topic.get_access().await;

for queue in topic_data.queues.get_queues() {
for queue in topic_data.queues.get_all() {
let mut subscribers = Vec::new();

if let Some(the_subscribers) = queue.subscribers.get_all() {
Expand Down
2 changes: 1 addition & 1 deletion src/http/controllers/queues/get_list_of_queues_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn handle_request(

{
let topic_data = topic.get_access().await;
for queue in topic_data.queues.get_queues() {
for queue in topic_data.queues.get_all() {
result.push(queue.queue_id.clone());
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/operations/queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn delete_queue(

let mut topic_data = topic.get_access().await;

topic_data.queues.delete_queue(queue_id);
topic_data.queues.remove(queue_id);

app.prometheus.queue_is_deleted(topic_id, queue_id);

Expand Down
86 changes: 48 additions & 38 deletions src/queues/queues_list.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::collections::{
btree_map::{Values, ValuesMut},
BTreeMap,
};

use my_service_bus::abstractions::{
queue_with_intervals::QueueWithIntervals, subscriber::TopicQueueType,
};
Expand All @@ -12,14 +7,14 @@ use crate::{queue_subscribers::QueueSubscriber, sessions::SessionId, topics::Top
use super::queue::TopicQueue;

pub struct TopicQueuesList {
queues: BTreeMap<String, TopicQueue>,
queues: Vec<TopicQueue>,
snapshot_id: usize,
}

impl TopicQueuesList {
pub fn new() -> Self {
TopicQueuesList {
queues: BTreeMap::new(),
queues: Vec::new(),
snapshot_id: 0,
}
}
Expand All @@ -28,21 +23,51 @@ impl TopicQueuesList {
self.snapshot_id
}

fn has_the_queue(&self, queue_id: &str) -> bool {
for queue in &self.queues {
if queue.queue_id == queue_id {
return true;
}
}

return false;
}

pub fn get(&self, queue_id: &str) -> Option<&TopicQueue> {
for queue in &self.queues {
if queue.queue_id == queue_id {
return Some(queue);
}
}

return None;
}

pub fn get_mut(&mut self, queue_id: &str) -> Option<&mut TopicQueue> {
for queue in &mut self.queues {
if queue.queue_id == queue_id {
return Some(queue);
}
}

return None;
}

pub fn add_queue_if_not_exists(
&mut self,
topic_id: String,
queue_id: String,
queue_type: TopicQueueType,
) -> &mut TopicQueue {
if !self.queues.contains_key(queue_id.as_str()) {
if !self.has_the_queue(queue_id.as_str()) {
let queue = TopicQueue::new(topic_id, queue_id.to_string(), queue_type);

self.queues.insert(queue_id.to_string(), queue);
self.queues.push(queue);

self.snapshot_id += 1;
}

let result = self.queues.get_mut(queue_id.as_str()).unwrap();
let result = self.get_mut(queue_id.as_str()).unwrap();

result.update_queue_type(queue_type);

Expand All @@ -58,43 +83,32 @@ impl TopicQueuesList {
) -> &TopicQueue {
let topic_queue = TopicQueue::restore(topic_id, queue_id.to_string(), queue_type, queue);

self.queues.insert(queue_id.to_string(), topic_queue);
self.queues.push(topic_queue);

self.snapshot_id += 1;

return self.queues.get(queue_id.as_str()).unwrap();
}

pub fn get(&self, queue_id: &str) -> Option<&TopicQueue> {
return self.queues.get(queue_id);
}

pub fn get_mut(&mut self, queue_id: &str) -> Option<&mut TopicQueue> {
return self.queues.get_mut(queue_id);
return self.get(queue_id.as_str()).unwrap();
}

pub fn delete_queue(&mut self, queue_id: &str) -> Option<TopicQueue> {
let result = self.queues.remove(queue_id);
pub fn remove(&mut self, queue_id: &str) -> Option<TopicQueue> {
let index = self.queues.iter().position(|x| x.queue_id == queue_id)?;
let result = self.queues.remove(index);
self.snapshot_id += 1;
result
}

pub fn get_queues(&self) -> Values<String, TopicQueue> {
self.queues.values()
Some(result)
}

pub fn get_all(&self) -> Values<String, TopicQueue> {
self.queues.values()
pub fn get_all(&self) -> impl Iterator<Item = &TopicQueue> {
self.queues.iter()
}

pub fn get_all_mut(&mut self) -> ValuesMut<String, TopicQueue> {
self.queues.values_mut()
pub fn get_all_mut(&mut self) -> impl Iterator<Item = &mut TopicQueue> {
self.queues.iter_mut()
}

pub fn get_snapshot_to_persist(&self) -> Vec<TopicQueueSnapshot> {
let mut result = Vec::new();

for queue in self.queues.values() {
for queue in self.get_all() {
let get_snapshot_to_persist_result = queue.get_snapshot_to_persist();

if let Some(snapshot_to_persist) = get_snapshot_to_persist_result {
Expand All @@ -104,14 +118,10 @@ impl TopicQueuesList {
return result;
}

pub fn remove(&mut self, queue_id: &str) -> Option<TopicQueue> {
self.delete_queue(queue_id)
}

pub fn get_queues_with_no_subscribers(&self) -> Option<Vec<&TopicQueue>> {
let mut result = None;

for queue in self.queues.values() {
for queue in self.get_all() {
if queue.subscribers.get_amount() > 0 {
continue;
}
Expand All @@ -132,7 +142,7 @@ impl TopicQueuesList {
) -> Option<Vec<(&mut TopicQueue, QueueSubscriber)>> {
let mut result = None;

for queue in self.queues.values_mut() {
for queue in self.get_all_mut() {
let remove_result = queue.subscribers.remove_by_session_id(session_id);
if let Some(sub) = remove_result {
if result.is_none() {
Expand Down
6 changes: 3 additions & 3 deletions src/topics/topics_list_inner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use my_service_bus::abstractions::MessageId;
Expand All @@ -7,14 +7,14 @@ use my_service_bus::shared::validators::InvalidTopicName;
use super::Topic;

pub struct TopicListInner {
topics: BTreeMap<String, Arc<Topic>>,
topics: HashMap<String, Arc<Topic>>,
snapshot_id: usize,
}

impl TopicListInner {
pub fn new() -> Self {
Self {
topics: BTreeMap::new(),
topics: HashMap::new(),
snapshot_id: 0,
}
}
Expand Down

0 comments on commit 411cf3c

Please sign in to comment.