Skip to content

Commit

Permalink
enhancement(logdna sink): Support template syntax in hostname and tag…
Browse files Browse the repository at this point in the history
…s field (#4884)

* Template and partition

Signed-off-by: Duy Do <juchiast@gmail.com>

* testing

Signed-off-by: Duy Do <juchiast@gmail.com>

* partition test

Signed-off-by: Duy Do <juchiast@gmail.com>

* with_capacity

Signed-off-by: Duy Do <juchiast@gmail.com>

* remove clone

Signed-off-by: Duy Do <juchiast@gmail.com>
  • Loading branch information
juchiast authored Nov 6, 2020
1 parent f86da29 commit aade9e6
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 51 deletions.
136 changes: 89 additions & 47 deletions src/sinks/logdna.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use crate::{
http::{Auth, HttpClient},
sinks::util::{
encoding::{EncodingConfigWithDefault, EncodingConfiguration},
http::{BatchedHttpSink, HttpSink},
BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, TowerRequestConfig, UriSerde,
http::{HttpSink, PartitionHttpSink},
BatchConfig, BatchSettings, BoxedRawValue, JsonArrayBuffer, PartitionBuffer,
PartitionInnerBuffer, TowerRequestConfig, UriSerde,
},
template::Template,
};
use futures::FutureExt;
use futures01::Sink;
Expand All @@ -28,10 +30,10 @@ pub struct LogdnaConfig {
#[serde(alias = "host")]
endpoint: Option<UriSerde>,

hostname: String,
hostname: Template,
mac: Option<String>,
ip: Option<String>,
tags: Option<Vec<String>>,
tags: Option<Vec<Template>>,

#[serde(
skip_serializing_if = "crate::serde::skip_serializing_if_default",
Expand Down Expand Up @@ -85,9 +87,9 @@ impl SinkConfig for LogdnaConfig {
.parse_config(self.batch)?;
let client = HttpClient::new(None)?;

let sink = BatchedHttpSink::new(
let sink = PartitionHttpSink::new(
self.clone(),
JsonArrayBuffer::new(batch_settings.size),
PartitionBuffer::new(JsonArrayBuffer::new(batch_settings.size)),
request_settings,
batch_settings.timeout,
client.clone(),
Expand All @@ -112,12 +114,29 @@ impl SinkConfig for LogdnaConfig {
}
}

#[derive(Hash, Eq, PartialEq, Clone)]
pub struct PartitionKey {
hostname: String,
tags: Option<Vec<String>>,
}

#[async_trait::async_trait]
impl HttpSink for LogdnaConfig {
type Input = serde_json::Value;
type Output = Vec<BoxedRawValue>;
type Input = PartitionInnerBuffer<serde_json::Value, PartitionKey>;
type Output = PartitionInnerBuffer<Vec<BoxedRawValue>, PartitionKey>;

fn encode_event(&self, mut event: Event) -> Option<Self::Input> {
let key = self
.render_key(&event)
.map_err(|missing| {
error!(
message = "Error rendering template.",
?missing,
rate_limit_secs = 30
);
})
.ok()?;

self.encoding.apply_rules(&mut event);
let mut log = event.into_log();

Expand Down Expand Up @@ -164,18 +183,19 @@ impl HttpSink for LogdnaConfig {
map.insert("meta".into(), json!(&log));
}

Some(map.into())
Some(PartitionInnerBuffer::new(map.into(), key))
}

async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
async fn build_request(&self, output: Self::Output) -> crate::Result<http::Request<Vec<u8>>> {
let (events, key) = output.into_parts();
let mut query = url::form_urlencoded::Serializer::new(String::new());

let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time can't drift behind the epoch!")
.as_millis();

query.append_pair("hostname", &self.hostname);
query.append_pair("hostname", &key.hostname);
query.append_pair("now", &format!("{}", now));

if let Some(mac) = &self.mac {
Expand All @@ -186,7 +206,7 @@ impl HttpSink for LogdnaConfig {
query.append_pair("ip", ip);
}

if let Some(tags) = &self.tags {
if let Some(tags) = &key.tags {
let tags = tags.join(",");
query.append_pair("tags", &tags);
}
Expand Down Expand Up @@ -227,6 +247,22 @@ impl LogdnaConfig {
uri.parse::<http::Uri>()
.expect("This should be a valid uri")
}

fn render_key(&self, event: &Event) -> Result<PartitionKey, Vec<String>> {
let hostname = self.hostname.render_string(&event)?;
let tags = self
.tags
.as_ref()
.map(|tags| -> Result<Option<Vec<String>>, Vec<String>> {
let mut vec = Vec::with_capacity(tags.len());
for tag in tags {
vec.push(tag.render_string(event)?);
}
Ok(Some(vec))
})
.unwrap_or(Ok(None))?;
Ok(PartitionKey { hostname, tags })
}
}

async fn healthcheck(config: LogdnaConfig, mut client: HttpClient) -> crate::Result<()> {
Expand Down Expand Up @@ -288,13 +324,13 @@ mod tests {
let mut event4 = Event::from("hello world");
event4.as_mut_log().insert("env", "staging");

let event1_out = config.encode_event(event1).unwrap();
let event1_out = config.encode_event(event1).unwrap().into_parts().0;
let event1_out = event1_out.as_object().unwrap();
let event2_out = config.encode_event(event2).unwrap();
let event2_out = config.encode_event(event2).unwrap().into_parts().0;
let event2_out = event2_out.as_object().unwrap();
let event3_out = config.encode_event(event3).unwrap();
let event3_out = config.encode_event(event3).unwrap().into_parts().0;
let event3_out = event3_out.as_object().unwrap();
let event4_out = config.encode_event(event4).unwrap();
let event4_out = config.encode_event(event4).unwrap().into_parts().0;
let event4_out = event4_out.as_object().unwrap();

assert_eq!(event1_out.get("app").unwrap(), &json!("notvector"));
Expand All @@ -313,7 +349,7 @@ mod tests {
api_key = "mylogtoken"
ip = "127.0.0.1"
mac = "some-mac-addr"
hostname = "vector"
hostname = "{{ hostname }}"
tags = ["test","maybeanothertest"]
"#,
)
Expand All @@ -335,55 +371,61 @@ mod tests {

let lines = random_lines(100).take(10).collect::<Vec<_>>();
let mut events = Vec::new();
let hosts = ["host0", "host1"];

let mut partitions = vec![Vec::new(), Vec::new()];
// Create 10 events where the first one contains custom
// fields that are not just `message`.
for (i, line) in lines.iter().enumerate() {
let event = if i == 0 {
let mut event = Event::from(line.as_str());
event.as_mut_log().insert("key1", "value1");
event
} else {
Event::from(line.as_str())
};
let mut event = Event::from(line.as_str());
let p = i % 2;
event.as_mut_log().insert("hostname", hosts[p]);

partitions[p].push(line);
events.push(event);
}

sink.run(stream::iter(events)).await.unwrap();

let output = rx.next().await.unwrap();
for _ in 0..partitions.len() {
let output = rx.next().await.unwrap();

let request = &output.0;
let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();
let request = &output.0;
let body: serde_json::Value = serde_json::from_slice(&output.1[..]).unwrap();

let query = request.uri.query().unwrap();
assert!(query.contains("hostname=vector"));
assert!(query.contains("ip=127.0.0.1"));
assert!(query.contains("mac=some-mac-addr"));
assert!(query.contains("tags=test%2Cmaybeanothertest"));
let query = request.uri.query().unwrap();

let output = body
.as_object()
.unwrap()
.get("lines")
.unwrap()
.as_array()
.unwrap();
let (p, host) = hosts
.iter()
.enumerate()
.find(|(_, host)| query.contains(&format!("hostname={}", host)))
.expect("invalid hostname");
let lines = &partitions[p];

assert!(query.contains("ip=127.0.0.1"));
assert!(query.contains("mac=some-mac-addr"));
assert!(query.contains("tags=test%2Cmaybeanothertest"));

let output = body
.as_object()
.unwrap()
.get("lines")
.unwrap()
.as_array()
.unwrap();

for (i, line) in output.iter().enumerate() {
// All lines are json objects
let line = line.as_object().unwrap();
for (i, line) in output.iter().enumerate() {
// All lines are json objects
let line = line.as_object().unwrap();

assert_eq!(line.get("app").unwrap(), &json!("vector"));
assert_eq!(line.get("env").unwrap(), &json!("production"));
assert_eq!(line.get("line").unwrap(), &json!(lines[i]));
assert_eq!(line.get("app").unwrap(), &json!("vector"));
assert_eq!(line.get("env").unwrap(), &json!("production"));
assert_eq!(line.get("line").unwrap(), &json!(lines[i]));

if i == 0 {
assert_eq!(
line.get("meta").unwrap(),
&json!({
"key1": "value1"
"hostname": host,
})
);
}
Expand Down
127 changes: 126 additions & 1 deletion src/sinks/util/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
retries::{RetryAction, RetryLogic},
sink, Batch, TowerBatchedSink, TowerRequestSettings,
sink, Batch, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestSettings,
};
use crate::{buffers::Acker, event::Event, http::HttpClient};
use bytes::{Buf, Bytes};
Expand All @@ -11,6 +11,7 @@ use hyper::body::{self, Body};
use std::{
fmt,
future::Future,
hash::Hash,
sync::Arc,
task::{Context, Poll},
time::Duration,
Expand All @@ -26,6 +27,130 @@ pub trait HttpSink: Send + Sync + 'static {
async fn build_request(&self, events: Self::Output) -> crate::Result<http::Request<Vec<u8>>>;
}

pub struct PartitionHttpSink<T, B, K, L = HttpRetryLogic>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
sink: Arc<T>,
inner: TowerPartitionSink<
HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Vec<u8>>>>, B::Output>,
B,
L,
K,
B::Output,
>,
slot: Option<B::Input>,
}

impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
pub fn new(
sink: T,
batch: B,
request_settings: TowerRequestSettings,
batch_timeout: Duration,
client: HttpClient,
acker: Acker,
) -> Self {
Self::with_retry_logic(
sink,
batch,
HttpRetryLogic,
request_settings,
batch_timeout,
client,
acker,
)
}
}

impl<T, B, K, L> PartitionHttpSink<T, B, K, L>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
L: RetryLogic<Response = http::Response<Bytes>, Error = hyper::Error> + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
{
pub fn with_retry_logic(
sink: T,
batch: B,
logic: L,
request_settings: TowerRequestSettings,
batch_timeout: Duration,
client: HttpClient,
acker: Acker,
) -> Self {
let sink = Arc::new(sink);

let sink1 = Arc::clone(&sink);
let request_builder =
move |b| -> BoxFuture<'static, crate::Result<http::Request<Vec<u8>>>> {
let sink = Arc::clone(&sink1);
Box::pin(async move { sink.build_request(b).await })
};

let svc = HttpBatchService::new(client, request_builder);
let inner = request_settings.partition_sink(logic, svc, batch, batch_timeout, acker);

Self {
sink,
inner,
slot: None,
}
}
}

impl<T, B, K, L> Sink for PartitionHttpSink<T, B, K, L>
where
B: Batch,
B::Output: Clone + Send + 'static,
B::Input: Partition<K>,
K: Hash + Eq + Clone + Send + 'static,
T: HttpSink<Input = B::Input, Output = B::Output>,
L: RetryLogic<Response = http::Response<Bytes>> + Send + 'static,
{
type SinkItem = crate::Event;
type SinkError = crate::Error;

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
if self.slot.is_some() && self.poll_complete()?.is_not_ready() {
return Ok(AsyncSink::NotReady(item));
}
assert!(self.slot.is_none(), "poll_complete did not clear slot");

if let Some(item) = self.sink.encode_event(item) {
self.slot = Some(item);
self.poll_complete()?;
}

Ok(AsyncSink::Ready)
}

fn poll_complete(&mut self) -> Poll01<(), Self::SinkError> {
if let Some(item) = self.slot.take() {
if let AsyncSink::NotReady(item) = self.inner.start_send(item)? {
self.slot = Some(item);
return Ok(Async::NotReady);
}
}

self.inner.poll_complete()
}
}

/// Provides a simple wrapper around internal tower and
/// batching sinks for http.
///
Expand Down
Loading

0 comments on commit aade9e6

Please sign in to comment.