Skip to content

Commit

Permalink
Merge pull request #28 from Alokit-Innovations/mkp/bitbucket-bug-fix-…
Browse files Browse the repository at this point in the history
…condition

Added condition for processing review data after webhook callback
  • Loading branch information
avikalpg authored Oct 31, 2023
2 parents 552658f + 1ada131 commit 3198ed6
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 12 deletions.
4 changes: 1 addition & 3 deletions vibi-dpu/src/bitbucket/prs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use reqwest::header::HeaderMap;
use serde_json::Value;
use std::collections::HashMap;
use std::str;
use std::env;
use crate::utils::pr_info::PrInfo;
use crate::db::prs::save_pr_info_to_db;

Expand Down Expand Up @@ -72,7 +71,6 @@ async fn get_list_prs(headers: &HeaderMap, params: &HashMap<String, String>, rep
pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &str, pr_number: &str) -> Option<PrInfo> {
let base_url = bitbucket_base_url();
let url = format!("{}/repositories/{}/{}/pullrequests/{}", &base_url, workspace_slug, repo_slug, pr_number);

let client = get_client();
let response_result = client.get(&url)
.header("Authorization", format!("Bearer {}", access_token))
Expand All @@ -87,7 +85,7 @@ pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &s
}
let response = response_result.expect("Uncaught error in response");
if !response.status().is_success() {
println!("Failed to get PR info, status: {:?}", response.status());
println!("Failed to get PR info, response: {:?}", response);
return None;
}
let pr_data: Value = response.json().await.expect("Error parsing PR data");
Expand Down
4 changes: 3 additions & 1 deletion vibi-dpu/src/core/review.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ async fn process_review_changes(review: &Review) -> Option<HunkMap>{
}
let (_, smallfiles) = fileopt.expect("fileopt is empty");
let diffmap = generate_diff(&review, &smallfiles);
println!("diffmap = {:?}", &diffmap);
println!("[process_review_changes] diffmap = {:?}", &diffmap);
let linemap = process_diffmap(&diffmap);
println!("[process_review_changes] linemap = {:?}", &linemap);
let blamevec = generate_blame(&review, &linemap).await;
println!("[process_review_changes] blamevec = {:?}", &blamevec);
let hmapitem = PrHunkItem::new(
review.id().to_string(),
review.author().to_string(),
Expand Down
74 changes: 73 additions & 1 deletion vibi-dpu/src/db/prs.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use serde_json::Value;
use sled::IVec;
use std::error::Error;
use crate::db::config::get_db;
Expand Down Expand Up @@ -27,7 +28,7 @@ pub async fn save_pr_info_to_db(workspace_slug: &str,repo_slug: &str,pr_info: Pr
}


pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info: PrInfo, pr_number: &str) {
pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info: &PrInfo, pr_number: &str) {
let key = format!("{}/{}/{}/{}", "bitbucket", workspace_slug, repo_slug, pr_number);
let db = get_db();

Expand All @@ -51,4 +52,75 @@ pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info
}

println!("PR info updated successfully in the database. {:?} {:?}", key, pr_info);
}

pub async fn process_and_update_pr_if_different(webhook_data: &Value, workspace_slug: &str, repo_slug: &str, pr_number: &str, repo_provider: &str) -> bool {
println!("[process_and_update_pr_if_different] {:?}, {:?}, {:?}, {:?}", workspace_slug, repo_slug, pr_number, repo_provider);
let pr_info_parsed_opt = parse_webhook_data(webhook_data);
if pr_info_parsed_opt.is_none() {
eprintln!("[process_and_update_pr_if_different] Unable to parse webhook data");
return false;
}
let pr_info_parsed = pr_info_parsed_opt.expect("Empty pr_info_parsed_opt");
// Retrieve the existing pr_head_commit from the database
let pr_info_db_opt = get_pr_info_from_db(workspace_slug, repo_slug, pr_number, repo_provider, &pr_info_parsed).await;
if pr_info_db_opt.is_none() {
eprintln!("[process_and_update_pr_if_different] No pr_info in db, parsed: {:?}", pr_info_parsed);
return false;
}
let pr_info_db = pr_info_db_opt.expect("Empty pr_info_db_opt");
if pr_info_db.pr_head_commit().to_string().eq_ignore_ascii_case(pr_info_parsed.pr_head_commit()){
return false; // commits are the same
} else {
update_pr_info_in_db(&workspace_slug, &repo_slug, &pr_info_parsed, &pr_number).await;
return true; // commits are different, and PR info should be updated
}
}

fn parse_webhook_data(webhook_data: &Value) -> Option<PrInfo> {
println!("[parse_webhook_data] webhook_data: {:?}", &webhook_data);
let pr_head_commit_raw = webhook_data["pullrequest"]["source"]["commit"]["hash"].to_string();
let pr_head_commit = pr_head_commit_raw.trim_matches('"');
let base_head_commit_raw = webhook_data["pullrequest"]["destination"]["commit"]["hash"].to_string();
let base_head_commit = base_head_commit_raw.trim_matches('"');
let pr_state_raw = webhook_data["pullrequest"]["state"].to_string();
let pr_state = pr_state_raw.trim_matches('"');
let pr_branch_raw = webhook_data["pullrequest"]["source"]["branch"]["name"].to_string();
let pr_branch = pr_branch_raw.trim_matches('"');
let pr_info = PrInfo { base_head_commit: base_head_commit.to_string(),
pr_head_commit: pr_head_commit.to_string(),
state: pr_state.to_string(),
pr_branch: pr_branch.to_string()
};
println!("[parse_webhook_data] pr_info :{:?}", &pr_info);
return Some(pr_info);
}

