Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new source): ecs/fargate metrics source #4698

Merged
merged 13 commits into from
Nov 17, 2020
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ api-client = [
# Sources
sources = [
"sources-apache_metrics",
"sources-aws_ecs_metrics",
"sources-aws_kinesis_firehose",
"sources-aws_s3",
"sources-docker",
Expand All @@ -321,6 +322,7 @@ sources = [
"sources-kubernetes-logs",
]
sources-apache_metrics = []
sources-aws_ecs_metrics = []
sources-aws_kinesis_firehose = ["base64", "tls", "warp"]
sources-aws_s3 = ["rusoto_core", "rusoto_credential", "rusoto_signature", "rusoto_sts", "rusoto_s3", "rusoto_sqs"]
sources-docker = ["bollard"]
Expand Down Expand Up @@ -501,13 +503,15 @@ aws-integration-tests = [
"aws-cloudwatch-logs-integration-tests",
"aws-cloudwatch-metrics-integration-tests",
"aws-ec2-metadata-integration-tests",
"aws-ecs-metrics-integration-tests",
"aws-kinesis-firehose-integration-tests",
"aws-kinesis-streams-integration-tests",
"aws-s3-integration-tests",
]
aws-cloudwatch-logs-integration-tests = ["sinks-aws_cloudwatch_logs"]
aws-cloudwatch-metrics-integration-tests = ["sinks-aws_cloudwatch_metrics"]
aws-ec2-metadata-integration-tests = ["transforms-aws_ec2_metadata"]
aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"]
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "sinks-elasticsearch", "rusoto_es"]
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
aws-s3-integration-tests = ["sources-aws_s3", "sinks-aws_s3"]
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,16 @@ stop-test-integration: stop-integration-splunk
.PHONY: start-integration-aws
start-integration-aws:
ifeq ($(CONTAINER_TOOL),podman)
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create --replace --name vector-test-integration-aws -p 4566:4566 -p 4571:4571 -p 6000:6000
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create --replace --name vector-test-integration-aws -p 4566:4566 -p 4571:4571 -p 6000:6000 -p 9088:80
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_ec2_metadata \
timberiodev/mock-ec2-metadata:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_localstack_aws \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs \
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -v /var/run:/var/run --name vector_local_ecs \
-e RUST_LOG=trace amazon/amazon-ecs-local-container-endpoints:latest
else
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create vector-test-integration-aws
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -p 8111:8111 --name vector_ec2_metadata \
Expand All @@ -330,11 +332,13 @@ else
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -p 6000:6000 --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -v /var/run:/var/run -p 9088:80 --name vector_local_ecs \
-e RUST_LOG=trace amazon/amazon-ecs-local-container-endpoints:latest
endif

.PHONY: stop-integration-aws
stop-integration-aws:
$(CONTAINER_TOOL) rm --force vector_ec2_metadata vector_mockwatchlogs vector_localstack_aws 2>/dev/null; true
$(CONTAINER_TOOL) rm --force vector_ec2_metadata vector_mockwatchlogs vector_localstack_aws vector_local_ecs 2>/dev/null; true
ifeq ($(CONTAINER_TOOL),podman)
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) stop --name=vector-test-integration-aws 2>/dev/null; true
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) rm --force --name=vector-test-integration-aws 2>/dev/null; true
Expand Down
231 changes: 231 additions & 0 deletions docs/reference/components/sources/aws_ecs_metrics.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package metadata

components: sources: aws_ecs_metrics: {
title: "AWS ECS Metrics"
description: "The ECS metrics source collects the docker container stats for tasks running in Amazon ECS or Fargate."

classes: {
commonly_used: false
delivery: "at_least_once"
deployment_roles: ["sidecar"]
development: "beta"
egress_method: "batch"
}

features: {
collect: {
checkpoint: enabled: false
from: {
name: "Amazon ECS"
thing: "an \(name) container"
url: urls.aws_ecs
versions: null

interface: {
socket: {
api: {
title: "Amazon ECS task metadata endpoint"
url: urls.aws_ecs_task_metadata
}
direction: "outgoing"
protocols: ["http"]
ssl: "disabled"
}
}
}
}
multiline: enabled: false
}

support: {
platforms: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}

requirements: []
warnings: []
notices: []
}

