diff --git a/src/sources/splunk_hec.rs b/src/sources/splunk_hec.rs index 50b49d5a253bd..1a9f6d9eed76f 100644 --- a/src/sources/splunk_hec.rs +++ b/src/sources/splunk_hec.rs @@ -11,8 +11,7 @@ use crate::{ use bytes::{Buf, Bytes}; use chrono::{DateTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; -use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt}; -use futures01::{Async, Stream}; +use futures::{stream, FutureExt, SinkExt, StreamExt, TryFutureExt}; use http::StatusCode; use serde::{de, Deserialize, Serialize}; use serde_json::{de::IoRead, json, Deserializer, Value as JsonValue}; @@ -173,7 +172,26 @@ impl SplunkSource { host: Option, gzip: bool, body: Bytes| { - process_service_request(out.clone(), channel, host, gzip, body) + let mut out = out + .clone() + .sink_map_err(|_| Rejection::from(ApiError::ServerShutdown)); + async move { + let reader: Box = if gzip { + Box::new(MultiGzDecoder::new(body.reader())) + } else { + Box::new(body.reader()) + }; + + let events = stream::iter(EventIterator::new(reader, channel, host)); + + // `fn send_all` can be used once https://github.com/rust-lang/futures-rs/issues/2402 + // is resolved. + let res = events.forward(&mut out).await; + + out.flush().await?; + + res + } }, ) .map(finish_ok) @@ -293,37 +311,9 @@ impl SplunkSource { } } -async fn process_service_request( - out: Pipeline, - channel: Option, - host: Option, - gzip: bool, - body: Bytes, -) -> Result<(), Rejection> { - use futures::compat::Stream01CompatExt; - - let mut out = out.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown)); - - let reader: Box = if gzip { - Box::new(MultiGzDecoder::new(body.reader())) - } else { - Box::new(body.reader()) - }; - - let stream = EventStream::new(reader, channel, host).compat(); - - let res = stream.forward(&mut out).await; - - out.flush() - .map_err(|_| Rejection::from(ApiError::ServerShutdown)) - .await?; - - res.map(|_| ()) -} - -/// Constructs one ore more events from json-s coming from reader. +/// Constructs one or more events from json-s coming from reader. /// If errors, it's done with input. -struct EventStream { +struct EventIterator { /// Remaining request with JSON events data: R, /// Count of sent events @@ -336,9 +326,9 @@ struct EventStream { extractors: [DefaultExtractor; 4], } -impl EventStream { +impl EventIterator { fn new(data: R, channel: Option, host: Option) -> Self { - EventStream { + EventIterator { data, events: 0, channel: channel.map(Value::from), @@ -365,30 +355,8 @@ impl EventStream { Some(_) => Deserialize::deserialize(&mut Deserializer::new(reader)).map(Some), } } -} - -impl Stream for EventStream { - type Item = Event; - type Error = Rejection; - fn poll(&mut self) -> Result>, Rejection> { - // Parse JSON object - let mut json = match self.from_reader_take::() { - Ok(Some(json)) => json, - Ok(None) => { - return if self.events == 0 { - Err(ApiError::NoData.into()) - } else { - Ok(Async::Ready(None)) - }; - } - Err(error) => { - emit!(SplunkHecRequestBodyInvalid { - error: error.into() - }); - return Err(ApiError::InvalidDataFormat { event: self.events }.into()); - } - }; + fn build_event(&mut self, mut json: JsonValue) -> Result { // Construct Event from parsed json event let mut event = Event::new_empty_log(); let log = event.as_mut_log(); @@ -486,7 +454,32 @@ impl Stream for EventStream { emit!(SplunkHecEventReceived); self.events += 1; - Ok(Async::Ready(Some(event))) + Ok(event) + } +} + +impl Iterator for EventIterator { + type Item = Result; + + fn next(&mut self) -> Option { + match self.from_reader_take::() { + Ok(Some(json)) => Some(self.build_event(json)), + Ok(None) => { + if self.events == 0 { + Some(Err(ApiError::NoData.into())) + } else { + None + } + } + Err(error) => { + emit!(SplunkHecRequestBodyInvalid { + error: error.into() + }); + Some(Err( + ApiError::InvalidDataFormat { event: self.events }.into() + )) + } + } } }