Skip to content

Commit

Permalink
feat(new source): ecs/fargate metrics source (vectordotdev#4698)
Browse files Browse the repository at this point in the history
* fargate metrics

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* Addressed some feedback (#1)

Signed-off-by: James Turnbull <james@lovedthanlost.net>

* address comments

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* redo metrics scraping

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* fix default endpoint

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* code style fix

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* add documentation and metric type fixes

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* rename metrics

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* address comments

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* add integration test

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* tweak endpoint auto detection

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* forgot docs

Signed-off-by: drunkirishcoder <daniel.jin@gmail.com>

* Add integration tests for v2 and v4

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>

Co-authored-by: James Turnbull <james@lovedthanlost.net>
Co-authored-by: Jesse Szwedko <jesse@szwedko.me>
Signed-off-by: Brian Menges <brian.menges@anaplan.com>
  • Loading branch information
3 people authored and Brian Menges committed Dec 9, 2020
1 parent 6ea0fbd commit 6126503
Show file tree
Hide file tree
Showing 9 changed files with 2,017 additions and 2 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ api-client = [
# Sources
sources = [
"sources-apache_metrics",
"sources-aws_ecs_metrics",
"sources-aws_kinesis_firehose",
"sources-aws_s3",
"sources-docker_logs",
Expand All @@ -321,6 +322,7 @@ sources = [
"sources-kubernetes-logs",
]
sources-apache_metrics = []
sources-aws_ecs_metrics = []
sources-aws_kinesis_firehose = ["base64", "tls", "warp"]
sources-aws_s3 = ["rusoto_core", "rusoto_credential", "rusoto_signature", "rusoto_sts", "rusoto_s3", "rusoto_sqs"]
sources-docker_logs = ["bollard"]
Expand Down Expand Up @@ -503,13 +505,15 @@ aws-integration-tests = [
"aws-cloudwatch-logs-integration-tests",
"aws-cloudwatch-metrics-integration-tests",
"aws-ec2-metadata-integration-tests",
"aws-ecs-metrics-integration-tests",
"aws-kinesis-firehose-integration-tests",
"aws-kinesis-streams-integration-tests",
"aws-s3-integration-tests",
]
aws-cloudwatch-logs-integration-tests = ["sinks-aws_cloudwatch_logs"]
aws-cloudwatch-metrics-integration-tests = ["sinks-aws_cloudwatch_metrics"]
aws-ec2-metadata-integration-tests = ["transforms-aws_ec2_metadata"]
aws-ecs-metrics-integration-tests = ["sources-aws_ecs_metrics"]
aws-kinesis-firehose-integration-tests = ["sinks-aws_kinesis_firehose", "sinks-elasticsearch", "rusoto_es"]
aws-kinesis-streams-integration-tests = ["sinks-aws_kinesis_streams"]
aws-s3-integration-tests = ["sources-aws_s3", "sinks-aws_s3"]
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,16 @@ stop-test-integration: stop-integration-splunk
.PHONY: start-integration-aws
start-integration-aws:
ifeq ($(CONTAINER_TOOL),podman)
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create --replace --name vector-test-integration-aws -p 4566:4566 -p 4571:4571 -p 6000:6000
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create --replace --name vector-test-integration-aws -p 4566:4566 -p 4571:4571 -p 6000:6000 -p 9088:80
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_ec2_metadata \
timberiodev/mock-ec2-metadata:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_localstack_aws \
-e SERVICES=kinesis,s3,cloudwatch,elasticsearch,es,firehose,sqs \
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -v /var/run:/var/run --name vector_local_ecs \
-e RUST_LOG=trace amazon/amazon-ecs-local-container-endpoints:latest
else
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) create vector-test-integration-aws
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -p 8111:8111 --name vector_ec2_metadata \
Expand All @@ -330,11 +332,13 @@ else
localstack/localstack-full:0.11.6
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -p 6000:6000 --name vector_mockwatchlogs \
-e RUST_LOG=trace luciofranco/mockwatchlogs:latest
$(CONTAINER_TOOL) run -d --$(CONTAINER_ENCLOSURE)=vector-test-integration-aws -v /var/run:/var/run -p 9088:80 --name vector_local_ecs \
-e RUST_LOG=trace amazon/amazon-ecs-local-container-endpoints:latest
endif

.PHONY: stop-integration-aws
stop-integration-aws:
$(CONTAINER_TOOL) rm --force vector_ec2_metadata vector_mockwatchlogs vector_localstack_aws 2>/dev/null; true
$(CONTAINER_TOOL) rm --force vector_ec2_metadata vector_mockwatchlogs vector_localstack_aws vector_local_ecs 2>/dev/null; true
ifeq ($(CONTAINER_TOOL),podman)
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) stop --name=vector-test-integration-aws 2>/dev/null; true
$(CONTAINER_TOOL) $(CONTAINER_ENCLOSURE) rm --force --name=vector-test-integration-aws 2>/dev/null; true
Expand Down
231 changes: 231 additions & 0 deletions docs/reference/components/sources/aws_ecs_metrics.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package metadata

components: sources: aws_ecs_metrics: {
title: "AWS ECS Metrics"
description: "The ECS metrics source collects the docker container stats for tasks running in Amazon ECS or Fargate."

classes: {
commonly_used: false
delivery: "at_least_once"
deployment_roles: ["sidecar"]
development: "beta"
egress_method: "batch"
}

features: {
collect: {
checkpoint: enabled: false
from: {
name: "Amazon ECS"
thing: "an \(name) container"
url: urls.aws_ecs
versions: null

interface: {
socket: {
api: {
title: "Amazon ECS task metadata endpoint"
url: urls.aws_ecs_task_metadata
}
direction: "outgoing"
protocols: ["http"]
ssl: "disabled"
}
}
}
}
multiline: enabled: false
}

support: {
platforms: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}

requirements: []
warnings: []
notices: []
}

configuration: {
endpoint: {
description: """
Base URI of the task metadata endpoint.
If empty, the URI will be automatically discovered based on the latest version detected.
The version 2 endpoint base URI is `169.254.170.2/v2/`.
The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
"""
common: false
required: false
type: string: {
default: "${ECS_CONTAINER_METADATA_URI_V4}"
}
}
version: {
description: """
The version of the metadata endpoint.
If empty, the version will be automatically discovered based on envirionment variables.
"""
common: false
required: false
type: string: {
default: "v4"
enum: {
v4: "When the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined."
v3: "When fails the v4 check, but the environment variable `ECS_CONTAINER_METADATA_URI` is defined."
v2: "When fails the v4 and v3 checks."
}
}
}
scrape_interval_secs: {
description: "The interval between scrapes, in seconds."
common: true
required: false
type: uint: {
default: 15
unit: "seconds"
}
}
namespace: {
description: "The namespace of the metric. Disabled if empty."
common: true
required: false
type: string: {
default: "awsecs"
}
}
}

output: metrics: {
_tags: {
container_id: {
description: "The identifier of the ECS container."
required: true
examples: ["0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352"]
}
container_name: {
description: "The name of the ECS container."
required: true
examples: ["myapp"]
}
}

_gauge: {
type: "gauge"
tags: _tags
}

_counter: {
type: "counter"
tags: _tags
}

_blkio_counter: {
type: "counter"
tags: _tags & {
device: {
description: "Device identified by its major and minor numbers."
required: true
examples: ["202:26368"]
}
op: {
description: "The operation type."
required: true
examples: ["read", "write", "sync", "async", "total"]
}
}
}

blkio_recursive_io_merged_total: _blkio_counter & {description: "Total number of bios/requests merged into requests."}
blkio_recursive_io_queued_total: _blkio_counter & {description: "Total number of requests queued up at any given instant."}
blkio_recursive_io_service_bytes_total: _blkio_counter & {description: "Number of bytes transferred to/from the disk."}
blkio_recursive_io_service_time_seconds_total: _blkio_counter & {description: "Total amount of time in seconds between request dispatch and request completion for the IOs done."}
blkio_recursive_io_serviced_total: _blkio_counter & {description: "Number of IOs completed to/from the disk."}
blkio_recursive_io_time_seconds_total: _blkio_counter & {description: "Disk time allocated per device in seconds."}
blkio_recursive_io_wait_time_seconds_total: _blkio_counter & {description: "Total amount of time in seconds the IOs spent waiting in the scheduler queues for service."}
blkio_recursive_sectors_total: _blkio_counter & {description: "Number of sectors transferred to/from disk."}

cpu_online_cpus: _gauge & {description: "Number of CPU cores."}
cpu_usage_system_jiffies_total: _counter & {description: "Jiffies of CPU time used by the system."}
cpu_usage_usermode_jiffies_total: _counter & {description: "Jiffies of CPU time spent in user mode by the container."}
cpu_usage_kernelmode_jiffies_total: _counter & {description: "Jiffies of CPU time spent in kernel mode by the container."}
cpu_usage_total_jiffies_total: _counter & {description: "Jiffies of CPU time used by the container."}
cpu_throttling_periods_total: _counter & {description: "Number of periods."}
cpu_throttled_periods_total: _counter & {description: "Number of periods throttled."}
cpu_throttled_time_seconds_total: _counter & {description: "Throttling time in seconds."}

cpu_usage_percpu_jiffies_total: {
description: "Jiffies of CPU time used by the container, per CPU core."
type: "counter"
tags: _tags & {
cpu: {
description: "CPU core identifier."
required: true
examples: ["0", "1"]
}
}
}

memory_used_bytes: _gauge & {description: "Memory used by the container, in bytes."}
memory_max_used_bytes: _gauge & {description: "Maximum measured memory usage of the container, in bytes."}
memory_limit_bytes: _gauge & {description: "Memory usage limit of the container, in bytes."}
memory_active_anonymous_bytes: _gauge & {description: "Amount of memory that has been identified as active by the kernel. Anonymous memory is memory that is not linked to disk pages."}
memory_active_file_bytes: _gauge & {description: "Amount of active file cache memory. Cache memory = active_file + inactive_file + tmpfs."}
memory_cache_bytes: _gauge & {description: "The amount of memory used by the processes of this cgroup that can be associated with a block on a block device. Also accounts for memory used by tmpfs."}
memory_dirty_bytes: _gauge & {description: "The amount of memory waiting to get written to disk."}
memory_inactive_anonymous_bytes: _gauge & {description: "Amount of memory that has been identified as inactive by the kernel."}
memory_inactive_file_bytes: _gauge & {description: "Amount of inactive file cache memory."}
memory_mapped_file_bytes: _gauge & {description: "Indicates the amount of memory mapped by the processes in the cgroup. It doesn’t give you information about how much memory is used; it rather tells you how it is used."}
memory_page_faults_total: _counter & {description: "Number of times that a process of the cgroup triggered a page fault."}
memory_major_faults_total: _counter & {description: "Number of times that a process of the cgroup triggered a major page fault."}
memory_page_charged_total: _counter & {description: "Number of charging events to the memory cgroup. Charging events happen each time a page is accounted as either mapped anon page(RSS) or cache page to the cgroup."}
memory_page_uncharged_total: _counter & {description: "Number of uncharging events to the memory cgroup. Uncharging events happen each time a page is unaccounted from the cgroup."}
memory_rss_bytes: _gauge & {description: "The amount of memory that doesn’t correspond to anything on disk: stacks, heaps, and anonymous memory maps."}
memory_rss_hugepages_bytes: _gauge & {description: "Amount of memory due to anonymous transparent hugepages."}
memory_unevictable_bytes: _gauge & {description: "The amount of memory that cannot be reclaimed."}
memory_writeback_bytes: _gauge & {description: "The amount of memory from file/anon cache that are queued for syncing to the disk."}
memory_total_active_anonymous_bytes: _gauge & {description: "Total amount of memory that has been identified as active by the kernel."}
memory_total_active_file_bytes: _gauge & {description: "Total amount of active file cache memory."}
memory_total_cache_bytes: _gauge & {description: "Total amount of memory used by the processes of this cgroup that can be associated with a block on a block device."}
memory_total_dirty_bytes: _gauge & {description: "Total amount of memory waiting to get written to disk."}
memory_total_inactive_anonymous_bytes: _gauge & {description: "Total amount of memory that has been identified as inactive by the kernel."}
memory_total_inactive_file_bytes: _gauge & {description: "Total amount of inactive file cache memory."}
memory_total_mapped_file_bytes: _gauge & {description: "Total amount of memory mapped by the processes in the cgroup."}
memory_total_page_faults_total: _counter & {description: "Total number of page faults."}
memory_total_major_faults_total: _counter & {description: "Total number of major page faults."}
memory_total_page_charged_total: _counter & {description: "Total number of charging events."}
memory_total_page_uncharged_total: _counter & {description: "Total number of uncharging events."}
memory_total_rss_bytes: _gauge & {description: "Total amount of memory that doesn’t correspond to anything on disk: stacks, heaps, and anonymous memory maps."}
memory_total_rss_hugepages_bytes: _gauge & {description: "Total amount of memory due to anonymous transparent hugepages."}
memory_total_unevictable_bytes: _gauge & {description: "Total amount of memory that can not be reclaimed."}
memory_total_writeback_bytes: _gauge & {description: "Total amount of memory from file/anon cache that are queued for syncing to the disk."}
memory_hierarchical_memory_limit_bytes: _gauge & {description: "The memory limit in place by the hierarchy cgroup."}
memory_hierarchical_memsw_limit_bytes: _gauge & {description: "The memory + swap limit in place by the hierarchy cgroup."}

_network_counter: {
type: "counter"
tags: _tags & {
device: {
description: "The network interface."
required: true
examples: ["eth1"]
}
}
}

network_receive_bytes_total: _network_counter & {description: "Bytes received by the container via the network interface."}
network_receive_packets_total: _network_counter & {description: "Number of packets received by the container via the network interface."}
network_receive_packets_drop_total: _network_counter & {description: "Number of inbound packets dropped by the container."}
network_receive_errs_total: _network_counter & {description: "Errors receiving packets."}
network_transmit_bytes_total: _network_counter & {description: "Bytes sent by the container via the network interface."}
network_transmit_packets_total: _network_counter & {description: "Number of packets sent by the container via the network interface."}
network_transmit_packets_drop_total: _network_counter & {description: "Number of outbound packets dropped by the container."}
network_transmit_errs_total: _network_counter & {description: "Errors sending packets."}
}
}
2 changes: 2 additions & 0 deletions docs/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ urls: {
aws_canonical_user_id: "https://docs.aws.amazon.com/general/latest/gr/acct-identifiers.html#FindingCanonicalId"
aws_cloudwatch_logs_sink_source: "https://github.com/timberio/vector/blob/master/src/sinks/aws_cloudwatch_logs/"
aws_ec2_instance_metadata: "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html"
aws_ecs: "https://aws.amazon.com/ecs/"
aws_ecs_task_metadata: "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html"
aws_elb: "https://aws.amazon.com/elasticloadbalancing/"
aws_cloudwatch: "https://aws.amazon.com/cloudwatch/"
aws_cloudwatch_logs: "https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/WhatIsCloudWatchLogs.html"
Expand Down
92 changes: 92 additions & 0 deletions src/internal_events/aws_ecs_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use super::InternalEvent;
use metrics::{counter, histogram};
use std::borrow::Cow;
use std::time::Instant;

#[derive(Debug)]
pub struct AwsEcsMetricsReceived {
pub byte_size: usize,
pub count: usize,
}

impl InternalEvent for AwsEcsMetricsReceived {
fn emit_logs(&self) {
debug!(message = "Scraped events.", ?self.count);
}

fn emit_metrics(&self) {
counter!("events_processed_total", self.count as u64);
counter!("processed_bytes_total", self.byte_size as u64);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsRequestCompleted {
pub start: Instant,
pub end: Instant,
}

impl InternalEvent for AwsEcsMetricsRequestCompleted {
fn emit_logs(&self) {
debug!(message = "Request completed.");
}

fn emit_metrics(&self) {
counter!("requests_completed_total", 1);
histogram!("request_duration_nanoseconds", self.end - self.start);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsParseError<'a> {
pub error: serde_json::Error,
pub url: &'a str,
pub body: Cow<'a, str>,
}

impl<'a> InternalEvent for AwsEcsMetricsParseError<'_> {
fn emit_logs(&self) {
error!(message = "Parsing error.", url = %self.url, error = %self.error);
debug!(
message = %format!("Failed to parse response:\\n\\n{}\\n\\n", self.body.escape_debug()),
url = %self.url,
rate_limit_secs = 10
);
}

fn emit_metrics(&self) {
counter!("parse_errors_total", 1);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsErrorResponse<'a> {
pub code: hyper::StatusCode,
pub url: &'a str,
}

impl InternalEvent for AwsEcsMetricsErrorResponse<'_> {
fn emit_logs(&self) {
error!(message = "HTTP error response.", url = %self.url, code = %self.code);
}

fn emit_metrics(&self) {
counter!("http_error_response_total", 1);
}
}

#[derive(Debug)]
pub struct AwsEcsMetricsHttpError<'a> {
pub error: hyper::Error,
pub url: &'a str,
}

impl InternalEvent for AwsEcsMetricsHttpError<'_> {
fn emit_logs(&self) {
error!(message = "HTTP request processing error.", url = %self.url, error = %self.error);
}

fn emit_metrics(&self) {
counter!("http_request_errors_total", 1);
}
}
Loading

0 comments on commit 6126503

Please sign in to comment.