From 084dc77c556e144d5ff65102d22a0ebbc6d58964 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Thu, 10 Aug 2023 16:43:45 -0400 Subject: [PATCH 1/3] feat: Refactor dnstap to use 'OwnedValuePath's --- benches/dnstap/mod.rs | 14 +- src/sources/dnstap/mod.rs | 26 +- src/sources/dnstap/parser.rs | 811 +++++++++++++++-------------- src/sources/dnstap/schema.rs | 975 ++++++++++------------------------- 4 files changed, 705 insertions(+), 1121 deletions(-) diff --git a/benches/dnstap/mod.rs b/benches/dnstap/mod.rs index e4dda4b5f3d4b..37839b41ace06 100644 --- a/benches/dnstap/mod.rs +++ b/benches/dnstap/mod.rs @@ -1,14 +1,10 @@ use bytes::Bytes; use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; -use vector::{ - event::LogEvent, - sources::dnstap::{schema::DnstapEventSchema, DnstapParser}, -}; +use vector::event::LogEvent; +use vector::sources::dnstap::parser::DnstapParser; fn benchmark_query_parsing(c: &mut Criterion) { let mut event = LogEvent::default(); - let schema = DnstapEventSchema::new(); - let mut parser = DnstapParser::new(&schema, &mut event); let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcnoIAxACGAEiEAAAAAAAAA\ AAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb\ 20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAHgB"; @@ -19,7 +15,7 @@ fn benchmark_query_parsing(c: &mut Criterion) { group.bench_function("dns_query_parsing", |b| { b.iter_batched( || dnstap_data.clone(), - |dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(), + |dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(), BatchSize::SmallInput, ) }); @@ -29,8 +25,6 @@ fn benchmark_query_parsing(c: &mut Criterion) { fn benchmark_update_parsing(c: &mut Criterion) { let mut event = LogEvent::default(); - let schema = DnstapEventSchema::new(); - let mut parser = DnstapParser::new(&schema, &mut event); let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcmsIDhABGAEiBH8AAA\ EqBH8AAAEwrG44AEC+iu73BU14gfofUh1wi6gAAAEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAWC+iu73BW0agDwvch1wi6gAA\ AEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAXgB"; @@ -41,7 +35,7 @@ fn benchmark_update_parsing(c: &mut Criterion) { group.bench_function("dns_update_parsing", |b| { b.iter_batched( || dnstap_data.clone(), - |dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(), + |dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(), BatchSize::SmallInput, ) }); diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index 44d213847ce6a..dbfde6c95a9cd 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -7,7 +7,7 @@ use vector_common::internal_event::{ ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered, }; use vector_config::configurable_component; -use vrl::event_path; +use vrl::path::PathPrefix; use vrl::value::{kind::Collection, Kind}; use super::util::framestream::{build_framestream_unix_source, FrameHandler}; @@ -19,9 +19,9 @@ use crate::{ }; pub mod parser; -pub use parser::{parse_dnstap_data, DnstapParser}; - pub mod schema; +use crate::sources::dnstap::parser::DnstapParser; +use crate::sources::dnstap::schema::DNSTAP_VALUE_PATHS; use dnsmsg_parser::{dns_message, dns_message_parser}; use lookup::lookup_v2::OptionalValuePath; pub use schema::DnstapEventSchema; @@ -109,19 +109,11 @@ impl DnstapConfig { "protobuf:dnstap.Dnstap".to_string() //content-type for framestream } - fn event_schema(timestamp_key: Option<&OwnedValuePath>) -> DnstapEventSchema { - let mut schema = DnstapEventSchema::new(); - schema - .dnstap_root_data_schema_mut() - .set_timestamp(timestamp_key.cloned()); - schema - } - pub fn schema_definition( &self, log_namespace: LogNamespace, ) -> vector_core::schema::Definition { - let event_schema = Self::event_schema(log_schema().timestamp_key()); + let event_schema = DnstapEventSchema; match log_namespace { LogNamespace::Legacy => { @@ -205,7 +197,6 @@ pub struct DnstapFrameHandler { max_frame_length: usize, socket_path: PathBuf, content_type: String, - schema: DnstapEventSchema, raw_data_only: bool, multithreaded: bool, max_frame_handling_tasks: u32, @@ -224,8 +215,6 @@ impl DnstapFrameHandler { let source_type_key = log_schema().source_type_key(); let timestamp_key = log_schema().timestamp_key(); - let schema = DnstapConfig::event_schema(timestamp_key); - let host_key = config .host_key .clone() @@ -235,7 +224,6 @@ impl DnstapFrameHandler { max_frame_length: config.max_frame_length, socket_path: config.socket_path.clone(), content_type: config.content_type(), - schema, raw_data_only: config.raw_data_only.unwrap_or(false), multithreaded: config.multithreaded.unwrap_or(false), max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000), @@ -281,10 +269,10 @@ impl FrameHandler for DnstapFrameHandler { if self.raw_data_only { log_event.insert( - event_path!(self.schema.dnstap_root_data_schema().raw_data()), + (PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data), BASE64_STANDARD.encode(&frame), ); - } else if let Err(err) = parse_dnstap_data(&self.schema, &mut log_event, frame) { + } else if let Err(err) = DnstapParser::parse(&mut log_event, frame) { emit!(DnstapParseError { error: format!("Dnstap protobuf decode error {:?}.", err) }); @@ -408,7 +396,7 @@ mod tests { let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json))); event.as_mut_log().insert("timestamp", chrono::Utc::now()); - let definition = DnstapConfig::event_schema(Some(&owned_value_path!("timestamp"))); + let definition = DnstapEventSchema; let schema = vector_core::schema::Definition::empty_legacy_namespace() .with_standard_vector_source_metadata(); diff --git a/src/sources/dnstap/parser.rs b/src/sources/dnstap/parser.rs index 2ab05d4c86582..f41451f0bf9c0 100644 --- a/src/sources/dnstap/parser.rs +++ b/src/sources/dnstap/parser.rs @@ -15,6 +15,7 @@ use trust_dns_proto::{ rr::domain::Name, serialize::binary::{BinDecodable, BinDecoder}, }; +use vrl::{owned_value_path, path}; use crate::{ event::{LogEvent, Value}, @@ -27,12 +28,14 @@ mod dnstap_proto { include!(concat!(env!("OUT_DIR"), "/dnstap.rs")); } +use crate::sources::dnstap::schema::DNSTAP_VALUE_PATHS; use dnstap_proto::{ message::Type as DnstapMessageType, Dnstap, Message as DnstapMessage, SocketFamily, SocketProtocol, }; use lookup::lookup_v2::{OwnedValuePath, ValuePath}; use lookup::PathPrefix; +use vector_core::config::log_schema; use super::{ dns_message::{ @@ -40,11 +43,8 @@ use super::{ ZoneInfo, }, dns_message_parser::DnsMessageParser, - schema::DnstapEventSchema, }; -const MAX_DNSTAP_QUERY_MESSAGE_TYPE_ID: i32 = 12; - #[derive(Debug, Snafu)] enum DnstapParserError { #[snafu(display("Unsupported DNSTap message type: {}", "dnstap_message_type_id"))] @@ -78,86 +78,79 @@ static DNSTAP_MESSAGE_RESPONSE_TYPE_IDS: Lazy> = Lazy::new(|| { .collect() }); -pub struct DnstapParser<'a> { - event_schema: &'a DnstapEventSchema, - parent_key_path: OwnedValuePath, - log_event: &'a mut LogEvent, -} - -pub fn parse_dnstap_data( - event_schema: &DnstapEventSchema, - log_event: &mut LogEvent, - frame: Bytes, -) -> Result<()> { - DnstapParser::new(event_schema, log_event).parse_dnstap_data(frame) -} - -impl<'a> DnstapParser<'a> { - pub fn new(event_schema: &'a DnstapEventSchema, log_event: &'a mut LogEvent) -> Self { - Self { - event_schema, - parent_key_path: Vec::new().into(), - log_event, - } - } +#[derive(Default)] +pub struct DnstapParser; - fn insert<'b, 'c: 'b, V>(&'c mut self, key: impl ValuePath<'b>, value: V) -> Option +impl DnstapParser { + fn insert<'a, V>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + path: impl ValuePath<'a>, + value: V, + ) -> Option where V: Into + Debug, { - self.log_event.insert( - (PathPrefix::Event, (&self.parent_key_path).concat(key)), - value, - ) + event.insert((PathPrefix::Event, prefix.concat(path)), value) } - pub fn parse_dnstap_data(&mut self, frame: Bytes) -> Result<()> { + pub fn parse(event: &mut LogEvent, frame: Bytes) -> Result<()> { //parse frame with dnstap protobuf let proto_msg = Dnstap::decode(frame.clone())?; - + let root = owned_value_path!(); if let Some(server_id) = proto_msg.identity { - self.insert( - self.event_schema - .dnstap_root_data_schema() - .server_identity(), - String::from_utf8(server_id).unwrap_or_default(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.server_identity, + String::from_utf8(server_id.clone()).unwrap_or_default(), ); } if let Some(version) = proto_msg.version { - self.insert( - self.event_schema.dnstap_root_data_schema().server_version(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.server_version, String::from_utf8(version).unwrap_or_default(), ); } if let Some(extra) = proto_msg.extra { - self.insert( - self.event_schema.dnstap_root_data_schema().extra(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.extra, String::from_utf8(extra).unwrap_or_default(), ); } let dnstap_data_type_id: i32 = proto_msg.r#type; let mut need_raw_data = false; - self.insert( - self.event_schema.dnstap_root_data_schema().data_type_id(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.data_type_id, dnstap_data_type_id, ); if let Some(dnstap_data_type) = to_dnstap_data_type(dnstap_data_type_id) { - self.insert( - self.event_schema.dnstap_root_data_schema().data_type(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.data_type, dnstap_data_type.clone(), ); if dnstap_data_type == "Message" { if let Some(message) = proto_msg.message { - if let Err(err) = self.parse_dnstap_message(message) { + if let Err(err) = DnstapParser::parse_dnstap_message(event, &root, message) { emit!(DnstapParseWarning { error: &err }); need_raw_data = true; - self.insert( - self.event_schema.dnstap_root_data_schema().error(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.error, err.to_string(), ); } @@ -171,8 +164,10 @@ impl<'a> DnstapParser<'a> { } if need_raw_data { - self.insert( - self.event_schema.dnstap_root_data_schema().raw_data(), + DnstapParser::insert( + event, + &root, + &DNSTAP_VALUE_PATHS.raw_data, BASE64_STANDARD.encode(&frame), ); } @@ -180,17 +175,28 @@ impl<'a> DnstapParser<'a> { Ok(()) } - fn parse_dnstap_message(&mut self, dnstap_message: DnstapMessage) -> Result<()> { + fn parse_dnstap_message<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + dnstap_message: DnstapMessage, + ) -> Result<()> { if let Some(socket_family) = dnstap_message.socket_family { - self.parse_dnstap_message_socket_family(socket_family, &dnstap_message)?; + DnstapParser::parse_dnstap_message_socket_family( + event, + prefix.clone(), + socket_family, + &dnstap_message, + )?; } if let Some(query_zone) = dnstap_message.query_zone.as_ref() { let mut decoder: BinDecoder = BinDecoder::new(query_zone); match Name::read(&mut decoder) { Ok(raw_data) => { - self.insert( - self.event_schema.dnstap_message_schema().query_zone(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.query_zone, raw_data.to_utf8(), ); } @@ -199,71 +205,74 @@ impl<'a> DnstapParser<'a> { } let dnstap_message_type_id = dnstap_message.r#type; - self.insert( - self.event_schema - .dnstap_message_schema() - .dnstap_message_type_id(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.message_type_id, dnstap_message_type_id, ); - self.insert( - self.event_schema - .dnstap_message_schema() - .dnstap_message_type(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.message_type, to_dnstap_message_type(dnstap_message_type_id), ); - let request_message_key = self.event_schema.dnstap_message_schema().request_message(); - let response_message_key = self.event_schema.dnstap_message_schema().response_message(); - if let Some(query_time_sec) = dnstap_message.query_time_sec { - self.parse_dnstap_message_time( + DnstapParser::parse_dnstap_message_time( + event, + prefix.clone(), query_time_sec, dnstap_message.query_time_nsec, dnstap_message_type_id, - request_message_key, + &DNSTAP_VALUE_PATHS.request_message, dnstap_message.query_message.as_ref(), &DNSTAP_MESSAGE_REQUEST_TYPE_IDS, ); } if let Some(response_time_sec) = dnstap_message.response_time_sec { - self.parse_dnstap_message_time( + DnstapParser::parse_dnstap_message_time( + event, + prefix.clone(), response_time_sec, dnstap_message.response_time_nsec, dnstap_message_type_id, - response_message_key, + &DNSTAP_VALUE_PATHS.request_message, dnstap_message.response_message.as_ref(), &DNSTAP_MESSAGE_RESPONSE_TYPE_IDS, ); } - self.parse_dnstap_message_type( + DnstapParser::parse_dnstap_message_type( + event, + prefix.clone(), dnstap_message_type_id, dnstap_message, - request_message_key, - response_message_key, )?; Ok(()) } - fn parse_dnstap_message_type( - &mut self, + fn parse_dnstap_message_type<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, dnstap_message_type_id: i32, dnstap_message: DnstapMessage, - request_message_key: &'static str, - response_message_key: &'static str, ) -> Result<()> { match dnstap_message_type_id { 1..=12 => { if let Some(query_message) = dnstap_message.query_message { let mut query_message_parser = DnsMessageParser::new(query_message); - if let Err(error) = - self.parse_dns_query_message(request_message_key, &mut query_message_parser) - { - self.log_raw_dns_message( - request_message_key, + if let Err(error) = DnstapParser::parse_dns_query_message( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.request_message), + &mut query_message_parser, + ) { + DnstapParser::log_raw_dns_message( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.request_message), query_message_parser.raw_message(), ); @@ -273,11 +282,14 @@ impl<'a> DnstapParser<'a> { if let Some(response_message) = dnstap_message.response_message { let mut response_message_parser = DnsMessageParser::new(response_message); - if let Err(error) = self - .parse_dns_query_message(response_message_key, &mut response_message_parser) - { - self.log_raw_dns_message( - response_message_key, + if let Err(error) = DnstapParser::parse_dns_query_message( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.response_message), + &mut response_message_parser, + ) { + DnstapParser::log_raw_dns_message( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.response_message), response_message_parser.raw_message(), ); @@ -289,12 +301,14 @@ impl<'a> DnstapParser<'a> { if let Some(update_request_message) = dnstap_message.query_message { let mut update_request_message_parser = DnsMessageParser::new(update_request_message); - if let Err(error) = self.parse_dns_update_message( - request_message_key, + if let Err(error) = DnstapParser::parse_dns_update_message( + event, + &DNSTAP_VALUE_PATHS.request_message, &mut update_request_message_parser, ) { - self.log_raw_dns_message( - request_message_key, + DnstapParser::log_raw_dns_message( + event, + &DNSTAP_VALUE_PATHS.request_message, update_request_message_parser.raw_message(), ); @@ -305,12 +319,14 @@ impl<'a> DnstapParser<'a> { if let Some(update_response_message) = dnstap_message.response_message { let mut update_response_message_parser = DnsMessageParser::new(update_response_message); - if let Err(error) = self.parse_dns_update_message( - response_message_key, + if let Err(error) = DnstapParser::parse_dns_update_message( + event, + &DNSTAP_VALUE_PATHS.response_message, &mut update_response_message_parser, ) { - self.log_raw_dns_message( - response_message_key, + DnstapParser::log_raw_dns_message( + event, + &DNSTAP_VALUE_PATHS.response_message, update_response_message_parser.raw_message(), ); @@ -330,12 +346,13 @@ impl<'a> DnstapParser<'a> { Ok(()) } - fn parse_dnstap_message_time( - &mut self, + fn parse_dnstap_message_time<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, time_sec: u64, time_nsec: Option, dnstap_message_type_id: i32, - message_key: &str, + message_key: &'a OwnedValuePath, message: Option<&Vec>, type_ids: &HashSet, ) { @@ -345,66 +362,40 @@ impl<'a> DnstapParser<'a> { }; if type_ids.contains(&dnstap_message_type_id) { - self.log_time( - self.event_schema.dnstap_root_data_schema().time(), - time_in_nanosec, - self.event_schema.dnstap_root_data_schema().time_precision(), - "ns", - ); + DnstapParser::log_time(event, prefix.clone(), time_in_nanosec, "ns"); let timestamp = Utc .timestamp_opt(time_sec.try_into().unwrap(), query_time_nsec) .single() .expect("invalid timestamp"); - if let Some(timestamp_key) = self.event_schema.dnstap_root_data_schema().timestamp() { - self.insert(timestamp_key, timestamp); + if let Some(timestamp_key) = log_schema().timestamp_key() { + DnstapParser::insert(event, prefix.clone(), timestamp_key, timestamp); } } if message.is_none() { - self.parent_key_path.push_field(message_key); - - let time_key_name = if dnstap_message_type_id <= MAX_DNSTAP_QUERY_MESSAGE_TYPE_ID { - self.event_schema.dns_query_message_schema().time() - } else { - self.event_schema.dns_update_message_schema().time() - }; - - let time_precision_key_name = - if dnstap_message_type_id <= MAX_DNSTAP_QUERY_MESSAGE_TYPE_ID { - self.event_schema - .dns_query_message_schema() - .time_precision() - } else { - self.event_schema - .dns_update_message_schema() - .time_precision() - }; - - self.log_time( - time_key_name, - time_in_nanosec, - time_precision_key_name, - "ns", - ); - - self.parent_key_path.segments.pop(); + DnstapParser::log_time(event, prefix.concat(message_key), time_in_nanosec, "ns"); } } - fn parse_dnstap_message_socket_family( - &mut self, + fn parse_dnstap_message_socket_family<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, socket_family: i32, dnstap_message: &DnstapMessage, ) -> Result<()> { - self.insert( - self.event_schema.dnstap_message_schema().socket_family(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.socket_family, to_socket_family_name(socket_family)?.to_string(), ); if let Some(socket_protocol) = dnstap_message.socket_protocol { - self.insert( - self.event_schema.dnstap_message_schema().socket_protocol(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.socket_protocol, to_socket_protocol_name(socket_protocol)?.to_string(), ); } @@ -418,15 +409,19 @@ impl<'a> DnstapParser<'a> { IpAddr::V6(Ipv6Addr::from(address_buffer)) }; - self.insert( - self.event_schema.dnstap_message_schema().query_address(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.query_address, source_address.to_string(), ); } if let Some(query_port) = dnstap_message.query_port { - self.insert( - self.event_schema.dnstap_message_schema().query_port(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.query_port, query_port, ); } @@ -440,441 +435,472 @@ impl<'a> DnstapParser<'a> { IpAddr::V6(Ipv6Addr::from(address_buffer)) }; - self.insert( - self.event_schema.dnstap_message_schema().response_address(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response_address, response_addr.to_string(), ); } Ok(if let Some(response_port) = dnstap_message.response_port { - self.insert( - self.event_schema.dnstap_message_schema().response_port(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response_port, response_port, ); }) } - fn log_time( - &mut self, - time_key: &'static str, + fn log_time<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, time: i64, - time_precision_key: &'static str, time_precision: &str, ) { - self.insert(time_key, time); - - self.insert(time_precision_key, time_precision.to_string()); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.time, time); + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.time_precision, + time_precision.to_string(), + ); } - fn log_raw_dns_message(&mut self, key_prefix: &'static str, raw_dns_message: &[u8]) { - self.parent_key_path.push_field(key_prefix); - - self.insert( - self.event_schema.dns_query_message_schema().raw_data(), + fn log_raw_dns_message<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + raw_dns_message: &[u8], + ) { + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.raw_data, BASE64_STANDARD.encode(raw_dns_message), ); - - self.parent_key_path.segments.pop(); } - fn parse_dns_query_message( - &mut self, - key_prefix: &'static str, + fn parse_dns_query_message<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, dns_message_parser: &mut DnsMessageParser, ) -> Result<()> { let msg = dns_message_parser.parse_as_query_message()?; - self.parent_key_path.push_field(key_prefix); - - self.insert( - self.event_schema.dns_query_message_schema().response_code(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response_code, msg.response_code, ); if let Some(response) = msg.response { - self.insert( - self.event_schema.dns_query_message_schema().response(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response, response.to_string(), ); } - self.log_dns_query_message_header( - self.event_schema.dns_query_message_schema().header(), + DnstapParser::log_dns_query_message_header( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.header), &msg.header, ); - self.log_dns_query_message_query_section( - self.event_schema - .dns_query_message_schema() - .question_section(), + DnstapParser::log_dns_query_message_query_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.question_section), &msg.question_section, ); - self.log_dns_message_record_section( - self.event_schema - .dns_query_message_schema() - .answer_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.answer_section), &msg.answer_section, ); - self.log_dns_message_record_section( - self.event_schema - .dns_query_message_schema() - .authority_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.authority_section), &msg.authority_section, ); - self.log_dns_message_record_section( - self.event_schema - .dns_query_message_schema() - .additional_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.additional_section), &msg.additional_section, ); - self.log_edns( - self.event_schema - .dns_query_message_schema() - .opt_pseudo_section(), + DnstapParser::log_edns( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.opt_pseudo_section), &msg.opt_pseudo_section, ); - self.parent_key_path.segments.pop(); Ok(()) } - fn log_dns_query_message_header(&mut self, parent_key: &'static str, header: &QueryHeader) { - self.parent_key_path.push_field(parent_key); - - self.insert(self.event_schema.dns_query_header_schema().id(), header.id); - - self.insert( - self.event_schema.dns_query_header_schema().opcode(), + fn log_dns_query_message_header<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + header: &QueryHeader, + ) { + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.id, header.id); + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.opcode, header.opcode, ); - - self.insert( - self.event_schema.dns_query_header_schema().rcode(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.rcode, u16::from(header.rcode), ); - - self.insert(self.event_schema.dns_query_header_schema().qr(), header.qr); - - self.insert(self.event_schema.dns_query_header_schema().aa(), header.aa); - - self.insert(self.event_schema.dns_query_header_schema().tc(), header.tc); - - self.insert(self.event_schema.dns_query_header_schema().rd(), header.rd); - - self.insert(self.event_schema.dns_query_header_schema().ra(), header.ra); - - self.insert(self.event_schema.dns_query_header_schema().ad(), header.ad); - - self.insert(self.event_schema.dns_query_header_schema().cd(), header.cd); - - self.insert( - self.event_schema.dns_query_header_schema().question_count(), + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.qr, header.qr); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.aa, header.aa); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.tc, header.tc); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.rd, header.rd); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ra, header.ra); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ad, header.ad); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.cd, header.cd); + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.question_count, header.question_count, ); - - self.insert( - self.event_schema.dns_query_header_schema().answer_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.answer_count, header.answer_count, ); - - self.insert( - self.event_schema - .dns_query_header_schema() - .authority_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.authority_count, header.authority_count, ); - - self.insert( - self.event_schema - .dns_query_header_schema() - .additional_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.ar_count, header.additional_count, ); - - self.parent_key_path.segments.pop(); } - fn log_dns_query_message_query_section( - &mut self, - key_path: &'static str, + fn log_dns_query_message_query_section<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, questions: &[QueryQuestion], ) { - self.parent_key_path.push_field(key_path); - for (i, query) in questions.iter().enumerate() { - self.parent_key_path.push_index(i as isize); - self.log_dns_query_question(query); - self.parent_key_path.segments.pop(); + let index_segment = path!(i as isize); + DnstapParser::log_dns_query_question(event, prefix.concat(index_segment), query); } - - self.parent_key_path.segments.pop(); } - fn log_dns_query_question(&mut self, question: &QueryQuestion) { - self.insert( - self.event_schema.dns_query_question_schema().name(), + fn log_dns_query_question<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + question: &QueryQuestion, + ) { + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.domain_name, question.name.clone(), ); if let Some(record_type) = question.record_type.clone() { - self.insert( - self.event_schema - .dns_query_question_schema() - .question_type(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.question_type, record_type, ); } - self.insert( - self.event_schema - .dns_query_question_schema() - .question_type_id(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.question_type_id, question.record_type_id, ); - self.insert( - self.event_schema.dns_query_question_schema().class(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.class, question.class.clone(), ); } - fn parse_dns_update_message( - &mut self, - key_prefix: &'static str, + fn parse_dns_update_message<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, dns_message_parser: &mut DnsMessageParser, ) -> Result<()> { let msg = dns_message_parser.parse_as_update_message()?; - self.parent_key_path.push_field(key_prefix); - - self.insert( - self.event_schema - .dns_update_message_schema() - .response_code(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response_code, msg.response_code, ); if let Some(response) = msg.response { - self.insert( - self.event_schema.dns_update_message_schema().response(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.response, response.to_string(), ); } - self.log_dns_update_message_header( - self.event_schema.dns_update_message_schema().header(), + DnstapParser::log_dns_update_message_header( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.header), &msg.header, ); - self.log_dns_update_message_zone_section( - self.event_schema.dns_update_message_schema().zone_section(), + DnstapParser::log_dns_update_message_zone_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.zone_section), &msg.zone_to_update, ); - self.log_dns_message_record_section( - self.event_schema - .dns_update_message_schema() - .prerequisite_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.prerequisite_section), &msg.prerequisite_section, ); - self.log_dns_message_record_section( - self.event_schema - .dns_update_message_schema() - .update_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.update_section), &msg.update_section, ); - self.log_dns_message_record_section( - self.event_schema - .dns_update_message_schema() - .additional_section(), + DnstapParser::log_dns_message_record_section( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.additional_section), &msg.additional_section, ); - self.parent_key_path.segments.pop(); Ok(()) } - fn log_dns_update_message_header(&mut self, key_prefix: &'static str, header: &UpdateHeader) { - self.parent_key_path.push_field(key_prefix); - - self.insert(self.event_schema.dns_update_header_schema().id(), header.id); + fn log_dns_update_message_header<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + header: &UpdateHeader, + ) { + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.id, header.id); - self.insert( - self.event_schema.dns_update_header_schema().opcode(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.opcode, header.opcode, ); - self.insert( - self.event_schema.dns_update_header_schema().rcode(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.rcode, u16::from(header.rcode), ); - self.insert(self.event_schema.dns_update_header_schema().qr(), header.qr); + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.qr, header.qr); - self.insert( - self.event_schema.dns_update_header_schema().zone_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.zone_count, header.zone_count, ); - self.insert( - self.event_schema - .dns_update_header_schema() - .prerequisite_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.prerequisite_count, header.prerequisite_count, ); - self.insert( - self.event_schema.dns_update_header_schema().update_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.update_count, header.update_count, ); - self.insert( - self.event_schema - .dns_update_header_schema() - .additional_count(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.ad_count, header.additional_count, ); - - self.parent_key_path.segments.pop(); } - fn log_dns_update_message_zone_section(&mut self, key_path: &'static str, zone: &ZoneInfo) { - self.parent_key_path.push_field(key_path); - - self.insert( - self.event_schema.dns_update_zone_info_schema().zone_name(), + fn log_dns_update_message_zone_section<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + zone: &ZoneInfo, + ) { + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.zone_name, zone.name.clone(), ); if let Some(zone_type) = zone.zone_type.clone() { - self.insert( - self.event_schema.dns_update_zone_info_schema().zone_type(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.zone_type, zone_type, ); } - self.insert( - self.event_schema - .dns_update_zone_info_schema() - .zone_type_id(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.zone_type_id, zone.zone_type_id, ); - self.insert( - self.event_schema.dns_update_zone_info_schema().zone_class(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.zone_class, zone.class.clone(), ); - - self.parent_key_path.segments.pop(); } - fn log_edns(&mut self, key_prefix: &'static str, opt_section: &Option) { - self.parent_key_path.push_field(key_prefix); - + fn log_edns<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + opt_section: &Option, + ) { if let Some(edns) = opt_section { - self.insert( - self.event_schema - .dns_message_opt_pseudo_section_schema() - .extended_rcode(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.extended_rcode, edns.extended_rcode, ); - self.insert( - self.event_schema - .dns_message_opt_pseudo_section_schema() - .version(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.version, edns.version, ); - self.insert( - self.event_schema - .dns_message_opt_pseudo_section_schema() - .do_flag(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.do_flag, edns.dnssec_ok, ); - self.insert( - self.event_schema - .dns_message_opt_pseudo_section_schema() - .udp_max_payload_size(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.udp_max_payload_size, edns.udp_max_payload_size, ); - self.log_edns_options( - self.event_schema - .dns_message_opt_pseudo_section_schema() - .options(), + DnstapParser::log_edns_options( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.options), &edns.options, ); } - - self.parent_key_path.segments.pop(); } - fn log_edns_options(&mut self, key_path: &'static str, options: &[EdnsOptionEntry]) { - self.parent_key_path.push_field(key_path); - + fn log_edns_options<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + options: &[EdnsOptionEntry], + ) { options.iter().enumerate().for_each(|(i, opt)| { - self.parent_key_path.push_index(i as isize); - self.log_edns_opt(opt); - self.parent_key_path.segments.pop(); + let index_segment = path!(i as isize); + DnstapParser::log_edns_opt(event, prefix.concat(index_segment), opt); }); - - self.parent_key_path.segments.pop(); } - fn log_edns_opt(&mut self, opt: &EdnsOptionEntry) { - self.insert( - self.event_schema.dns_message_option_schema().opt_code(), + fn log_edns_opt<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, opt: &EdnsOptionEntry) { + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.opt_code, opt.opt_code, ); - self.insert( - self.event_schema.dns_message_option_schema().opt_name(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.opt_name, opt.opt_name.clone(), ); - self.insert( - self.event_schema.dns_message_option_schema().opt_data(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.opt_data, opt.opt_data.clone(), ); } - fn log_dns_message_record_section(&mut self, key_path: &'static str, records: &[DnsRecord]) { - self.parent_key_path.push_field(key_path); - + fn log_dns_message_record_section<'a>( + event: &mut LogEvent, + prefix: impl ValuePath<'a>, + records: &[DnsRecord], + ) { for (i, record) in records.iter().enumerate() { - self.parent_key_path.push_index(i as isize); - self.log_dns_record(record); - self.parent_key_path.segments.pop(); + let index_segment = path!(i as isize); + DnstapParser::log_dns_record(event, prefix.concat(index_segment), record); } - - self.parent_key_path.segments.pop(); } - fn log_dns_record(&mut self, record: &DnsRecord) { - self.insert( - self.event_schema.dns_record_schema().name(), + fn log_dns_record<'a>(event: &mut LogEvent, prefix: impl ValuePath<'a>, record: &DnsRecord) { + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.domain_name, record.name.clone(), ); if let Some(record_type) = record.record_type.clone() { - self.insert( - self.event_schema.dns_record_schema().record_type(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.record_type, record_type, ); } - self.insert( - self.event_schema.dns_record_schema().record_type_id(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.record_type_id, record.record_type_id, ); - self.insert(self.event_schema.dns_record_schema().ttl(), record.ttl); - self.insert( - self.event_schema.dns_record_schema().class(), + DnstapParser::insert(event, prefix.clone(), &DNSTAP_VALUE_PATHS.ttl, record.ttl); + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.class, record.class.clone(), ); if let Some(rdata) = &record.rdata { - self.insert( - self.event_schema.dns_record_schema().rdata(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.rdata, rdata.to_string(), ); }; if let Some(rdata_bytes) = &record.rdata_bytes { - self.insert( - self.event_schema.dns_record_schema().rdata_bytes(), + DnstapParser::insert( + event, + prefix.clone(), + &DNSTAP_VALUE_PATHS.rdata_bytes, BASE64_STANDARD.encode(rdata_bytes), ); }; @@ -944,21 +970,19 @@ fn to_dnstap_message_type(type_id: i32) -> String { #[cfg(test)] mod tests { - use super::{super::schema::DnstapEventSchema, *}; + use super::*; use crate::event::Value; #[test] fn test_parse_dnstap_data_with_query_message() { let mut log_event = LogEvent::default(); - let schema = DnstapEventSchema::new(); - let mut parser = DnstapParser::new(&schema, &mut log_event); let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcnoIAxACGAEiEAAAAAAAAA\ AAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb\ 20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAHgB"; let dnstap_data = BASE64_STANDARD .decode(raw_dnstap_data) .expect("Invalid base64 encoded data."); - let parse_result = parser.parse_dnstap_data(Bytes::from(dnstap_data)); + let parse_result = DnstapParser::parse(&mut log_event, Bytes::from(dnstap_data)); assert!(parse_result.is_ok()); assert!(log_event .all_fields() @@ -1002,15 +1026,13 @@ mod tests { #[test] fn test_parse_dnstap_data_with_update_message() { let mut log_event = LogEvent::default(); - let schema = DnstapEventSchema::new(); - let mut parser = DnstapParser::new(&schema, &mut log_event); let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcmsIDhABGAEiBH8AAA\ EqBH8AAAEwrG44AEC+iu73BU14gfofUh1wi6gAAAEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAWC+iu73BW0agDwvch1wi6gAA\ AEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAXgB"; let dnstap_data = BASE64_STANDARD .decode(raw_dnstap_data) .expect("Invalid base64 encoded data."); - let parse_result = parser.parse_dnstap_data(Bytes::from(dnstap_data)); + let parse_result = DnstapParser::parse(&mut log_event, Bytes::from(dnstap_data)); assert!(parse_result.is_ok()); assert!(log_event .all_fields() @@ -1056,10 +1078,7 @@ mod tests { #[test] fn test_parse_dnstap_data_with_invalid_data() { let mut log_event = LogEvent::default(); - let schema = DnstapEventSchema::new(); - let mut parser = DnstapParser::new(&schema, &mut log_event); - let e = parser - .parse_dnstap_data(Bytes::from(vec![1, 2, 3])) + let e = DnstapParser::parse(&mut log_event, Bytes::from(vec![1, 2, 3])) .expect_err("Expected TrustDnsError."); assert!(e.to_string().contains("Protobuf message")); } diff --git a/src/sources/dnstap/schema.rs b/src/sources/dnstap/schema.rs index 90a76dfa33a44..23ce53cdbfd92 100644 --- a/src/sources/dnstap/schema.rs +++ b/src/sources/dnstap/schema.rs @@ -1,4 +1,5 @@ use lookup::{owned_value_path, OwnedValuePath}; +use once_cell::sync::Lazy; use std::collections::BTreeMap; use vrl::btreemap; use vrl::value::{ @@ -7,151 +8,121 @@ use vrl::value::{ }; #[derive(Debug, Default, Clone)] -pub struct DnstapEventSchema { - dnstap_root_data_schema: DnstapRootDataSchema, - dnstap_message_schema: DnstapMessageSchema, - dns_query_message_schema: DnsQueryMessageSchema, - dns_query_header_schema: DnsQueryHeaderSchema, - dns_update_message_schema: DnsUpdateMessageSchema, - dns_update_header_schema: DnsUpdateHeaderSchema, - dns_message_opt_pseudo_section_schema: DnsMessageOptPseudoSectionSchema, - dns_message_option_schema: DnsMessageOptionSchema, - dns_record_schema: DnsRecordSchema, - dns_query_question_schema: DnsQueryQuestionSchema, - dns_update_zone_info_schema: DnsUpdateZoneInfoSchema, -} +pub struct DnstapEventSchema; impl DnstapEventSchema { /// The message schema for the request and response message fields fn request_message_schema_definition(&self) -> Collection { - let mut result = BTreeMap::new(); + let mut result: BTreeMap = BTreeMap::new(); result.insert( - self.dns_query_message_schema().time().into(), + DNSTAP_VALUE_PATHS.time.to_string().into(), Kind::integer().or_undefined(), ); result.insert( - self.dns_update_message_schema().time().into(), + DNSTAP_VALUE_PATHS.time.to_string().into(), Kind::integer().or_undefined(), ); result.insert( - self.dns_query_message_schema().time_precision().into(), + DNSTAP_VALUE_PATHS.time_precision.to_string().into(), Kind::bytes().or_undefined(), ); result.insert( - self.dns_update_message_schema().time_precision().into(), + DNSTAP_VALUE_PATHS.time_precision.to_string().into(), Kind::bytes().or_undefined(), ); result.insert( - self.dns_query_message_schema().response_code().into(), + DNSTAP_VALUE_PATHS.response_code.to_string().into(), Kind::integer().or_undefined(), ); result.insert( - self.dns_update_message_schema().response_code().into(), + DNSTAP_VALUE_PATHS.response_code.to_string().into(), Kind::integer().or_undefined(), ); result.insert( - self.dns_query_message_schema().response().into(), + DNSTAP_VALUE_PATHS.response.to_string().into(), Kind::bytes().or_undefined(), ); result.insert( - self.dns_update_message_schema().response().into(), + DNSTAP_VALUE_PATHS.response.to_string().into(), Kind::bytes().or_undefined(), ); - if self.dns_query_message_schema().header() == self.dns_update_message_schema().header() { - // This branch will always be hit - - // we know that both headers are equal since they both pull the values from the common schema. - let mut schema = self.dns_query_header_schema().schema_definition(); - schema.merge(self.dns_update_header_schema().schema_definition(), true); - - result.insert( - self.dns_query_message_schema().header().into(), - Kind::object(schema).or_undefined(), - ); - } else { - result.insert( - self.dns_query_message_schema().header().into(), - Kind::object(self.dns_query_header_schema().schema_definition()).or_undefined(), - ); - result.insert( - self.dns_update_message_schema().header().into(), - Kind::object(self.dns_update_header_schema().schema_definition()).or_undefined(), - ); - } + let mut schema = DnsQueryHeaderSchema::schema_definition(); + schema.merge(DnsUpdateHeaderSchema::schema_definition(), true); + + result.insert( + DNSTAP_VALUE_PATHS.header.to_string().into(), + Kind::object(schema).or_undefined(), + ); + result.insert( - self.dns_update_message_schema().zone_section().into(), - Kind::object(self.dns_update_zone_info_schema().schema_definition()).or_undefined(), + DNSTAP_VALUE_PATHS.zone_section.to_string().into(), + Kind::object(DnsUpdateZoneInfoSchema::schema_definition()).or_undefined(), ); result.insert( - self.dns_query_message_schema().question_section().into(), + DNSTAP_VALUE_PATHS.question_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_query_question_schema().schema_definition(), + DnsQueryQuestionSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_query_message_schema().answer_section().into(), + DNSTAP_VALUE_PATHS.answer_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_query_message_schema().authority_section().into(), + DNSTAP_VALUE_PATHS.authority_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_query_message_schema().additional_section().into(), + DNSTAP_VALUE_PATHS.additional_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_query_message_schema().opt_pseudo_section().into(), - Kind::object( - self.dns_message_opt_pseudo_section_schema() - .schema_definition(self.dns_message_option_schema()), - ) - .or_undefined(), + DNSTAP_VALUE_PATHS.opt_pseudo_section.to_string().into(), + Kind::object(DnsMessageOptPseudoSectionSchema::schema_definition()).or_undefined(), ); result.insert( - self.dns_query_message_schema().raw_data().into(), + DNSTAP_VALUE_PATHS.raw_data.to_string().into(), Kind::bytes().or_undefined(), ); result.insert( - self.dns_update_message_schema() - .prerequisite_section() - .into(), + DNSTAP_VALUE_PATHS.prerequisite_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_update_message_schema().update_section().into(), + DNSTAP_VALUE_PATHS.update_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); result.insert( - self.dns_update_message_schema().additional_section().into(), + DNSTAP_VALUE_PATHS.additional_section.to_string().into(), Kind::array(Collection::from_unknown(Kind::object( - self.dns_record_schema().schema_definition(), + DnsRecordSchema::schema_definition(), ))) .or_undefined(), ); @@ -165,51 +136,15 @@ impl DnstapEventSchema { schema: vector_core::schema::Definition, ) -> vector_core::schema::Definition { schema - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().server_identity()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().server_version()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().extra()), - Kind::bytes(), - None, - ) - .with_event_field( - &owned_value_path!(self.dnstap_root_data_schema().data_type_id()), - Kind::integer(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().data_type()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().error()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().raw_data()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().time()), - Kind::integer(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_root_data_schema().time_precision()), - Kind::bytes(), - None, - ) + .optional_field(&DNSTAP_VALUE_PATHS.server_identity, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.server_version, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.extra, Kind::bytes(), None) + .with_event_field(&DNSTAP_VALUE_PATHS.data_type_id, Kind::integer(), None) + .optional_field(&DNSTAP_VALUE_PATHS.data_type, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.error, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.raw_data, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.time, Kind::integer(), None) + .optional_field(&DNSTAP_VALUE_PATHS.time_precision, Kind::bytes(), None) } /// Schema definition from the message. @@ -218,58 +153,22 @@ impl DnstapEventSchema { schema: vector_core::schema::Definition, ) -> vector_core::schema::Definition { schema + .optional_field(&DNSTAP_VALUE_PATHS.socket_family, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.socket_protocol, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.query_address, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.query_port, Kind::integer(), None) + .optional_field(&DNSTAP_VALUE_PATHS.response_address, Kind::bytes(), None) + .optional_field(&DNSTAP_VALUE_PATHS.response_port, Kind::integer(), None) + .optional_field(&DNSTAP_VALUE_PATHS.query_zone, Kind::bytes(), None) + .with_event_field(&DNSTAP_VALUE_PATHS.message_type_id, Kind::integer(), None) + .optional_field(&DNSTAP_VALUE_PATHS.message_type, Kind::bytes(), None) .optional_field( - &owned_value_path!(self.dnstap_message_schema().socket_family()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().socket_protocol()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().query_address()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().query_port()), - Kind::integer(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().response_address()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().response_port()), - Kind::integer(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().query_zone()), - Kind::bytes(), - None, - ) - .with_event_field( - &owned_value_path!(self.dnstap_message_schema().dnstap_message_type_id()), - Kind::integer(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().dnstap_message_type()), - Kind::bytes(), - None, - ) - .optional_field( - &owned_value_path!(self.dnstap_message_schema().request_message()), + &DNSTAP_VALUE_PATHS.request_message, Kind::object(self.request_message_schema_definition()), None, ) .optional_field( - &owned_value_path!(self.dnstap_message_schema().response_message()), + &DNSTAP_VALUE_PATHS.response_message, Kind::object(self.request_message_schema_definition()), None, ) @@ -282,616 +181,300 @@ impl DnstapEventSchema { ) -> vector_core::schema::Definition { self.root_schema_definition(self.message_schema_definition(schema)) } - - pub const fn dnstap_root_data_schema(&self) -> &DnstapRootDataSchema { - &self.dnstap_root_data_schema - } - - pub const fn dnstap_message_schema(&self) -> &DnstapMessageSchema { - &self.dnstap_message_schema - } - - pub const fn dns_query_message_schema(&self) -> &DnsQueryMessageSchema { - &self.dns_query_message_schema - } - - pub const fn dns_query_header_schema(&self) -> &DnsQueryHeaderSchema { - &self.dns_query_header_schema - } - - pub const fn dns_update_message_schema(&self) -> &DnsUpdateMessageSchema { - &self.dns_update_message_schema - } - - pub const fn dns_update_header_schema(&self) -> &DnsUpdateHeaderSchema { - &self.dns_update_header_schema - } - - pub const fn dns_message_opt_pseudo_section_schema(&self) -> &DnsMessageOptPseudoSectionSchema { - &self.dns_message_opt_pseudo_section_schema - } - - pub const fn dns_message_option_schema(&self) -> &DnsMessageOptionSchema { - &self.dns_message_option_schema - } - - pub const fn dns_record_schema(&self) -> &DnsRecordSchema { - &self.dns_record_schema - } - - pub const fn dns_query_question_schema(&self) -> &DnsQueryQuestionSchema { - &self.dns_query_question_schema - } - - pub const fn dns_update_zone_info_schema(&self) -> &DnsUpdateZoneInfoSchema { - &self.dns_update_zone_info_schema - } - - pub fn dnstap_root_data_schema_mut(&mut self) -> &mut DnstapRootDataSchema { - &mut self.dnstap_root_data_schema - } - - pub fn new() -> Self { - Self::default() - } } +/// Collection of owned value paths. #[derive(Debug, Clone)] -pub struct DnstapRootDataSchema { - timestamp: Option, -} - -impl Default for DnstapRootDataSchema { - fn default() -> Self { - Self { - timestamp: Some(owned_value_path!("timestamp")), - } - } -} - -impl DnstapRootDataSchema { - pub const fn server_identity(&self) -> &'static str { - "serverId" - } - - pub const fn server_version(&self) -> &'static str { - "serverVersion" - } - - pub const fn extra(&self) -> &'static str { - "extraInfo" - } - - pub const fn data_type(&self) -> &'static str { - "dataType" - } - - pub const fn data_type_id(&self) -> &'static str { - "dataTypeId" - } - - pub const fn timestamp(&self) -> Option<&OwnedValuePath> { - self.timestamp.as_ref() - } - - pub const fn time(&self) -> &'static str { - "time" - } - - pub const fn time_precision(&self) -> &'static str { - "timePrecision" - } - - pub const fn error(&self) -> &'static str { - "error" - } - - pub const fn raw_data(&self) -> &'static str { - "rawData" - } - - pub fn set_timestamp(&mut self, val: Option) -> &mut Self { - self.timestamp = val; - self - } -} - -#[derive(Debug, Default, Clone)] -pub struct DnstapMessageSchema; - -impl DnstapMessageSchema { - pub const fn socket_family(&self) -> &'static str { - "socketFamily" - } - - pub const fn socket_protocol(&self) -> &'static str { - "socketProtocol" - } - - pub const fn query_address(&self) -> &'static str { - "sourceAddress" - } - - pub const fn query_port(&self) -> &'static str { - "sourcePort" - } - - pub const fn response_address(&self) -> &'static str { - "responseAddress" - } - - pub const fn response_port(&self) -> &'static str { - "responsePort" - } - - pub const fn query_zone(&self) -> &'static str { - "queryZone" - } - - pub const fn dnstap_message_type(&self) -> &'static str { - "messageType" - } - - pub const fn dnstap_message_type_id(&self) -> &'static str { - "messageTypeId" - } - - pub const fn request_message(&self) -> &'static str { - "requestData" - } - - pub const fn response_message(&self) -> &'static str { - "responseData" - } +pub struct DnstapPaths { + // DnstapRootDataSchema + pub server_identity: OwnedValuePath, + pub server_version: OwnedValuePath, + pub extra: OwnedValuePath, + pub data_type: OwnedValuePath, + pub data_type_id: OwnedValuePath, + pub time: OwnedValuePath, + pub time_precision: OwnedValuePath, + pub error: OwnedValuePath, + pub raw_data: OwnedValuePath, + + // DnstapMessageSchema + pub socket_family: OwnedValuePath, + pub socket_protocol: OwnedValuePath, + pub query_address: OwnedValuePath, + pub query_port: OwnedValuePath, + pub response_address: OwnedValuePath, + pub response_port: OwnedValuePath, + pub query_zone: OwnedValuePath, + pub message_type: OwnedValuePath, + pub message_type_id: OwnedValuePath, + pub request_message: OwnedValuePath, + pub response_message: OwnedValuePath, + + // DnsQueryMessageSchema + pub response_code: OwnedValuePath, + pub response: OwnedValuePath, + pub header: OwnedValuePath, + pub question_section: OwnedValuePath, + pub answer_section: OwnedValuePath, + pub authority_section: OwnedValuePath, + pub additional_section: OwnedValuePath, + pub opt_pseudo_section: OwnedValuePath, + + // DnsUpdateMessageSchema + pub zone_section: OwnedValuePath, + pub prerequisite_section: OwnedValuePath, + pub update_section: OwnedValuePath, + + // DnsMessageHeaderCommonSchema + pub id: OwnedValuePath, + pub opcode: OwnedValuePath, + pub rcode: OwnedValuePath, + pub qr: OwnedValuePath, + + // DnsQueryHeaderSchema + pub aa: OwnedValuePath, + pub tc: OwnedValuePath, + pub rd: OwnedValuePath, + pub ra: OwnedValuePath, + pub ad: OwnedValuePath, + pub cd: OwnedValuePath, + pub question_count: OwnedValuePath, + pub answer_count: OwnedValuePath, + pub authority_count: OwnedValuePath, + pub ar_count: OwnedValuePath, + + // DnsUpdateHeaderSchema + pub zone_count: OwnedValuePath, + pub prerequisite_count: OwnedValuePath, + pub update_count: OwnedValuePath, + pub ad_count: OwnedValuePath, + + // DnsMessageOptPseudoSectionSchema + pub extended_rcode: OwnedValuePath, + pub version: OwnedValuePath, + pub do_flag: OwnedValuePath, + pub udp_max_payload_size: OwnedValuePath, + pub options: OwnedValuePath, + + // DnsMessageOptionSchema + pub opt_code: OwnedValuePath, + pub opt_name: OwnedValuePath, + pub opt_data: OwnedValuePath, + + // DnsRecordSchema + pub domain_name: OwnedValuePath, + pub record_type: OwnedValuePath, + pub record_type_id: OwnedValuePath, + pub ttl: OwnedValuePath, + pub class: OwnedValuePath, + pub rdata: OwnedValuePath, + pub rdata_bytes: OwnedValuePath, + + // DnsQueryQuestionSchema + pub question_type: OwnedValuePath, + pub question_type_id: OwnedValuePath, + + // DnsUpdateZoneInfoSchema + pub zone_name: OwnedValuePath, + pub zone_class: OwnedValuePath, + pub zone_type: OwnedValuePath, + pub zone_type_id: OwnedValuePath, } -#[derive(Debug, Default, Clone)] -pub struct DnsMessageCommonSchema; - -impl DnsMessageCommonSchema { - pub const fn response_code() -> &'static str { - "fullRcode" - } - - pub const fn response() -> &'static str { - "rcodeName" - } - - pub const fn time() -> &'static str { - "time" - } - - pub const fn time_precision() -> &'static str { - "timePrecision" - } - - pub const fn raw_data() -> &'static str { - "rawData" - } - - pub const fn header() -> &'static str { - "header" - } -} - -#[derive(Debug, Default, Clone)] -pub struct DnsQueryMessageSchema; - -impl DnsQueryMessageSchema { - pub const fn response_code(&self) -> &'static str { - DnsMessageCommonSchema::response_code() - } - - pub const fn response(&self) -> &'static str { - DnsMessageCommonSchema::response() - } - - pub const fn time(&self) -> &'static str { - DnsMessageCommonSchema::time() - } - - pub const fn time_precision(&self) -> &'static str { - DnsMessageCommonSchema::time_precision() - } - - pub const fn raw_data(&self) -> &'static str { - DnsMessageCommonSchema::raw_data() - } - - pub const fn header(&self) -> &'static str { - DnsMessageCommonSchema::header() - } - - pub const fn question_section(&self) -> &'static str { - "question" - } - - pub const fn answer_section(&self) -> &'static str { - "answers" - } - - pub const fn authority_section(&self) -> &'static str { - "authority" - } - - pub const fn additional_section(&self) -> &'static str { - "additional" - } - - pub const fn opt_pseudo_section(&self) -> &'static str { - "opt" - } -} - -#[derive(Debug, Default, Clone)] -pub struct DnsUpdateMessageSchema; - -impl DnsUpdateMessageSchema { - pub const fn response_code(&self) -> &'static str { - DnsMessageCommonSchema::response_code() - } - - pub const fn response(&self) -> &'static str { - DnsMessageCommonSchema::response() - } - - pub const fn time(&self) -> &'static str { - DnsMessageCommonSchema::time() - } - - pub const fn time_precision(&self) -> &'static str { - DnsMessageCommonSchema::time_precision() - } - - pub const fn raw_data(&self) -> &'static str { - DnsMessageCommonSchema::raw_data() - } - - pub const fn header(&self) -> &'static str { - DnsMessageCommonSchema::header() - } - - pub const fn zone_section(&self) -> &'static str { - "zone" - } - - pub const fn prerequisite_section(&self) -> &'static str { - "prerequisite" - } - - pub const fn update_section(&self) -> &'static str { - "update" - } - - pub const fn additional_section(&self) -> &'static str { - "additional" - } -} - -#[derive(Debug, Default, Clone)] -pub struct DnsMessageHeaderCommonSchema; - -impl DnsMessageHeaderCommonSchema { - pub const fn id() -> &'static str { - "id" - } - - pub const fn opcode() -> &'static str { - "opcode" - } - - pub const fn rcode() -> &'static str { - "rcode" - } - - pub const fn qr() -> &'static str { - "qr" - } -} +/// Lazily initialized singleton. +pub(crate) static DNSTAP_VALUE_PATHS: Lazy = Lazy::new(|| DnstapPaths { + server_identity: owned_value_path!("serverId"), + server_version: owned_value_path!("serverVersion"), + extra: owned_value_path!("extraInfo"), + data_type: owned_value_path!("dataType"), + data_type_id: owned_value_path!("dataTypeId"), + time: owned_value_path!("time"), + time_precision: owned_value_path!("timePrecision"), + error: owned_value_path!("error"), + raw_data: owned_value_path!("rawData"), + socket_family: owned_value_path!("socketFamily"), + socket_protocol: owned_value_path!("socketProtocol"), + query_address: owned_value_path!("sourceAddress"), + query_port: owned_value_path!("sourcePort"), + response_address: owned_value_path!("responseAddress"), + response_port: owned_value_path!("responsePort"), + query_zone: owned_value_path!("queryZone"), + message_type: owned_value_path!("messageType"), + message_type_id: owned_value_path!("messageTypeId"), + request_message: owned_value_path!("requestData"), + response_message: owned_value_path!("responseData"), + response_code: owned_value_path!("fullRcode"), + response: owned_value_path!("rcodeName"), + header: owned_value_path!("header"), + question_section: owned_value_path!("question"), + answer_section: owned_value_path!("answers"), + authority_section: owned_value_path!("authority"), + additional_section: owned_value_path!("additional"), + opt_pseudo_section: owned_value_path!("opt"), + zone_section: owned_value_path!("zone"), + prerequisite_section: owned_value_path!("prerequisite"), + update_section: owned_value_path!("update"), + id: owned_value_path!("id"), + opcode: owned_value_path!("opcode"), + rcode: owned_value_path!("rcode"), + qr: owned_value_path!("qr"), + aa: owned_value_path!("aa"), + tc: owned_value_path!("tc"), + rd: owned_value_path!("rd"), + ra: owned_value_path!("ra"), + ad: owned_value_path!("ad"), + cd: owned_value_path!("cd"), + question_count: owned_value_path!("qdCount"), + answer_count: owned_value_path!("anCount"), + authority_count: owned_value_path!("nsCount"), + ar_count: owned_value_path!("arCount"), + zone_count: owned_value_path!("zoCount"), + prerequisite_count: owned_value_path!("prCount"), + update_count: owned_value_path!("upCount"), + ad_count: owned_value_path!("adCount"), + extended_rcode: owned_value_path!("extendedRcode"), + version: owned_value_path!("ednsVersion"), + do_flag: owned_value_path!("do"), + udp_max_payload_size: owned_value_path!("udpPayloadSize"), + options: owned_value_path!("options"), + opt_code: owned_value_path!("optCode"), + opt_name: owned_value_path!("optName"), + opt_data: owned_value_path!("optValue"), + record_type: owned_value_path!("recordType"), + record_type_id: owned_value_path!("recordTypeId"), + ttl: owned_value_path!("ttl"), + class: owned_value_path!("class"), + rdata: owned_value_path!("rData"), + rdata_bytes: owned_value_path!("rDataBytes"), + domain_name: owned_value_path!("domainName"), + question_type: owned_value_path!("questionType"), + question_type_id: owned_value_path!("questionTypeId"), + zone_name: owned_value_path!("zName"), + zone_class: owned_value_path!("zClass"), + zone_type: owned_value_path!("zType"), + zone_type_id: owned_value_path!("zTypeId"), +}); #[derive(Debug, Default, Clone)] pub struct DnsQueryHeaderSchema; impl DnsQueryHeaderSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.id() => Kind::integer(), - self.opcode() => Kind::integer(), - self.rcode() => Kind::integer(), - self.qr() => Kind::integer(), - self.aa() => Kind::boolean(), - self.tc() => Kind::boolean(), - self.rd() => Kind::boolean(), - self.ra() => Kind::boolean(), - self.ad() => Kind::boolean(), - self.cd() => Kind::boolean(), - self.additional_count() => Kind::integer().or_undefined(), - self.question_count() => Kind::integer().or_undefined(), - self.answer_count() => Kind::integer().or_undefined(), - self.authority_count() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.id.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.opcode.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.rcode.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.qr.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.aa.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.tc.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.rd.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.ra.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.ad.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.cd.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.ar_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.question_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.answer_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.authority_count.to_string() => Kind::integer().or_undefined(), } .into() } - - pub const fn id(&self) -> &'static str { - DnsMessageHeaderCommonSchema::id() - } - - pub const fn opcode(&self) -> &'static str { - DnsMessageHeaderCommonSchema::opcode() - } - - pub const fn rcode(&self) -> &'static str { - DnsMessageHeaderCommonSchema::rcode() - } - - pub const fn qr(&self) -> &'static str { - DnsMessageHeaderCommonSchema::qr() - } - - pub const fn aa(&self) -> &'static str { - "aa" - } - - pub const fn tc(&self) -> &'static str { - "tc" - } - - pub const fn rd(&self) -> &'static str { - "rd" - } - - pub const fn ra(&self) -> &'static str { - "ra" - } - - pub const fn ad(&self) -> &'static str { - "ad" - } - - pub const fn cd(&self) -> &'static str { - "cd" - } - - pub const fn question_count(&self) -> &'static str { - "qdCount" - } - - pub const fn answer_count(&self) -> &'static str { - "anCount" - } - - pub const fn authority_count(&self) -> &'static str { - "nsCount" - } - - pub const fn additional_count(&self) -> &'static str { - "arCount" - } } #[derive(Debug, Default, Clone)] pub struct DnsUpdateHeaderSchema; impl DnsUpdateHeaderSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.id() => Kind::integer(), - self.opcode() => Kind::integer(), - self.rcode() => Kind::integer(), - self.qr() => Kind::integer(), - self.zone_count() => Kind::integer().or_undefined(), - self.prerequisite_count() => Kind::integer().or_undefined(), - self.update_count() => Kind::integer().or_undefined(), - self.additional_count() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.id.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.opcode.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.rcode.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.qr.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.zone_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.prerequisite_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.update_count.to_string() => Kind::integer().or_undefined(), + DNSTAP_VALUE_PATHS.ad_count.to_string() => Kind::integer().or_undefined(), } .into() } - - pub const fn id(&self) -> &'static str { - DnsMessageHeaderCommonSchema::id() - } - - pub const fn opcode(&self) -> &'static str { - DnsMessageHeaderCommonSchema::opcode() - } - - pub const fn rcode(&self) -> &'static str { - DnsMessageHeaderCommonSchema::rcode() - } - - pub const fn qr(&self) -> &'static str { - DnsMessageHeaderCommonSchema::qr() - } - - pub const fn zone_count(&self) -> &'static str { - "zoCount" - } - - pub const fn prerequisite_count(&self) -> &'static str { - "prCount" - } - - pub const fn update_count(&self) -> &'static str { - "upCount" - } - - pub const fn additional_count(&self) -> &'static str { - "adCount" - } } #[derive(Debug, Default, Clone)] pub struct DnsMessageOptPseudoSectionSchema; impl DnsMessageOptPseudoSectionSchema { - pub fn schema_definition( - &self, - dns_message_option_schema: &DnsMessageOptionSchema, - ) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.extended_rcode() => Kind::integer(), - self.version() => Kind::integer(), - self.do_flag() => Kind::boolean(), - self.udp_max_payload_size() => Kind::integer(), - self.options() => Kind::array( - Collection::from_unknown(Kind::object(dns_message_option_schema.schema_definition())) + DNSTAP_VALUE_PATHS.extended_rcode.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.version.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.do_flag.to_string() => Kind::boolean(), + DNSTAP_VALUE_PATHS.udp_max_payload_size.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.options.to_string() => Kind::array( + Collection::from_unknown(Kind::object(DnsMessageOptionSchema::schema_definition())) ).or_undefined(), } .into() } - - pub const fn extended_rcode(&self) -> &'static str { - "extendedRcode" - } - - pub const fn version(&self) -> &'static str { - "ednsVersion" - } - - pub const fn do_flag(&self) -> &'static str { - "do" - } - - pub const fn udp_max_payload_size(&self) -> &'static str { - "udpPayloadSize" - } - - pub const fn options(&self) -> &'static str { - "options" - } } #[derive(Debug, Default, Clone)] pub struct DnsMessageOptionSchema; impl DnsMessageOptionSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.opt_code() => Kind::integer(), - self.opt_name() => Kind::bytes(), - self.opt_data() => Kind::bytes(), + DNSTAP_VALUE_PATHS.opt_code.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.opt_name.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.opt_data.to_string() => Kind::bytes(), } .into() } - - pub const fn opt_code(&self) -> &'static str { - "optCode" - } - - pub const fn opt_name(&self) -> &'static str { - "optName" - } - - pub const fn opt_data(&self) -> &'static str { - "optValue" - } } #[derive(Debug, Default, Clone)] pub struct DnsRecordSchema; impl DnsRecordSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.name() => Kind::bytes(), - self.record_type() => Kind::bytes().or_undefined(), - self.record_type_id() => Kind::integer(), - self.ttl() => Kind::integer(), - self.class() => Kind::bytes(), - self.rdata() => Kind::bytes(), - self.rdata_bytes() => Kind::bytes().or_undefined(), + DNSTAP_VALUE_PATHS.domain_name.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.record_type.to_string() => Kind::bytes().or_undefined(), + DNSTAP_VALUE_PATHS.record_type_id.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.ttl.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.class.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.rdata.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.rdata_bytes.to_string() => Kind::bytes().or_undefined(), } .into() } - - pub const fn name(&self) -> &'static str { - "domainName" - } - - pub const fn record_type(&self) -> &'static str { - "recordType" - } - - pub const fn record_type_id(&self) -> &'static str { - "recordTypeId" - } - - pub const fn ttl(&self) -> &'static str { - "ttl" - } - - pub const fn class(&self) -> &'static str { - "class" - } - - pub const fn rdata(&self) -> &'static str { - "rData" - } - - pub const fn rdata_bytes(&self) -> &'static str { - "rDataBytes" - } } #[derive(Debug, Default, Clone)] pub struct DnsQueryQuestionSchema; impl DnsQueryQuestionSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.class() => Kind::bytes(), - self.name() => Kind::bytes(), - self.question_type() => Kind::bytes(), - self.question_type_id() => Kind::integer(), + DNSTAP_VALUE_PATHS.class.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.domain_name.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.question_type.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.question_type_id.to_string() => Kind::integer(), } .into() } - - pub const fn name(&self) -> &'static str { - "domainName" - } - - pub const fn question_type(&self) -> &'static str { - "questionType" - } - - pub const fn question_type_id(&self) -> &'static str { - "questionTypeId" - } - - pub const fn class(&self) -> &'static str { - "class" - } } #[derive(Debug, Default, Clone)] pub struct DnsUpdateZoneInfoSchema; impl DnsUpdateZoneInfoSchema { - pub fn schema_definition(&self) -> Collection { + pub fn schema_definition() -> Collection { btreemap! { - self.zone_name() => Kind::bytes(), - self.zone_type() => Kind::bytes().or_undefined(), - self.zone_type_id() => Kind::integer(), - self.zone_class() => Kind::bytes(), + DNSTAP_VALUE_PATHS.zone_name.to_string() => Kind::bytes(), + DNSTAP_VALUE_PATHS.zone_type.to_string() => Kind::bytes().or_undefined(), + DNSTAP_VALUE_PATHS.zone_type_id.to_string() => Kind::integer(), + DNSTAP_VALUE_PATHS.zone_class.to_string() => Kind::bytes(), } .into() } - - pub const fn zone_name(&self) -> &'static str { - "zName" - } - - pub const fn zone_class(&self) -> &'static str { - "zClass" - } - - pub const fn zone_type(&self) -> &'static str { - "zType" - } - - pub const fn zone_type_id(&self) -> &'static str { - "zTypeId" - } } From 4063f0d7e2f12de0baefd0e24750bafe016d16f7 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 11 Aug 2023 10:43:41 -0400 Subject: [PATCH 2/3] clippy fixes --- src/sources/dnstap/parser.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/sources/dnstap/parser.rs b/src/sources/dnstap/parser.rs index f41451f0bf9c0..b2a947f07df0c 100644 --- a/src/sources/dnstap/parser.rs +++ b/src/sources/dnstap/parser.rs @@ -33,7 +33,7 @@ use dnstap_proto::{ message::Type as DnstapMessageType, Dnstap, Message as DnstapMessage, SocketFamily, SocketProtocol, }; -use lookup::lookup_v2::{OwnedValuePath, ValuePath}; +use lookup::lookup_v2::ValuePath; use lookup::PathPrefix; use vector_core::config::log_schema; @@ -226,7 +226,6 @@ impl DnstapParser { query_time_sec, dnstap_message.query_time_nsec, dnstap_message_type_id, - &DNSTAP_VALUE_PATHS.request_message, dnstap_message.query_message.as_ref(), &DNSTAP_MESSAGE_REQUEST_TYPE_IDS, ); @@ -239,7 +238,6 @@ impl DnstapParser { response_time_sec, dnstap_message.response_time_nsec, dnstap_message_type_id, - &DNSTAP_VALUE_PATHS.request_message, dnstap_message.response_message.as_ref(), &DNSTAP_MESSAGE_RESPONSE_TYPE_IDS, ); @@ -352,7 +350,6 @@ impl DnstapParser { time_sec: u64, time_nsec: Option, dnstap_message_type_id: i32, - message_key: &'a OwnedValuePath, message: Option<&Vec>, type_ids: &HashSet, ) { @@ -374,7 +371,12 @@ impl DnstapParser { } if message.is_none() { - DnstapParser::log_time(event, prefix.concat(message_key), time_in_nanosec, "ns"); + DnstapParser::log_time( + event, + prefix.concat(&DNSTAP_VALUE_PATHS.request_message), + time_in_nanosec, + "ns", + ); } } From 79d3a3f0d39c4d3441e5315fcce467b28640959a Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 11 Aug 2023 13:37:04 -0400 Subject: [PATCH 3/3] strengthen tests --- src/sources/dnstap/parser.rs | 239 ++++++++++++++++++++++++----------- 1 file changed, 163 insertions(+), 76 deletions(-) diff --git a/src/sources/dnstap/parser.rs b/src/sources/dnstap/parser.rs index b2a947f07df0c..0a12f7426443a 100644 --- a/src/sources/dnstap/parser.rs +++ b/src/sources/dnstap/parser.rs @@ -974,6 +974,8 @@ fn to_dnstap_message_type(type_id: i32) -> String { mod tests { use super::*; use crate::event::Value; + use chrono::DateTime; + use std::collections::BTreeMap; #[test] fn test_parse_dnstap_data_with_query_message() { @@ -986,43 +988,91 @@ mod tests { .expect("Invalid base64 encoded data."); let parse_result = DnstapParser::parse(&mut log_event, Bytes::from(dnstap_data)); assert!(parse_result.is_ok()); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "time" - && match *value { - Value::Integer(time) => time == 1_593_489_007_920_014_129, - _ => false, - })); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "timestamp" - && match *value { - Value::Timestamp(timestamp) => - timestamp.timestamp_nanos() == 1_593_489_007_920_014_129, - _ => false, - })); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "requestData.header.qr" - && match *value { - Value::Integer(qr) => qr == 0, - _ => false, - })); - assert!(log_event.all_fields().unwrap().any(|(key, value)| key - == "requestData.opt.udpPayloadSize" - && match *value { - Value::Integer(udp_payload_size) => udp_payload_size == 512, - _ => false, - })); - assert!(log_event.all_fields().unwrap().any(|(key, value)| key - == "requestData.question[0].domainName" - && match value { - Value::Bytes(domain_name) => *domain_name == Bytes::from_static(b"facebook1.com."), - _ => false, - })); + + let expected_map: BTreeMap<&str, Value> = BTreeMap::from([ + ("dataType", Value::Bytes(Bytes::from("Message"))), + ("dataTypeId", Value::Integer(1)), + ("messageType", Value::Bytes(Bytes::from("ResolverQuery"))), + ("messageTypeId", Value::Integer(3)), + ("queryZone", Value::Bytes(Bytes::from("com."))), + ("requestData.fullRcode", Value::Integer(0)), + ("requestData.header.aa", Value::Boolean(false)), + ("requestData.header.ad", Value::Boolean(false)), + ("requestData.header.anCount", Value::Integer(0)), + ("requestData.header.arCount", Value::Integer(1)), + ("requestData.header.cd", Value::Boolean(false)), + ("requestData.header.id", Value::Integer(37634)), + ("requestData.header.nsCount", Value::Integer(0)), + ("requestData.header.opcode", Value::Integer(0)), + ("requestData.header.qdCount", Value::Integer(1)), + ("requestData.header.qr", Value::Integer(0)), + ("requestData.header.ra", Value::Boolean(false)), + ("requestData.header.rcode", Value::Integer(0)), + ("requestData.header.rd", Value::Boolean(false)), + ("requestData.header.tc", Value::Boolean(false)), + ("requestData.opt.do", Value::Boolean(true)), + ("requestData.opt.ednsVersion", Value::Integer(0)), + ("requestData.opt.extendedRcode", Value::Integer(0)), + ("requestData.opt.options[0].optCode", Value::Integer(10)), + ( + "requestData.opt.options[0].optName", + Value::Bytes(Bytes::from("Cookie")), + ), + ( + "requestData.opt.options[0].optValue", + Value::Bytes(Bytes::from("7GMIAb3NWDM=")), + ), + ("requestData.opt.udpPayloadSize", Value::Integer(512)), + ( + "requestData.question[0].class", + Value::Bytes(Bytes::from("IN")), + ), + ( + "requestData.question[0].domainName", + Value::Bytes(Bytes::from("facebook1.com.")), + ), + ( + "requestData.question[0].questionType", + Value::Bytes(Bytes::from("A")), + ), + ("requestData.question[0].questionTypeId", Value::Integer(1)), + ( + "requestData.rcodeName", + Value::Bytes(Bytes::from("NoError")), + ), + ( + "responseAddress", + Value::Bytes(Bytes::from("2001:502:7094::30")), + ), + ("responsePort", Value::Integer(53)), + ( + "serverId", + Value::Bytes(Bytes::from("james-Virtual-Machine")), + ), + ("serverVersion", Value::Bytes(Bytes::from("BIND 9.16.3"))), + ("socketFamily", Value::Bytes(Bytes::from("INET6"))), + ("socketProtocol", Value::Bytes(Bytes::from("UDP"))), + ("sourceAddress", Value::Bytes(Bytes::from("::"))), + ("sourcePort", Value::Integer(46835)), + ("time", Value::Integer(1_593_489_007_920_014_129)), + ("timePrecision", Value::Bytes(Bytes::from("ns"))), + ( + "timestamp", + Value::Timestamp( + Utc.from_utc_datetime( + &DateTime::parse_from_rfc3339("2020-06-30T03:50:07.920014129Z") + .unwrap() + .naive_utc(), + ), + ), + ), + ]); + + // The maps need to contain identical keys and values. + for (exp_key, exp_value) in expected_map { + let value = log_event.get(exp_key).unwrap(); + assert_eq!(*value, exp_value); + } } #[test] @@ -1036,45 +1086,82 @@ mod tests { .expect("Invalid base64 encoded data."); let parse_result = DnstapParser::parse(&mut log_event, Bytes::from(dnstap_data)); assert!(parse_result.is_ok()); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "time" - && match *value { - Value::Integer(time) => time == 1_593_541_950_792_494_106, - _ => false, - })); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "timestamp" - && match *value { - Value::Timestamp(timestamp) => - timestamp.timestamp_nanos() == 1_593_541_950_792_494_106, - _ => false, - })); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "requestData.header.qr" - && match *value { - Value::Integer(qr) => qr == 1, - _ => false, - })); - assert!(log_event - .all_fields() - .unwrap() - .any(|(key, value)| key == "messageType" - && match value { - Value::Bytes(data_type) => *data_type == Bytes::from_static(b"UpdateResponse"), - _ => false, - })); - assert!(log_event.all_fields().unwrap().any(|(key, value)| key - == "requestData.zone.zName" - && match value { - Value::Bytes(domain_name) => *domain_name == Bytes::from_static(b"example.com."), - _ => false, - })); + + let expected_map: BTreeMap<&str, Value> = BTreeMap::from([ + ("dataType", Value::Bytes(Bytes::from("Message"))), + ("dataTypeId", Value::Integer(1)), + ("messageType", Value::Bytes(Bytes::from("UpdateResponse"))), + ("messageTypeId", Value::Integer(14)), + ("requestData.fullRcode", Value::Integer(0)), + ("requestData.header.adCount", Value::Integer(0)), + ("requestData.header.id", Value::Integer(28811)), + ("requestData.header.opcode", Value::Integer(5)), + ("requestData.header.prCount", Value::Integer(0)), + ("requestData.header.qr", Value::Integer(1)), + ("requestData.header.rcode", Value::Integer(0)), + ("requestData.header.upCount", Value::Integer(0)), + ("requestData.header.zoCount", Value::Integer(1)), + ( + "requestData.rcodeName", + Value::Bytes(Bytes::from("NoError")), + ), + ("requestData.zone.zClass", Value::Bytes(Bytes::from("IN"))), + ( + "requestData.zone.zName", + Value::Bytes(Bytes::from("example.com.")), + ), + ("requestData.zone.zType", Value::Bytes(Bytes::from("SOA"))), + ("requestData.zone.zTypeId", Value::Integer(6)), + ("responseAddress", Value::Bytes(Bytes::from("127.0.0.1"))), + ("responseData.fullRcode", Value::Integer(0)), + ("responseData.header.adCount", Value::Integer(0)), + ("responseData.header.id", Value::Integer(28811)), + ("responseData.header.opcode", Value::Integer(5)), + ("responseData.header.prCount", Value::Integer(0)), + ("responseData.header.qr", Value::Integer(1)), + ("responseData.header.rcode", Value::Integer(0)), + ("responseData.header.upCount", Value::Integer(0)), + ("responseData.header.zoCount", Value::Integer(1)), + ( + "responseData.rcodeName", + Value::Bytes(Bytes::from("NoError")), + ), + ("responseData.zone.zClass", Value::Bytes(Bytes::from("IN"))), + ( + "responseData.zone.zName", + Value::Bytes(Bytes::from("example.com.")), + ), + ("responseData.zone.zType", Value::Bytes(Bytes::from("SOA"))), + ("responseData.zone.zTypeId", Value::Integer(6)), + ("responsePort", Value::Integer(0)), + ( + "serverId", + Value::Bytes(Bytes::from("james-Virtual-Machine")), + ), + ("serverVersion", Value::Bytes(Bytes::from("BIND 9.16.3"))), + ("socketFamily", Value::Bytes(Bytes::from("INET"))), + ("socketProtocol", Value::Bytes(Bytes::from("UDP"))), + ("sourceAddress", Value::Bytes(Bytes::from("127.0.0.1"))), + ("sourcePort", Value::Integer(14124)), + ("time", Value::Integer(1_593_541_950_792_494_106)), + ("timePrecision", Value::Bytes(Bytes::from("ns"))), + ( + "timestamp", + Value::Timestamp( + Utc.from_utc_datetime( + &DateTime::parse_from_rfc3339("2020-06-30T18:32:30.792494106Z") + .unwrap() + .naive_utc(), + ), + ), + ), + ]); + + // The maps need to contain identical keys and values. + for (exp_key, exp_value) in expected_map { + let value = log_event.get(exp_key).unwrap(); + assert_eq!(*value, exp_value); + } } #[test]