Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(splunk_hec source): Update to futures 0.3 #7225

Merged
merged 2 commits into from
Apr 25, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 46 additions & 59 deletions src/sources/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use crate::{
use bytes::{Buf, Bytes};
use chrono::{DateTime, TimeZone, Utc};
use flate2::read::MultiGzDecoder;
use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt};
use futures01::{Async, Stream};
use futures::{stream, FutureExt, SinkExt, StreamExt, TryFutureExt};
use http::StatusCode;
use serde::{de, Deserialize, Serialize};
use serde_json::{de::IoRead, json, Deserializer, Value as JsonValue};
Expand Down Expand Up @@ -173,7 +172,20 @@ impl SplunkSource {
host: Option<String>,
gzip: bool,
body: Bytes| {
process_service_request(out.clone(), channel, host, gzip, body)
let out = out.clone();
async move {
let reader: Box<dyn Read + Send> = if gzip {
Box::new(MultiGzDecoder::new(body.reader()))
} else {
Box::new(body.reader())
};

let mut events = stream::iter(EventIterator::new(reader, channel, host));

out.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown))
.send_all(&mut events)
.await
}
},
)
.map(finish_ok)
Expand Down Expand Up @@ -293,37 +305,9 @@ impl SplunkSource {
}
}

async fn process_service_request(
out: Pipeline,
channel: Option<String>,
host: Option<String>,
gzip: bool,
body: Bytes,
) -> Result<(), Rejection> {
use futures::compat::Stream01CompatExt;

let mut out = out.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown));

let reader: Box<dyn Read + Send> = if gzip {
Box::new(MultiGzDecoder::new(body.reader()))
} else {
Box::new(body.reader())
};

let stream = EventStream::new(reader, channel, host).compat();

let res = stream.forward(&mut out).await;

out.flush()
.map_err(|_| Rejection::from(ApiError::ServerShutdown))
.await?;

res.map(|_| ())
}

/// Constructs one ore more events from json-s coming from reader.
/// Constructs one or more events from json-s coming from reader.
/// If errors, it's done with input.
struct EventStream<R: Read> {
struct EventIterator<R: Read> {
/// Remaining request with JSON events
data: R,
/// Count of sent events
Expand All @@ -336,9 +320,9 @@ struct EventStream<R: Read> {
extractors: [DefaultExtractor; 4],
}

impl<R: Read> EventStream<R> {
impl<R: Read> EventIterator<R> {
fn new(data: R, channel: Option<String>, host: Option<String>) -> Self {
EventStream {
EventIterator {
data,
events: 0,
channel: channel.map(Value::from),
Expand All @@ -365,30 +349,8 @@ impl<R: Read> EventStream<R> {
Some(_) => Deserialize::deserialize(&mut Deserializer::new(reader)).map(Some),
}
}
}

impl<R: Read> Stream for EventStream<R> {
type Item = Event;
type Error = Rejection;
fn poll(&mut self) -> Result<Async<Option<Event>>, Rejection> {
// Parse JSON object
let mut json = match self.from_reader_take::<JsonValue>() {
Ok(Some(json)) => json,
Ok(None) => {
return if self.events == 0 {
Err(ApiError::NoData.into())
} else {
Ok(Async::Ready(None))
};
}
Err(error) => {
emit!(SplunkHecRequestBodyInvalid {
error: error.into()
});
return Err(ApiError::InvalidDataFormat { event: self.events }.into());
}
};

fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
// Construct Event from parsed json event
let mut event = Event::new_empty_log();
let log = event.as_mut_log();
Expand Down Expand Up @@ -486,7 +448,32 @@ impl<R: Read> Stream for EventStream<R> {
emit!(SplunkHecEventReceived);
self.events += 1;

Ok(Async::Ready(Some(event)))
Ok(event)
}
}

impl<R: Read> Iterator for EventIterator<R> {
type Item = Result<Event, Rejection>;

fn next(&mut self) -> Option<Self::Item> {
match self.from_reader_take::<JsonValue>() {
Ok(Some(json)) => Some(self.build_event(json)),
Ok(None) => {
if self.events == 0 {
Some(Err(ApiError::NoData.into()))
} else {
None
}
}
Err(error) => {
emit!(SplunkHecRequestBodyInvalid {
error: error.into()
});
Some(Err(
ApiError::InvalidDataFormat { event: self.events }.into()
))
}
}
}
}

Expand Down