Skip to content

Commit

Permalink
Tr/ablejobs (#46)
Browse files Browse the repository at this point in the history
* fix keys in webhook data parsing

* bug fix: flow in webhook callback didn't run process_review on pr update, and didn't run update-pr-in-db on creation

* 1. bug fix: pubsub webhook message control flow corrected.
2. removed duplicated function (save and update pr_info functions were doing the same thing)

* bug fix: PR not getting detected - no entry in database (#54)

* bug fix: removed IVec of value while inserting pr_info

* bug fix: database key changed for PR-info to resolve key collision.

---------

Co-authored-by: Avikalp Kumar Gupta <avikalpgupta@gmail.com>
Co-authored-by: Avikalp Kumar Gupta <avikalpg@users.noreply.github.com>
Co-authored-by: Muskan Paliwal <muskan10112002@gmail.com>
  • Loading branch information
4 people authored Oct 31, 2023
1 parent 4d9e654 commit 9da8060
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 66 deletions.
14 changes: 8 additions & 6 deletions vibi-dpu/src/bitbucket/prs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ use reqwest::header::HeaderMap;
use serde_json::Value;
use std::collections::HashMap;
use std::str;
use crate::utils::pr_info::PrInfo;
use crate::db::prs::save_pr_info_to_db;
use crate::{utils::pr_info::PrInfo, db::prs::update_pr_info_in_db};

use super::config::{get_client, prepare_auth_headers, bitbucket_base_url};

Expand Down Expand Up @@ -71,6 +70,8 @@ async fn get_list_prs(headers: &HeaderMap, params: &HashMap<String, String>, rep
pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &str, pr_number: &str) -> Option<PrInfo> {
let base_url = bitbucket_base_url();
let url = format!("{}/repositories/{}/{}/pullrequests/{}", &base_url, workspace_slug, repo_slug, pr_number);
println!("[get_pr_info] url: {:?}", &url);
println!("[get_pr_info] access token: {:?}", access_token);
let client = get_client();
let response_result = client.get(&url)
.header("Authorization", format!("Bearer {}", access_token))
Expand All @@ -89,19 +90,20 @@ pub async fn get_pr_info(workspace_slug: &str, repo_slug: &str, access_token: &s
return None;
}
let pr_data: Value = response.json().await.expect("Error parsing PR data");

Some(PrInfo {
let pr_info = PrInfo {
base_head_commit: pr_data["destination"]["commit"]["hash"].to_string().trim_matches('"').to_string(),
pr_head_commit: pr_data["source"]["commit"]["hash"].to_string().trim_matches('"').to_string(),
state: pr_data["state"].to_string().trim_matches('"').to_string(),
pr_branch: pr_data["source"]["branch"]["name"].to_string().trim_matches('"').to_string(),
})
};
println!("[get_pr_info] pr_info: {:?}", &pr_info);
Some(pr_info)
}

pub async fn get_and_store_pr_info(workspace_slug: &str,repo_slug: &str,access_token: &str, pr_number: &str) {
if let Some(pr_info) = get_pr_info(workspace_slug, repo_slug, access_token, pr_number).await {
// If PR information is available, store it in the database
save_pr_info_to_db(workspace_slug, repo_slug, pr_info, pr_number).await;
update_pr_info_in_db(workspace_slug, repo_slug, &pr_info, pr_number).await;
} else {
eprintln!("No PR info available for PR number: {:?} repository: {:?} repo_owner{:?}", pr_number, repo_slug, workspace_slug);
}
Expand Down
33 changes: 4 additions & 29 deletions vibi-dpu/src/db/prs.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,10 @@
use serde_json::Value;
use sled::IVec;
use std::error::Error;
use crate::db::config::get_db;
use crate::utils::pr_info::PrInfo;


pub async fn save_pr_info_to_db(workspace_slug: &str,repo_slug: &str,pr_info: PrInfo, pr_number: &str) {
let db = get_db();
let key = format!("{}/{}/{}/{}", "bitbucket", workspace_slug, repo_slug, pr_number);

let pr_info_bytes = serde_json::to_vec(&pr_info);
if pr_info_bytes.is_err() {
let e = pr_info_bytes.expect_err("Empty error in pr_info_bytes");
eprintln!("Unable to serialize pr info: {:?}, error: {:?}", pr_info, e);
return;
}
let pr_info_json = pr_info_bytes.expect("Uncaught error in parse_res repo");

let insert_result = db.insert(key.as_bytes(), IVec::from(pr_info_json));
if insert_result.is_err() {
let e = insert_result.expect_err("No error in inserting pr_info");
eprintln!("Failed to insert PR info into the database. {:?}", e);
return;
}

println!("PR succesfully upserted: {:?} {:?}", key, pr_info);
}


pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info: &PrInfo, pr_number: &str) {
let key = format!("{}/{}/{}/{}", "bitbucket", workspace_slug, repo_slug, pr_number);
let key = format!("pr_info/{}/{}/{}/{}", "bitbucket", workspace_slug, repo_slug, pr_number);
let db = get_db();

let pr_info_json_result = serde_json::to_vec(&pr_info);
Expand All @@ -43,7 +18,7 @@ pub async fn update_pr_info_in_db(workspace_slug: &str, repo_slug: &str, pr_info
let pr_info_bytes = pr_info_json_result.expect("empty pr_info_json_result");

// Update the entry in the database. It will create a new entry if the key does not exist.
let update_result = db.insert(IVec::from(key.as_bytes()), IVec::from(pr_info_bytes));
let update_result = db.insert(IVec::from(key.as_bytes()), pr_info_bytes);

if update_result.is_err() {
let e = update_result.expect_err("No error in updating pr_info");
Expand All @@ -66,7 +41,7 @@ pub async fn process_and_update_pr_if_different(webhook_data: &Value, workspace_
let pr_info_db_opt = get_pr_info_from_db(workspace_slug, repo_slug, pr_number, repo_provider, &pr_info_parsed).await;
if pr_info_db_opt.is_none() {
eprintln!("[process_and_update_pr_if_different] No pr_info in db, parsed: {:?}", pr_info_parsed);
return false;
return true; // new pr
}
let pr_info_db = pr_info_db_opt.expect("Empty pr_info_db_opt");
if pr_info_db.pr_head_commit().to_string().eq_ignore_ascii_case(pr_info_parsed.pr_head_commit()){
Expand Down Expand Up @@ -98,7 +73,7 @@ fn parse_webhook_data(webhook_data: &Value) -> Option<PrInfo> {

pub async fn get_pr_info_from_db(workspace_slug: &str, repo_slug: &str, pr_number: &str, repo_provider: &str, pr_info_parsed: &PrInfo) -> Option<PrInfo> {
let db = get_db();
let db_pr_key = format!("{}/{}/{}/{}", repo_provider, workspace_slug, repo_slug, pr_number);
let db_pr_key = format!("pr_info/{}/{}/{}/{}", repo_provider, workspace_slug, repo_slug, pr_number);
let pr_info_res = db.get(IVec::from(db_pr_key.as_bytes()));

if pr_info_res.is_err() {
Expand Down
36 changes: 5 additions & 31 deletions vibi-dpu/src/pubsub/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::task;
use std::collections::VecDeque;
use sha256::digest;
use tonic::Code;
use crate::{db::prs::process_and_update_pr_if_different, utils::user::ProviderEnum};
use crate::db::prs::process_and_update_pr_if_different;
use crate::core::{setup::handle_install_bitbucket, review::process_review};

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -54,12 +54,12 @@ async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<
let repo_slug = deserialised_msg_data["eventPayload"]["repository"]["name"].to_string().trim_matches('"').to_string();
let pr_number = deserialised_msg_data["eventPayload"]["pullrequest"]["id"].to_string().trim_matches('"').to_string();
let event_type = deserialised_msg_data["eventType"].to_string().trim_matches('"').to_string();
let mut is_reviewable = false;
let is_reviewable = process_and_update_pr_if_different(&deserialised_msg_data["eventPayload"], &workspace_slug, &repo_slug, &pr_number, &repo_provider).await;

if event_type == "pullrequest:updated" {
is_reviewable = process_and_update_pr_if_different(&deserialised_msg_data["eventPayload"], &workspace_slug, &repo_slug, &pr_number, &repo_provider).await;
if event_type == "pullrequest:approved" {
todo!("Process approved event");
}
if is_reviewable || event_type == "pullrequest:created" || event_type == "pullrequest:approved" {
if is_reviewable && (event_type == "pullrequest:created" || event_type == "pullrequest:updated" ) {
task::spawn(async move {
process_review(&data_bytes_async).await;
println!("Processed webhook callback message");
Expand All @@ -72,32 +72,6 @@ async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<
};
}


async fn prcoess_install_callback(data_bytes: &[u8]) {
println!("Processing install callback message");
let msg_data_res = serde_json::from_slice::<InstallCallback>(data_bytes);
if msg_data_res.is_err() {
eprintln!("Error deserializing install callback: {:?}", msg_data_res);
return;
}
let data = msg_data_res.expect("msg_data not found");
if data.repository_provider == ProviderEnum::Github.to_string().to_lowercase() {
println!("To be Implemented");
// let code_async = data.installation_code.clone();
// task::spawn(async move {
// handle_install_github(&code_async).await;
// println!("Processed install callback message");
// });
}
if data.repository_provider == ProviderEnum::Bitbucket.to_string().to_lowercase() {
let code_async = data.installation_code.clone();
task::spawn(async move {
handle_install_bitbucket(&code_async).await;
println!("Processed install callback message");
});
}
}

pub async fn get_pubsub_client_config(keypath: &str) -> ClientConfig {
let credfile = CredentialsFile::new_from_file(keypath.to_string()).await
.expect("Failed to locate credentials file");
Expand Down

0 comments on commit 9da8060

Please sign in to comment.