Skip to content

Commit

Permalink
Merge pull request #2 from Alokit-Innovations/tr/pubsub
Browse files Browse the repository at this point in the history
Implement PubSub listener with different message types
  • Loading branch information
avikalpg authored Oct 5, 2023
2 parents 5c91338 + c0bf8b4 commit 7923a84
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 3 deletions.
24 changes: 21 additions & 3 deletions vibi-dpu/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
fn main() {
println!("Hello, world!");
}
use std::env;
mod pubsub;
// mod db;
// mod core;
// mod bitbucket;
// mod utils;

#[tokio::main]
async fn main() {
// Get topic subscription and Listen to messages
let gcp_credentials = //"/home/tapishr/dev-profiler/pubsub-sa.json".to_owned();
env::var("GCP_CREDENTIALS").expect("GCP_CREDENTIALS must be set");
let topic_name = //"rtapish-fromserver".to_owned();
env::var("INSTALL_ID").expect("INSTALL_ID must be set");
println!("env vars = {}, {}", &gcp_credentials, &topic_name);

pubsub::listener::listen_messages(
&gcp_credentials,
&topic_name,
).await;
}
121 changes: 121 additions & 0 deletions vibi-dpu/src/pubsub/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::collections::HashMap;

use futures_util::StreamExt;
use google_cloud_auth::credentials::CredentialsFile;
use google_cloud_default::WithAuthExt;
use google_cloud_pubsub::{
client::{Client, ClientConfig},
subscription::{SubscriptionConfig, Subscription},
};
use serde::Deserialize;
use tokio::task;
use std::collections::VecDeque;
use sha256::digest;
use tonic::Code;
// use crate::core::{setup::handle_install_bitbucket, review::process_review}; // To be added in future PR

#[derive(Debug, Deserialize)]
struct InstallCallback {
repository_provider: String,
installation_code: String,
}


async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<u8>) {
let msgtype_opt = attributes.get("msgtype");
if msgtype_opt.is_none() {
eprintln!("msgtype attribute not found in message : {:?}", attributes);
return;
}
let msgtype = msgtype_opt.expect("Empty msgtype");
match msgtype.as_str() {
"install_callback" => {
println!("Processing install callback message");
let msg_data_res = serde_json::from_slice::<InstallCallback>(data_bytes);
if msg_data_res.is_err() {
eprintln!("Error deserializing install callback: {:?}", msg_data_res);
return;
}
let data = msg_data_res.expect("msg_data not found");
let code_async = data.installation_code.clone();
task::spawn(async move {
// handle_install_bitbucket(&code_async).await;
println!("Processed install callback message");
});
},
"webhook_callback" => {
task::spawn(async move {
// process_review(&data_bytes).await;
println!("Processed webhook callback message");
});
}
_ => {
eprintln!("Message type not found for message : {:?}", attributes);
}
};
}

pub async fn get_pubsub_client_config(keypath: &str) -> ClientConfig {
let credfile = CredentialsFile::new_from_file(keypath.to_string()).await
.expect("Failed to locate credentials file");
return ClientConfig::default()
.with_credentials(credfile)
.await
.expect("Unable to get PubSub Client config");
}

async fn setup_subscription(keypath: &str, topicname: &str) -> Subscription{
let config = get_pubsub_client_config(keypath).await;
let client = Client::new(config).await
.expect("Unable to create pubsub client to listen to messages");
let topic = client.topic(topicname);
let topic_res = topic.exists(None).await;
if topic_res.is_err() {
let e = topic_res.expect_err("No error found in topic_res");
if e.code() == Code::NotFound {
client.create_topic(topicname, None, None).await
.expect("Unable to create topic");
}
else {
eprintln!("Error getting topic: {:?}", e);
}
}
let sub_config = SubscriptionConfig {
enable_message_ordering: true,
..Default::default()
};
let subscription_name = format!("{topicname}-sub");
let subscription = client.subscription(&subscriptionname);
if !subscription.exists(None).await.expect("Unable to get subscription information") {
subscription.create(
topic.fully_qualified_name(), subconfig, None)
.await.expect("Unable to create subscription for listening to messages");
}
println!("sub = {:?}", &subscription);
subscription
}

pub async fn listen_messages(keypath: &str, topicname: &str) {
let queue_cap = 100;
let mut message_hashes = VecDeque::with_capacity(queue_cap);
let subscription = setup_subscription(keypath, topicname).await;
let mut stream = subscription.subscribe(None).await
.expect("Unable to subscribe to messages");
while let Some(message) = stream.next().await {
println!("Listening for messages...");
let attrmap: HashMap<String, String> = message.message.attributes.clone().into_iter().collect();
let message_hash = digest(&*message.message.data);
if !message_hashes.contains(&message_hash) {
message_hashes.push_back(message_hash);
if message_hashes.len() > queue_cap {
while message_hashes.len() > queue_cap {
message_hashes.pop_front();
}
}
let msg_bytes = message.message.data.clone();
process_message(&attrmap, &msg_bytes).await;
}
// Ack or Nack message.
let _ = message.ack().await;
}
}
1 change: 1 addition & 0 deletions vibi-dpu/src/pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod listener;

0 comments on commit 7923a84

Please sign in to comment.