Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement setup for bitbucket #3

Merged
merged 5 commits into from
Oct 7, 2023
Merged

Implement setup for bitbucket #3

merged 5 commits into from
Oct 7, 2023

Conversation

tapishr
Copy link
Member

@tapishr tapishr commented Sep 15, 2023

Start with pubsub/listener.rs and go with the call flow to easily review

Summary by CodeRabbit

  • New Feature: Added support for Bitbucket integration, including OAuth2 authorization, workspace retrieval, repository fetching, and webhook handling.
  • New Feature: Implemented database operations for storing and retrieving authentication information, workspaces, repositories, users, and webhooks using the sled database.
  • New Feature: Introduced a new module for setting up Bitbucket installations, which includes cloning Git repositories and processing webhooks.
  • New Feature: Enhanced PubSub listener to process different message types asynchronously.
  • Refactor: Organized code into modules based on functionality for better maintainability and readability.

@tapishr tapishr changed the base branch from tr/pubsub to main September 15, 2023 18:55
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Sep 16, 2023

Walkthrough

The pull request introduces a comprehensive integration with Bitbucket API, including OAuth2 authorization, data retrieval from various endpoints (workspaces, repositories, users, webhooks), and saving the fetched data to a local sled database. It also includes PubSub message processing and setup for Bitbucket repository installation.

Changes

File(s) Summary
.../bitbucket/auth.rs, .../bitbucket/config.rs Adds functions for OAuth2 authorization and API calls to Bitbucket.
.../bitbucket/mod.rs, .../bitbucket/workspace.rs, .../bitbucket/repo.rs, .../bitbucket/user.rs, .../bitbucket/webhook.rs Introduces modules for handling different aspects of Bitbucket data: workspaces, repositories, users, and webhooks.
.../core/mod.rs, .../core/setup.rs Adds a new module and function for handling Bitbucket repository installation.
.../db/auth.rs, .../db/config.rs, .../db/mod.rs, .../db/owner.rs, .../db/repo.rs, .../db/user.rs, .../db/webhook.rs Implements database operations for storing authentication info, workspaces, repositories, users, and webhooks using the sled database.
.../main.rs Modifies the main function to include new imports and use environment variables.
.../pubsub/listener.rs, .../pubsub/mod.rs Adds a new module and modifies existing code for processing PubSub messages.
.../utils/auth.rs, .../utils/mod.rs, .../utils/owner.rs, .../utils/repo.rs, .../utils/user.rs, .../utils/webhook.rs Introduces new utility modules and data structures for handling authentication, workspaces, repositories, users, and webhooks.

Tips

Chat with CodeRabbit Bot (@coderabbitai)

  • Mention @coderabbitai in any review comment for bot assistance.
  • Note: Review comments are made on code diffs or files, not on the PR overview.

Pause Incremental Reviews

  • Insert @coderabbitai: ignore in the PR description to halt the bot's ongoing reviews. Remove the line to resume.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 50

Commits Files that changed from the base of the PR and between 87ce51d and 419cdde commits.
Files selected for processing (25)
  • vibi-dpu/src/bitbucket/auth.rs (1 hunks)
  • vibi-dpu/src/bitbucket/config.rs (1 hunks)
  • vibi-dpu/src/bitbucket/mod.rs (1 hunks)
  • vibi-dpu/src/bitbucket/repo.rs (1 hunks)
  • vibi-dpu/src/bitbucket/user.rs (1 hunks)
  • vibi-dpu/src/bitbucket/webhook.rs (1 hunks)
  • vibi-dpu/src/bitbucket/workspace.rs (1 hunks)
  • vibi-dpu/src/core/mod.rs (1 hunks)
  • vibi-dpu/src/core/setup.rs (1 hunks)
  • vibi-dpu/src/db/auth.rs (1 hunks)
  • vibi-dpu/src/db/config.rs (1 hunks)
  • vibi-dpu/src/db/mod.rs (1 hunks)
  • vibi-dpu/src/db/owner.rs (1 hunks)
  • vibi-dpu/src/db/repo.rs (1 hunks)
  • vibi-dpu/src/db/user.rs (1 hunks)
  • vibi-dpu/src/db/webhook.rs (1 hunks)
  • vibi-dpu/src/main.rs (1 hunks)
  • vibi-dpu/src/pubsub/listener.rs (1 hunks)
  • vibi-dpu/src/pubsub/mod.rs (1 hunks)
  • vibi-dpu/src/utils/auth.rs (1 hunks)
  • vibi-dpu/src/utils/mod.rs (1 hunks)
  • vibi-dpu/src/utils/owner.rs (1 hunks)
  • vibi-dpu/src/utils/repo.rs (1 hunks)
  • vibi-dpu/src/utils/user.rs (1 hunks)
  • vibi-dpu/src/utils/webhook.rs (1 hunks)
