Skip to content

Commit

Permalink
Upgrade file sink for tokio 0.2
Browse files Browse the repository at this point in the history
Signed-off-by: MOZGIII <mike-n@narod.ru>
  • Loading branch information
MOZGIII committed Mar 5, 2020
1 parent 910afda commit 147b53c
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 428 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ tracing-limit = { path = "lib/tracing-limit" }
futures01 = { package = "futures", version = "0.1.25" }
futures = { version = "0.3", default-features = false, features = ["compat"] }
tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false }
tokio02 = { package = "tokio", version = "0.2", features = ["blocking"] }
tokio02 = { package = "tokio", version = "0.2", features = ["blocking", "fs", "sync"] }
tokio-retry = "0.2.0"
tokio-signal = "0.2.7"
tokio-tls = "0.2.1"
tokio-compat = { version = "0.1", features = ["rt-full"] }
async-trait = "0.1"

# Tracing
tracing = "0.1.9"
Expand Down
118 changes: 118 additions & 0 deletions src/expiring_hash_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use futures::ready;
use futures::stream::Stream;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio02::time::{delay_queue, DelayQueue, Error};

pub type ExpiredItem<K, V> = (V, delay_queue::Expired<K>);

pub struct ExpiringHashMap<K, V> {
map: HashMap<K, (V, delay_queue::Key)>,
expiration_queue: DelayQueue<K>,
}

impl<K, V> ExpiringHashMap<K, V>
where
K: Eq + Hash + Clone,
{
pub fn new() -> Self {
Self {
map: HashMap::new(),
expiration_queue: DelayQueue::new(),
}
}

pub fn insert(&mut self, key: K, value_with_ttl: (V, Duration)) {
let (value, ttl) = value_with_ttl;
let delay = self.expiration_queue.insert(key.clone(), ttl);
self.map.insert(key, (value, delay));
}

pub fn insert_at(&mut self, key: K, value_with_deadline: (V, Instant)) {
let (value, deadline) = value_with_deadline;
let delay = self
.expiration_queue
.insert_at(key.clone(), deadline.into());
self.map.insert(key, (value, delay));
}

pub fn get<Q: ?Sized>(&self, k: &Q) -> Option<&V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.get(k).map(|&(ref v, _)| v)
}

pub fn get_mut<Q: ?Sized>(&mut self, k: &Q) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
self.map.get_mut(k).map(|&mut (ref mut v, _)| v)
}

pub fn reset_at<Q: ?Sized>(&mut self, k: &Q, when: Instant) -> Option<&mut V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let (value, delay_queue_key) = self.map.get_mut(k)?;
self.expiration_queue.reset_at(delay_queue_key, when.into());
Some(value)
}

pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<(V, delay_queue::Expired<K>)>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let (value, expiration_queue_key) = self.map.remove(k)?;
let expired = self.expiration_queue.remove(&expiration_queue_key);
Some((value, expired))
}

pub fn poll_expired(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<ExpiredItem<K, V>, Error>>> {
let key = ready!(self.expiration_queue.poll_expired(cx));
let key = match key {
None => return Poll::Ready(None),
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
Some(Ok(key)) => key,
};
let (value, _) = self.map.remove(key.get_ref()).unwrap();
Poll::Ready(Some(Ok((value, key))))
}
}

impl<K, V> Stream for ExpiringHashMap<K, V>
where
K: Unpin + Eq + Hash + Clone,
V: Unpin,
{
type Item = Result<ExpiredItem<K, V>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(self).poll_expired(cx)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.expiration_queue.size_hint()
}
}

impl<K, V> fmt::Debug for ExpiringHashMap<K, V>
where
K: Eq + Hash + Clone,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ExpiringHashMap").finish()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod conditions;
pub mod config_paths;
pub mod dns;
pub mod event;
pub mod expiring_hash_map;
pub mod generate;
#[cfg(feature = "rdkafka")]
pub mod kafka;
Expand Down
38 changes: 38 additions & 0 deletions src/sinks/file/bytes_path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Fun little hack around bytes and OsStr

use bytes::Bytes;
use std::path::Path;

#[derive(Debug, Clone)]
pub struct BytesPath {
#[cfg(unix)]
path: Bytes,
#[cfg(windows)]
path: std::path::PathBuf,
}

impl BytesPath {
#[cfg(unix)]
pub fn new(path: Bytes) -> Self {
Self { path }
}
#[cfg(windows)]
pub fn new(path: Bytes) -> Self {
let utf8_string = String::from_utf8_lossy(&path[..]);
let path = path::PathBuf::from(utf8_string.as_ref());
Self { path }
}
}

impl AsRef<Path> for BytesPath {
#[cfg(unix)]
fn as_ref(&self) -> &Path {
use std::os::unix::ffi::OsStrExt;
let os_str = std::ffi::OsStr::from_bytes(&self.path);
&Path::new(os_str)
}
#[cfg(windows)]
fn as_ref(&self) -> &Path {
&self.path.as_ref()
}
}
13 changes: 13 additions & 0 deletions src/sinks/file/encoding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use super::Encoding;
use crate::event::{self, Event};

pub fn encode_event(encoding: &Encoding, event: Event) -> Vec<u8> {
let log = event.into_log();
match encoding {
Encoding::Ndjson => serde_json::to_vec(&log).expect("Unable to encode event as JSON."),
Encoding::Text => log
.get(&event::log_schema().message_key())
.map(|v| v.to_string_lossy().into_bytes())
.unwrap_or_default(),
}
}
Loading

0 comments on commit 147b53c

Please sign in to comment.