From 69ab3428a361a4de331eeb4277c815ee236286cd Mon Sep 17 00:00:00 2001 From: Joaquin Hoyos Date: Mon, 30 Oct 2023 16:07:19 -0300 Subject: [PATCH 1/2] add ordering key for pubsub --- src/sinks/gcp_pubsub/run.rs | 10 ++++++++-- src/sinks/gcp_pubsub/setup.rs | 18 ++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index d89bf52d..d7690e08 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -15,10 +15,15 @@ use crate::{ utils::{retry, Utils}, }; -async fn send_pubsub_msg(publisher: &Publisher, event: &Event) -> Result<(), crate::Error> { +async fn send_pubsub_msg( + publisher: &Publisher, + event: &Event, + ordering_key: String, +) -> Result<(), crate::Error> { let body = json!(event).to_string(); let msg = PubsubMessage { data: body.into(), + ordering_key, ..Default::default() }; @@ -37,6 +42,7 @@ pub fn writer_loop( topic_name: &str, error_policy: &ErrorPolicy, retry_policy: &retry::Policy, + ordering_key: &String, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -52,7 +58,7 @@ pub fn writer_loop( for event in input.iter() { let result = retry::retry_operation( - || rt.block_on(send_pubsub_msg(&publisher, &event)), + || rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key.clone())), retry_policy, ); diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index b27e562a..4435ac49 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -13,6 +13,7 @@ pub struct Config { pub topic: String, pub error_policy: Option, pub retry_policy: Option, + pub ordering_key: Option, #[warn(deprecated)] pub credentials: Option, @@ -30,12 +31,25 @@ impl SinkProvider for WithUtils { .unwrap_or(ErrorPolicy::Exit); let retry_policy = self.inner.retry_policy.unwrap_or_default(); + let ordering_key = self + .inner + .ordering_key + .as_ref() + .cloned() + .unwrap_or_default(); let utils = self.utils.clone(); let handle = std::thread::spawn(move || { - writer_loop(input, &topic_name, &error_policy, &retry_policy, utils) - .expect("writer loop failed"); + writer_loop( + input, + &topic_name, + &error_policy, + &retry_policy, + &ordering_key, + utils, + ) + .expect("writer loop failed"); }); Ok(handle) From 6e7ce8de6eb0d54e16584a0f765f0b964b0fd3ff Mon Sep 17 00:00:00 2001 From: Paulo Cesar <461084+pocesar@users.noreply.github.com> Date: Mon, 30 Oct 2023 19:08:16 -0300 Subject: [PATCH 2/2] chore: remove cloning --- src/sinks/gcp_pubsub/run.rs | 8 ++++---- src/sinks/gcp_pubsub/setup.rs | 3 +-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/sinks/gcp_pubsub/run.rs b/src/sinks/gcp_pubsub/run.rs index d7690e08..5244ffe1 100644 --- a/src/sinks/gcp_pubsub/run.rs +++ b/src/sinks/gcp_pubsub/run.rs @@ -18,12 +18,12 @@ use crate::{ async fn send_pubsub_msg( publisher: &Publisher, event: &Event, - ordering_key: String, + ordering_key: &str, ) -> Result<(), crate::Error> { let body = json!(event).to_string(); let msg = PubsubMessage { data: body.into(), - ordering_key, + ordering_key: ordering_key.into(), ..Default::default() }; @@ -42,7 +42,7 @@ pub fn writer_loop( topic_name: &str, error_policy: &ErrorPolicy, retry_policy: &retry::Policy, - ordering_key: &String, + ordering_key: &str, utils: Arc, ) -> Result<(), crate::Error> { let rt = tokio::runtime::Builder::new_current_thread() @@ -58,7 +58,7 @@ pub fn writer_loop( for event in input.iter() { let result = retry::retry_operation( - || rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key.clone())), + || rt.block_on(send_pubsub_msg(&publisher, &event, ordering_key)), retry_policy, ); diff --git a/src/sinks/gcp_pubsub/setup.rs b/src/sinks/gcp_pubsub/setup.rs index 4435ac49..74abdc80 100644 --- a/src/sinks/gcp_pubsub/setup.rs +++ b/src/sinks/gcp_pubsub/setup.rs @@ -34,8 +34,7 @@ impl SinkProvider for WithUtils { let ordering_key = self .inner .ordering_key - .as_ref() - .cloned() + .to_owned() .unwrap_or_default(); let utils = self.utils.clone();