diff --git a/vibi-dpu/src/bitbucket/prs.rs b/vibi-dpu/src/bitbucket/prs.rs index 7b1c0a89..1480d12a 100644 --- a/vibi-dpu/src/bitbucket/prs.rs +++ b/vibi-dpu/src/bitbucket/prs.rs @@ -2,7 +2,6 @@ use reqwest::header::HeaderMap; use serde_json::Value; use std::collections::HashMap; use std::str; -use std::env; use crate::utils::pr_info::PrInfo; use crate::db::prs::save_pr_info_to_db; @@ -72,7 +71,6 @@ async fn get_list_prs(headers: &HeaderMap, params: &HashMap, rep pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &str, pr_number: &str) -> Option { let base_url = bitbucket_base_url(); let url = format!("{}/repositories/{}/{}/pullrequests/{}", &base_url, workspace_slug, repo_slug, pr_number); - let client = get_client(); let response_result = client.get(&url) .header("Authorization", format!("Bearer {}", access_token)) @@ -87,7 +85,7 @@ pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &s } let response = response_result.expect("Uncaught error in response"); if !response.status().is_success() { - println!("Failed to get PR info, status: {:?}", response.status()); + println!("Failed to get PR info, response: {:?}", response); return None; } let pr_data: Value = response.json().await.expect("Error parsing PR data"); diff --git a/vibi-dpu/src/core/review.rs b/vibi-dpu/src/core/review.rs index bb544ad5..6ed6a07c 100644 --- a/vibi-dpu/src/core/review.rs +++ b/vibi-dpu/src/core/review.rs @@ -72,9 +72,11 @@ async fn process_review_changes(review: &Review) -> Option{ } let (_, smallfiles) = fileopt.expect("fileopt is empty"); let diffmap = generate_diff(&review, &smallfiles); - println!("diffmap = {:?}", &diffmap); + println!("[process_review_changes] diffmap = {:?}", &diffmap); let linemap = process_diffmap(&diffmap); + println!("[process_review_changes] linemap = {:?}", &linemap); let blamevec = generate_blame(&review, &linemap).await; + println!("[process_review_changes] blamevec = {:?}", &blamevec); let hmapitem = PrHunkItem::new( review.id().to_string(), review.author().to_string(), diff --git a/vibi-dpu/src/db/prs.rs b/vibi-dpu/src/db/prs.rs index d1aea410..89b28ef3 100644 --- a/vibi-dpu/src/db/prs.rs +++ b/vibi-dpu/src/db/prs.rs @@ -1,3 +1,4 @@ +use serde_json::Value; use sled::IVec; use std::error::Error; use crate::db::config::get_db; @@ -27,7 +28,7 @@ pub async fn save_pr_info_to_db(workspace_slug: &str,repo_slug: &str,pr_info: Pr } -pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info: PrInfo, pr_number: &str) { +pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info: &PrInfo, pr_number: &str) { let key = format!("{}/{}/{}/{}", "bitbucket", workspace_slug, repo_slug, pr_number); let db = get_db(); @@ -51,4 +52,75 @@ pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info } println!("PR info updated successfully in the database. {:?} {:?}", key, pr_info); +} + +pub async fn process_and_update_pr_if_different(webhook_data: &Value, workspace_slug: &str, repo_slug: &str, pr_number: &str, repo_provider: &str) -> bool { + println!("[process_and_update_pr_if_different] {:?}, {:?}, {:?}, {:?}", workspace_slug, repo_slug, pr_number, repo_provider); + let pr_info_parsed_opt = parse_webhook_data(webhook_data); + if pr_info_parsed_opt.is_none() { + eprintln!("[process_and_update_pr_if_different] Unable to parse webhook data"); + return false; + } + let pr_info_parsed = pr_info_parsed_opt.expect("Empty pr_info_parsed_opt"); + // Retrieve the existing pr_head_commit from the database + let pr_info_db_opt = get_pr_info_from_db(workspace_slug, repo_slug, pr_number, repo_provider, &pr_info_parsed).await; + if pr_info_db_opt.is_none() { + eprintln!("[process_and_update_pr_if_different] No pr_info in db, parsed: {:?}", pr_info_parsed); + return false; + } + let pr_info_db = pr_info_db_opt.expect("Empty pr_info_db_opt"); + if pr_info_db.pr_head_commit().to_string().eq_ignore_ascii_case(pr_info_parsed.pr_head_commit()){ + return false; // commits are the same + } else { + update_pr_info_in_db(&workspace_slug, &repo_slug, &pr_info_parsed, &pr_number).await; + return true; // commits are different, and PR info should be updated + } +} + +fn parse_webhook_data(webhook_data: &Value) -> Option { + println!("[parse_webhook_data] webhook_data: {:?}", &webhook_data); + let pr_head_commit_raw = webhook_data["pullrequest"]["source"]["commit"]["hash"].to_string(); + let pr_head_commit = pr_head_commit_raw.trim_matches('"'); + let base_head_commit_raw = webhook_data["pullrequest"]["destination"]["commit"]["hash"].to_string(); + let base_head_commit = base_head_commit_raw.trim_matches('"'); + let pr_state_raw = webhook_data["pullrequest"]["state"].to_string(); + let pr_state = pr_state_raw.trim_matches('"'); + let pr_branch_raw = webhook_data["pullrequest"]["source"]["branch"]["name"].to_string(); + let pr_branch = pr_branch_raw.trim_matches('"'); + let pr_info = PrInfo { base_head_commit: base_head_commit.to_string(), + pr_head_commit: pr_head_commit.to_string(), + state: pr_state.to_string(), + pr_branch: pr_branch.to_string() + }; + println!("[parse_webhook_data] pr_info :{:?}", &pr_info); + return Some(pr_info); +} + +pub async fn get_pr_info_from_db(workspace_slug: &str, repo_slug: &str, pr_number: &str, repo_provider: &str, pr_info_parsed: &PrInfo) -> Option { + let db = get_db(); + let db_pr_key = format!("{}/{}/{}/{}", repo_provider, workspace_slug, repo_slug, pr_number); + let pr_info_res = db.get(IVec::from(db_pr_key.as_bytes())); + + if pr_info_res.is_err() { + let e = pr_info_res.expect_err("No error in pr_info_res"); + eprintln!("Unable to get bb pr info from db: {:?}", e); + return None; + }; + + let pr_info_opt = pr_info_res.expect("Uncaught error in pr_info res"); + if pr_info_opt.is_none() { + eprintln!("No bitbucket pr info in db"); + update_pr_info_in_db(&workspace_slug, &repo_slug, pr_info_parsed, &pr_number).await; + return None; //If no info in db then it will be considered as new commit + } + + let pr_info_ivec = pr_info_opt.expect("Empty pr_info_opt"); + let pr_info_parse = serde_json::from_slice(&pr_info_ivec); + if pr_info_parse.is_err() { + let e = pr_info_parse.expect_err("No error in pr_info_parse"); + eprintln!("Unable to deserialize pr_Info: {:?}", e); + return None; + } + let pr_info: PrInfo = pr_info_parse.expect("Failed to deserialize PR info"); + return Some(pr_info); } \ No newline at end of file diff --git a/vibi-dpu/src/pubsub/listener.rs b/vibi-dpu/src/pubsub/listener.rs index ee4cbfed..618b0ea4 100644 --- a/vibi-dpu/src/pubsub/listener.rs +++ b/vibi-dpu/src/pubsub/listener.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; - use futures_util::StreamExt; use google_cloud_auth::credentials::CredentialsFile; use google_cloud_default::WithAuthExt; @@ -8,10 +7,12 @@ use google_cloud_pubsub::{ subscription::{SubscriptionConfig, Subscription}, }; use serde::Deserialize; +use serde_json::Value; use tokio::task; use std::collections::VecDeque; use sha256::digest; use tonic::Code; +use crate::{db::prs::process_and_update_pr_if_different, utils::user::ProviderEnum}; use crate::core::{setup::handle_install_bitbucket, review::process_review}; #[derive(Debug, Deserialize)] @@ -45,17 +46,58 @@ async fn process_message(attributes: &HashMap, data_bytes: &Vec< }, "webhook_callback" => { let data_bytes_async = data_bytes.to_owned(); - task::spawn(async move { - process_review(&data_bytes_async).await; - println!("Processed webhook callback message"); - }); + let deserialized_data_opt = deserialized_data(&data_bytes_async); + let deserialised_msg_data = deserialized_data_opt.expect("Failed to deserialize data"); + + let repo_provider = deserialised_msg_data["repositoryProvider"].to_string().trim_matches('"').to_string(); + let workspace_slug = deserialised_msg_data["eventPayload"]["repository"]["workspace"]["slug"].to_string().trim_matches('"').to_string(); + let repo_slug = deserialised_msg_data["eventPayload"]["repository"]["name"].to_string().trim_matches('"').to_string(); + let pr_number = deserialised_msg_data["eventPayload"]["pullrequest"]["id"].to_string().trim_matches('"').to_string(); + let event_type = deserialised_msg_data["eventType"].to_string().trim_matches('"').to_string(); + let mut is_reviewable = false; + + if event_type == "pullrequest:updated" { + is_reviewable = process_and_update_pr_if_different(&deserialised_msg_data["eventPayload"], &workspace_slug, &repo_slug, &pr_number, &repo_provider).await; + } + if is_reviewable || event_type == "pullrequest:created" || event_type == "pullrequest:approved" { + task::spawn(async move { + process_review(&data_bytes_async).await; + println!("Processed webhook callback message"); + }); + } } - _ => { + _=> { eprintln!("Message type not found for message : {:?}", attributes); } }; } + +async fn prcoess_install_callback(data_bytes: &[u8]) { + println!("Processing install callback message"); + let msg_data_res = serde_json::from_slice::(data_bytes); + if msg_data_res.is_err() { + eprintln!("Error deserializing install callback: {:?}", msg_data_res); + return; + } + let data = msg_data_res.expect("msg_data not found"); + if data.repository_provider == ProviderEnum::Github.to_string().to_lowercase() { + println!("To be Implemented"); + // let code_async = data.installation_code.clone(); + // task::spawn(async move { + // handle_install_github(&code_async).await; + // println!("Processed install callback message"); + // }); + } + if data.repository_provider == ProviderEnum::Bitbucket.to_string().to_lowercase() { + let code_async = data.installation_code.clone(); + task::spawn(async move { + handle_install_bitbucket(&code_async).await; + println!("Processed install callback message"); + }); + } +} + pub async fn get_pubsub_client_config(keypath: &str) -> ClientConfig { let credfile = CredentialsFile::new_from_file(keypath.to_string()).await .expect("Failed to locate credentials file"); @@ -123,4 +165,16 @@ pub async fn listen_messages(keypath: &str, topicname: &str) { // Ack or Nack message. let _ = message.ack().await; } +} + +pub fn deserialized_data(message_data: &Vec) -> Option { + let msg_data_res = serde_json::from_slice::(message_data); + if msg_data_res.is_err() { + let e = msg_data_res.expect_err("No error in data_res"); + eprintln!("Incoming message does not contain valid reviews: {:?}", e); + return None; + } + let deserialized_data = msg_data_res.expect("Uncaught error in deserializing message_data"); + println!("deserialized_data == {:?}", &deserialized_data["eventPayload"]["repository"]); + Some(deserialized_data) } \ No newline at end of file diff --git a/vibi-dpu/src/utils/pr_info.rs b/vibi-dpu/src/utils/pr_info.rs index 09bf9f00..38e02422 100644 --- a/vibi-dpu/src/utils/pr_info.rs +++ b/vibi-dpu/src/utils/pr_info.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use std::fmt; #[derive(Debug, Serialize, Deserialize)] pub struct PrInfo {