Skip to content

Commit

Permalink
enhancement(amqp sink): Support AMQ Properties (content-type) in AMQP…
Browse files Browse the repository at this point in the history
… sink (#17174)

* Support AMQ Properties in AMQP sinks

* Refacto removing properties.rs

* Update src/sinks/amqp/config.rs

Co-authored-by: Stephen Wakely <stephen@lisp.space>

* Update src/sinks/amqp/config.rs

Co-authored-by: neuronull <kyle.criddle@datadoghq.com>

* Update config.rs

Refacto for documentation consistency.

* Rustfmt

* Generate component docs

---------

Co-authored-by: Stephen Wakely <stephen@lisp.space>
Co-authored-by: neuronull <kyle.criddle@datadoghq.com>
  • Loading branch information
3 people authored Apr 24, 2023
1 parent 5dff0ed commit c10d30b
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 4 deletions.
32 changes: 32 additions & 0 deletions src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,40 @@ use crate::{
};
use codecs::TextSerializerConfig;
use futures::FutureExt;
use lapin::{types::ShortString, BasicProperties};
use std::sync::Arc;
use vector_config::configurable_component;
use vector_core::config::AcknowledgementsConfig;

use super::sink::AmqpSink;

/// AMQP properties configuration.
#[configurable_component]
#[configurable(title = "Configure the AMQP message properties.")]
#[derive(Clone, Debug, Default)]
pub struct AmqpPropertiesConfig {
/// Content-Type for the AMQP messages.
#[configurable(derived)]
pub(crate) content_type: Option<String>,

/// Content-Encoding for the AMQP messages.
#[configurable(derived)]
pub(crate) content_encoding: Option<String>,
}

impl AmqpPropertiesConfig {
pub(super) fn build(&self) -> BasicProperties {
let mut prop = BasicProperties::default();
if let Some(content_type) = &self.content_type {
prop = prop.with_content_type(ShortString::from(content_type.clone()));
}
if let Some(content_encoding) = &self.content_encoding {
prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
}
prop
}
}

/// Configuration for the `amqp` sink.
///
/// Supports AMQP version 0.9.1
Expand All @@ -26,6 +54,9 @@ pub struct AmqpSinkConfig {
/// Template used to generate a routing key which corresponds to a queue binding.
pub(crate) routing_key: Option<Template>,

/// AMQP message properties.
pub(crate) properties: Option<AmqpPropertiesConfig>,

#[serde(flatten)]
pub(crate) connection: AmqpConfig,

Expand All @@ -46,6 +77,7 @@ impl Default for AmqpSinkConfig {
Self {
exchange: Template::try_from("vector").unwrap(),
routing_key: None,
properties: None,
encoding: TextSerializerConfig::default().into(),
connection: AmqpConfig::default(),
acknowledgements: AcknowledgementsConfig::default(),
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/amqp/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
},
};
use bytes::Bytes;
use lapin::BasicProperties;
use std::io;
use vector_common::{
finalization::{EventFinalizers, Finalizable},
Expand All @@ -20,6 +21,7 @@ use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent};
pub(super) struct AmqpMetadata {
exchange: String,
routing_key: String,
properties: BasicProperties,
finalizers: EventFinalizers,
}

Expand Down Expand Up @@ -55,6 +57,7 @@ impl RequestBuilder<AmqpEvent> for AmqpRequestBuilder {
let metadata = AmqpMetadata {
exchange: input.exchange,
routing_key: input.routing_key,
properties: input.properties,
finalizers: input.event.take_finalizers(),
};

Expand All @@ -72,6 +75,7 @@ impl RequestBuilder<AmqpEvent> for AmqpRequestBuilder {
body,
amqp_metadata.exchange,
amqp_metadata.routing_key,
amqp_metadata.properties,
amqp_metadata.finalizers,
metadata,
)
Expand Down
5 changes: 4 additions & 1 deletion src/sinks/amqp/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub(super) struct AmqpRequest {
body: Bytes,
exchange: String,
routing_key: String,
properties: BasicProperties,
finalizers: EventFinalizers,
metadata: RequestMetadata,
}
Expand All @@ -32,13 +33,15 @@ impl AmqpRequest {
body: Bytes,
exchange: String,
routing_key: String,
properties: BasicProperties,
finalizers: EventFinalizers,
metadata: RequestMetadata,
) -> Self {
Self {
body,
exchange,
routing_key,
properties,
finalizers,
metadata,
}
Expand Down Expand Up @@ -117,7 +120,7 @@ impl Service<AmqpRequest> for AmqpService {
&req.routing_key,
BasicPublishOptions::default(),
req.body.as_ref(),
BasicProperties::default(),
req.properties,
)
.await;

Expand Down
18 changes: 15 additions & 3 deletions src/sinks/amqp/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use crate::{
use async_trait::async_trait;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use lapin::options::ConfirmSelectOptions;
use lapin::{options::ConfirmSelectOptions, BasicProperties};
use serde::Serialize;
use std::sync::Arc;
use tower::ServiceBuilder;
use vector_buffers::EventCount;
use vector_core::{sink::StreamSink, ByteSizeOf, EstimatedJsonEncodedSizeOf};

use super::{
config::AmqpSinkConfig, encoder::AmqpEncoder, request_builder::AmqpRequestBuilder,
service::AmqpService, BuildError,
config::{AmqpPropertiesConfig, AmqpSinkConfig},
encoder::AmqpEncoder,
request_builder::AmqpRequestBuilder,
service::AmqpService,
BuildError,
};

/// Stores the event together with the rendered exchange and routing_key values.
Expand All @@ -29,6 +32,7 @@ pub(super) struct AmqpEvent {
pub(super) event: Event,
pub(super) exchange: String,
pub(super) routing_key: String,
pub(super) properties: BasicProperties,
}

impl EventCount for AmqpEvent {
Expand All @@ -54,6 +58,7 @@ pub(super) struct AmqpSink {
pub(super) channel: Arc<lapin::Channel>,
exchange: Template,
routing_key: Option<Template>,
properties: Option<AmqpPropertiesConfig>,
transformer: Transformer,
encoder: crate::codecs::Encoder<()>,
}
Expand Down Expand Up @@ -81,6 +86,7 @@ impl AmqpSink {
channel: Arc::new(channel),
exchange: config.exchange,
routing_key: config.routing_key,
properties: config.properties,
transformer,
encoder,
})
Expand Down Expand Up @@ -115,10 +121,16 @@ impl AmqpSink {
.ok()?,
};

let properties = match &self.properties {
None => BasicProperties::default(),
Some(prop) => prop.build(),
};

Some(AmqpEvent {
event,
exchange,
routing_key,
properties,
})
}

Expand Down
20 changes: 20 additions & 0 deletions website/cue/reference/components/sinks/base/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ base: components: sinks: amqp: configuration: {
required: true
type: string: syntax: "template"
}
properties: {
description: """
Configure the AMQP message properties.
AMQP message properties.
"""
required: false
type: object: options: {
content_encoding: {
description: "Content-Encoding for the AMQP messages."
required: false
type: string: {}
}
content_type: {
description: "Content-Type for the AMQP messages."
required: false
type: string: {}
}
}
}
routing_key: {
description: "Template used to generate a routing key which corresponds to a queue binding."
required: false
Expand Down

0 comments on commit c10d30b

Please sign in to comment.