Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(csv codec): added additional csv encoding options #18149

Closed
wants to merge 8 commits into from
229 changes: 218 additions & 11 deletions lib/codecs/src/encoding/format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl CsvSerializerConfig {
if self.csv.fields.is_empty() {
Err("At least one CSV field must be specified".into())
} else {
Ok(CsvSerializer::new(self.csv.fields.clone()))
Ok(CsvSerializer::new(self.clone()))
}
}

Expand All @@ -45,10 +45,84 @@ impl CsvSerializerConfig {
}
}

/// The user configuration to choose the metric tag strategy.
#[crate::configurable_component]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum QuoteStyle {
/// This puts quotes around every field. Always.
Always,

/// This puts quotes around fields only when necessary.
/// They are necessary when fields contain a quote, delimiter or record terminator.
/// Quotes are also necessary when writing an empty record
/// (which is indistinguishable from a record with one empty field).
#[default]
Necessary,

/// This puts quotes around all fields that are non-numeric.
/// Namely, when writing a field that does not parse as a valid float or integer,
/// then quotes will be used even if they aren’t strictly necessary.
NonNumeric,

/// This never writes quotes, even if it would produce invalid CSV data.
Never,
}

const fn default_delimiter() -> u8 {
b','
}

const fn default_escape() -> u8 {
b','
}

const fn default_double_quote() -> bool {
true
}

/// Config used to build a `CsvSerializer`.
#[crate::configurable_component]
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct CsvSerializerOptions {
/// The field delimiter to use when writing CSV.
#[serde(
default = "default_delimiter",
with = "vector_core::serde::ascii_char",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
pub delimiter: u8,

/// Enable double quote escapes.
///
/// This is enabled by default, but it may be disabled. When disabled, quotes in
/// field data are escaped instead of doubled.
#[serde(
default = "default_double_quote",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
pub double_quote: bool,

/// The escape character to use when writing CSV.
///
/// In some variants of CSV, quotes are escaped using a special escape character
/// like \ (instead of escaping quotes by doubling them).
///
/// To use this `double_quotes` needs to be disabled as well otherwise it is ignored
#[serde(
default = "default_escape",
with = "vector_core::serde::ascii_char",
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
pub escape: u8,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, worth documenting what happens when double_quote is enabled.


/// The quoting style to use when writing CSV data.
#[serde(
default,
skip_serializing_if = "vector_core::serde::skip_serializing_if_default"
)]
pub quote_style: QuoteStyle,

/// Configures the fields that will be encoded, as well as the order in which they
/// appear in the output.
///
Expand All @@ -59,16 +133,27 @@ pub struct CsvSerializerOptions {
pub fields: Vec<ConfigTargetPath>,
}

impl CsvSerializerOptions {
fn csv_quote_style(&self) -> csv::QuoteStyle {
match self.quote_style {
QuoteStyle::Always => csv::QuoteStyle::Always,
QuoteStyle::NonNumeric => csv::QuoteStyle::NonNumeric,
QuoteStyle::Never => csv::QuoteStyle::Never,
_ => csv::QuoteStyle::Necessary,
}
}
}

/// Serializer that converts an `Event` to bytes using the CSV format.
#[derive(Debug, Clone)]
pub struct CsvSerializer {
fields: Vec<ConfigTargetPath>,
config: CsvSerializerConfig,
}

