Skip to content

Commit

Permalink
fix(docker-logs source): old messages that are dropped should be a co…
Browse files Browse the repository at this point in the history
…mponent_error (#14449)

* Old messages should be a component_error.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Formatting

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Feedback from Kyle.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Rename event to DockerLogsReceivedOutOfOrderError

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Sep 22, 2022
1 parent 5149fef commit d8ca608
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
43 changes: 33 additions & 10 deletions src/internal_events/docker_logs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use bollard::errors::Error;
use chrono::ParseError;
use metrics::counter;
use vector_core::internal_event::InternalEvent;

use vector_common::internal_event::{error_stage, error_type};
use vector_core::internal_event::InternalEvent;

#[derive(Debug)]
pub struct DockerLogsEventsReceived<'a> {
Expand All @@ -12,7 +11,7 @@ pub struct DockerLogsEventsReceived<'a> {
pub container_name: &'a str,
}

impl<'a> InternalEvent for DockerLogsEventsReceived<'a> {
impl InternalEvent for DockerLogsEventsReceived<'_> {
fn emit(self) {
trace!(
message = "Events received.",
Expand Down Expand Up @@ -42,7 +41,7 @@ pub struct DockerLogsContainerEventReceived<'a> {
pub action: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerEventReceived<'a> {
impl InternalEvent for DockerLogsContainerEventReceived<'_> {
fn emit(self) {
debug!(
message = "Received one container event.",
Expand All @@ -58,7 +57,7 @@ pub struct DockerLogsContainerWatch<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerWatch<'a> {
impl InternalEvent for DockerLogsContainerWatch<'_> {
fn emit(self) {
info!(
message = "Started watching for container logs.",
Expand All @@ -73,7 +72,7 @@ pub struct DockerLogsContainerUnwatch<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerUnwatch<'a> {
impl InternalEvent for DockerLogsContainerUnwatch<'_> {
fn emit(self) {
info!(
message = "Stopped watching for container logs.",
Expand All @@ -89,7 +88,7 @@ pub struct DockerLogsCommunicationError<'a> {
pub container_id: Option<&'a str>,
}

impl<'a> InternalEvent for DockerLogsCommunicationError<'a> {
impl InternalEvent for DockerLogsCommunicationError<'_> {
fn emit(self) {
error!(
message = "Error in communication with Docker daemon.",
Expand All @@ -115,7 +114,7 @@ pub struct DockerLogsContainerMetadataFetchError<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerMetadataFetchError<'a> {
impl InternalEvent for DockerLogsContainerMetadataFetchError<'_> {
fn emit(self) {
error!(
message = "Failed to fetch container metadata.",
Expand All @@ -142,7 +141,7 @@ pub struct DockerLogsTimestampParseError<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsTimestampParseError<'a> {
impl InternalEvent for DockerLogsTimestampParseError<'_> {
fn emit(self) {
error!(
message = "Failed to parse timestamp as RFC3339 timestamp.",
Expand All @@ -169,7 +168,7 @@ pub struct DockerLogsLoggingDriverUnsupportedError<'a> {
pub error: Error,
}

impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> {
fn emit(self) {
error!(
message = "Docker engine is not using either the `jsonfile` or `journald` logging driver. Please enable one of these logging drivers to get logs from the Docker daemon.",
Expand All @@ -188,3 +187,27 @@ impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
counter!("logging_driver_errors_total", 1);
}
}

#[derive(Debug)]
pub struct DockerLogsReceivedOutOfOrderError<'a> {
pub timestamp_str: &'a str,
pub container_id: &'a str,
}

impl InternalEvent for DockerLogsReceivedOutOfOrderError<'_> {
fn emit(self) {
error!(
message = "Received out of order log message.",
error_type = error_type::CONDITION_FAILED,
stage = error_stage::RECEIVING,
container_id = ?self.container_id,
timestamp = ?self.timestamp_str,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::CONDITION_FAILED,
"stage" => error_stage::RECEIVING,
"container_id" => self.container_id.to_owned(),
);
}
}
20 changes: 12 additions & 8 deletions src/sources/docker_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use crate::{
DockerLogsCommunicationError, DockerLogsContainerEventReceived,
DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
DockerLogsContainerWatch, DockerLogsEventsReceived,
DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOutOfOrderError,
DockerLogsTimestampParseError, StreamClosedError,
},
line_agg::{self, LineAgg},
shutdown::ShutdownSignal,
Expand Down Expand Up @@ -854,19 +855,22 @@ impl ContainerLogInfo {
Ok(timestamp) => {
// Timestamp check
match self.last_log.as_ref() {
// Received log has not already been processed
// Received log has not already been processed.
Some(&(ref last, gen))
if *last < timestamp || (*last == timestamp && gen == self.generation) =>
{
// noop
}
// Received log is not from before of creation
None if self.created <= timestamp.with_timezone(&Utc) => (),
// Received log is after the time the container was created.
None if self.created <= timestamp.with_timezone(&Utc) => {
// noop
}
// Received log is older than the previously received entry.
_ => {
trace!(
message = "Received older log.",
timestamp = %timestamp_str
);
emit!(DockerLogsReceivedOutOfOrderError {
container_id: self.id.as_str(),
timestamp_str,
});
return None;
}
}
Expand Down

0 comments on commit d8ca608

Please sign in to comment.