pub async fn get_pr_info_from_db(workspace_slug: &str, repo_slug: &str, pr_number: &str, repo_provider: &str, pr_info_parsed: &PrInfo) -> Option<PrInfo> {
let db = get_db();
let db_pr_key = format!("{}/{}/{}/{}", repo_provider, workspace_slug, repo_slug, pr_number);
let pr_info_res = db.get(IVec::from(db_pr_key.as_bytes()));

if pr_info_res.is_err() {
let e = pr_info_res.expect_err("No error in pr_info_res");
eprintln!("Unable to get bb pr info from db: {:?}", e);
return None;
};

let pr_info_opt = pr_info_res.expect("Uncaught error in pr_info res");
if pr_info_opt.is_none() {
eprintln!("No bitbucket pr info in db");
update_pr_info_in_db(&workspace_slug, &repo_slug, pr_info_parsed, &pr_number).await;
return None; //If no info in db then it will be considered as new commit
}

let pr_info_ivec = pr_info_opt.expect("Empty pr_info_opt");
let pr_info_parse = serde_json::from_slice(&pr_info_ivec);
if pr_info_parse.is_err() {
let e = pr_info_parse.expect_err("No error in pr_info_parse");
eprintln!("Unable to deserialize pr_Info: {:?}", e);
return None;
}
let pr_info: PrInfo = pr_info_parse.expect("Failed to deserialize PR info");
return Some(pr_info);
}
66 changes: 60 additions & 6 deletions vibi-dpu/src/pubsub/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::HashMap;

use futures_util::StreamExt;
use google_cloud_auth::credentials::CredentialsFile;
use google_cloud_default::WithAuthExt;
Expand All @@ -8,10 +7,12 @@ use google_cloud_pubsub::{
subscription::{SubscriptionConfig, Subscription},
};
use serde::Deserialize;
use serde_json::Value;
use tokio::task;
use std::collections::VecDeque;
use sha256::digest;
use tonic::Code;
use crate::{db::prs::process_and_update_pr_if_different, utils::user::ProviderEnum};
use crate::core::{setup::handle_install_bitbucket, review::process_review};

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -45,17 +46,58 @@ async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<
},
"webhook_callback" => {
let data_bytes_async = data_bytes.to_owned();
task::spawn(async move {
process_review(&data_bytes_async).await;
println!("Processed webhook callback message");
});
let deserialized_data_opt = deserialized_data(&data_bytes_async);
let deserialised_msg_data = deserialized_data_opt.expect("Failed to deserialize data");

let repo_provider = deserialised_msg_data["repositoryProvider"].to_string().trim_matches('"').to_string();
let workspace_slug = deserialised_msg_data["eventPayload"]["repository"]["workspace"]["slug"].to_string().trim_matches('"').to_string();
let repo_slug = deserialised_msg_data["eventPayload"]["repository"]["name"].to_string().trim_matches('"').to_string();
let pr_number = deserialised_msg_data["eventPayload"]["pullrequest"]["id"].to_string().trim_matches('"').to_string();
let event_type = deserialised_msg_data["eventType"].to_string().trim_matches('"').to_string();
let mut is_reviewable = false;

if event_type == "pullrequest:updated" {
is_reviewable = process_and_update_pr_if_different(&deserialised_msg_data["eventPayload"], &workspace_slug, &repo_slug, &pr_number, &repo_provider).await;
}
if is_reviewable || event_type == "pullrequest:created" || event_type == "pullrequest:approved" {
task::spawn(async move {
process_review(&data_bytes_async).await;
println!("Processed webhook callback message");
});
}
}
_ => {
_=> {
eprintln!("Message type not found for message : {:?}", attributes);
}
};
}


async fn prcoess_install_callback(data_bytes: &[u8]) {
println!("Processing install callback message");
let msg_data_res = serde_json::from_slice::<InstallCallback>(data_bytes);
if msg_data_res.is_err() {
eprintln!("Error deserializing install callback: {:?}", msg_data_res);
return;
}
let data = msg_data_res.expect("msg_data not found");
if data.repository_provider == ProviderEnum::Github.to_string().to_lowercase() {
println!("To be Implemented");
// let code_async = data.installation_code.clone();
// task::spawn(async move {
// handle_install_github(&code_async).await;
// println!("Processed install callback message");
// });
}
if data.repository_provider == ProviderEnum::Bitbucket.to_string().to_lowercase() {
let code_async = data.installation_code.clone();
task::spawn(async move {
handle_install_bitbucket(&code_async).await;
println!("Processed install callback message");
});
}
}

pub async fn get_pubsub_client_config(keypath: &str) -> ClientConfig {
let credfile = CredentialsFile::new_from_file(keypath.to_string()).await
.expect("Failed to locate credentials file");
Expand Down Expand Up @@ -123,4 +165,16 @@ pub async fn listen_messages(keypath: &str, topicname: &str) {
// Ack or Nack message.
let _ = message.ack().await;
}
}

pub fn deserialized_data(message_data: &Vec<u8>) -> Option<Value> {
let msg_data_res = serde_json::from_slice::<Value>(message_data);
if msg_data_res.is_err() {
let e = msg_data_res.expect_err("No error in data_res");
eprintln!("Incoming message does not contain valid reviews: {:?}", e);
return None;
}
let deserialized_data = msg_data_res.expect("Uncaught error in deserializing message_data");
println!("deserialized_data == {:?}", &deserialized_data["eventPayload"]["repository"]);
Some(deserialized_data)
}
1 change: 0 additions & 1 deletion vibi-dpu/src/utils/pr_info.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use std::fmt;

#[derive(Debug, Serialize, Deserialize)]
pub struct PrInfo {
Expand Down

0 comments on commit 3198ed6

Please sign in to comment.