Files skipped from review due to trivial changes (6)
  • vibi-dpu/src/bitbucket/mod.rs
  • vibi-dpu/src/bitbucket/repo.rs
  • vibi-dpu/src/core/mod.rs
  • vibi-dpu/src/db/mod.rs
  • vibi-dpu/src/pubsub/mod.rs
  • vibi-dpu/src/utils/mod.rs
Additional comments (Suppressed): 9
vibi-dpu/src/bitbucket/workspace.rs (1)
  • 4-14: Ensure that all calls to get_bitbucket_workspaces throughout the codebase have been updated to handle the returned Vec<Workspace>. Also, verify that the returned Vec<Workspace> is being used appropriately where this function is called.
vibi-dpu/src/db/auth.rs (1)
  • 12-12: The timestamp is being set every time save_auth_info_to_db is called. If this function is called multiple times with the same AuthInfo, the timestamp will be updated each time. If this is not the intended behavior, consider setting the timestamp only if it's not already set.
vibi-dpu/src/bitbucket/auth.rs (1)
  • 48-48: Ensure that the save_auth_info_to_db function handles any potential errors or exceptions that might occur during the database operation.
vibi-dpu/src/utils/owner.rs (1)
  • 1-33: The Workspace struct and its methods look good. It's great to see the use of getter methods for accessing private fields, which is a good practice for encapsulation in Rust.
vibi-dpu/src/main.rs (1)
  • 8-21: Ensure that all necessary environment variables (GCP_CREDENTIALS, INSTALL_ID) are properly set in the deployment environment where this code will run.
vibi-dpu/src/utils/repo.rs (2)
  • 19-41: The constructor method new is correctly implemented. It's good that you are using Option<String> for local_dir as it might not always be available.

  • 44-78: Getter methods are correctly implemented and return references to avoid unnecessary cloning. Good job!

vibi-dpu/src/utils/user.rs (1)
  • 1-9: The ProviderEnum is well defined and implements the Display trait for easy conversion to string. This will be useful in logging and error messages.
vibi-dpu/src/bitbucket/config.rs (1)
  • 1-8:

Comment on lines +9 to +11
let base_url = bitbucket_base_url();
let members_url = format!("{}/workspaces/{}/members", &base_url, workspace_id);
let response_json = get_api(&members_url, access_token, None).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get_api function is called without error handling. If the API call fails, it could cause a panic or unexpected behavior. Consider adding error handling to ensure that the program can recover gracefully from errors.

Comment on lines +12 to +22
for user_json in response_json {
let provider_id = user_json["user"]["uuid"].to_string().replace('"', "");
let user = User::new(
Provider::new(
provider_id,
ProviderEnum::Bitbucket),
user_json["user"]["display_name"].to_string().replace('"', ""),
user_json["workspace"]["slug"].to_string().replace('"', ""),
None);
save_user_to_db(&user);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code assumes that the JSON response will always have the expected structure and fields. If the API response changes or if there's an error in the response, this could lead to panics due to accessing non-existent fields. It would be better to use a library like serde_json to safely parse the JSON into a struct.

user_json["user"]["display_name"].to_string().replace('"', ""),
user_json["workspace"]["slug"].to_string().replace('"', ""),
None);
save_user_to_db(&user);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The save_user_to_db function is called without checking its result. If saving to the database fails, this could lead to data loss. Consider checking the result of this function and handling any errors that may occur.

