Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability)!: remove peer_addr internal metric tag #18982

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/internal_events/tcp.rs
Original file line number Diff line number Diff line change
@@ -144,8 +144,7 @@ impl InternalEvent for TcpBytesReceived {
);
counter!(
"component_received_bytes_total", self.byte_size as u64,
"protocol" => "tcp",
"peer_addr" => self.peer_addr.to_string()
"protocol" => "tcp"
);
}
}
54 changes: 27 additions & 27 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
@@ -376,7 +376,7 @@ mod test {
sources::util::net::SocketListenAddr,
test_util::{
collect_n, collect_n_limited,
components::{assert_source_compliance, SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS},
components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
next_addr, random_string, send_lines, send_lines_tls, wait_for_tcp,
},
tls::{self, TlsConfig, TlsEnableableConfig, TlsSourceConfig},
@@ -391,7 +391,7 @@ mod test {
//////// TCP TESTS ////////
#[tokio::test]
async fn tcp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

@@ -416,7 +416,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
let mut conf = TcpConfig::from_address(addr.into());
@@ -456,7 +456,7 @@ mod test {

#[tokio::test]
async fn tcp_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let addr = next_addr();

@@ -488,7 +488,7 @@ mod test {

#[tokio::test]
async fn tcp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

@@ -514,7 +514,7 @@ mod test {

#[tokio::test]
async fn tcp_continue_after_long_line() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

@@ -555,7 +555,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

@@ -619,7 +619,7 @@ mod test {

#[tokio::test]
async fn tcp_with_tls_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();

@@ -694,7 +694,7 @@ mod test {

#[tokio::test]
async fn tcp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let source_id = ComponentKey::from("tcp_shutdown_simple");
let (tx, mut rx) = SourceSender::new_test();
let addr = next_addr();
@@ -962,7 +962,7 @@ mod test {

#[tokio::test]
async fn udp_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

@@ -979,7 +979,7 @@ mod test {

#[tokio::test]
async fn udp_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

@@ -996,7 +996,7 @@ mod test {

#[tokio::test]
async fn udp_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

@@ -1017,7 +1017,7 @@ mod test {

#[tokio::test]
async fn udp_max_length() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
@@ -1053,7 +1053,7 @@ mod test {
/// Windows will drop the entire packet if we exceed the max_length so we are unable to
/// extract anything.
async fn udp_max_length_delimited() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = next_addr();
let mut config = UdpConfig::from_address(address.into());
@@ -1084,7 +1084,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_host() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

@@ -1099,7 +1099,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_vector_namespaced_fields() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, true).await;

@@ -1127,7 +1127,7 @@ mod test {

#[tokio::test]
async fn udp_it_includes_source_type() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let address = init_udp(tx, false).await;

@@ -1144,7 +1144,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_simple() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_simple");

@@ -1174,7 +1174,7 @@ mod test {

#[tokio::test]
async fn udp_shutdown_infinite_stream() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let source_id = ComponentKey::from("udp_shutdown_infinite_stream");

@@ -1334,7 +1334,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, false).await;
let events = collect_n(rx, 1).await;

@@ -1401,7 +1401,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", false, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
@@ -1426,7 +1426,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_message_preserves_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", false, false).await;
let events = collect_n(rx, 1).await;

@@ -1446,7 +1446,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_datagram_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(false).await
})
.await;
@@ -1513,7 +1513,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, false).await;
let events = collect_n(rx, 1).await;

@@ -1533,7 +1533,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_with_vector_namespace() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("test", true, true).await;
let events = collect_n(rx, 1).await;
let log = events[0].as_log();
@@ -1556,7 +1556,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_message_splits_on_newline() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let (_, rx) = unix_message("foo\nbar", true, false).await;
let events = collect_n(rx, 2).await;

@@ -1584,7 +1584,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn unix_stream_multiple_packets() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
unix_multiple_packets(true).await
})
.await;
8 changes: 4 additions & 4 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
@@ -351,7 +351,7 @@ mod test {
collect_limited,
components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS,
SOCKET_PUSH_SOURCE_TAGS,
},
metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
next_addr,
@@ -365,7 +365,7 @@ mod test {

#[tokio::test]
async fn test_statsd_udp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
@@ -384,7 +384,7 @@ mod test {

#[tokio::test]
async fn test_statsd_tcp() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_addr = next_addr();
let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
let (sender, mut receiver) = mpsc::channel(200);
@@ -427,7 +427,7 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_statsd_unix() {
assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async move {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
let in_path = tempfile::tempdir().unwrap().into_path().join("unix_test");
let config = StatsdConfig::Unix(UnixConfig {
path: in_path.clone(),
4 changes: 2 additions & 2 deletions src/sources/syslog.rs
Original file line number Diff line number Diff line change
@@ -1190,14 +1190,14 @@ mod test {
#[cfg(unix)]
#[tokio::test]
async fn test_unix_stream_syslog() {
use crate::test_util::components::SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS;
use crate::test_util::components::SOCKET_PUSH_SOURCE_TAGS;
use futures_util::{stream, SinkExt};
use std::os::unix::net::UnixStream as StdUnixStream;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use tokio_util::codec::{FramedWrite, LinesCodec};

assert_source_compliance(&SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS, async {
assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
let num_messages: usize = 1;
let in_path = tempfile::tempdir().unwrap().into_path().join("stream_test");

7 changes: 1 addition & 6 deletions src/test_util/components.rs
Original file line number Diff line number Diff line change
@@ -42,12 +42,7 @@ pub const HTTP_PULL_SOURCE_TAGS: [&str; 2] = ["endpoint", "protocol"];
pub const HTTP_PUSH_SOURCE_TAGS: [&str; 2] = ["http_path", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`.
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 2] = ["peer_addr", "protocol"];

/// The standard set of tags for all generic socket-based sources that accept connections i.e. `TcpSource`, but
/// specifically sources that experience high cardinality i.e. many many clients, where emitting metrics with the peer
/// address as a tag would represent too high of a cost to pay.
pub const SOCKET_HIGH_CARDINALITY_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];
pub const SOCKET_PUSH_SOURCE_TAGS: [&str; 1] = ["protocol"];

/// The standard set of tags for all generic socket-based sources that poll connections i.e. Redis.
pub const SOCKET_PULL_SOURCE_TAGS: [&str; 2] = ["remote_addr", "protocol"];
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ Vector's 0.34.0 release includes **breaking changes**:

1. [Removal of Deprecated Datadog Component Config Options](#datadog-deprecated-config-options)
1. [Removal of Deprecated `component_name` Metric Tag](#deprecated-component-name)
1. [Removal of `peer_addr` Metric Tag](#remove-peer-addr)
1. [Blackhole sink no longer reports by default](#blackhole-sink-reporting)

We cover them below to help you upgrade quickly:
@@ -33,6 +34,9 @@ been removed from the Enterprise configuration. Instead of `region`, `site` shou

The deprecated `component_name` tag has been removed from all internal metrics. Instead the `component_id` tag should be used.

#### Removal of `peer_addr` Metric Tag {#remove-peer-addr}
The `peer_addr` tag has been removed from the `component_received_bytes_total` internal metric for TCP-based sources due to its unbounded cardinality.

#### Blackhole sink no longer reports by default {#blackhole-sink-reporting}

The `blackhole` sink no longer reports events processed every second by default. Instead this
Loading