From 4271c23a337e766763954bdad11e23cb35c1dc9a Mon Sep 17 00:00:00 2001
From: Stephen Wakely <fungus.humungus@gmail.com>
Date: Fri, 16 Sep 2022 12:23:40 +0100
Subject: [PATCH 1/4] Old messages should be a component_error.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
---
 src/internal_events/docker_logs.rs | 41 ++++++++++++++++++++++++------
 src/sources/docker_logs.rs         | 11 ++++----
 2 files changed, 39 insertions(+), 13 deletions(-)

diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs
index ee512604533df..52d39e0384139 100644
--- a/src/internal_events/docker_logs.rs
+++ b/src/internal_events/docker_logs.rs
@@ -12,7 +12,7 @@ pub struct DockerLogsEventsReceived<'a> {
     pub container_name: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsEventsReceived<'a> {
+impl InternalEvent for DockerLogsEventsReceived<'_> {
     fn emit(self) {
         trace!(
             message = "Events received.",
@@ -42,7 +42,7 @@ pub struct DockerLogsContainerEventReceived<'a> {
     pub action: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsContainerEventReceived<'a> {
+impl InternalEvent for DockerLogsContainerEventReceived<'_> {
     fn emit(self) {
         debug!(
             message = "Received one container event.",
@@ -58,7 +58,7 @@ pub struct DockerLogsContainerWatch<'a> {
     pub container_id: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsContainerWatch<'a> {
+impl InternalEvent for DockerLogsContainerWatch<'_> {
     fn emit(self) {
         info!(
             message = "Started watching for container logs.",
@@ -73,7 +73,7 @@ pub struct DockerLogsContainerUnwatch<'a> {
     pub container_id: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsContainerUnwatch<'a> {
+impl InternalEvent for DockerLogsContainerUnwatch<'_> {
     fn emit(self) {
         info!(
             message = "Stopped watching for container logs.",
@@ -89,7 +89,7 @@ pub struct DockerLogsCommunicationError<'a> {
     pub container_id: Option<&'a str>,
 }
 
-impl<'a> InternalEvent for DockerLogsCommunicationError<'a> {
+impl InternalEvent for DockerLogsCommunicationError<'_> {
     fn emit(self) {
         error!(
             message = "Error in communication with Docker daemon.",
@@ -115,7 +115,7 @@ pub struct DockerLogsContainerMetadataFetchError<'a> {
     pub container_id: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsContainerMetadataFetchError<'a> {
+impl InternalEvent for DockerLogsContainerMetadataFetchError<'_> {
     fn emit(self) {
         error!(
             message = "Failed to fetch container metadata.",
@@ -142,7 +142,7 @@ pub struct DockerLogsTimestampParseError<'a> {
     pub container_id: &'a str,
 }
 
-impl<'a> InternalEvent for DockerLogsTimestampParseError<'a> {
+impl InternalEvent for DockerLogsTimestampParseError<'_> {
     fn emit(self) {
         error!(
             message = "Failed to parse timestamp as RFC3339 timestamp.",
@@ -169,7 +169,7 @@ pub struct DockerLogsLoggingDriverUnsupportedError<'a> {
     pub error: Error,
 }
 
-impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
+impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> {
     fn emit(self) {
         error!(
             message = "Docker engine is not using either the `jsonfile` or `journald` logging driver. Please enable one of these logging drivers to get logs from the Docker daemon.",
@@ -188,3 +188,28 @@ impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
         counter!("logging_driver_errors_total", 1);
     }
 }
+
+#[derive(Debug)]
+pub struct DockerLogsReceivedOldLogError<'a> {
+    pub timestamp_str: &'a str,
+    pub container_id: &'a str,
+}
+
+impl InternalEvent for DockerLogsReceivedOldLogError<'_> {
+    fn emit(self) {
+        error!(
+            message = "Received older log.",
+            error_type = error_type::CONDITION_FAILED,
+            stage = error_stage::RECEIVING,
+            container_id = ?self.container_id,
+            timestamp = ?self.timestamp_str,
+        );
+        counter!(
+            "component_errors_total", 1,
+            "error_type" => error_type::CONDITION_FAILED,
+            "stage" => error_stage::RECEIVING,
+            "container_id" => self.container_id.to_owned(),
+        );
+
+    }
+}
diff --git a/src/sources/docker_logs.rs b/src/sources/docker_logs.rs
index 5c01d88173f10..ee79687febf55 100644
--- a/src/sources/docker_logs.rs
+++ b/src/sources/docker_logs.rs
@@ -33,7 +33,8 @@ use crate::{
         DockerLogsCommunicationError, DockerLogsContainerEventReceived,
         DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
         DockerLogsContainerWatch, DockerLogsEventsReceived,
-        DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
+        DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOldLogError,
+        DockerLogsTimestampParseError, StreamClosedError,
     },
     line_agg::{self, LineAgg},
     shutdown::ShutdownSignal,
@@ -863,10 +864,10 @@ impl ContainerLogInfo {
                     // Received log is not from before of creation
                     None if self.created <= timestamp.with_timezone(&Utc) => (),
                     _ => {
-                        trace!(
-                            message = "Received older log.",
-                            timestamp = %timestamp_str
-                        );
+                        emit!(DockerLogsReceivedOldLogError {
+                            container_id: self.id.as_str(),
+                            timestamp_str,
+                        });
                         return None;
                     }
                 }

From 75192e56fe13de38704b6cd88e91f7769c486507 Mon Sep 17 00:00:00 2001
From: Stephen Wakely <fungus.humungus@gmail.com>
Date: Fri, 16 Sep 2022 13:21:24 +0100
Subject: [PATCH 2/4] Formatting

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
---
 src/internal_events/docker_logs.rs | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs
index 52d39e0384139..7efad1e97ee38 100644
--- a/src/internal_events/docker_logs.rs
+++ b/src/internal_events/docker_logs.rs
@@ -1,9 +1,8 @@
 use bollard::errors::Error;
 use chrono::ParseError;
 use metrics::counter;
-use vector_core::internal_event::InternalEvent;
-
 use vector_common::internal_event::{error_stage, error_type};
+use vector_core::internal_event::InternalEvent;
 
 #[derive(Debug)]
 pub struct DockerLogsEventsReceived<'a> {
@@ -210,6 +209,5 @@ impl InternalEvent for DockerLogsReceivedOldLogError<'_> {
             "stage" => error_stage::RECEIVING,
             "container_id" => self.container_id.to_owned(),
         );
-
     }
 }

From 4f5960f5fcc559e4ddc47819e42acbdda7a3af78 Mon Sep 17 00:00:00 2001
From: Stephen Wakely <fungus.humungus@gmail.com>
Date: Wed, 21 Sep 2022 11:35:08 +0100
Subject: [PATCH 3/4] Feedback from Kyle.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
---
 src/internal_events/docker_logs.rs | 2 +-
 src/sources/docker_logs.rs         | 9 ++++++---
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs
index 7efad1e97ee38..81e9a644522a3 100644
--- a/src/internal_events/docker_logs.rs
+++ b/src/internal_events/docker_logs.rs
@@ -197,7 +197,7 @@ pub struct DockerLogsReceivedOldLogError<'a> {
 impl InternalEvent for DockerLogsReceivedOldLogError<'_> {
     fn emit(self) {
         error!(
-            message = "Received older log.",
+            message = "Received out of order log message.",
             error_type = error_type::CONDITION_FAILED,
             stage = error_stage::RECEIVING,
             container_id = ?self.container_id,
diff --git a/src/sources/docker_logs.rs b/src/sources/docker_logs.rs
index ee79687febf55..4042e6e6fb951 100644
--- a/src/sources/docker_logs.rs
+++ b/src/sources/docker_logs.rs
@@ -855,14 +855,17 @@ impl ContainerLogInfo {
             Ok(timestamp) => {
                 // Timestamp check
                 match self.last_log.as_ref() {
-                    // Received log has not already been processed
+                    // Received log has not already been processed.
                     Some(&(ref last, gen))
                         if *last < timestamp || (*last == timestamp && gen == self.generation) =>
                     {
                         // noop
                     }
-                    // Received log is not from before of creation
-                    None if self.created <= timestamp.with_timezone(&Utc) => (),
+                    // Received log is after the time the container was created.
+                    None if self.created <= timestamp.with_timezone(&Utc) => {
+                        // noop
+                    }
+                    // Received log is older than the previously received entry.
                     _ => {
                         emit!(DockerLogsReceivedOldLogError {
                             container_id: self.id.as_str(),

From 802d5a1a57a80d212b4ef45a208cde2bacfbbab5 Mon Sep 17 00:00:00 2001
From: Stephen Wakely <fungus.humungus@gmail.com>
Date: Thu, 22 Sep 2022 11:13:16 +0100
Subject: [PATCH 4/4] Rename event to DockerLogsReceivedOutOfOrderError

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
---
 src/internal_events/docker_logs.rs | 4 ++--
 src/sources/docker_logs.rs         | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/internal_events/docker_logs.rs b/src/internal_events/docker_logs.rs
index 81e9a644522a3..4ab2bd10aa0d1 100644
--- a/src/internal_events/docker_logs.rs
+++ b/src/internal_events/docker_logs.rs
@@ -189,12 +189,12 @@ impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> {
 }
 
 #[derive(Debug)]
-pub struct DockerLogsReceivedOldLogError<'a> {
+pub struct DockerLogsReceivedOutOfOrderError<'a> {
     pub timestamp_str: &'a str,
     pub container_id: &'a str,
 }
 
-impl InternalEvent for DockerLogsReceivedOldLogError<'_> {
+impl InternalEvent for DockerLogsReceivedOutOfOrderError<'_> {
     fn emit(self) {
         error!(
             message = "Received out of order log message.",
diff --git a/src/sources/docker_logs.rs b/src/sources/docker_logs.rs
index 4042e6e6fb951..b8d602cff430c 100644
--- a/src/sources/docker_logs.rs
+++ b/src/sources/docker_logs.rs
@@ -33,7 +33,7 @@ use crate::{
         DockerLogsCommunicationError, DockerLogsContainerEventReceived,
         DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
         DockerLogsContainerWatch, DockerLogsEventsReceived,
-        DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOldLogError,
+        DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOutOfOrderError,
         DockerLogsTimestampParseError, StreamClosedError,
     },
     line_agg::{self, LineAgg},
@@ -867,7 +867,7 @@ impl ContainerLogInfo {
                     }
                     // Received log is older than the previously received entry.
                     _ => {
-                        emit!(DockerLogsReceivedOldLogError {
+                        emit!(DockerLogsReceivedOutOfOrderError {
                             container_id: self.id.as_str(),
                             timestamp_str,
                         });