Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor dnstap to use 'OwnedValuePath's #18212

Merged
merged 3 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions benches/dnstap/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use vector::{
event::LogEvent,
sources::dnstap::{schema::DnstapEventSchema, DnstapParser},
};
use vector::event::LogEvent;
use vector::sources::dnstap::parser::DnstapParser;

fn benchmark_query_parsing(c: &mut Criterion) {
let mut event = LogEvent::default();
let schema = DnstapEventSchema::new();
let mut parser = DnstapParser::new(&schema, &mut event);
let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcnoIAxACGAEiEAAAAAAAAA\
AAAAAAAAAAAAAqECABBQJwlAAAAAAAAAAAADAw8+0CODVA7+zq9wVNMU3WNlI2kwIAAAABAAAAAAABCWZhY2Vib29rMQNjb\
20AAAEAAQAAKQIAAACAAAAMAAoACOxjCAG9zVgzWgUDY29tAHgB";
Expand All @@ -19,7 +15,7 @@ fn benchmark_query_parsing(c: &mut Criterion) {
group.bench_function("dns_query_parsing", |b| {
b.iter_batched(
|| dnstap_data.clone(),
|dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(),
|dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(),
BatchSize::SmallInput,
)
});
Expand All @@ -29,8 +25,6 @@ fn benchmark_query_parsing(c: &mut Criterion) {

fn benchmark_update_parsing(c: &mut Criterion) {
let mut event = LogEvent::default();
let schema = DnstapEventSchema::new();
let mut parser = DnstapParser::new(&schema, &mut event);
let raw_dnstap_data = "ChVqYW1lcy1WaXJ0dWFsLU1hY2hpbmUSC0JJTkQgOS4xNi4zcmsIDhABGAEiBH8AAA\
EqBH8AAAEwrG44AEC+iu73BU14gfofUh1wi6gAAAEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAWC+iu73BW0agDwvch1wi6gAA\
AEAAAAAAAAHZXhhbXBsZQNjb20AAAYAAXgB";
Expand All @@ -41,7 +35,7 @@ fn benchmark_update_parsing(c: &mut Criterion) {
group.bench_function("dns_update_parsing", |b| {
b.iter_batched(
|| dnstap_data.clone(),
|dnstap_data| parser.parse_dnstap_data(Bytes::from(dnstap_data)).unwrap(),
|dnstap_data| DnstapParser::parse(&mut event, Bytes::from(dnstap_data)).unwrap(),
BatchSize::SmallInput,
)
});
Expand Down
26 changes: 7 additions & 19 deletions src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use vector_common::internal_event::{
ByteSize, BytesReceived, InternalEventHandle as _, Protocol, Registered,
};
use vector_config::configurable_component;
use vrl::event_path;
use vrl::path::PathPrefix;
use vrl::value::{kind::Collection, Kind};

use super::util::framestream::{build_framestream_unix_source, FrameHandler};
Expand All @@ -19,9 +19,9 @@ use crate::{
};

pub mod parser;
pub use parser::{parse_dnstap_data, DnstapParser};

pub mod schema;
use crate::sources::dnstap::parser::DnstapParser;
use crate::sources::dnstap::schema::DNSTAP_VALUE_PATHS;
use dnsmsg_parser::{dns_message, dns_message_parser};
use lookup::lookup_v2::OptionalValuePath;
pub use schema::DnstapEventSchema;
Expand Down Expand Up @@ -109,19 +109,11 @@ impl DnstapConfig {
"protobuf:dnstap.Dnstap".to_string() //content-type for framestream
}

fn event_schema(timestamp_key: Option<&OwnedValuePath>) -> DnstapEventSchema {
let mut schema = DnstapEventSchema::new();
schema
.dnstap_root_data_schema_mut()
.set_timestamp(timestamp_key.cloned());
schema
}

pub fn schema_definition(
&self,
log_namespace: LogNamespace,
) -> vector_core::schema::Definition {
let event_schema = Self::event_schema(log_schema().timestamp_key());
let event_schema = DnstapEventSchema;

match log_namespace {
LogNamespace::Legacy => {
Expand Down Expand Up @@ -205,7 +197,6 @@ pub struct DnstapFrameHandler {
max_frame_length: usize,
socket_path: PathBuf,
content_type: String,
schema: DnstapEventSchema,
raw_data_only: bool,
multithreaded: bool,
max_frame_handling_tasks: u32,
Expand All @@ -224,8 +215,6 @@ impl DnstapFrameHandler {
let source_type_key = log_schema().source_type_key();
let timestamp_key = log_schema().timestamp_key();

let schema = DnstapConfig::event_schema(timestamp_key);

let host_key = config
.host_key
.clone()
Expand All @@ -235,7 +224,6 @@ impl DnstapFrameHandler {
max_frame_length: config.max_frame_length,
socket_path: config.socket_path.clone(),
content_type: config.content_type(),
schema,
raw_data_only: config.raw_data_only.unwrap_or(false),
multithreaded: config.multithreaded.unwrap_or(false),
max_frame_handling_tasks: config.max_frame_handling_tasks.unwrap_or(1000),
Expand Down Expand Up @@ -281,10 +269,10 @@ impl FrameHandler for DnstapFrameHandler {

if self.raw_data_only {
log_event.insert(
event_path!(self.schema.dnstap_root_data_schema().raw_data()),
(PathPrefix::Event, &DNSTAP_VALUE_PATHS.raw_data),
BASE64_STANDARD.encode(&frame),
);
} else if let Err(err) = parse_dnstap_data(&self.schema, &mut log_event, frame) {
} else if let Err(err) = DnstapParser::parse(&mut log_event, frame) {
emit!(DnstapParseError {
error: format!("Dnstap protobuf decode error {:?}.", err)
});
Expand Down Expand Up @@ -408,7 +396,7 @@ mod tests {
let mut event = Event::from(LogEvent::from(vrl::value::Value::from(json)));
event.as_mut_log().insert("timestamp", chrono::Utc::now());

let definition = DnstapConfig::event_schema(Some(&owned_value_path!("timestamp")));
let definition = DnstapEventSchema;
let schema = vector_core::schema::Definition::empty_legacy_namespace()
.with_standard_vector_source_metadata();

Expand Down
Loading