From d8ca608861ee14e572a3a41cc49728678b9b7c91 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 22 Sep 2022 12:42:26 +0100 Subject: [PATCH] fix(docker-logs source): old messages that are dropped should be a component_error (#14449) * Old messages should be a component_error. Signed-off-by: Stephen Wakely * Formatting Signed-off-by: Stephen Wakely * Feedback from Kyle. Signed-off-by: Stephen Wakely * Rename event to DockerLogsReceivedOutOfOrderError Signed-off-by: Stephen Wakely Signed-off-by: Stephen Wakely --- src/internal_events/docker_logs.rs | 43 +++++++++++++++++++++++------- src/sources/docker_logs.rs | 20 ++++++++------ 2 files changed, 45 insertions(+), 18 deletions(-) diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs index 4c5157e368532..1c7043e8e8188 100644 --- a/src/internal_events/docker_logs.rs +++ b/src/internal_events/docker_logs.rs @@ -1,9 +1,8 @@ use bollard::errors::Error; use chrono::ParseError; use metrics::counter; -use vector_core::internal_event::InternalEvent; - use vector_common::internal_event::{error_stage, error_type}; +use vector_core::internal_event::InternalEvent; #[derive(Debug)] pub struct DockerLogsEventsReceived<'a> { @@ -12,7 +11,7 @@ pub struct DockerLogsEventsReceived<'a> { pub container_name: &'a str, } -impl<'a> InternalEvent for DockerLogsEventsReceived<'a> { +impl InternalEvent for DockerLogsEventsReceived<'_> { fn emit(self) { trace!( message = "Events received.", @@ -42,7 +41,7 @@ pub struct DockerLogsContainerEventReceived<'a> { pub action: &'a str, } -impl<'a> InternalEvent for DockerLogsContainerEventReceived<'a> { +impl InternalEvent for DockerLogsContainerEventReceived<'_> { fn emit(self) { debug!( message = "Received one container event.", @@ -58,7 +57,7 @@ pub struct DockerLogsContainerWatch<'a> { pub container_id: &'a str, } -impl<'a> InternalEvent for DockerLogsContainerWatch<'a> { +impl InternalEvent for DockerLogsContainerWatch<'_> { fn emit(self) { info!( message = "Started watching for container logs.", @@ -73,7 +72,7 @@ pub struct DockerLogsContainerUnwatch<'a> { pub container_id: &'a str, } -impl<'a> InternalEvent for DockerLogsContainerUnwatch<'a> { +impl InternalEvent for DockerLogsContainerUnwatch<'_> { fn emit(self) { info!( message = "Stopped watching for container logs.", @@ -89,7 +88,7 @@ pub struct DockerLogsCommunicationError<'a> { pub container_id: Option<&'a str>, } -impl<'a> InternalEvent for DockerLogsCommunicationError<'a> { +impl InternalEvent for DockerLogsCommunicationError<'_> { fn emit(self) { error!( message = "Error in communication with Docker daemon.", @@ -115,7 +114,7 @@ pub struct DockerLogsContainerMetadataFetchError<'a> { pub container_id: &'a str, } -impl<'a> InternalEvent for DockerLogsContainerMetadataFetchError<'a> { +impl InternalEvent for DockerLogsContainerMetadataFetchError<'_> { fn emit(self) { error!( message = "Failed to fetch container metadata.", @@ -142,7 +141,7 @@ pub struct DockerLogsTimestampParseError<'a> { pub container_id: &'a str, } -impl<'a> InternalEvent for DockerLogsTimestampParseError<'a> { +impl InternalEvent for DockerLogsTimestampParseError<'_> { fn emit(self) { error!( message = "Failed to parse timestamp as RFC3339 timestamp.", @@ -169,7 +168,7 @@ pub struct DockerLogsLoggingDriverUnsupportedError<'a> { pub error: Error, } -impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> { +impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> { fn emit(self) { error!( message = "Docker engine is not using either the `jsonfile` or `journald` logging driver. Please enable one of these logging drivers to get logs from the Docker daemon.", @@ -188,3 +187,27 @@ impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> { counter!("logging_driver_errors_total", 1); } } + +#[derive(Debug)] +pub struct DockerLogsReceivedOutOfOrderError<'a> { + pub timestamp_str: &'a str, + pub container_id: &'a str, +} + +impl InternalEvent for DockerLogsReceivedOutOfOrderError<'_> { + fn emit(self) { + error!( + message = "Received out of order log message.", + error_type = error_type::CONDITION_FAILED, + stage = error_stage::RECEIVING, + container_id = ?self.container_id, + timestamp = ?self.timestamp_str, + ); + counter!( + "component_errors_total", 1, + "error_type" => error_type::CONDITION_FAILED, + "stage" => error_stage::RECEIVING, + "container_id" => self.container_id.to_owned(), + ); + } +} diff --git a/src/sources/docker_logs.rs b/src/sources/docker_logs.rs index 5c01d88173f10..b8d602cff430c 100644 --- a/src/sources/docker_logs.rs +++ b/src/sources/docker_logs.rs @@ -33,7 +33,8 @@ use crate::{ DockerLogsCommunicationError, DockerLogsContainerEventReceived, DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch, DockerLogsContainerWatch, DockerLogsEventsReceived, - DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError, + DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOutOfOrderError, + DockerLogsTimestampParseError, StreamClosedError, }, line_agg::{self, LineAgg}, shutdown::ShutdownSignal, @@ -854,19 +855,22 @@ impl ContainerLogInfo { Ok(timestamp) => { // Timestamp check match self.last_log.as_ref() { - // Received log has not already been processed + // Received log has not already been processed. Some(&(ref last, gen)) if *last < timestamp || (*last == timestamp && gen == self.generation) => { // noop } - // Received log is not from before of creation - None if self.created <= timestamp.with_timezone(&Utc) => (), + // Received log is after the time the container was created. + None if self.created <= timestamp.with_timezone(&Utc) => { + // noop + } + // Received log is older than the previously received entry. _ => { - trace!( - message = "Received older log.", - timestamp = %timestamp_str - ); + emit!(DockerLogsReceivedOutOfOrderError { + container_id: self.id.as_str(), + timestamp_str, + }); return None; } }