Comment on lines +1 to +14
use super::config::{bitbucket_base_url, get_api};
use crate::db::owner::save_workspace_to_db;
use crate::utils::owner::Workspace;
pub async fn get_bitbucket_workspaces(access_token: &str) -> Vec<Workspace> {
let user_url = format!("{}/workspaces", bitbucket_base_url());
let response = get_api(&user_url, access_token, None).await;
let mut workspace_vec = Vec::new();
for workspace_json in response {
let val = serde_json::from_value::<Workspace>(workspace_json.clone()).expect("Unable to deserialize workspace");
save_workspace_to_db(&val);
workspace_vec.push(val);
}
return workspace_vec;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_bitbucket_workspaces is making an API call and directly saving the response to the database without any error handling. If the API call fails or returns an error, this could lead to unexpected behavior or crashes. Consider adding error handling for the API call and the database operation.

    let user_url = format!("{}/workspaces", bitbucket_base_url());
    let response = match get_api(&user_url, access_token, None).await {
        Ok(res) => res,
        Err(e) => {
            eprintln!("Failed to fetch workspaces: {}", e);
            return Vec::new();
        }
    };
    let mut workspace_vec = Vec::new();
    for workspace_json in response {
        let val = serde_json::from_value::<Workspace>(workspace_json.clone()).expect("Unable to deserialize workspace");
        if let Err(e) = save_workspace_to_db(&val) {
            eprintln!("Failed to save workspace to DB: {}", e);
            continue;
        }
        workspace_vec.push(val);
    }
    return workspace_vec;
}

