diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index fc2a39e0e5743..481f1c697af84 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -20,7 +20,8 @@ use crate::{ sources::util::{ http::HttpMethod, http_client::{ - build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder, + build_url, call, default_interval, default_timeout, warn_if_interval_too_low, + GenericHttpClientInputs, HttpClientBuilder, }, }, tls::{TlsConfig, TlsSettings}, @@ -51,13 +52,22 @@ pub struct HttpClientConfig { #[configurable(metadata(docs::examples = "http://127.0.0.1:9898/logs"))] pub endpoint: String, - /// The interval between calls. + /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer + /// than the interval a new scrape will be started. This can take extra resources, set the timeout + /// to a value lower than the scrape interval to prevent this from happening. #[serde(default = "default_interval")] #[serde_as(as = "serde_with::DurationSeconds")] #[serde(rename = "scrape_interval_secs")] #[configurable(metadata(docs::human_name = "Scrape Interval"))] pub interval: Duration, + /// The timeout for each scrape request. + #[serde(default = "default_timeout")] + #[serde_as(as = "serde_with:: DurationSecondsWithFrac")] + #[serde(rename = "scrape_timeout_secs")] + #[configurable(metadata(docs::human_name = "Scrape Timeout"))] + pub timeout: Duration, + /// Custom parameters for the HTTP request query string. /// /// One or more values for the same parameter key can be provided. @@ -153,6 +163,7 @@ impl Default for HttpClientConfig { endpoint: "http://localhost:9898/logs".to_string(), query: HashMap::new(), interval: default_interval(), + timeout: default_timeout(), decoding: default_decoding(), framing: default_framing_message_based(), headers: HashMap::new(), @@ -193,9 +204,12 @@ impl SourceConfig for HttpClientConfig { log_namespace, }; + warn_if_interval_too_low(self.timeout, self.interval); + let inputs = GenericHttpClientInputs { urls, interval: self.interval, + timeout: self.timeout, headers: self.headers.clone(), content_type, auth: self.auth.clone(), diff --git a/src/sources/http_client/integration_tests.rs b/src/sources/http_client/integration_tests.rs index 4bd5552ccaf67..495941ed25d9e 100644 --- a/src/sources/http_client/integration_tests.rs +++ b/src/sources/http_client/integration_tests.rs @@ -19,7 +19,7 @@ use codecs::decoding::DeserializerConfig; use vector_core::config::log_schema; use super::{ - tests::{run_compliance, INTERVAL}, + tests::{run_compliance, INTERVAL, TIMEOUT}, HttpClientConfig, }; @@ -53,6 +53,7 @@ async fn invalid_endpoint() { run_error(HttpClientConfig { endpoint: "http://nope".to_string(), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: default_decoding(), framing: default_framing_message_based(), @@ -71,6 +72,7 @@ async fn collected_logs_bytes() { let events = run_compliance(HttpClientConfig { endpoint: format!("{}/logs/bytes", dufs_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Bytes, framing: default_framing_message_based(), @@ -95,6 +97,7 @@ async fn collected_logs_json() { let events = run_compliance(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -119,6 +122,7 @@ async fn collected_metrics_native_json() { let events = run_compliance(HttpClientConfig { endpoint: format!("{}/metrics/native.json", dufs_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::NativeJson(Default::default()), framing: default_framing_message_based(), @@ -148,6 +152,7 @@ async fn collected_trace_native_json() { let events = run_compliance(HttpClientConfig { endpoint: format!("{}/traces/native.json", dufs_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::NativeJson(Default::default()), framing: default_framing_message_based(), @@ -172,6 +177,7 @@ async fn unauthorized_no_auth() { run_error(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -190,6 +196,7 @@ async fn unauthorized_wrong_auth() { run_error(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -211,6 +218,7 @@ async fn authorized() { run_compliance(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_auth_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -232,6 +240,7 @@ async fn tls_invalid_ca() { run_error(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_https_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -253,6 +262,7 @@ async fn tls_valid() { run_compliance(HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_https_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), @@ -275,6 +285,7 @@ async fn shutdown() { let source = HttpClientConfig { endpoint: format!("{}/logs/json.json", dufs_address()), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: default_framing_message_based(), diff --git a/src/sources/http_client/tests.rs b/src/sources/http_client/tests.rs index ecd799a73696b..bb97289807fbd 100644 --- a/src/sources/http_client/tests.rs +++ b/src/sources/http_client/tests.rs @@ -16,6 +16,8 @@ use crate::test_util::{ pub(crate) const INTERVAL: Duration = Duration::from_secs(1); +pub(crate) const TIMEOUT: Duration = Duration::from_secs(1); + /// The happy path should yield at least one event and must emit the required internal events for sources. pub(crate) async fn run_compliance(config: HttpClientConfig) -> Vec { let events = @@ -47,6 +49,7 @@ async fn bytes_decoding() { run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: default_decoding(), framing: default_framing_message_based(), @@ -75,6 +78,7 @@ async fn json_decoding_newline_delimited() { run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: FramingConfig::NewlineDelimited(Default::default()), @@ -103,6 +107,7 @@ async fn json_decoding_character_delimited() { run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Json(Default::default()), framing: FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig { @@ -135,6 +140,7 @@ async fn request_query_applied() { let events = run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint?key1=val1", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::from([ ("key1".to_string(), vec!["val2".to_string()]), ( @@ -203,6 +209,7 @@ async fn headers_applied() { run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: default_decoding(), framing: default_framing_message_based(), @@ -234,6 +241,7 @@ async fn accept_header_override() { run_compliance(HttpClientConfig { endpoint: format!("http://{}/endpoint", in_addr), interval: INTERVAL, + timeout: TIMEOUT, query: HashMap::new(), decoding: DeserializerConfig::Bytes, framing: default_framing_message_based(), diff --git a/src/sources/prometheus/scrape.rs b/src/sources/prometheus/scrape.rs index 4a7c66425359e..408cde50097e9 100644 --- a/src/sources/prometheus/scrape.rs +++ b/src/sources/prometheus/scrape.rs @@ -11,6 +11,7 @@ use vector_core::{config::LogNamespace, event::Event}; use super::parser; use crate::sources::util::http::HttpMethod; +use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low}; use crate::{ config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput}, http::Auth, @@ -53,13 +54,22 @@ pub struct PrometheusScrapeConfig { #[serde(alias = "hosts")] endpoints: Vec, - /// The interval between scrapes, in seconds. + /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer + /// than the interval a new scrape will be started. This can take extra resources, set the timeout + /// to a value lower than the scrape interval to prevent this from happening. #[serde(default = "default_interval")] #[serde_as(as = "serde_with::DurationSeconds")] #[serde(rename = "scrape_interval_secs")] #[configurable(metadata(docs::human_name = "Scrape Interval"))] interval: Duration, + /// The timeout for each scrape request. + #[serde(default = "default_timeout")] + #[serde_as(as = "serde_with:: DurationSecondsWithFrac")] + #[serde(rename = "scrape_timeout_secs")] + #[configurable(metadata(docs::human_name = "Scrape Timeout"))] + timeout: Duration, + /// The tag name added to each event representing the scraped instance's `host:port`. /// /// The tag value is the host and port of the scraped instance. @@ -114,6 +124,7 @@ impl GenerateConfig for PrometheusScrapeConfig { toml::Value::try_from(Self { endpoints: vec!["http://localhost:9090/metrics".to_string()], interval: default_interval(), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: false, @@ -143,9 +154,12 @@ impl SourceConfig for PrometheusScrapeConfig { endpoint_tag: self.endpoint_tag.clone(), }; + warn_if_interval_too_low(self.timeout, self.interval); + let inputs = GenericHttpClientInputs { urls, interval: self.interval, + timeout: self.timeout, headers: HashMap::new(), content_type: "text/plain".to_string(), auth: self.auth.clone(), @@ -351,6 +365,7 @@ mod test { let config = PrometheusScrapeConfig { endpoints: vec![format!("http://{}/metrics", in_addr)], interval: Duration::from_secs(1), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: true, @@ -384,6 +399,7 @@ mod test { let config = PrometheusScrapeConfig { endpoints: vec![format!("http://{}/metrics", in_addr)], interval: Duration::from_secs(1), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: true, @@ -435,6 +451,7 @@ mod test { let config = PrometheusScrapeConfig { endpoints: vec![format!("http://{}/metrics", in_addr)], interval: Duration::from_secs(1), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: false, @@ -500,6 +517,7 @@ mod test { let config = PrometheusScrapeConfig { endpoints: vec![format!("http://{}/metrics", in_addr)], interval: Duration::from_secs(1), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: true, @@ -555,6 +573,7 @@ mod test { let config = PrometheusScrapeConfig { endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)], interval: Duration::from_secs(1), + timeout: default_timeout(), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: false, @@ -668,6 +687,7 @@ mod test { honor_labels: false, query: HashMap::new(), interval: Duration::from_secs(1), + timeout: default_timeout(), tls: None, auth: None, }, @@ -753,6 +773,7 @@ mod integration_tests { let config = PrometheusScrapeConfig { endpoints: vec!["http://prometheus:9090/metrics".into()], interval: Duration::from_secs(1), + timeout: Duration::from_secs(1), instance_tag: Some("instance".to_string()), endpoint_tag: Some("endpoint".to_string()), honor_labels: false, diff --git a/src/sources/util/http_client.rs b/src/sources/util/http_client.rs index 08f14b878608f..25678a90ae344 100644 --- a/src/sources/util/http_client.rs +++ b/src/sources/util/http_client.rs @@ -25,7 +25,7 @@ use crate::{ }, sources::util::http::HttpMethod, tls::TlsSettings, - Error, SourceSender, + SourceSender, }; use vector_common::shutdown::ShutdownSignal; use vector_core::{config::proxy::ProxyConfig, event::Event, EstimatedJsonEncodedSizeOf}; @@ -36,6 +36,8 @@ pub(crate) struct GenericHttpClientInputs { pub urls: Vec, /// Interval between calls. pub interval: Duration, + /// Timeout for the HTTP request. + pub timeout: Duration, /// Map of Header+Value to apply to HTTP request. pub headers: HashMap>, /// Content type of the HTTP request, determined by the source. @@ -51,6 +53,11 @@ pub(crate) const fn default_interval() -> Duration { Duration::from_secs(15) } +/// The default timeout for the HTTP request if none is configured. +pub(crate) const fn default_timeout() -> Duration { + Duration::from_secs(5) +} + /// Builds the context, allowing the source-specific implementation to leverage data from the /// config and the current HTTP request. pub(crate) trait HttpClientBuilder { @@ -101,6 +108,17 @@ pub(crate) fn build_url(uri: &Uri, query: &HashMap>) -> Uri .expect("Failed to build URI from parsed arguments") } +/// Warns if the scrape timeout is greater than the scrape interval. +pub(crate) fn warn_if_interval_too_low(timeout: Duration, interval: Duration) { + if timeout > interval { + warn!( + interval_secs = %interval.as_secs_f64(), + timeout_secs = %timeout.as_secs_f64(), + message = "Having a scrape timeout that exceeds the scrape interval can lead to excessive resource consumption.", + ); + } +} + /// Calls one or more urls at an interval. /// - The HTTP request is built per the options in provided generic inputs. /// - The HTTP response is decoded/parsed into events by the specific context. @@ -114,15 +132,16 @@ pub(crate) async fn call< mut out: SourceSender, http_method: HttpMethod, ) -> Result<(), ()> { + // Building the HttpClient should not fail as it is just setting up the client with the + // proxy and tls settings. + let client = + HttpClient::new(inputs.tls.clone(), &inputs.proxy).expect("Building HTTP client failed"); let mut stream = IntervalStream::new(tokio::time::interval(inputs.interval)) .take_until(inputs.shutdown) .map(move |_| stream::iter(inputs.urls.clone())) .flatten() .map(move |url| { - // Building the HttpClient should not fail as it is just setting up the client with the - // proxy and tls settings. - let client = HttpClient::new(inputs.tls.clone(), &inputs.proxy) - .expect("Building HTTP client failed"); + let client = client.clone(); let endpoint = url.to_string(); let context_builder = context_builder.clone(); @@ -157,9 +176,18 @@ pub(crate) async fn call< } let start = Instant::now(); - client - .send(request) - .map_err(Error::from) + tokio::time::timeout(inputs.timeout, client.send(request)) + .then(move |result| async move { + match result { + Ok(Ok(response)) => Ok(response), + Ok(Err(error)) => Err(error.into()), + Err(_) => Err(format!( + "Timeout error: request exceeded {}s", + inputs.timeout.as_secs_f64() + ) + .into()), + } + }) .and_then(|response| async move { let (header, body) = response.into_parts(); let body = hyper::body::to_bytes(body).await?; @@ -224,8 +252,9 @@ pub(crate) async fn call< }) }) .flatten() + .boxed() }) - .flatten() + .flatten_unordered(None) .boxed(); match out.send_event_stream(&mut stream).await { diff --git a/website/cue/reference/components/sources/base/http_client.cue b/website/cue/reference/components/sources/base/http_client.cue index efe28fb6a7827..a7e0fa5f37d23 100644 --- a/website/cue/reference/components/sources/base/http_client.cue +++ b/website/cue/reference/components/sources/base/http_client.cue @@ -309,13 +309,25 @@ base: components: sources: http_client: configuration: { } } scrape_interval_secs: { - description: "The interval between calls." - required: false + description: """ + The interval between scrapes. Requests are run concurrently so if a scrape takes longer + than the interval a new scrape will be started. This can take extra resources, set the timeout + to a value lower than the scrape interval to prevent this from happening. + """ + required: false type: uint: { default: 15 unit: "seconds" } } + scrape_timeout_secs: { + description: "The timeout for each scrape request." + required: false + type: float: { + default: 5.0 + unit: "seconds" + } + } tls: { description: "TLS configuration." required: false diff --git a/website/cue/reference/components/sources/base/prometheus_scrape.cue b/website/cue/reference/components/sources/base/prometheus_scrape.cue index f0c9b02351f33..9e2aa806ce989 100644 --- a/website/cue/reference/components/sources/base/prometheus_scrape.cue +++ b/website/cue/reference/components/sources/base/prometheus_scrape.cue @@ -104,13 +104,25 @@ base: components: sources: prometheus_scrape: configuration: { } } scrape_interval_secs: { - description: "The interval between scrapes, in seconds." - required: false + description: """ + The interval between scrapes. Requests are run concurrently so if a scrape takes longer + than the interval a new scrape will be started. This can take extra resources, set the timeout + to a value lower than the scrape interval to prevent this from happening. + """ + required: false type: uint: { default: 15 unit: "seconds" } } + scrape_timeout_secs: { + description: "The timeout for each scrape request." + required: false + type: float: { + default: 5.0 + unit: "seconds" + } + } tls: { description: "TLS configuration." required: false