Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(timestamp encoding): add unixtime formats #18817

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 88 additions & 49 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![deny(missing_docs)]

use chrono::{DateTime, Utc};
use core::fmt::Debug;
use std::collections::BTreeMap;

use lookup::lookup_v2::ConfigValuePath;
use lookup::{event_path, PathPrefix};
use ordered_float::NotNan;
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
Expand Down Expand Up @@ -176,32 +178,46 @@ impl Transformer {
}
}

fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
where
F: Fn(&DateTime<Utc>) -> T,
T: Into<Value>,
{
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), extract(ts).into()));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(extract(ts))
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), ts.into());
}
}
}

fn apply_timestamp_format(&self, log: &mut LogEvent) {
if let Some(timestamp_format) = self.timestamp_format.as_ref() {
match timestamp_format {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(ts.timestamp())
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), Value::Integer(ts));
}
}
}
TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
ts.timestamp_nanos_opt().expect("Timestamp out of range")
}),
TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
NotNan::new(ts.timestamp_micros() as f64 / 1e6).unwrap()
}),
// RFC3339 is the default serialization of a timestamp.
TimestampFormat::Rfc3339 => (),
}
Expand All @@ -225,14 +241,26 @@ impl Transformer {

#[configurable_component]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
/// The format in which a timestamp should be represented.
pub enum TimestampFormat {
/// Represent the timestamp as a Unix timestamp.
Unix,

/// Represent the timestamp as a RFC 3339 timestamp.
Rfc3339,

/// Represent the timestamp as a Unix timestamp in milliseconds.
UnixMs,

/// Represent the timestamp as a Unix timestamp in microseconds
UnixUs,

/// Represent the timestamp as a Unix timestamp in nanoseconds.
UnixNs,

/// Represent the timestamp as a Unix timestamp in floating point.
UnixFloat,
}

#[cfg(test)]
Expand Down Expand Up @@ -344,9 +372,8 @@ mod tests {

#[test]
fn deserialize_and_transform_timestamp() {
let transformer: Transformer = toml::from_str(r#"timestamp_format = "unix""#).unwrap();
let mut event = Event::Log(LogEvent::from("Demo"));
let timestamp = event
let mut base = Event::Log(LogEvent::from("Demo"));
let timestamp = base
.as_mut_log()
.get((
lookup::PathPrefix::Event,
Expand All @@ -355,32 +382,44 @@ mod tests {
.unwrap()
.clone();
let timestamp = timestamp.as_timestamp().unwrap();
event
.as_mut_log()
base.as_mut_log()
.insert("another", Value::Timestamp(*timestamp));

transformer.transform(&mut event);

match event
.as_mut_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap()
{
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
let cases = [
("unix", Value::from(timestamp.timestamp())),
("unix_ms", Value::from(timestamp.timestamp_millis())),
("unix_us", Value::from(timestamp.timestamp_micros())),
(
"unix_ns",
Value::from(timestamp.timestamp_nanos_opt().unwrap()),
),
}
match event.as_mut_log().get("another").unwrap() {
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
(
"unix_float",
Value::from(timestamp.timestamp_micros() as f64 / 1e6),
),
];
for (fmt, expected) in cases {
let config: String = format!(r#"timestamp_format = "{}""#, fmt);
let transformer: Transformer = toml::from_str(&config).unwrap();
let mut event = base.clone();
transformer.transform(&mut event);
let log = event.as_mut_log();

for actual in [
// original key
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap(),
// second key
log.get("another").unwrap(),
] {
// type matches
assert_eq!(expected.kind_str(), actual.kind_str());
// value matches
assert_eq!(&expected, actual);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks.cue
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ components: sinks: [Name=string]: {
type: string: {
default: "rfc3339"
enum: {
rfc3339: "Formats as a RFC3339 string"
unix: "Formats as a unix timestamp"
rfc3339: "Formats as a RFC3339 string"
unix: "Formats as a unix timestamp"
unix_ms: "Formats as a unix timestamp in milliseconds"
unix_us: "Formats as a unix timestamp in microseconds"
unix_ns: "Formats as a unix timestamp in nanoseconds"
unix_float: "Formats as a unix timestamp in floating point"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,12 @@ base: components: sinks: amqp: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/appsignal.cue
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ base: components: sinks: appsignal: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,12 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,12 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,12 @@ base: components: sinks: aws_kinesis_streams: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,12 @@ base: components: sinks: aws_s3: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_sns.cue
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,12 @@ base: components: sinks: aws_sns: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_sqs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,12 @@ base: components: sinks: aws_sqs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,12 @@ base: components: sinks: azure_blob: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ base: components: sinks: azure_monitor_logs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ base: components: sinks: clickhouse: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/console.cue
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ base: components: sinks: console: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/databend.cue
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,12 @@ base: components: sinks: databend: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Loading
Loading