Comment on lines +11 to +30
pub async fn get_webhooks_in_repo(workspace_slug: &str, repo_slug: &str, access_token: &str) -> Vec<Webhook> {
let url = format!("{}/repositories/{}/{}/hooks", bitbucket_base_url(), workspace_slug, repo_slug);
println!("Getting webhooks from {}", url);
let response_json = get_api(&url, access_token, None).await;
let mut webhooks = Vec::new();
for webhook_json in response_json {
let active = matches!(webhook_json["active"].to_string().trim_matches('"'), "true" | "false");
let webhook = Webhook::new(
webhook_json["uuid"].to_string(),
active,
webhook_json["created_at"].to_string().replace('"', ""),
webhook_json["events"].as_array().expect("Unable to deserialize events").into_iter()
.map(|events| events.as_str().expect("Unable to convert event").to_string()).collect(),
webhook_json["links"]["self"]["href"].to_string().replace('"', ""),
webhook_json["url"].to_string().replace('"', ""),
);
webhooks.push(webhook);
}
return webhooks;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_webhooks_in_repo does not handle potential errors that may occur during the API call in line 14. If the API call fails or returns an error status, this function will still attempt to process the response as if it were successful, which could lead to unexpected behavior or panics. Consider adding error handling for the API call.

Comment on lines +10 to +19
pub async fn get_api(url: &str, access_token: &str, params: Option<HashMap<&str, &str>> ) -> Vec<Value> {
let response_opt = call_get_api(url, access_token, &params).await;
println!("response of get_api = {:?}", &response_opt);
let (mut response_values, next_url) = deserialize_response(response_opt).await;
if next_url.is_some() {
let mut page_values = get_all_pages(next_url, access_token, &params).await;
response_values.append(&mut page_values);
}
return response_values;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_api is making a call to the Bitbucket API and handling pagination. However, it's using println! for logging which is not recommended in production code. Consider using a proper logging framework like log or slog for better control over log levels and outputs.

Comment on lines +21 to +54
pub async fn call_get_api(url: &str, access_token: &str, params: &Option<HashMap<&str, &str>> ) -> Option<Response>{
println!("GET api url = {}", url);
let client = reqwest::Client::new();
let mut headers = reqwest::header::HeaderMap::new();
headers.insert( reqwest::header::AUTHORIZATION,
format!("Bearer {}", access_token).parse().expect("Invalid auth header"), );
headers.insert("Accept",
"application/json".parse().expect("Invalid Accept header"));
match params {
Some(params) => {
match client.get(url).headers(headers).query(params).send().await {
Ok(response) => {
if response.status().is_success() {
return Some(response);
}
else { eprintln!("Failed to call API {}, status: {}", url, response.status()); }
},
Err(e) => { eprintln!("Error sending GET request to {}, error: {}", url, e); },
};
},
None => {
match client.get(url).headers(headers).send().await {
Ok(response) => {
if response.status().is_success() {
return Some(response);
}
else { eprintln!("Failed to call API {}, status: {}", url, response.status()); }
},
Err(e) => { eprintln!("Error sending GET request to {}, error: {}", url, e); },
};
}
};
return None;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function call_get_api is responsible for making GET requests to the Bitbucket API. It has some issues:

  1. The function is repeating code for the cases when params is Some and None. This repetition can be avoided by building the request separately from sending it.

  2. Error handling could be improved. Currently, the function prints an error message and returns None if the request fails. It would be more robust to return a Result type and let the caller decide how to handle errors.

  3. The function uses println! for logging which is not recommended in production code. Consider using a proper logging framework like log or slog.

Here's a suggested refactor:

pub async fn call_get_api(url: &str, access_token: &str, params: &Option<HashMap<&str, &str>> ) -> Result<Response, reqwest::Error> {
    let client = reqwest::Client::new();
    let mut headers = HeaderMap::new(); 
    headers.insert( reqwest::header::AUTHORIZATION, format!("Bearer {}", access_token).parse().expect("Invalid auth header"));
    headers.insert("Accept", "application/json".parse().expect("Invalid Accept header"));

    let request_builder = client.get(url).headers(headers);
    let request_builder = match params {
        Some(params) => request_builder.query(params),
        None => request_builder,
    };

    let response = request_builder.send().await?;
    if response.status().is_success() {
        Ok(response)
    } else {
        Err(reqwest::Error::new(reqwest::StatusCode::from_u16(response.status().as_u16()).unwrap(), "Failed to call API"))
    }
}

Comment on lines +56 to +76
async fn deserialize_response(response_opt: Option<Response>) -> (Vec<Value>, Option<String>) {
let values_vec = Vec::new();
match response_opt {
Some(response) => {
match response.json::<serde_json::Value>().await {
Ok(response_json) => {
let mut values_vec = Vec::new();
if let Some(values) = response_json["values"].as_array() {
for value in values {
values_vec.push(value.to_owned());
}
return (values_vec, Some(response_json["next"].to_string()));
}
}
Err(e) => { eprintln!("Unable to deserialize response: {}", e); }
};
},
None => { eprintln!("Response is None");}
};
return (values_vec, None);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function deserialize_response is responsible for deserializing the response from the Bitbucket API. It has some issues:

  1. The function uses println! for logging which is not recommended in production code. Consider using a proper logging framework like log or slog.

  2. Error handling could be improved. Currently, the function prints an error message and returns an empty vector and None if deserialization fails. It would be more robust to return a Result type and let the caller decide how to handle errors.

  3. The function assumes that the next field in the response is always present. This might not be the case and could lead to runtime errors. Consider checking if the next field exists before trying to access it.

Comment on lines +78 to +93
async fn get_all_pages(next_url: Option<String>, access_token: &str, params: &Option<HashMap<&str, &str>>) -> Vec<Value>{
let mut values_vec = Vec::new();
let mut next_url = next_url;
while next_url.is_some() {
let url = next_url.as_ref().expect("next_url is none").trim_matches('"');
if url != "null" {
let response_opt = call_get_api(url, access_token, params).await;
let (mut response_values, url_opt) = deserialize_response(response_opt).await;
next_url = url_opt.clone();
values_vec.append(&mut response_values);
} else {
break;
}
}
return values_vec;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_all_pages is responsible for fetching all pages of data from the Bitbucket API. It has some issues:

  1. The function uses expect on next_url.as_ref(), which will panic if next_url is None. This is not good practice as it can cause the program to crash. Consider using pattern matching or the if let construct to safely handle the Option.

  2. The function assumes that the next field in the response is always present. This might not be the case and could lead to runtime errors. Consider checking if the next field exists before trying to access it.

  3. The function checks if url is equal to "null". This is a bit of a hack and could lead to bugs if the API changes. Consider checking if url_opt is None instead.

Comment on lines +95 to +106
pub fn prepare_auth_headers(access_token: &str) -> HeaderMap{
let mut headers_map = HeaderMap::new();
let auth_header = format!("Bearer {}", access_token);
let headervalres = HeaderValue::from_str(&auth_header);
match headervalres {
Ok(headerval) => {
headers_map.insert("Authorization", headerval);
},
Err(e) => panic!("Could not parse header value: {}", e),
};
return headers_map;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function prepare_auth_headers prepares the authorization headers for Bitbucket API calls. It has some issues:

  1. The function panics if it fails to parse the header value. This is not good practice as it can cause the program to crash. Consider returning a Result type and let the caller decide how to handle errors.

  2. The function only sets the Authorization header. Other headers like Accept might also be required for API calls. Consider making the function more flexible by allowing it to set multiple headers.

Here's a suggested refactor:

pub fn prepare_auth_headers(access_token: &str) -> Result<HeaderMap, reqwest::header::InvalidHeaderValue> {
    let mut headers_map = HeaderMap::new();
    let auth_header = format!("Bearer {}", access_token);
    let headerval = HeaderValue::from_str(&auth_header)?;
    headers_map.insert("Authorization", headerval);
    Ok(headers_map)
}

@tapishr tapishr mentioned this pull request Sep 17, 2023
1 task
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Status

Actionable comments generated: 4

Commits Files that changed from the base of the PR and between 419cdde and 393b8e2 commits.
Files selected for processing (1)
  • vibi-dpu/src/pubsub/listener.rs (1 hunks)
Additional comments (Suppressed): 1
vibi-dpu/src/pubsub/listener.rs (1)
  • 1-16: The import statements are well organized and only necessary modules are imported. Good use of Rust's module system.

Comment on lines 25 to 57
async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<u8>) {
let msgtype_opt = attributes.get("msgtype");
if msgtype_opt.is_none() {
eprintln!("msgtype attribute not found in message : {:?}", attributes);
return;
}
let msgtype = msgtype_opt.expect("Empty msgtype");
match msgtype.as_str() {
"install_callback" => {
println!("Processing install callback message");
let msg_data_res = serde_json::from_slice::<InstallCallback>(data_bytes);
if msg_data_res.is_err() {
eprintln!("Error deserializing install callback: {:?}", msg_data_res);
return;
}
let data = msg_data_res.expect("msg_data not found");
let code_async = data.installation_code.clone();
task::spawn(async move {
handle_install_bitbucket(&code_async).await;
println!("Processed install callback message");
});
},
"webhook_callback" => {
task::spawn(async move {
// process_review(&data_bytes).await;
println!("Processed webhook callback message");
});
}
_ => {
eprintln!("Message type not found for message : {:?}", attributes);
}
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function process_message handles different types of messages based on the msgtype attribute. It correctly checks for the existence of the msgtype attribute and logs an error if it is not found. The handling of install_callback and webhook_callback messages is done asynchronously using tokio::task::spawn, which is a good practice for handling potentially long-running tasks without blocking the main thread. However, there is no error handling in case the spawned task panics. Consider wrapping the body of the spawned task in a catch_unwind block to handle any potential panics and prevent them from bringing down the entire application.

task::spawn(async move {
    let result = std::panic::catch_unwind(|| {
        handle_install_bitbucket(&code_async).await;
        println!("Processed install callback message");
    });
    if let Err(err) = result {
        eprintln!("Error processing install callback: {:?}", err);
    }
});

Comment on lines 59 to 66
pub async fn get_pubsub_client_config(keypath: &str) -> ClientConfig {
let credfile = CredentialsFile::new_from_file(keypath.to_string()).await
.expect("Failed to locate credentials file");
return ClientConfig::default()
.with_credentials(credfile)
.await
.expect("Unable to get PubSub Client config");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function get_pubsub_client_config reads the credentials file and creates a ClientConfig. It uses .expect() to unwrap the results, which will cause the program to panic if an error occurs. Consider returning a Result<ClientConfig> instead and handling the error at a higher level where more context is available.

pub async fn get_pubsub_client_config(keypath: &str) -> Result<ClientConfig, Box<dyn std::error::Error>> {
    let credfile = CredentialsFile::new_from_file(keypath.to_string()).await?;
    return Ok(ClientConfig::default()
        .with_credentials(credfile)
        .await?);
}

Comment on lines 68 to 101
async fn setup_subscription(keypath: &str, topicname: &str) -> Subscription{
let config = get_pubsub_client_config(keypath).await;
let client = Client::new(config).await
.expect("Unable to create pubsub client to listen to messages");
let topic = client.topic(topicname);
let topic_res = topic.exists(None).await;
if topic_res.is_err() {
let e = topic_res.expect_err("No error found in topic_res");
if e.code() == Code::NotFound {
client.create_topic(topicname, None, None).await
.expect("Unable to create topic");
}
else {
eprintln!("Error getting topic: {:?}", e);
}
}
let sub_config = SubscriptionConfig {
enable_message_ordering: true,
..Default::default()
};
let subscription_name = format!("{topicname}-sub");
let subscription = client.subscription(&subscription_name);
let subconfig = SubscriptionConfig {
enable_message_ordering: true,
..Default::default()
};
if !subscription.exists(None).await.expect("Unable to get subscription information") {
subscription.create(
topic.fully_qualified_name(), subconfig, None)
.await.expect("Unable to create subscription for listening to messages");
}
println!("sub = {:?}", &subscription);
subscription
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function setup_subscription sets up a subscription to a PubSub topic. It checks if the topic exists and creates it if it doesn't, which is good. However, it uses .expect() to unwrap results, which will cause the program to panic if an error occurs. Consider returning a Result<Subscription> instead and handling the error at a higher level where more context is available.

async fn setup_subscription(keypath: &str, topicname: &str) -> Result<Subscription, Box<dyn std::error::Error>> {
    let config = get_pubsub_client_config(keypath).await?;
    let client = Client::new(config).await?;
    // ... rest of the code ...
    Ok(subscription)
}

Comment on lines 103 to 126
pub async fn listen_messages(keypath: &str, topicname: &str) {
let queue_cap = 100;
let mut message_hashes = VecDeque::with_capacity(queue_cap);
let subscription = setup_subscription(keypath, topicname).await;
let mut stream = subscription.subscribe(None).await
.expect("Unable to subscribe to messages");
while let Some(message) = stream.next().await {
println!("Listening for messages...");
let attrmap: HashMap<String, String> = message.message.attributes.clone().into_iter().collect();
let message_hash = digest(&*message.message.data);
if !message_hashes.contains(&message_hash) {
message_hashes.push_back(message_hash);
if message_hashes.len() > queue_cap {
while message_hashes.len() > queue_cap {
message_hashes.pop_front();
}
}
let msg_bytes = message.message.data.clone();
process_message(&attrmap, &msg_bytes).await;
}
// Ack or Nack message.
let _ = message.ack().await;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function listen_messages listens for messages from a PubSub subscription and processes them. It correctly acknowledges each message after processing. However, it uses a VecDeque to store hashes of processed messages and prevent duplicate processing. This can lead to high memory usage if the number of unique messages is large. Consider using a data structure with a fixed size, such as a Bloom filter, to store the hashes. Also, consider moving the logic for checking and storing message hashes into a separate function for better modularity and testability.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Oct 7, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

@avikalpg avikalpg merged commit d95c4d8 into main Oct 7, 2023
3 checks passed
@avikalpg avikalpg deleted the tr/setup branch October 7, 2023 07:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants