From b1b089e42744820102a95f11211a311cfe99b885 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 1 Mar 2023 20:16:24 +0000 Subject: [PATCH] chore(azure_monitor_logs sink): update log namespacing (#16475) * Update sinks to use log namespacing Signed-off-by: Stephen Wakely * Use the function Signed-off-by: Stephen Wakely * add new option to avoid breaking changes * +fix clippy * +pr feedback * +dh feedback * +regen docs * +kc feedback --------- Signed-off-by: Stephen Wakely Co-authored-by: Spencer Gilbert --- src/sinks/azure_monitor_logs.rs | 80 +++++++++++++++---- .../2023-02-28-0-28-0-upgrade-guide.md | 13 ++- .../sinks/base/azure_monitor_logs.cue | 14 ++++ 3 files changed, 91 insertions(+), 16 deletions(-) diff --git a/src/sinks/azure_monitor_logs.rs b/src/sinks/azure_monitor_logs.rs index ee14a1b88eda8..6fe254e608aeb 100644 --- a/src/sinks/azure_monitor_logs.rs +++ b/src/sinks/azure_monitor_logs.rs @@ -6,13 +6,17 @@ use http::{ Request, StatusCode, Uri, }; use hyper::Body; +use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; +use lookup::{OwnedValuePath, PathPrefix}; use once_cell::sync::Lazy; use openssl::{base64, hash, pkey, sign}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use value::Kind; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; +use vector_core::schema; use crate::{ codecs::Transformer, @@ -36,7 +40,7 @@ fn default_host() -> String { /// Configuration for the `azure_monitor_logs` sink. #[configurable_component(sink("azure_monitor_logs"))] -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] #[serde(deny_unknown_fields)] pub struct AzureMonitorLogsConfig { /// The [unique identifier][uniq_id] for the Log Analytics workspace. @@ -99,6 +103,17 @@ pub struct AzureMonitorLogsConfig { #[serde(default)] pub request: TowerRequestConfig, + /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure. + /// + /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default. + /// This field should be used in rare cases where `TimeGenerated` should point to a specific log + /// field. For example, use this field to set the log field `source_timestamp` as holding the + /// value that should be used as `TimeGenerated` on the Azure side. + /// + /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + #[configurable(metadata(docs::examples = "time_generated"))] + pub time_generated_key: Option, + #[configurable(derived)] pub tls: Option, @@ -111,6 +126,24 @@ pub struct AzureMonitorLogsConfig { acknowledgements: AcknowledgementsConfig, } +impl Default for AzureMonitorLogsConfig { + fn default() -> Self { + Self { + customer_id: "my-customer-id".to_string(), + shared_key: Default::default(), + log_type: "MyRecordType".to_string(), + azure_resource_id: None, + host: default_host(), + encoding: Default::default(), + batch: Default::default(), + request: Default::default(), + time_generated_key: None, + tls: None, + acknowledgements: Default::default(), + } + } +} + #[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Clone, Derivative)] #[serde(rename_all = "snake_case")] #[derivative(Default)] @@ -152,10 +185,12 @@ impl SinkConfig for AzureMonitorLogsConfig { .limit_max_bytes(MAX_BATCH_SIZE)? .into_batch_settings()?; + let time_generated_key = self.time_generated_key.clone().and_then(|k| k.path); + let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(Some(tls_settings), &cx.proxy)?; - let sink = AzureMonitorLogsSink::new(self)?; + let sink = AzureMonitorLogsSink::new(self, time_generated_key)?; let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); let healthcheck = healthcheck(sink.clone(), client.clone()).boxed(); @@ -173,7 +208,10 @@ impl SinkConfig for AzureMonitorLogsConfig { } fn input(&self) -> Input { - Input::log() + let requirements = + schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirements) } fn acknowledgements(&self) -> &AcknowledgementsConfig { @@ -185,6 +223,7 @@ impl SinkConfig for AzureMonitorLogsConfig { struct AzureMonitorLogsSink { uri: Uri, customer_id: String, + time_generated_key: OwnedValuePath, transformer: Transformer, shared_key: pkey::PKey, default_headers: HeaderMap, @@ -192,6 +231,7 @@ struct AzureMonitorLogsSink { struct AzureMonitorLogsEventEncoder { transformer: Transformer, + time_generated_key: OwnedValuePath, } impl HttpEventEncoder for AzureMonitorLogsEventEncoder { @@ -201,21 +241,22 @@ impl HttpEventEncoder for AzureMonitorLogsEventEncoder { // it seems like Azure Monitor doesn't support full 9-digit nanosecond precision // adjust the timestamp format accordingly, keeping only milliseconds let mut log = event.into_log(); - let timestamp_key = log_schema().timestamp_key(); - let timestamp = if let Some(Value::Timestamp(ts)) = log.remove(timestamp_key) { + // `.remove_timestamp()` will return the `timestamp` value regardless of location in Event or + // Metadata, the following `insert()` ensures it's encoded in the request. + let timestamp = if let Some(Value::Timestamp(ts)) = log.remove_timestamp() { ts } else { chrono::Utc::now() }; - let mut entry = serde_json::json!(&log); - let object_entry = entry.as_object_mut().unwrap(); - object_entry.insert( - timestamp_key.to_string(), + log.insert( + (PathPrefix::Event, &self.time_generated_key), JsonValue::String(timestamp.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)), ); + let entry = serde_json::json!(&log); + Some(entry) } } @@ -229,6 +270,7 @@ impl HttpSink for AzureMonitorLogsSink { fn build_encoder(&self) -> Self::Encoder { AzureMonitorLogsEventEncoder { transformer: self.transformer.clone(), + time_generated_key: self.time_generated_key.clone(), } } @@ -238,7 +280,10 @@ impl HttpSink for AzureMonitorLogsSink { } impl AzureMonitorLogsSink { - fn new(config: &AzureMonitorLogsConfig) -> crate::Result { + fn new( + config: &AzureMonitorLogsConfig, + time_generated_key: Option, + ) -> crate::Result { let url = format!( "https://{}.{}{}?api-version={}", config.customer_id, config.host, RESOURCE, API_VERSION @@ -249,6 +294,11 @@ impl AzureMonitorLogsSink { return Err("shared_key can't be an empty string".into()); } + let time_generated_key = time_generated_key.unwrap_or_else(|| { + parse_value_path(log_schema().timestamp_key()) + .expect("global log_schema.timestamp_key to be valid path") + }); + let shared_key_bytes = base64::decode_block(config.shared_key.inner())?; let shared_key = pkey::PKey::hmac(&shared_key_bytes)?; let mut default_headers = HeaderMap::with_capacity(3); @@ -263,10 +313,9 @@ impl AzureMonitorLogsSink { let log_type = HeaderValue::from_str(&config.log_type)?; default_headers.insert(LOG_TYPE_HEADER.clone(), log_type); - let timestamp_key = log_schema().timestamp_key(); default_headers.insert( TIME_GENERATED_FIELD_HEADER.clone(), - HeaderValue::from_str(timestamp_key)?, + HeaderValue::try_from(time_generated_key.to_string())?, ); if let Some(azure_resource_id) = &config.azure_resource_id { @@ -288,6 +337,7 @@ impl AzureMonitorLogsSink { customer_id: config.customer_id.clone(), shared_key, default_headers, + time_generated_key, }) } @@ -396,6 +446,8 @@ mod tests { let sink = AzureMonitorLogsSink { uri: mock_endpoint, customer_id: "weee".to_string(), + time_generated_key: parse_value_path(log_schema().timestamp_key()) + .expect("log_schema.timestamp_key to be valid path"), transformer: Default::default(), shared_key, default_headers: HeaderMap::new(), @@ -448,7 +500,7 @@ mod tests { ) .unwrap(); - let sink = AzureMonitorLogsSink::new(&config).unwrap(); + let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); let mut log = [("message", "hello world")] .iter() .copied() @@ -478,7 +530,7 @@ mod tests { ) .unwrap(); - let sink = AzureMonitorLogsSink::new(&config).unwrap(); + let sink = AzureMonitorLogsSink::new(&config, None).unwrap(); let mut encoder = sink.build_encoder(); let mut log1 = [("message", "hello")].iter().copied().collect::(); diff --git a/website/content/en/highlights/2023-02-28-0-28-0-upgrade-guide.md b/website/content/en/highlights/2023-02-28-0-28-0-upgrade-guide.md index a5be7140dcb13..2a7399f6d992c 100644 --- a/website/content/en/highlights/2023-02-28-0-28-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-02-28-0-28-0-upgrade-guide.md @@ -26,6 +26,7 @@ and **deprecations**: and **potentially impactful changes**: 1. [AWS components now use OpenSSL as the TLS implementation](#aws-openssl) +2. [The `azure_monitor_logs` sink now hardcodes the `timestamp` field](#azure-timestamp) We cover them below to help you upgrade quickly: @@ -56,7 +57,7 @@ syntax to use to migrate. #### Removal of the `apex` sink {#apex-removal} The `apex` sink has been removed from Vector. This follows the EOL of the Apex service that took -effect in December of 2022. +effect in December 2022. #### Removal of the `disk_v1` buffer type {#disk_v1-removal} @@ -108,7 +109,7 @@ will be removed in the `0.29.0` release. #### AWS components now use OpenSSL as the TLS implementation {#aws-openssl} In this release, Vector's AWS integrations had their TLS implementation swapped from -[`rustls`][rusttls] to OpenSSL. We don't expect any user visible impact, +[`rustls`][rustls] to OpenSSL. We don't expect any user visible impact, but [please let us know][bug_report] if this change causes you any issues. We made this change primarily since most of Vector's dependencies use OpenSSL and so it reduces our @@ -117,3 +118,11 @@ as more of the Rust ecosystem supports `rustls`. [rustls]: https://github.com/rustls/rustls [bug_report]: https://github.com/vectordotdev/vector/issues/new?assignees=&labels=type%3A+bug&template=bug.yml + +#### The `azure_monitor_logs` sink now encodes the timestamp at a configurable path {#azure-timestamp} + +Previously this sink would include a properly formatted timestamp at a path determined by the `log_schema` +as well as setting a `time-generated-field` header with a value of that path. Azure Monitor will use the contained value +to populate the [`TimeGenerated` standard column](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated). +In this release we've added a `time_generated_key` option that determines where this field is encoded, this field defaults +to the value set in `log_schema` if unset. diff --git a/website/cue/reference/components/sinks/base/azure_monitor_logs.cue b/website/cue/reference/components/sinks/base/azure_monitor_logs.cue index 8ba750dbad7b2..c570d0ed25638 100644 --- a/website/cue/reference/components/sinks/base/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/base/azure_monitor_logs.cue @@ -276,6 +276,20 @@ base: components: sinks: azure_monitor_logs: configuration: { required: true type: string: examples: ["SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA==", "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"] } + time_generated_key: { + description: """ + Use this option to customize the log field used as [`TimeGenerated`][1] in Azure. + + The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default. + This field should be used in rare cases where `TimeGenerated` should point to a specific log + field. For example, use this field to set the log field `source_timestamp` as holding the + value that should be used as `TimeGenerated` on the Azure side. + + [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated + """ + required: false + type: string: examples: ["time_generated"] + } tls: { description: "TLS configuration." required: false