configuration: {
endpoint: {
description: """
Base URI of the task metadata endpoint.
If empty, the URI will be automatically discovered based on the latest version detected.
The version 2 endpoint base URI is `169.254.170.2/v2/`.
The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
"""
common: false
required: false
type: string: {
default: "${ECS_CONTAINER_METADATA_URI_V4}"
}
}
version: {
description: """
The version of the metadata endpoint.
If empty, the version will be automatically discovered based on envirionment variables.
"""
common: false
required: false
type: string: {
default: "v4"
enum: {
v4: "When the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined."
v3: "When fails the v4 check, but the environment variable `ECS_CONTAINER_METADATA_URI` is defined."
v2: "When fails the v4 and v3 checks."
}
}
}
scrape_interval_secs: {
description: "The interval between scrapes, in seconds."
common: true
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
namespace: {
description: "The namespace of the metric. Disabled if empty."
common: true
required: false
type: string: {
default: "awsecs"
}
}
}

output: metrics: {
_tags: {
container_id: {
description: "The identifier of the ECS container."
required: true
examples: ["0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352"]
}
container_name: {
description: "The name of the ECS container."
required: true
examples: ["myapp"]
}
}

_gauge: {
type: "gauge"
tags: _tags
}

_counter: {
type: "counter"
tags: _tags
}

_blkio_counter: {
type: "counter"
tags: _tags & {
device: {
description: "Device identified by its major and minor numbers."
required: true
examples: ["202:26368"]
}
op: {
description: "The operation type."
required: true
examples: ["read", "write", "sync", "async", "total"]
}
}
}

blkio_recursive_io_merged_total: _blkio_counter & {description: "Total number of bios/requests merged into requests."}
blkio_recursive_io_queued_total: _blkio_counter & {description: "Total number of requests queued up at any given instant."}
blkio_recursive_io_service_bytes_total: _blkio_counter & {description: "Number of bytes transferred to/from the disk."}
blkio_recursive_io_service_time_seconds_total: _blkio_counter & {description: "Total amount of time in seconds between request dispatch and request completion for the IOs done."}
blkio_recursive_io_serviced_total: _blkio_counter & {description: "Number of IOs completed to/from the disk."}
blkio_recursive_io_time_seconds_total: _blkio_counter & {description: "Disk time allocated per device in seconds."}
blkio_recursive_io_wait_time_seconds_total: _blkio_counter & {description: "Total amount of time in seconds the IOs spent waiting in the scheduler queues for service."}
blkio_recursive_sectors_total: _blkio_counter & {description: "Number of sectors transferred to/from disk."}

cpu_online_cpus: _gauge & {description: "Number of CPU cores."}
cpu_usage_system_jiffies_total: _counter & {description: "Jiffies of CPU time used by the system."}
cpu_usage_usermode_jiffies_total: _counter & {description: "Jiffies of CPU time spent in user mode by the container."}
cpu_usage_kernelmode_jiffies_total: _counter & {description: "Jiffies of CPU time spent in kernel mode by the container."}
cpu_usage_total_jiffies_total: _counter & {description: "Jiffies of CPU time used by the container."}
cpu_throttling_periods_total: _counter & {description: "Number of periods."}
cpu_throttled_periods_total: _counter & {description: "Number of periods throttled."}
cpu_throttled_time_seconds_total: _counter & {description: "Throttling time in seconds."}

cpu_usage_percpu_jiffies_total: {
description: "Jiffies of CPU time used by the container, per CPU core."
type: "counter"
tags: _tags & {
cpu: {
description: "CPU core identifier."
required: true
examples: ["0", "1"]
}
}
}

memory_used_bytes: _gauge & {description: "Memory used by the container, in bytes."}
memory_max_used_bytes: _gauge & {description: "Maximum measured memory usage of the container, in bytes."}
memory_limit_bytes: _gauge & {description: "Memory usage limit of the container, in bytes."}
memory_active_anonymous_bytes: _gauge & {description: "Amount of memory that has been identified as active by the kernel. Anonymous memory is memory that is not linked to disk pages."}
memory_active_file_bytes: _gauge & {description: "Amount of active file cache memory. Cache memory = active_file + inactive_file + tmpfs."}
memory_cache_bytes: _gauge & {description: "The amount of memory used by the processes of this cgroup that can be associated with a block on a block device. Also accounts for memory used by tmpfs."}
memory_dirty_bytes: _gauge & {description: "The amount of memory waiting to get written to disk."}
memory_inactive_anonymous_bytes: _gauge & {description: "Amount of memory that has been identified as inactive by the kernel."}
memory_inactive_file_bytes: _gauge & {description: "Amount of inactive file cache memory."}
memory_mapped_file_bytes: _gauge & {description: "Indicates the amount of memory mapped by the processes in the cgroup. It doesn’t give you information about how much memory is used; it rather tells you how it is used."}
memory_page_faults_total: _counter & {description: "Number of times that a process of the cgroup triggered a page fault."}
memory_major_faults_total: _counter & {description: "Number of times that a process of the cgroup triggered a major page fault."}
memory_page_charged_total: _counter & {description: "Number of charging events to the memory cgroup. Charging events happen each time a page is accounted as either mapped anon page(RSS) or cache page to the cgroup."}
memory_page_uncharged_total: _counter & {description: "Number of uncharging events to the memory cgroup. Uncharging events happen each time a page is unaccounted from the cgroup."}
memory_rss_bytes: _gauge & {description: "The amount of memory that doesn’t correspond to anything on disk: stacks, heaps, and anonymous memory maps."}
memory_rss_hugepages_bytes: _gauge & {description: "Amount of memory due to anonymous transparent hugepages."}
memory_unevictable_bytes: _gauge & {description: "The amount of memory that cannot be reclaimed."}
memory_writeback_bytes: _gauge & {description: "The amount of memory from file/anon cache that are queued for syncing to the disk."}
memory_total_active_anonymous_bytes: _gauge & {description: "Total amount of memory that has been identified as active by the kernel."}
memory_total_active_file_bytes: _gauge & {description: "Total amount of active file cache memory."}
memory_total_cache_bytes: _gauge & {description: "Total amount of memory used by the processes of this cgroup that can be associated with a block on a block device."}
memory_total_dirty_bytes: _gauge & {description: "Total amount of memory waiting to get written to disk."}
memory_total_inactive_anonymous_bytes: _gauge & {description: "Total amount of memory that has been identified as inactive by the kernel."}
memory_total_inactive_file_bytes: _gauge & {description: "Total amount of inactive file cache memory."}
memory_total_mapped_file_bytes: _gauge & {description: "Total amount of memory mapped by the processes in the cgroup."}
memory_total_page_faults_total: _counter & {description: "Total number of page faults."}
memory_total_major_faults_total: _counter & {description: "Total number of major page faults."}
memory_total_page_charged_total: _counter & {description: "Total number of charging events."}
memory_total_page_uncharged_total: _counter & {description: "Total number of uncharging events."}
memory_total_rss_bytes: _gauge & {description: "Total amount of memory that doesn’t correspond to anything on disk: stacks, heaps, and anonymous memory maps."}
memory_total_rss_hugepages_bytes: _gauge & {description: "Total amount of memory due to anonymous transparent hugepages."}
memory_total_unevictable_bytes: _gauge & {description: "Total amount of memory that can not be reclaimed."}
memory_total_writeback_bytes: _gauge & {description: "Total amount of memory from file/anon cache that are queued for syncing to the disk."}
memory_hierarchical_memory_limit_bytes: _gauge & {description: "The memory limit in place by the hierarchy cgroup."}
memory_hierarchical_memsw_limit_bytes: _gauge & {description: "The memory + swap limit in place by the hierarchy cgroup."}

_network_counter: {
type: "counter"
tags: _tags & {
device: {
description: "The network interface."
required: true
examples: ["eth1"]
}
}
}

network_receive_bytes_total: _network_counter & {description: "Bytes received by the container via the network interface."}
network_receive_packets_total: _network_counter & {description: "Number of packets received by the container via the network interface."}
network_receive_packets_drop_total: _network_counter & {description: "Number of inbound packets dropped by the container."}
network_receive_errs_total: _network_counter & {description: "Errors receiving packets."}
network_transmit_bytes_total: _network_counter & {description: "Bytes sent by the container via the network interface."}
network_transmit_packets_total: _network_counter & {description: "Number of packets sent by the container via the network interface."}
network_transmit_packets_drop_total: _network_counter & {description: "Number of outbound packets dropped by the container."}
network_transmit_errs_total: _network_counter & {description: "Errors sending packets."}
}
}
2 changes: 2 additions & 0 deletions docs/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ urls: {
aws_canonical_user_id: "https://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html#FindingCanonicalId"
aws_cloudwatch_logs_sink_source: "https://github.com/timberio/vector/blob/master/src/sinks/aws_cloudwatch_logs/"
aws_ec2_instance_metadata: "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html"
aws_ecs: "https://aws.amazon.com/ecs/"
aws_ecs_task_metadata: "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html"
aws_elb: "https://aws.amazon.com/elasticloadbalancing/"
aws_cloudwatch: "https://aws.amazon.com/cloudwatch/"
aws_cloudwatch_logs: "https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html"
Expand Down
92 changes: 92 additions & 0 deletions src/internal_events/aws_ecs_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use super::InternalEvent;
use metrics::{counter, histogram};
use std::borrow::Cow;
use std::time::Instant;

#[derive(Debug)]
pub struct AwsEcsMetricsReceived {
pub byte_size: usize,
pub count: usize,
}

impl InternalEvent for AwsEcsMetricsReceived {
fn emit_logs(&self) {
debug!(message = "Scraped events.", ?self.count);
}

fn emit_metrics(&self) {
counter!("events_processed_total", self.count as u64);
counter!("processed_bytes_total", self.byte_size as u64);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsRequestCompleted {
pub start: Instant,
pub end: Instant,
}

impl InternalEvent for AwsEcsMetricsRequestCompleted {
fn emit_logs(&self) {
debug!(message = "Request completed.");
}

fn emit_metrics(&self) {
counter!("requests_completed_total", 1);
histogram!("request_duration_nanoseconds", self.end - self.start);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsParseError<'a> {
pub error: serde_json::Error,
pub url: &'a str,
pub body: Cow<'a, str>,
}

impl<'a> InternalEvent for AwsEcsMetricsParseError<'_> {
fn emit_logs(&self) {
error!(message = "Parsing error.", url = %self.url, error = %self.error);
debug!(
message = %format!("Failed to parse response:\\n\\n{}\\n\\n", self.body.escape_debug()),
url = %self.url,
rate_limit_secs = 10
);
}

fn emit_metrics(&self) {
counter!("parse_errors_total", 1);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsErrorResponse<'a> {
pub code: hyper::StatusCode,
pub url: &'a str,
}

impl InternalEvent for AwsEcsMetricsErrorResponse<'_> {
fn emit_logs(&self) {
error!(message = "HTTP error response.", url = %self.url, code = %self.code);
}

fn emit_metrics(&self) {
counter!("http_error_response_total", 1);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsHttpError<'a> {
pub error: hyper::Error,
pub url: &'a str,
}

impl InternalEvent for AwsEcsMetricsHttpError<'_> {
fn emit_logs(&self) {
error!(message = "HTTP request processing error.", url = %self.url, error = %self.error);
}

fn emit_metrics(&self) {
counter!("http_request_errors_total", 1);
}
}
Loading