Skip to content

Commit

Permalink
feat(timestamp encoding): add unixtime formats (#18817)
Browse files Browse the repository at this point in the history
* feat(timestamp encoding): add unixtime formats

* `unix_ms`: milliseconds
* `unix_us`: microseconds
* `unix_ns`: nanoseconds
* `unix_float`: seconds float

* Fix needless borrow

* Format website cue docs

* Fix type conversion and cue formatting
  • Loading branch information
srstrickland authored Oct 19, 2023
1 parent 1913ee5 commit 53039e7
Show file tree
Hide file tree
Showing 40 changed files with 322 additions and 127 deletions.
137 changes: 88 additions & 49 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![deny(missing_docs)]

use chrono::{DateTime, Utc};
use core::fmt::Debug;
use std::collections::BTreeMap;

use lookup::lookup_v2::ConfigValuePath;
use lookup::{event_path, PathPrefix};
use ordered_float::NotNan;
use serde::{Deserialize, Deserializer};
use vector_config::configurable_component;
use vector_core::event::{LogEvent, MaybeAsLogMut};
Expand Down Expand Up @@ -176,32 +178,46 @@ impl Transformer {
}
}

fn format_timestamps<F, T>(&self, log: &mut LogEvent, extract: F)
where
F: Fn(&DateTime<Utc>) -> T,
T: Into<Value>,
{
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), extract(ts).into()));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(extract(ts))
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), ts.into());
}
}
}

fn apply_timestamp_format(&self, log: &mut LogEvent) {
if let Some(timestamp_format) = self.timestamp_format.as_ref() {
match timestamp_format {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(ts.timestamp())
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), Value::Integer(ts));
}
}
}
TimestampFormat::Unix => self.format_timestamps(log, |ts| ts.timestamp()),
TimestampFormat::UnixMs => self.format_timestamps(log, |ts| ts.timestamp_millis()),
TimestampFormat::UnixUs => self.format_timestamps(log, |ts| ts.timestamp_micros()),
TimestampFormat::UnixNs => self.format_timestamps(log, |ts| {
ts.timestamp_nanos_opt().expect("Timestamp out of range")
}),
TimestampFormat::UnixFloat => self.format_timestamps(log, |ts| {
NotNan::new(ts.timestamp_micros() as f64 / 1e6).unwrap()
}),
// RFC3339 is the default serialization of a timestamp.
TimestampFormat::Rfc3339 => (),
}
Expand All @@ -225,14 +241,26 @@ impl Transformer {

#[configurable_component]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
/// The format in which a timestamp should be represented.
pub enum TimestampFormat {
/// Represent the timestamp as a Unix timestamp.
Unix,

/// Represent the timestamp as a RFC 3339 timestamp.
Rfc3339,

/// Represent the timestamp as a Unix timestamp in milliseconds.
UnixMs,

/// Represent the timestamp as a Unix timestamp in microseconds
UnixUs,

/// Represent the timestamp as a Unix timestamp in nanoseconds.
UnixNs,

/// Represent the timestamp as a Unix timestamp in floating point.
UnixFloat,
}

#[cfg(test)]
Expand Down Expand Up @@ -344,9 +372,8 @@ mod tests {

#[test]
fn deserialize_and_transform_timestamp() {
let transformer: Transformer = toml::from_str(r#"timestamp_format = "unix""#).unwrap();
let mut event = Event::Log(LogEvent::from("Demo"));
let timestamp = event
let mut base = Event::Log(LogEvent::from("Demo"));
let timestamp = base
.as_mut_log()
.get((
lookup::PathPrefix::Event,
Expand All @@ -355,32 +382,44 @@ mod tests {
.unwrap()
.clone();
let timestamp = timestamp.as_timestamp().unwrap();
event
.as_mut_log()
base.as_mut_log()
.insert("another", Value::Timestamp(*timestamp));

transformer.transform(&mut event);

match event
.as_mut_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap()
{
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
let cases = [
("unix", Value::from(timestamp.timestamp())),
("unix_ms", Value::from(timestamp.timestamp_millis())),
("unix_us", Value::from(timestamp.timestamp_micros())),
(
"unix_ns",
Value::from(timestamp.timestamp_nanos_opt().unwrap()),
),
}
match event.as_mut_log().get("another").unwrap() {
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
(
"unix_float",
Value::from(timestamp.timestamp_micros() as f64 / 1e6),
),
];
for (fmt, expected) in cases {
let config: String = format!(r#"timestamp_format = "{}""#, fmt);
let transformer: Transformer = toml::from_str(&config).unwrap();
let mut event = base.clone();
transformer.transform(&mut event);
let log = event.as_mut_log();

for actual in [
// original key
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap(),
// second key
log.get("another").unwrap(),
] {
// type matches
assert_eq!(expected.kind_str(), actual.kind_str());
// value matches
assert_eq!(&expected, actual);
}
}
}

Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks.cue
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,12 @@ components: sinks: [Name=string]: {
type: string: {
default: "rfc3339"
enum: {
rfc3339: "Formats as a RFC3339 string"
unix: "Formats as a unix timestamp"
rfc3339: "Formats as a RFC3339 string"
unix: "Formats as a unix timestamp"
unix_ms: "Formats as a unix timestamp in milliseconds"
unix_us: "Formats as a unix timestamp in microseconds"
unix_ns: "Formats as a unix timestamp in nanoseconds"
unix_float: "Formats as a unix timestamp in floating point"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,12 @@ base: components: sinks: amqp: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/appsignal.cue
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,12 @@ base: components: sinks: appsignal: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,12 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,12 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,12 @@ base: components: sinks: aws_kinesis_streams: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,12 @@ base: components: sinks: aws_s3: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_sns.cue
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,12 @@ base: components: sinks: aws_sns: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/aws_sqs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,12 @@ base: components: sinks: aws_sqs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,12 @@ base: components: sinks: azure_blob: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,12 @@ base: components: sinks: azure_monitor_logs: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ base: components: sinks: clickhouse: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/console.cue
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ base: components: sinks: console: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions website/cue/reference/components/sinks/base/databend.cue
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,12 @@ base: components: sinks: databend: configuration: {
description: "Format used for timestamp fields."
required: false
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_float: "Represent the timestamp as a Unix timestamp in floating point."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds."
unix_us: "Represent the timestamp as a Unix timestamp in microseconds"
}
}
}
Expand Down
Loading

0 comments on commit 53039e7

Please sign in to comment.