Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(docker-logs source): old messages that are dropped should be a component_error #14449

Merged
merged 4 commits into from
Sep 22, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 33 additions & 10 deletions src/internal_events/docker_logs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use bollard::errors::Error;
use chrono::ParseError;
use metrics::counter;
use vector_core::internal_event::InternalEvent;

use vector_common::internal_event::{error_stage, error_type};
use vector_core::internal_event::InternalEvent;

#[derive(Debug)]
pub struct DockerLogsEventsReceived<'a> {
Expand All @@ -12,7 +11,7 @@ pub struct DockerLogsEventsReceived<'a> {
pub container_name: &'a str,
}

impl<'a> InternalEvent for DockerLogsEventsReceived<'a> {
impl InternalEvent for DockerLogsEventsReceived<'_> {
fn emit(self) {
trace!(
message = "Events received.",
Expand Down Expand Up @@ -42,7 +41,7 @@ pub struct DockerLogsContainerEventReceived<'a> {
pub action: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerEventReceived<'a> {
impl InternalEvent for DockerLogsContainerEventReceived<'_> {
fn emit(self) {
debug!(
message = "Received one container event.",
Expand All @@ -58,7 +57,7 @@ pub struct DockerLogsContainerWatch<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerWatch<'a> {
impl InternalEvent for DockerLogsContainerWatch<'_> {
fn emit(self) {
info!(
message = "Started watching for container logs.",
Expand All @@ -73,7 +72,7 @@ pub struct DockerLogsContainerUnwatch<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerUnwatch<'a> {
impl InternalEvent for DockerLogsContainerUnwatch<'_> {
fn emit(self) {
info!(
message = "Stopped watching for container logs.",
Expand All @@ -89,7 +88,7 @@ pub struct DockerLogsCommunicationError<'a> {
pub container_id: Option<&'a str>,
}

impl<'a> InternalEvent for DockerLogsCommunicationError<'a> {
impl InternalEvent for DockerLogsCommunicationError<'_> {
fn emit(self) {
error!(
message = "Error in communication with Docker daemon.",
Expand All @@ -115,7 +114,7 @@ pub struct DockerLogsContainerMetadataFetchError<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsContainerMetadataFetchError<'a> {
impl InternalEvent for DockerLogsContainerMetadataFetchError<'_> {
fn emit(self) {
error!(
message = "Failed to fetch container metadata.",
Expand All @@ -142,7 +141,7 @@ pub struct DockerLogsTimestampParseError<'a> {
pub container_id: &'a str,
}

impl<'a> InternalEvent for DockerLogsTimestampParseError<'a> {
impl InternalEvent for DockerLogsTimestampParseError<'_> {
fn emit(self) {
error!(
message = "Failed to parse timestamp as RFC3339 timestamp.",
Expand All @@ -169,7 +168,7 @@ pub struct DockerLogsLoggingDriverUnsupportedError<'a> {
pub error: Error,
}

impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
impl InternalEvent for DockerLogsLoggingDriverUnsupportedError<'_> {
fn emit(self) {
error!(
message = "Docker engine is not using either the `jsonfile` or `journald` logging driver. Please enable one of these logging drivers to get logs from the Docker daemon.",
Expand All @@ -188,3 +187,27 @@ impl<'a> InternalEvent for DockerLogsLoggingDriverUnsupportedError<'a> {
counter!("logging_driver_errors_total", 1);
}
}

#[derive(Debug)]
pub struct DockerLogsReceivedOldLogError<'a> {
pub timestamp_str: &'a str,
pub container_id: &'a str,
}

impl InternalEvent for DockerLogsReceivedOldLogError<'_> {
fn emit(self) {
error!(
message = "Received out of order log message.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small thing but, 💭 Should we update the name of the event to more closely match this? E.g. DockerLogsReceivedOutOfOrderError or just DockerLogsOutOfOrderError ?

error_type = error_type::CONDITION_FAILED,
stage = error_stage::RECEIVING,
container_id = ?self.container_id,
timestamp = ?self.timestamp_str,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::CONDITION_FAILED,
"stage" => error_stage::RECEIVING,
"container_id" => self.container_id.to_owned(),
);
}
}
20 changes: 12 additions & 8 deletions src/sources/docker_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use crate::{
DockerLogsCommunicationError, DockerLogsContainerEventReceived,
DockerLogsContainerMetadataFetchError, DockerLogsContainerUnwatch,
DockerLogsContainerWatch, DockerLogsEventsReceived,
DockerLogsLoggingDriverUnsupportedError, DockerLogsTimestampParseError, StreamClosedError,
DockerLogsLoggingDriverUnsupportedError, DockerLogsReceivedOldLogError,
DockerLogsTimestampParseError, StreamClosedError,
},
line_agg::{self, LineAgg},
shutdown::ShutdownSignal,
Expand Down Expand Up @@ -854,19 +855,22 @@ impl ContainerLogInfo {
Ok(timestamp) => {
// Timestamp check
match self.last_log.as_ref() {
// Received log has not already been processed
// Received log has not already been processed.
Some(&(ref last, gen))
if *last < timestamp || (*last == timestamp && gen == self.generation) =>
{
// noop
}
// Received log is not from before of creation
None if self.created <= timestamp.with_timezone(&Utc) => (),
// Received log is after the time the container was created.
None if self.created <= timestamp.with_timezone(&Utc) => {
// noop
}
// Received log is older than the previously received entry.
_ => {
trace!(
message = "Received older log.",
timestamp = %timestamp_str
);
emit!(DockerLogsReceivedOldLogError {
container_id: self.id.as_str(),
timestamp_str,
});
return None;
}
}
Expand Down