impl CsvSerializer {
/// Creates a new `CsvSerializer`.
pub const fn new(fields: Vec<ConfigTargetPath>) -> Self {
Self { fields }
pub const fn new(config: CsvSerializerConfig) -> Self {
Self { config }
}
}

Expand All @@ -77,8 +162,18 @@ impl Encoder<Event> for CsvSerializer {

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let log = event.into_log();
let mut wtr = csv::Writer::from_writer(buffer.writer());
for field in &self.fields {

// 'flexible' is not needed since every event is a single context free csv line
let mut wtr = csv::WriterBuilder::new()
.delimiter(self.config.csv.delimiter)
.double_quote(self.config.csv.double_quote)
.escape(self.config.csv.escape)
.quote_style(self.config.csv.csv_quote_style())
.from_writer(buffer.writer());
// TODO: this is wanted after https://github.com/BurntSushi/rust-csv/pull/332 got merged
// .terminator(csv::Terminator::NONE)

for field in &self.config.csv.fields {
match log.get(field) {
Some(Value::Bytes(bytes)) => {
wtr.write_field(String::from_utf8_lossy(bytes).to_string())?
Expand All @@ -95,6 +190,10 @@ impl Encoder<Event> for CsvSerializer {
None => wtr.write_field("")?,
}
}

// TODO: this is wanted after https://github.com/BurntSushi/rust-csv/pull/332 got merged
//wtr.write_record(None::<&[u8]>)?; // terminate the line finishing quoting and adding \n

wtr.flush()?;
Ok(())
}
Expand All @@ -112,7 +211,7 @@ mod tests {

#[test]
fn build_error_on_empty_fields() {
let opts = CsvSerializerOptions { fields: vec![] };
let opts = CsvSerializerOptions::default();
let config = CsvSerializerConfig::new(opts);
let err = config.build().unwrap_err();
assert_eq!(err.to_string(), "At least one CSV field must be specified");
Expand All @@ -131,7 +230,6 @@ mod tests {
"bool" => Value::from(true),
"other" => Value::from("data"),
}));

let fields = vec![
ConfigTargetPath::try_from("foo".to_string()).unwrap(),
ConfigTargetPath::try_from("int".to_string()).unwrap(),
Expand All @@ -143,7 +241,12 @@ mod tests {
ConfigTargetPath::try_from("quote".to_string()).unwrap(),
ConfigTargetPath::try_from("bool".to_string()).unwrap(),
];
let config = CsvSerializerConfig::new(CsvSerializerOptions { fields });

let opts = CsvSerializerOptions {
fields,
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

Expand Down Expand Up @@ -171,14 +274,118 @@ mod tests {
ConfigTargetPath::try_from("field3".to_string()).unwrap(),
ConfigTargetPath::try_from("field2".to_string()).unwrap(),
];
let config = CsvSerializerConfig::new(CsvSerializerOptions { fields });
let opts = CsvSerializerOptions {
fields,
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
b"value1,value5,value5,value3,value2".as_slice()
);
}

#[test]
fn correct_quoting() {
let event = Event::Log(LogEvent::from(btreemap! {
// TODO: this test should write properly quoted field in last place
// TODO: this needs https://github.com/BurntSushi/rust-csv/issues/331
// "field1" => Value::from("foo\"bar"),
"field1" => Value::from("foo bar"),
}));
let fields = vec![ConfigTargetPath::try_from("field1".to_string()).unwrap()];
let opts = CsvSerializerOptions {
fields,
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(
bytes.freeze(),
// TODO: this needs https://github.com/BurntSushi/rust-csv/issues/331
//b"\"value1 \"\" value2\"".as_slice()
b"foo bar".as_slice()
);
}

#[test]
fn custom_delimiter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for providing these tests!

If custom_delimiter fails due to #17261 we can (1) make sure this test passes and (2) make a note on current behavior vs desired behavior.

Copy link
Contributor Author

@scMarkus scMarkus Aug 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my point of view it is the other way around. Writing custom_escape_char I could not get it to work. Digging into that I wrote custom_delimiter as well as found out about this issue which I think is a bug in the current version of vector?
EDIT: I miss read the comment. In fact I messed up in the initial description already. The delimiter test in is not failing but correct_quoting is (I will edit the description). custom_delimiter runs successfully.

let event = Event::Log(LogEvent::from(btreemap! {
"field1" => Value::from("value1"),
"field2" => Value::from("value2"),
}));
let fields = vec![
ConfigTargetPath::try_from("field1".to_string()).unwrap(),
ConfigTargetPath::try_from("field2".to_string()).unwrap(),
];
let opts = CsvSerializerOptions {
fields,
delimiter: b'\t',
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
}

#[test]
fn custom_escape_char() {
// TODO: this tests utilizes csv quoting which currently
// has a bug of not adding closing quotes in the last column
// hence the additional 'field2'
let event = Event::Log(LogEvent::from(btreemap! {
"field1" => Value::from("foo\"bar"),
"field2" => Value::from("baz"),
}));
let fields = vec![
ConfigTargetPath::try_from("field1".to_string()).unwrap(),
ConfigTargetPath::try_from("field2".to_string()).unwrap(),
];
let opts = CsvSerializerOptions {
fields,
double_quote: false,
escape: b'\\',
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"\"foo\\\"bar\",baz".as_slice());
}

#[test]
fn custom_quote_style() {
let event = Event::Log(LogEvent::from(btreemap! {
"field1" => Value::from("foo\"bar"),
}));
let fields = vec![ConfigTargetPath::try_from("field1".to_string()).unwrap()];
let opts = CsvSerializerOptions {
fields,
quote_style: QuoteStyle::Never,
..Default::default()
};
let config = CsvSerializerConfig::new(opts);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"foo\"bar".as_slice());
}
}