diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index 1a22198cea06d..3af34e943d7b2 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -8,12 +8,40 @@ use crate::{ }; use codecs::TextSerializerConfig; use futures::FutureExt; +use lapin::{types::ShortString, BasicProperties}; use std::sync::Arc; use vector_config::configurable_component; use vector_core::config::AcknowledgementsConfig; use super::sink::AmqpSink; +/// AMQP properties configuration. +#[configurable_component] +#[configurable(title = "Configure the AMQP message properties.")] +#[derive(Clone, Debug, Default)] +pub struct AmqpPropertiesConfig { + /// Content-Type for the AMQP messages. + #[configurable(derived)] + pub(crate) content_type: Option, + + /// Content-Encoding for the AMQP messages. + #[configurable(derived)] + pub(crate) content_encoding: Option, +} + +impl AmqpPropertiesConfig { + pub(super) fn build(&self) -> BasicProperties { + let mut prop = BasicProperties::default(); + if let Some(content_type) = &self.content_type { + prop = prop.with_content_type(ShortString::from(content_type.clone())); + } + if let Some(content_encoding) = &self.content_encoding { + prop = prop.with_content_encoding(ShortString::from(content_encoding.clone())); + } + prop + } +} + /// Configuration for the `amqp` sink. /// /// Supports AMQP version 0.9.1 @@ -26,6 +54,9 @@ pub struct AmqpSinkConfig { /// Template used to generate a routing key which corresponds to a queue binding. pub(crate) routing_key: Option