From c10d30bd35494ea336d90d0abf9977349c38d154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Rou=C3=ABn=C3=A9?= Date: Mon, 24 Apr 2023 11:27:59 +0200 Subject: [PATCH] enhancement(amqp sink): Support AMQ Properties (content-type) in AMQP sink (#17174) * Support AMQ Properties in AMQP sinks * Refacto removing properties.rs * Update src/sinks/amqp/config.rs Co-authored-by: Stephen Wakely * Update src/sinks/amqp/config.rs Co-authored-by: neuronull * Update config.rs Refacto for documentation consistency. * Rustfmt * Generate component docs --------- Co-authored-by: Stephen Wakely Co-authored-by: neuronull --- src/sinks/amqp/config.rs | 32 +++++++++++++++++++ src/sinks/amqp/request_builder.rs | 4 +++ src/sinks/amqp/service.rs | 5 ++- src/sinks/amqp/sink.rs | 18 +++++++++-- .../reference/components/sinks/base/amqp.cue | 20 ++++++++++++ 5 files changed, 75 insertions(+), 4 deletions(-) diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index 1a22198cea06d..3af34e943d7b2 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -8,12 +8,40 @@ use crate::{ }; use codecs::TextSerializerConfig; use futures::FutureExt; +use lapin::{types::ShortString, BasicProperties}; use std::sync::Arc; use vector_config::configurable_component; use vector_core::config::AcknowledgementsConfig; use super::sink::AmqpSink; +/// AMQP properties configuration. +#[configurable_component] +#[configurable(title = "Configure the AMQP message properties.")] +#[derive(Clone, Debug, Default)] +pub struct AmqpPropertiesConfig { + /// Content-Type for the AMQP messages. + #[configurable(derived)] + pub(crate) content_type: Option, + + /// Content-Encoding for the AMQP messages. + #[configurable(derived)] + pub(crate) content_encoding: Option, +} + +impl AmqpPropertiesConfig { + pub(super) fn build(&self) -> BasicProperties { + let mut prop = BasicProperties::default(); + if let Some(content_type) = &self.content_type { + prop = prop.with_content_type(ShortString::from(content_type.clone())); + } + if let Some(content_encoding) = &self.content_encoding { + prop = prop.with_content_encoding(ShortString::from(content_encoding.clone())); + } + prop + } +} + /// Configuration for the `amqp` sink. /// /// Supports AMQP version 0.9.1 @@ -26,6 +54,9 @@ pub struct AmqpSinkConfig { /// Template used to generate a routing key which corresponds to a queue binding. pub(crate) routing_key: Option