diff --git a/vibi-dpu/src/pubsub/listener.rs b/vibi-dpu/src/pubsub/listener.rs index eb2fe655..cbc048bd 100644 --- a/vibi-dpu/src/pubsub/listener.rs +++ b/vibi-dpu/src/pubsub/listener.rs @@ -25,6 +25,7 @@ async fn process_message(attributes: &HashMap, data_bytes: &Vec< let msgtype_opt = attributes.get("msgtype"); if msgtype_opt.is_none() { eprintln!("msgtype attribute not found in message : {:?}", attributes); + return; } let msgtype = msgtype_opt.expect("Empty msgtype"); match msgtype.as_str() { @@ -32,9 +33,8 @@ async fn process_message(attributes: &HashMap, data_bytes: &Vec< println!("Processing install callback message"); let msg_data_res = serde_json::from_slice::(data_bytes); if msg_data_res.is_err() { - eprintln!("Error deserializing install callback: {:?}", - msg_data_res.expect_err("No error in msg_data")); - return; + eprintln!("Error deserializing install callback: {:?}", msg_data_res); + return; } let data = msg_data_res.expect("msg_data not found"); let code_async = data.installation_code.clone(); @@ -81,11 +81,11 @@ async fn setup_subscription(keypath: &str, topicname: &str) -> Subscription{ eprintln!("Error getting topic: {:?}", e); } } - let subconfig = SubscriptionConfig { + let sub_config = SubscriptionConfig { enable_message_ordering: true, ..Default::default() }; - let subscriptionname = format!("{topicname}-sub"); + let subscription_name = format!("{topicname}-sub"); let subscription = client.subscription(&subscriptionname); if !subscription.exists(None).await.expect("Unable to get subscription information") { subscription.create(