Skip to content

Commit

Permalink
chore(splunk_hec source): Update to futures 0.3 (#7225)
Browse files Browse the repository at this point in the history
* Update source

Signed-off-by: ktf <krunotf@gmail.com>

* Use forward

Signed-off-by: ktf <krunotf@gmail.com>
  • Loading branch information
ktff authored Apr 25, 2021
1 parent 990ee46 commit e12ac0e
Showing 1 changed file with 52 additions and 59 deletions.
111 changes: 52 additions & 59 deletions src/sources/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use crate::{
use bytes::{Buf, Bytes};
use chrono::{DateTime, TimeZone, Utc};
use flate2::read::MultiGzDecoder;
use futures::{FutureExt, SinkExt, StreamExt, TryFutureExt};
use futures01::{Async, Stream};
use futures::{stream, FutureExt, SinkExt, StreamExt, TryFutureExt};
use http::StatusCode;
use serde::{de, Deserialize, Serialize};
use serde_json::{de::IoRead, json, Deserializer, Value as JsonValue};
Expand Down Expand Up @@ -173,7 +172,26 @@ impl SplunkSource {
host: Option<String>,
gzip: bool,
body: Bytes| {
process_service_request(out.clone(), channel, host, gzip, body)
let mut out = out
.clone()
.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown));
async move {
let reader: Box<dyn Read + Send> = if gzip {
Box::new(MultiGzDecoder::new(body.reader()))
} else {
Box::new(body.reader())
};

let events = stream::iter(EventIterator::new(reader, channel, host));

// `fn send_all` can be used once https://github.com/rust-lang/futures-rs/issues/2402
// is resolved.
let res = events.forward(&mut out).await;

out.flush().await?;

res
}
},
)
.map(finish_ok)
Expand Down Expand Up @@ -293,37 +311,9 @@ impl SplunkSource {
}
}

async fn process_service_request(
out: Pipeline,
channel: Option<String>,
host: Option<String>,
gzip: bool,
body: Bytes,
) -> Result<(), Rejection> {
use futures::compat::Stream01CompatExt;

let mut out = out.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown));

let reader: Box<dyn Read + Send> = if gzip {
Box::new(MultiGzDecoder::new(body.reader()))
} else {
Box::new(body.reader())
};

let stream = EventStream::new(reader, channel, host).compat();

let res = stream.forward(&mut out).await;

out.flush()
.map_err(|_| Rejection::from(ApiError::ServerShutdown))
.await?;

res.map(|_| ())
}

/// Constructs one ore more events from json-s coming from reader.
/// Constructs one or more events from json-s coming from reader.
/// If errors, it's done with input.
struct EventStream<R: Read> {
struct EventIterator<R: Read> {
/// Remaining request with JSON events
data: R,
/// Count of sent events
Expand All @@ -336,9 +326,9 @@ struct EventStream<R: Read> {
extractors: [DefaultExtractor; 4],
}

impl<R: Read> EventStream<R> {
impl<R: Read> EventIterator<R> {
fn new(data: R, channel: Option<String>, host: Option<String>) -> Self {
EventStream {
EventIterator {
data,
events: 0,
channel: channel.map(Value::from),
Expand All @@ -365,30 +355,8 @@ impl<R: Read> EventStream<R> {
Some(_) => Deserialize::deserialize(&mut Deserializer::new(reader)).map(Some),
}
}
}

impl<R: Read> Stream for EventStream<R> {
type Item = Event;
type Error = Rejection;
fn poll(&mut self) -> Result<Async<Option<Event>>, Rejection> {
// Parse JSON object
let mut json = match self.from_reader_take::<JsonValue>() {
Ok(Some(json)) => json,
Ok(None) => {
return if self.events == 0 {
Err(ApiError::NoData.into())
} else {
Ok(Async::Ready(None))
};
}
Err(error) => {
emit!(SplunkHecRequestBodyInvalid {
error: error.into()
});
return Err(ApiError::InvalidDataFormat { event: self.events }.into());
}
};

fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
// Construct Event from parsed json event
let mut event = Event::new_empty_log();
let log = event.as_mut_log();
Expand Down Expand Up @@ -486,7 +454,32 @@ impl<R: Read> Stream for EventStream<R> {
emit!(SplunkHecEventReceived);
self.events += 1;

Ok(Async::Ready(Some(event)))
Ok(event)
}
}

impl<R: Read> Iterator for EventIterator<R> {
type Item = Result<Event, Rejection>;

fn next(&mut self) -> Option<Self::Item> {
match self.from_reader_take::<JsonValue>() {
Ok(Some(json)) => Some(self.build_event(json)),
Ok(None) => {
if self.events == 0 {
Some(Err(ApiError::NoData.into()))
} else {
None
}
}
Err(error) => {
emit!(SplunkHecRequestBodyInvalid {
error: error.into()
});
Some(Err(
ApiError::InvalidDataFormat { event: self.events }.into()
))
}
}
}
}

Expand Down

0 comments on commit e12ac0e

Please sign in to comment.