Skip to content

Commit

Permalink
Merge branch 'tr/pubsub' into tr/setup
Browse files Browse the repository at this point in the history
  • Loading branch information
tapishr committed Sep 18, 2023
2 parents 419cdde + c0bf8b4 commit 3312b0d
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions vibi-dpu/src/pubsub/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ async fn process_message(attributes: &HashMap<String, String>, data_bytes: &Vec<
let msgtype_opt = attributes.get("msgtype");
if msgtype_opt.is_none() {
eprintln!("msgtype attribute not found in message : {:?}", attributes);
return;
}
let msgtype = msgtype_opt.expect("Empty msgtype");
match msgtype.as_str() {
"install_callback" => {
println!("Processing install callback message");
let msg_data_res = serde_json::from_slice::<InstallCallback>(data_bytes);
if msg_data_res.is_err() {
eprintln!("Error deserializing install callback: {:?}",
msg_data_res.expect_err("No error in msg_data"));
return;
eprintln!("Error deserializing install callback: {:?}", msg_data_res);
return;
}
let data = msg_data_res.expect("msg_data not found");
let code_async = data.installation_code.clone();
Expand Down Expand Up @@ -81,11 +81,11 @@ async fn setup_subscription(keypath: &str, topicname: &str) -> Subscription{
eprintln!("Error getting topic: {:?}", e);
}
}
let subconfig = SubscriptionConfig {
let sub_config = SubscriptionConfig {
enable_message_ordering: true,
..Default::default()
};
let subscriptionname = format!("{topicname}-sub");
let subscription_name = format!("{topicname}-sub");
let subscription = client.subscription(&subscriptionname);
if !subscription.exists(None).await.expect("Unable to get subscription information") {
subscription.create(
Expand Down

0 comments on commit 3312b0d

Please sign in to comment.