diff --git a/docs/tutorials/sinks/2_http_sink.md b/docs/tutorials/sinks/2_http_sink.md index 179ff4192c8cc..6b52372b16ac3 100644 --- a/docs/tutorials/sinks/2_http_sink.md +++ b/docs/tutorials/sinks/2_http_sink.md @@ -290,6 +290,12 @@ where the data is actually sent. # Service +**⚠ NOTE! This section implements an HTTP tower `Service` from scratch, for the +purpose of demonstration only. Many sinks will require implementing `Service` +in this way. Any new HTTP-based sink should ideally utilize the +`HttpService` structure, which abstracts away most of the logic shared +amongst HTTP-based sinks.** + We need to create a [`Tower`][tower] service that is responsible for actually sending our final encoded data. diff --git a/src/sinks/appsignal/service.rs b/src/sinks/appsignal/service.rs index c2fb558d6ac9d..959f573f752d0 100644 --- a/src/sinks/appsignal/service.rs +++ b/src/sinks/appsignal/service.rs @@ -24,6 +24,9 @@ use super::request_builder::AppsignalRequest; #[derive(Clone)] pub(super) struct AppsignalService { + // TODO: `HttpBatchService` has been deprecated for direct use in sinks. + // This sink should undergo a refactor to utilize the `HttpService` + // instead, which extracts much of the boilerplate code for `Service`. pub(super) batch_service: HttpBatchService, crate::Error>>, AppsignalRequest>, } diff --git a/src/sinks/datadog/events/service.rs b/src/sinks/datadog/events/service.rs index 374bd3268b802..0dfeb4c5b9f69 100644 --- a/src/sinks/datadog/events/service.rs +++ b/src/sinks/datadog/events/service.rs @@ -43,6 +43,9 @@ impl DriverResponse for DatadogEventsResponse { #[derive(Clone)] pub struct DatadogEventsService { + // TODO: `HttpBatchService` has been deprecated for direct use in sinks. + // This sink should undergo a refactor to utilize the `HttpService` + // instead, which extracts much of the boilerplate code for `Service`. batch_http_service: HttpBatchService, crate::Error>>, DatadogEventsRequest>, } diff --git a/src/sinks/elasticsearch/service.rs b/src/sinks/elasticsearch/service.rs index 38ba9a41b3af3..4e68218cc95bc 100644 --- a/src/sinks/elasticsearch/service.rs +++ b/src/sinks/elasticsearch/service.rs @@ -68,6 +68,9 @@ impl MetaDescriptive for ElasticsearchRequest { #[derive(Clone)] pub struct ElasticsearchService { + // TODO: `HttpBatchService` has been deprecated for direct use in sinks. + // This sink should undergo a refactor to utilize the `HttpService` + // instead, which extracts much of the boilerplate code for `Service`. batch_service: HttpBatchService< BoxFuture<'static, Result, crate::Error>>, ElasticsearchRequest, diff --git a/src/sinks/http.rs b/src/sinks/http.rs deleted file mode 100644 index 2b0b5e7b36f26..0000000000000 --- a/src/sinks/http.rs +++ /dev/null @@ -1,1037 +0,0 @@ -use std::io::Write; - -use bytes::{BufMut, Bytes, BytesMut}; -use codecs::encoding::{CharacterDelimitedEncoder, Framer, Serializer}; -use futures::{future, FutureExt, SinkExt}; -use http::{ - header::{HeaderName, HeaderValue, AUTHORIZATION}, - Method, Request, StatusCode, Uri, -}; -use hyper::Body; -use indexmap::IndexMap; -use tokio_util::codec::Encoder as _; -use vector_config::configurable_component; - -use crate::{ - codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, - components::validation::*, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::Event, - http::{Auth, HttpClient, MaybeAuth}, - register_validatable_component, - sinks::util::{ - self, - http::{BatchedHttpSink, HttpEventEncoder, RequestConfig}, - BatchConfig, Buffer, Compression, Compressor, RealtimeSizeBasedDefaultBatchSettings, - TowerRequestConfig, UriSerde, - }, - tls::{TlsConfig, TlsSettings}, -}; - -/// Configuration for the `http` sink. -#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))] -#[derive(Clone, Debug)] -#[serde(deny_unknown_fields)] -pub struct HttpSinkConfig { - /// The full URI to make HTTP requests to. - /// - /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI. - #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))] - pub uri: UriSerde, - - /// The HTTP method to use when making the request. - #[serde(default)] - pub method: HttpMethod, - - #[configurable(derived)] - pub auth: Option, - - /// A list of custom headers to add to each request. - #[configurable(deprecated)] - #[configurable(metadata( - docs::additional_props_description = "An HTTP request header and it's value." - ))] - pub headers: Option>, - - #[configurable(derived)] - #[serde(default)] - pub compression: Compression, - - #[serde(flatten)] - pub encoding: EncodingConfigWithFraming, - - /// A string to prefix the payload with. - /// - /// This option is ignored if the encoding is not character delimited JSON. - /// - /// If specified, the `payload_suffix` must also be specified and together they must produce a valid JSON object. - #[configurable(metadata(docs::examples = "{\"data\":"))] - #[serde(default)] - pub payload_prefix: String, - - /// A string to suffix the payload with. - /// - /// This option is ignored if the encoding is not character delimited JSON. - /// - /// If specified, the `payload_prefix` must also be specified and together they must produce a valid JSON object. - #[configurable(metadata(docs::examples = "}"))] - #[serde(default)] - pub payload_suffix: String, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: RequestConfig, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub acknowledgements: AcknowledgementsConfig, -} - -/// HTTP method. -/// -/// A subset of the HTTP methods described in [RFC 9110, section 9.1][rfc9110] are supported. -/// -/// [rfc9110]: https://datatracker.ietf.org/doc/html/rfc9110#section-9.1 -#[configurable_component] -#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] -#[serde(rename_all = "snake_case")] -#[derivative(Default)] -pub enum HttpMethod { - /// GET. - Get, - - /// HEAD. - Head, - - /// POST. - #[derivative(Default)] - Post, - - /// PUT. - Put, - - /// DELETE. - Delete, - - /// OPTIONS. - Options, - - /// TRACE. - Trace, - - /// PATCH. - Patch, -} - -impl From for Method { - fn from(http_method: HttpMethod) -> Self { - match http_method { - HttpMethod::Head => Self::HEAD, - HttpMethod::Get => Self::GET, - HttpMethod::Post => Self::POST, - HttpMethod::Put => Self::PUT, - HttpMethod::Patch => Self::PATCH, - HttpMethod::Delete => Self::DELETE, - HttpMethod::Options => Self::OPTIONS, - HttpMethod::Trace => Self::TRACE, - } - } -} - -impl GenerateConfig for HttpSinkConfig { - fn generate_config() -> toml::Value { - toml::from_str( - r#"uri = "https://10.22.212.22:9000/endpoint" - encoding.codec = "json""#, - ) - .unwrap() - } -} - -impl HttpSinkConfig { - fn build_http_client(&self, cx: &SinkContext) -> crate::Result { - let tls = TlsSettings::from_options(&self.tls)?; - Ok(HttpClient::new(tls, cx.proxy())?) - } -} - -struct HttpSink { - pub uri: UriSerde, - pub method: HttpMethod, - pub auth: Option, - pub payload_prefix: String, - pub payload_suffix: String, - pub compression: Compression, - pub transformer: Transformer, - pub encoder: Encoder, - pub batch: BatchConfig, - pub tower: TowerRequestConfig, - pub headers: IndexMap, -} - -#[cfg(test)] -fn default_sink(encoding: EncodingConfigWithFraming) -> HttpSink { - let (framing, serializer) = encoding.build(SinkType::MessageBased).unwrap(); - let encoder = Encoder::::new(framing, serializer); - - HttpSink { - uri: Default::default(), - method: Default::default(), - auth: Default::default(), - compression: Default::default(), - transformer: Default::default(), - encoder, - payload_prefix: Default::default(), - payload_suffix: Default::default(), - batch: Default::default(), - tower: Default::default(), - headers: Default::default(), - } -} - -#[async_trait::async_trait] -#[typetag::serde(name = "http")] -impl SinkConfig for HttpSinkConfig { - async fn build( - &self, - cx: SinkContext, - ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let client = self.build_http_client(&cx)?; - - let healthcheck = match cx.healthcheck.uri { - Some(healthcheck_uri) => { - healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed() - } - None => future::ok(()).boxed(), - }; - - let mut request = self.request.clone(); - request.add_old_option(self.headers.clone()); - let headers = validate_headers(&request.headers, self.auth.is_some())?; - - let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); - - let (payload_prefix, payload_suffix) = - validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?; - - let sink = HttpSink { - uri: self.uri.with_default_parts(), - method: self.method, - auth: self.auth.choose_one(&self.uri.auth)?, - compression: self.compression, - transformer: self.encoding.transformer(), - encoder, - batch: self.batch, - tower: request.tower, - headers, - payload_prefix, - payload_suffix, - }; - - let request = sink.tower.unwrap_with(&TowerRequestConfig::default()); - - let batch = sink.batch.into_batch_settings()?; - let sink = BatchedHttpSink::new( - sink, - Buffer::new(batch.size, Compression::None), - request, - batch.timeout, - client, - ) - .sink_map_err(|error| error!(message = "Fatal HTTP sink error.", %error)); - - #[allow(deprecated)] - let sink = super::VectorSink::from_event_sink(sink); - - Ok((sink, healthcheck)) - } - - fn input(&self) -> Input { - Input::new(self.encoding.config().1.input_type()) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -impl ValidatableComponent for HttpSinkConfig { - fn validation_configuration() -> ValidationConfiguration { - use codecs::{JsonSerializerConfig, MetricTagValues}; - use std::str::FromStr; - - let config = Self { - uri: UriSerde::from_str("http://127.0.0.1:9000/endpoint") - .expect("should never fail to parse"), - method: HttpMethod::Post, - encoding: EncodingConfigWithFraming::new( - None, - JsonSerializerConfig::new(MetricTagValues::Full).into(), - Transformer::default(), - ), - auth: None, - headers: None, - compression: Compression::default(), - batch: BatchConfig::default(), - request: RequestConfig::default(), - tls: None, - acknowledgements: AcknowledgementsConfig::default(), - payload_prefix: String::new(), - payload_suffix: String::new(), - }; - - let external_resource = ExternalResource::new( - ResourceDirection::Push, - HttpResourceConfig::from_parts(config.uri.uri.clone(), Some(config.method.into())), - config.encoding.clone(), - ); - - ValidationConfiguration::from_sink(Self::NAME, config, Some(external_resource)) - } -} - -register_validatable_component!(HttpSinkConfig); - -pub struct HttpSinkEventEncoder { - encoder: Encoder, - transformer: Transformer, -} - -impl HttpEventEncoder for HttpSinkEventEncoder { - fn encode_event(&mut self, mut event: Event) -> Option { - self.transformer.transform(&mut event); - - let mut body = BytesMut::new(); - self.encoder.encode(event, &mut body).ok()?; - - Some(body) - } -} - -#[async_trait::async_trait] -impl util::http::HttpSink for HttpSink { - type Input = BytesMut; - type Output = BytesMut; - type Encoder = HttpSinkEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - HttpSinkEventEncoder { - encoder: self.encoder.clone(), - transformer: self.transformer.clone(), - } - } - - async fn build_request(&self, mut body: Self::Output) -> crate::Result> { - let method: Method = self.method.into(); - let uri: Uri = self.uri.uri.clone(); - - let content_type = { - use Framer::*; - use Serializer::*; - match (self.encoder.serializer(), self.encoder.framer()) { - (RawMessage(_) | Text(_), _) => Some("text/plain"), - (Json(_), NewlineDelimited(_)) => { - if !body.is_empty() { - // Remove trailing newline for backwards-compatibility - // with Vector `0.20.x`. - body.truncate(body.len() - 1); - } - Some("application/x-ndjson") - } - (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { - // TODO(https://github.com/vectordotdev/vector/issues/11253): - // Prepend before building a request body to eliminate the - // additional copy here. - let message = body.split(); - body.put(self.payload_prefix.as_bytes()); - body.put_u8(b'['); - if !message.is_empty() { - body.unsplit(message); - // remove trailing comma from last record - body.truncate(body.len() - 1); - } - body.put_u8(b']'); - body.put(self.payload_suffix.as_bytes()); - Some("application/json") - } - _ => None, - } - }; - - let mut builder = Request::builder().method(method).uri(uri); - - if let Some(content_type) = content_type { - builder = builder.header("Content-Type", content_type); - } - - let compression = self.compression; - - if compression.is_compressed() { - builder = builder.header( - "Content-Encoding", - compression - .content_encoding() - .expect("Encoding should be specified."), - ); - - let mut compressor = Compressor::from(compression); - compressor - .write_all(&body) - .expect("Writing to Vec can't fail."); - body = compressor.finish().expect("Writing to Vec can't fail."); - } - - let headers = builder - .headers_mut() - // The request builder should not have errors at this point, and if it did it would fail in the call to `body()` also. - .expect("Failed to access headers in http::Request builder- builder has errors."); - for (header, value) in self.headers.iter() { - headers.insert(header, value.clone()); - } - - let mut request = builder.body(body.freeze()).unwrap(); - - if let Some(auth) = &self.auth { - auth.apply(&mut request); - } - - Ok(request) - } -} - -async fn healthcheck(uri: UriSerde, auth: Option, client: HttpClient) -> crate::Result<()> { - let auth = auth.choose_one(&uri.auth)?; - let uri = uri.with_default_parts(); - let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap(); - - if let Some(auth) = auth { - auth.apply(&mut request); - } - - let response = client.send(request).await?; - - match response.status() { - StatusCode::OK => Ok(()), - status => Err(super::HealthcheckError::UnexpectedStatus { status }.into()), - } -} - -fn validate_headers( - headers: &IndexMap, - configures_auth: bool, -) -> crate::Result> { - let headers = util::http::validate_headers(headers)?; - - for name in headers.keys() { - if configures_auth && name == AUTHORIZATION { - return Err("Authorization header can not be used with defined auth options".into()); - } - } - - Ok(headers) -} - -fn validate_payload_wrapper( - payload_prefix: &str, - payload_suffix: &str, - encoder: &Encoder, -) -> crate::Result<(String, String)> { - let payload = [payload_prefix, "{}", payload_suffix].join(""); - match ( - encoder.serializer(), - encoder.framer(), - serde_json::from_str::(&payload), - ) { - ( - Serializer::Json(_), - Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), - Err(_), - ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()), - _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())), - } -} - -#[cfg(test)] -mod tests { - use std::{ - io::{BufRead, BufReader}, - sync::{atomic, Arc}, - }; - - use bytes::{Buf, Bytes}; - use codecs::{ - encoding::FramingConfig, JsonSerializerConfig, NewlineDelimitedEncoderConfig, - TextSerializerConfig, - }; - use flate2::{read::MultiGzDecoder, read::ZlibDecoder}; - use futures::{channel::mpsc, stream, StreamExt}; - use headers::{Authorization, HeaderMapExt}; - use http::request::Parts; - use hyper::{Method, Response, StatusCode}; - use serde::{de, Deserialize}; - use vector_core::event::{BatchNotifier, BatchStatus, LogEvent}; - - use super::*; - use crate::{ - assert_downcast_matches, - config::SinkContext, - sinks::util::{ - http::{HeaderValidationError, HttpSink}, - test::{build_test_server, build_test_server_generic, build_test_server_status}, - }, - test_util::{ - components, - components::{COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS}, - next_addr, random_lines_with_stream, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[test] - fn http_encode_event_text() { - let event = Event::Log(LogEvent::from("hello world")); - - let sink = default_sink((None::, TextSerializerConfig::default()).into()); - let mut encoder = sink.build_encoder(); - let bytes = encoder.encode_event(event).unwrap(); - - assert_eq!(bytes, Vec::from("hello world\n")); - } - - #[test] - fn http_encode_event_ndjson() { - let event = Event::Log(LogEvent::from("hello world")); - - let sink = default_sink( - ( - Some(NewlineDelimitedEncoderConfig::new()), - JsonSerializerConfig::default(), - ) - .into(), - ); - let mut encoder = sink.build_encoder(); - let bytes = encoder.encode_event(event).unwrap(); - - #[derive(Deserialize, Debug)] - #[serde(deny_unknown_fields)] - #[allow(dead_code)] // deserialize all fields - struct ExpectedEvent { - message: String, - timestamp: chrono::DateTime, - } - - let output = serde_json::from_slice::(&bytes[..]).unwrap(); - - assert_eq!(output.message, "hello world".to_string()); - } - - #[test] - fn http_validates_normal_headers() { - let config = r#" - uri = "http://$IN_ADDR/frames" - encoding.codec = "text" - [request.headers] - Auth = "token:thing_and-stuff" - X-Custom-Nonsense = "_%_{}_-_&_._`_|_~_!_#_&_$_" - "#; - let config: HttpSinkConfig = toml::from_str(config).unwrap(); - - assert!(super::validate_headers(&config.request.headers, false).is_ok()); - } - - #[test] - fn http_catches_bad_header_names() { - let config = r#" - uri = "http://$IN_ADDR/frames" - encoding.codec = "text" - [request.headers] - "\u0001" = "bad" - "#; - let config: HttpSinkConfig = toml::from_str(config).unwrap(); - - assert_downcast_matches!( - super::validate_headers(&config.request.headers, false).unwrap_err(), - HeaderValidationError, - HeaderValidationError::InvalidHeaderName { .. } - ); - } - - #[test] - fn http_validates_payload_prefix_and_suffix() { - let config = r#" - uri = "http://$IN_ADDR/" - encoding.codec = "json" - payload_prefix = '{"data":' - payload_suffix = "}" - "#; - let config: HttpSinkConfig = toml::from_str(config).unwrap(); - let (framer, serializer) = config.encoding.build(SinkType::MessageBased).unwrap(); - let encoder = Encoder::::new(framer, serializer); - assert!(super::validate_payload_wrapper( - &config.payload_prefix, - &config.payload_suffix, - &encoder - ) - .is_ok()); - } - - #[test] - fn http_validates_payload_prefix_and_suffix_fails_on_invalid_json() { - let config = r#" - uri = "http://$IN_ADDR/" - encoding.codec = "json" - payload_prefix = '{"data":' - payload_suffix = "" - "#; - let config: HttpSinkConfig = toml::from_str(config).unwrap(); - let (framer, serializer) = config.encoding.build(SinkType::MessageBased).unwrap(); - let encoder = Encoder::::new(framer, serializer); - assert!(super::validate_payload_wrapper( - &config.payload_prefix, - &config.payload_suffix, - &encoder - ) - .is_err()); - } - - // TODO: Fix failure on GH Actions using macos-latest image. - #[cfg(not(target_os = "macos"))] - #[tokio::test] - #[should_panic(expected = "Authorization header can not be used with defined auth options")] - async fn http_headers_auth_conflict() { - let config = r#" - uri = "http://$IN_ADDR/" - encoding.codec = "text" - [request.headers] - Authorization = "Basic base64encodedstring" - [auth] - strategy = "basic" - user = "user" - password = "password" - "#; - let config: HttpSinkConfig = toml::from_str(config).unwrap(); - - let cx = SinkContext::default(); - - _ = config.build(cx).await.unwrap(); - } - - #[tokio::test] - async fn http_happy_path_post() { - run_sink( - r#" - [auth] - strategy = "basic" - user = "waldo" - password = "hunter2" - "#, - |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - }, - ) - .await; - } - - #[tokio::test] - async fn http_happy_path_put() { - run_sink( - r#" - method = "put" - [auth] - strategy = "basic" - user = "waldo" - password = "hunter2" - "#, - |parts| { - assert_eq!(Method::PUT, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - }, - ) - .await; - } - - #[tokio::test] - async fn http_passes_custom_headers() { - run_sink( - r#" - [request.headers] - foo = "bar" - baz = "quux" - "#, - |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some("bar"), - parts.headers.get("foo").map(|v| v.to_str().unwrap()) - ); - assert_eq!( - Some("quux"), - parts.headers.get("baz").map(|v| v.to_str().unwrap()) - ); - }, - ) - .await; - } - - #[tokio::test] - async fn retries_on_no_connection() { - components::assert_sink_compliance(&HTTP_SINK_TAGS, async { - let num_lines = 10; - - let (in_addr, sink) = build_sink("").await; - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - let pump = tokio::spawn(sink.run(events)); - - // This ordering starts the sender before the server has built - // its accepting socket. The delay below ensures that the sink - // attempts to connect at least once before creating the - // listening socket. - tokio::time::sleep(std::time::Duration::from_secs(2)).await; - let (rx, trigger, server) = build_test_server(in_addr); - tokio::spawn(server); - - pump.await.unwrap().unwrap(); - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let output_lines = get_received(rx, |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - }) - .await; - - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); - }) - .await; - } - - #[tokio::test] - async fn retries_on_temporary_error() { - components::assert_sink_compliance(&HTTP_SINK_TAGS, async { - const NUM_LINES: usize = 1000; - const NUM_FAILURES: usize = 2; - - let (in_addr, sink) = build_sink("").await; - - let counter = Arc::new(atomic::AtomicUsize::new(0)); - let in_counter = Arc::clone(&counter); - let (rx, trigger, server) = build_test_server_generic(in_addr, move || { - let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed); - if count < NUM_FAILURES { - // Send a temporary error for the first two responses - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty()) - .unwrap_or_else(|_| unreachable!()) - } else { - Response::new(Body::empty()) - } - }); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); - let pump = sink.run(events); - - tokio::spawn(server); - - pump.await.unwrap(); - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let output_lines = get_received(rx, |parts| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - }) - .await; - - let tries = counter.load(atomic::Ordering::Relaxed); - assert!(tries > NUM_FAILURES); - assert_eq!(NUM_LINES, output_lines.len()); - assert_eq!(input_lines, output_lines); - }) - .await; - } - - #[tokio::test] - async fn fails_on_permanent_error() { - components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { - let num_lines = 1000; - - let (in_addr, sink) = build_sink("").await; - - let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - let pump = sink.run(events); - - tokio::spawn(server); - - pump.await.unwrap(); - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); - - let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await; - assert!(output_lines.is_empty()); - }) - .await; - } - - #[tokio::test] - async fn json_gzip_compression() { - json_compression("gzip").await; - } - - #[tokio::test] - async fn json_zstd_compression() { - json_compression("zstd").await; - } - - #[tokio::test] - async fn json_zlib_compression() { - json_compression("zlib").await; - } - - #[tokio::test] - async fn json_gzip_compression_with_payload_wrapper() { - json_compression_with_payload_wrapper("gzip").await; - } - - #[tokio::test] - async fn json_zlib_compression_with_payload_wrapper() { - json_compression_with_payload_wrapper("zlib").await; - } - - #[tokio::test] - async fn json_zstd_compression_with_payload_wrapper() { - json_compression_with_payload_wrapper("zstd").await; - } - - async fn json_compression(compression: &str) { - components::assert_sink_compliance(&HTTP_SINK_TAGS, async { - let num_lines = 1000; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" - compression = "$COMPRESSION" - encoding.codec = "json" - method = "post" - - [auth] - strategy = "basic" - user = "waldo" - password = "hunter2" - "# - .replace("$IN_ADDR", &in_addr.to_string()) - .replace("$COMPRESSION", compression); - - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - - let cx = SinkContext::default(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (rx, trigger, server) = build_test_server(in_addr); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - let pump = sink.run(events); - - tokio::spawn(server); - - pump.await.unwrap(); - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let output_lines = rx - .flat_map(|(parts, body)| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - let lines: Vec = parse_compressed_json(compression, body); - stream::iter(lines) - }) - .map(|line| line.get("message").unwrap().as_str().unwrap().to_owned()) - .collect::>() - .await; - - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); - }) - .await; - } - - async fn json_compression_with_payload_wrapper(compression: &str) { - components::assert_sink_compliance(&HTTP_SINK_TAGS, async { - let num_lines = 1000; - - let in_addr = next_addr(); - - let config = r#" - uri = "http://$IN_ADDR/frames" - compression = "$COMPRESSION" - encoding.codec = "json" - payload_prefix = '{"data":' - payload_suffix = "}" - method = "post" - - [auth] - strategy = "basic" - user = "waldo" - password = "hunter2" - "# - .replace("$IN_ADDR", &in_addr.to_string()) - .replace("$COMPRESSION", compression); - - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - - let cx = SinkContext::default(); - - let (sink, _) = config.build(cx).await.unwrap(); - let (rx, trigger, server) = build_test_server(in_addr); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - let pump = sink.run(events); - - tokio::spawn(server); - - pump.await.unwrap(); - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let output_lines = rx - .flat_map(|(parts, body)| { - assert_eq!(Method::POST, parts.method); - assert_eq!("/frames", parts.uri.path()); - assert_eq!( - Some(Authorization::basic("waldo", "hunter2")), - parts.headers.typed_get() - ); - - let message: serde_json::Value = parse_compressed_json(compression, body); - - let lines: Vec = - message["data"].as_array().unwrap().to_vec(); - stream::iter(lines) - }) - .map(|line| line.get("message").unwrap().as_str().unwrap().to_owned()) - .collect::>() - .await; - - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); - }) - .await; - } - - fn parse_compressed_json(compression: &str, buf: Bytes) -> T - where - T: de::DeserializeOwned, - { - match compression { - "gzip" => serde_json::from_reader(MultiGzDecoder::new(buf.reader())).unwrap(), - "zstd" => serde_json::from_reader(zstd::Decoder::new(buf.reader()).unwrap()).unwrap(), - "zlib" => serde_json::from_reader(ZlibDecoder::new(buf.reader())).unwrap(), - _ => panic!("undefined compression: {}", compression), - } - } - - async fn get_received( - rx: mpsc::Receiver<(Parts, Bytes)>, - assert_parts: impl Fn(Parts), - ) -> Vec { - rx.flat_map(|(parts, body)| { - assert_parts(parts); - stream::iter(BufReader::new(MultiGzDecoder::new(body.reader())).lines()) - }) - .map(Result::unwrap) - .map(|line| { - let val: serde_json::Value = serde_json::from_str(&line).unwrap(); - val.get("message").unwrap().as_str().unwrap().to_owned() - }) - .collect::>() - .await - } - - async fn run_sink(extra_config: &str, assert_parts: impl Fn(http::request::Parts)) { - let num_lines = 1000; - - let (in_addr, sink) = build_sink(extra_config).await; - - let (rx, trigger, server) = build_test_server(in_addr); - tokio::spawn(server); - - let (batch, mut receiver) = BatchNotifier::new_with_receiver(); - let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); - components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; - drop(trigger); - - assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); - - let output_lines = get_received(rx, assert_parts).await; - - assert_eq!(num_lines, output_lines.len()); - assert_eq!(input_lines, output_lines); - } - - async fn build_sink(extra_config: &str) -> (std::net::SocketAddr, crate::sinks::VectorSink) { - let in_addr = next_addr(); - - let config = format!( - r#" - uri = "http://{addr}/frames" - compression = "gzip" - framing.method = "newline_delimited" - encoding.codec = "json" - {extras} - "#, - addr = in_addr, - extras = extra_config, - ); - let config: HttpSinkConfig = toml::from_str(&config).unwrap(); - - let cx = SinkContext::default(); - - let (sink, _) = config.build(cx).await.unwrap(); - (in_addr, sink) - } -} diff --git a/src/sinks/http/batch.rs b/src/sinks/http/batch.rs new file mode 100644 index 0000000000000..38718631fadd7 --- /dev/null +++ b/src/sinks/http/batch.rs @@ -0,0 +1,25 @@ +//! Batch settings for the `http` sink. + +use codecs::encoding::Framer; +use vector_core::{ + event::Event, stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; + +use crate::codecs::Encoder; + +/// Uses the configured encoder to determine batch sizing. +#[derive(Default)] +pub(super) struct HttpBatchSizer { + pub(super) encoder: Encoder, +} + +impl ItemBatchSize for HttpBatchSizer { + fn size(&self, item: &Event) -> usize { + match self.encoder.serializer() { + codecs::encoding::Serializer::Json(_) | codecs::encoding::Serializer::NativeJson(_) => { + item.estimated_json_encoded_size_of().get() + } + _ => item.size_of(), + } + } +} diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs new file mode 100644 index 0000000000000..5c933c1265eae --- /dev/null +++ b/src/sinks/http/config.rs @@ -0,0 +1,347 @@ +//! Configuration for the `http` sink. + +use codecs::{ + encoding::{Framer, Serializer}, + CharacterDelimitedEncoder, +}; +use http::{header::AUTHORIZATION, HeaderName, HeaderValue, Method, Request, StatusCode}; +use hyper::Body; +use indexmap::IndexMap; + +use crate::{ + codecs::{EncodingConfigWithFraming, SinkType}, + http::{Auth, HttpClient, MaybeAuth}, + sinks::{ + prelude::*, + util::{ + http::{HttpResponse, HttpService, HttpStatusRetryLogic, RequestConfig}, + RealtimeSizeBasedDefaultBatchSettings, UriSerde, + }, + }, +}; + +use super::{ + encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder, + sink::HttpSink, +}; + +const CONTENT_TYPE_TEXT: &str = "text/plain"; +const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson"; +const CONTENT_TYPE_JSON: &str = "application/json"; + +/// Configuration for the `http` sink. +#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub(super) struct HttpSinkConfig { + /// The full URI to make HTTP requests to. + /// + /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI. + #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))] + pub(super) uri: UriSerde, + + /// The HTTP method to use when making the request. + #[serde(default)] + pub(super) method: HttpMethod, + + #[configurable(derived)] + pub(super) auth: Option, + + /// A list of custom headers to add to each request. + #[configurable(deprecated = "This option has been deprecated, use `request.headers` instead.")] + #[configurable(metadata( + docs::additional_props_description = "An HTTP request header and it's value." + ))] + pub(super) headers: Option>, + + #[configurable(derived)] + #[serde(default)] + pub(super) compression: Compression, + + #[serde(flatten)] + pub(super) encoding: EncodingConfigWithFraming, + + /// A string to prefix the payload with. + /// + /// This option is ignored if the encoding is not character delimited JSON. + /// + /// If specified, the `payload_suffix` must also be specified and together they must produce a valid JSON object. + #[configurable(metadata(docs::examples = "{\"data\":"))] + #[serde(default)] + pub(super) payload_prefix: String, + + /// A string to suffix the payload with. + /// + /// This option is ignored if the encoding is not character delimited JSON. + /// + /// If specified, the `payload_prefix` must also be specified and together they must produce a valid JSON object. + #[configurable(metadata(docs::examples = "}"))] + #[serde(default)] + pub(super) payload_suffix: String, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: RequestConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) acknowledgements: AcknowledgementsConfig, +} + +/// HTTP method. +/// +/// A subset of the HTTP methods described in [RFC 9110, section 9.1][rfc9110] are supported. +/// +/// [rfc9110]: https://datatracker.ietf.org/doc/html/rfc9110#section-9.1 +#[configurable_component] +#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)] +#[serde(rename_all = "snake_case")] +#[derivative(Default)] +pub(super) enum HttpMethod { + /// GET. + Get, + + /// HEAD. + Head, + + /// POST. + #[derivative(Default)] + Post, + + /// PUT. + Put, + + /// DELETE. + Delete, + + /// OPTIONS. + Options, + + /// TRACE. + Trace, + + /// PATCH. + Patch, +} + +impl From for Method { + fn from(http_method: HttpMethod) -> Self { + match http_method { + HttpMethod::Head => Self::HEAD, + HttpMethod::Get => Self::GET, + HttpMethod::Post => Self::POST, + HttpMethod::Put => Self::PUT, + HttpMethod::Patch => Self::PATCH, + HttpMethod::Delete => Self::DELETE, + HttpMethod::Options => Self::OPTIONS, + HttpMethod::Trace => Self::TRACE, + } + } +} + +impl HttpSinkConfig { + fn build_http_client(&self, cx: &SinkContext) -> crate::Result { + let tls = TlsSettings::from_options(&self.tls)?; + Ok(HttpClient::new(tls, cx.proxy())?) + } + + pub(super) fn build_encoder(&self) -> crate::Result> { + let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; + Ok(Encoder::::new(framer, serializer)) + } +} + +impl GenerateConfig for HttpSinkConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"uri = "https://10.22.212.22:9000/endpoint" + encoding.codec = "json""#, + ) + .unwrap() + } +} + +async fn healthcheck(uri: UriSerde, auth: Option, client: HttpClient) -> crate::Result<()> { + let auth = auth.choose_one(&uri.auth)?; + let uri = uri.with_default_parts(); + let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap(); + + if let Some(auth) = auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => Ok(()), + status => Err(HealthcheckError::UnexpectedStatus { status }.into()), + } +} + +pub(super) fn validate_headers( + headers: &IndexMap, + configures_auth: bool, +) -> crate::Result> { + let headers = crate::sinks::util::http::validate_headers(headers)?; + + for name in headers.keys() { + if configures_auth && name == AUTHORIZATION { + return Err("Authorization header can not be used with defined auth options".into()); + } + } + + Ok(headers) +} + +pub(super) fn validate_payload_wrapper( + payload_prefix: &str, + payload_suffix: &str, + encoder: &Encoder, +) -> crate::Result<(String, String)> { + let payload = [payload_prefix, "{}", payload_suffix].join(""); + match ( + encoder.serializer(), + encoder.framer(), + serde_json::from_str::(&payload), + ) { + ( + Serializer::Json(_), + Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }), + Err(_), + ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()), + _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())), + } +} + +#[async_trait] +#[typetag::serde(name = "http")] +impl SinkConfig for HttpSinkConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let batch_settings = self.batch.validate()?.into_batcher_settings()?; + + let encoder = self.build_encoder()?; + let transformer = self.encoding.transformer(); + + let mut request = self.request.clone(); + request.add_old_option(self.headers.clone()); + + let headers = validate_headers(&request.headers, self.auth.is_some())?; + + let (payload_prefix, payload_suffix) = + validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?; + + let client = self.build_http_client(&cx)?; + + let healthcheck = match cx.healthcheck.uri { + Some(healthcheck_uri) => { + healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed() + } + None => future::ok(()).boxed(), + }; + + let content_type = { + use Framer::*; + use Serializer::*; + match (encoder.serializer(), encoder.framer()) { + (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()), + (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()), + (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { + Some(CONTENT_TYPE_JSON.to_owned()) + } + _ => None, + } + }; + + let request_builder = HttpRequestBuilder { + encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix), + compression: self.compression, + }; + + let content_encoding = self.compression.is_compressed().then(|| { + self.compression + .content_encoding() + .expect("Encoding should be specified for compression.") + .to_string() + }); + + let http_sink_request_builder = HttpSinkRequestBuilder::new( + self.uri.with_default_parts(), + self.method, + self.auth.choose_one(&self.uri.auth)?, + headers, + content_type, + content_encoding, + ); + + let service = HttpService::new(client, http_sink_request_builder); + + let request_limits = self.request.tower.unwrap_with(&Default::default()); + + let retry_logic = + HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()); + + let service = ServiceBuilder::new() + .settings(request_limits, retry_logic) + .service(service); + + let sink = HttpSink::new(service, batch_settings, request_builder); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().1.input_type()) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl ValidatableComponent for HttpSinkConfig { + fn validation_configuration() -> ValidationConfiguration { + use codecs::{JsonSerializerConfig, MetricTagValues}; + use std::str::FromStr; + + let config = Self { + uri: UriSerde::from_str("http://127.0.0.1:9000/endpoint") + .expect("should never fail to parse"), + method: HttpMethod::Post, + encoding: EncodingConfigWithFraming::new( + None, + JsonSerializerConfig::new(MetricTagValues::Full).into(), + Transformer::default(), + ), + auth: None, + headers: None, + compression: Compression::default(), + batch: BatchConfig::default(), + request: RequestConfig::default(), + tls: None, + acknowledgements: AcknowledgementsConfig::default(), + payload_prefix: String::new(), + payload_suffix: String::new(), + }; + + let external_resource = ExternalResource::new( + ResourceDirection::Push, + HttpResourceConfig::from_parts(config.uri.uri.clone(), Some(config.method.into())), + config.encoding.clone(), + ); + + ValidationConfiguration::from_sink(Self::NAME, config, Some(external_resource)) + } +} + +register_validatable_component!(HttpSinkConfig); diff --git a/src/sinks/http/encoder.rs b/src/sinks/http/encoder.rs new file mode 100644 index 0000000000000..ad13df3e4fb92 --- /dev/null +++ b/src/sinks/http/encoder.rs @@ -0,0 +1,97 @@ +//! Encoding for the `http` sink. + +use crate::{ + event::Event, + sinks::util::encoding::{write_all, Encoder as SinkEncoder}, +}; +use bytes::{BufMut, BytesMut}; +use codecs::{ + encoding::{ + Framer, + Framer::{CharacterDelimited, NewlineDelimited}, + Serializer::Json, + }, + CharacterDelimitedEncoder, +}; +use std::io; +use tokio_util::codec::Encoder as _; + +use crate::sinks::prelude::*; + +#[derive(Clone, Debug)] +pub(super) struct HttpEncoder { + pub(super) encoder: Encoder, + transformer: Transformer, + payload_prefix: String, + payload_suffix: String, +} + +impl HttpEncoder { + /// Creates a new `HttpEncoder`. + pub(super) const fn new( + encoder: Encoder, + transformer: Transformer, + payload_prefix: String, + payload_suffix: String, + ) -> Self { + Self { + encoder, + transformer, + payload_prefix, + payload_suffix, + } + } +} + +impl SinkEncoder> for HttpEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut encoder = self.encoder.clone(); + let mut byte_size = telemetry().create_request_count_byte_size(); + let mut body = BytesMut::new(); + let n_events = events.len(); + + if let (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) = + (self.encoder.serializer(), self.encoder.framer()) + { + body.put(self.payload_prefix.as_bytes()); + body.put_u8(b'['); + } + + for mut event in events { + self.transformer.transform(&mut event); + + byte_size.add_event(&event, event.estimated_json_encoded_size_of()); + + encoder + .encode(event, &mut body) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "unable to encode event"))?; + } + + match (self.encoder.serializer(), self.encoder.framer()) { + (Json(_), NewlineDelimited(_)) => { + if !body.is_empty() { + // Remove trailing newline for backwards-compatibility + // with Vector `0.20.x`. + body.truncate(body.len() - 1); + } + } + (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => { + if !body.is_empty() { + // remove trailing comma from last record + body.truncate(body.len() - 1); + } + body.put_u8(b']'); + body.put(self.payload_suffix.as_bytes()); + } + _ => {} + } + + let body = body.freeze(); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/http/mod.rs b/src/sinks/http/mod.rs new file mode 100644 index 0000000000000..8b4680e9f41cb --- /dev/null +++ b/src/sinks/http/mod.rs @@ -0,0 +1,14 @@ +//! The HTTP [`vector_core::sink::VectorSink`]. +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`]s and forwarding them to an HTTP server. + +mod batch; +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/http/request_builder.rs b/src/sinks/http/request_builder.rs new file mode 100644 index 0000000000000..7c102a4dd9f9a --- /dev/null +++ b/src/sinks/http/request_builder.rs @@ -0,0 +1,48 @@ +//! `RequestBuilder` implementation for the `http` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::HttpEncoder; + +pub(super) struct HttpRequestBuilder { + pub(super) encoder: HttpEncoder, + pub(super) compression: Compression, +} + +impl RequestBuilder> for HttpRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = HttpEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + self.compression + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} diff --git a/src/sinks/http/service.rs b/src/sinks/http/service.rs new file mode 100644 index 0000000000000..8a096ebe6f3f4 --- /dev/null +++ b/src/sinks/http/service.rs @@ -0,0 +1,79 @@ +//! Service implementation for the `http` sink. + +use bytes::Bytes; +use http::{HeaderName, HeaderValue, Method, Request, Uri}; +use indexmap::IndexMap; + +use crate::{ + http::Auth, + sinks::util::{http::HttpServiceRequestBuilder, UriSerde}, +}; + +use super::config::HttpMethod; + +#[derive(Debug, Clone)] +pub(super) struct HttpSinkRequestBuilder { + uri: UriSerde, + method: HttpMethod, + auth: Option, + headers: IndexMap, + content_type: Option, + content_encoding: Option, +} + +impl HttpSinkRequestBuilder { + /// Creates a new `HttpSinkRequestBuilder` + pub(super) const fn new( + uri: UriSerde, + method: HttpMethod, + auth: Option, + headers: IndexMap, + content_type: Option, + content_encoding: Option, + ) -> Self { + Self { + uri, + method, + auth, + headers, + content_type, + content_encoding, + } + } +} + +impl HttpServiceRequestBuilder for HttpSinkRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let method: Method = self.method.into(); + let uri: Uri = self.uri.uri.clone(); + let mut builder = Request::builder().method(method).uri(uri); + + if let Some(content_type) = &self.content_type { + builder = builder.header("Content-Type", content_type); + } + + if let Some(content_encoding) = &self.content_encoding { + builder = builder.header("Content-Encoding", content_encoding); + } + + let headers = builder + .headers_mut() + // The request building should not have errors at this point, and if it did it would fail in the call to `body()` also. + .expect("Failed to access headers in http::Request builder- builder has errors."); + + for (header, value) in self.headers.iter() { + headers.insert(header, value.clone()); + } + + // The request building should not have errors at this point + let mut request = builder + .body(body) + .expect("Failed to assign body to request- builder has errors"); + + if let Some(auth) = &self.auth { + auth.apply(&mut request); + } + + request + } +} diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs new file mode 100644 index 0000000000000..ee997cd82ad33 --- /dev/null +++ b/src/sinks/http/sink.rs @@ -0,0 +1,73 @@ +//! Implementation of the `http` sink. + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::{batch::HttpBatchSizer, request_builder::HttpRequestBuilder}; + +pub(super) struct HttpSink { + service: S, + batch_settings: BatcherSettings, + request_builder: HttpRequestBuilder, +} + +impl HttpSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `HttpSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: HttpRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the configured codec + .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { + encoder: self.request_builder.encoder.encoder.clone(), + })) + // Build requests with no concurrency limit. + .request_builder(None, self.request_builder) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for HttpSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/http/tests.rs b/src/sinks/http/tests.rs new file mode 100644 index 0000000000000..4cda846156d64 --- /dev/null +++ b/src/sinks/http/tests.rs @@ -0,0 +1,602 @@ +//! Unit tests for the `http` sink. + +use std::{ + io::{BufRead, BufReader}, + sync::{atomic, Arc}, +}; + +use bytes::{Buf, Bytes}; +use codecs::{ + encoding::{Framer, FramingConfig}, + JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig, +}; +use flate2::{read::MultiGzDecoder, read::ZlibDecoder}; +use futures::{channel::mpsc, stream}; +use headers::{Authorization, HeaderMapExt}; +use http::request::Parts; +use hyper::{Body, Method, Response, StatusCode}; +use serde::{de, Deserialize}; + +use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; + +use crate::{ + assert_downcast_matches, + codecs::{EncodingConfigWithFraming, SinkType}, + sinks::{ + prelude::*, + util::{ + encoding::Encoder as _, + http::HeaderValidationError, + test::{build_test_server, build_test_server_generic, build_test_server_status}, + }, + }, + test_util::{ + components, + components::{COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS}, + next_addr, random_lines_with_stream, + }, +}; + +use super::{ + config::HttpSinkConfig, + config::{validate_headers, validate_payload_wrapper}, + encoder::HttpEncoder, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { + HttpSinkConfig { + uri: Default::default(), + method: Default::default(), + auth: Default::default(), + headers: Default::default(), + compression: Default::default(), + encoding, + payload_prefix: Default::default(), + payload_suffix: Default::default(), + batch: Default::default(), + request: Default::default(), + tls: Default::default(), + acknowledgements: Default::default(), + } +} + +#[test] +fn http_encode_event_text() { + let event = Event::Log(LogEvent::from("hello world")); + + let cfg = default_cfg((None::, TextSerializerConfig::default()).into()); + let encoder = cfg.build_encoder().unwrap(); + let transformer = cfg.encoding.transformer(); + + let encoder = HttpEncoder::new(encoder, transformer, "".to_owned(), "".to_owned()); + + let mut encoded = vec![]; + let (encoded_size, _byte_size) = encoder.encode_input(vec![event], &mut encoded).unwrap(); + + assert_eq!(encoded, Vec::from("hello world\n")); + assert_eq!(encoded.len(), encoded_size); +} + +#[test] +fn http_encode_event_ndjson() { + let event = Event::Log(LogEvent::from("hello world")); + + let cfg = default_cfg( + ( + Some(NewlineDelimitedEncoderConfig::new()), + JsonSerializerConfig::default(), + ) + .into(), + ); + let encoder = cfg.build_encoder().unwrap(); + let transformer = cfg.encoding.transformer(); + + let encoder = HttpEncoder::new(encoder, transformer, "".to_owned(), "".to_owned()); + + let mut encoded = vec![]; + encoder.encode_input(vec![event], &mut encoded).unwrap(); + + #[derive(Deserialize, Debug)] + #[serde(deny_unknown_fields)] + #[allow(dead_code)] // deserialize all fields + struct ExpectedEvent { + message: String, + timestamp: chrono::DateTime, + } + + let output = serde_json::from_slice::(&encoded[..]).unwrap(); + + assert_eq!(output.message, "hello world".to_string()); +} + +#[test] +fn http_validates_normal_headers() { + let config = r#" + uri = "http://$IN_ADDR/frames" + encoding.codec = "text" + [request.headers] + Auth = "token:thing_and-stuff" + X-Custom-Nonsense = "_%_{}_-_&_._`_|_~_!_#_&_$_" + "#; + let config: HttpSinkConfig = toml::from_str(config).unwrap(); + + assert!(validate_headers(&config.request.headers, false).is_ok()); +} + +#[test] +fn http_catches_bad_header_names() { + let config = r#" + uri = "http://$IN_ADDR/frames" + encoding.codec = "text" + [request.headers] + "\u0001" = "bad" + "#; + let config: HttpSinkConfig = toml::from_str(config).unwrap(); + + assert_downcast_matches!( + validate_headers(&config.request.headers, false).unwrap_err(), + HeaderValidationError, + HeaderValidationError::InvalidHeaderName { .. } + ); +} + +#[test] +fn http_validates_payload_prefix_and_suffix() { + let config = r#" + uri = "http://$IN_ADDR/" + encoding.codec = "json" + payload_prefix = '{"data":' + payload_suffix = "}" + "#; + let config: HttpSinkConfig = toml::from_str(config).unwrap(); + let (framer, serializer) = config.encoding.build(SinkType::MessageBased).unwrap(); + let encoder = Encoder::::new(framer, serializer); + assert!( + validate_payload_wrapper(&config.payload_prefix, &config.payload_suffix, &encoder).is_ok() + ); +} + +#[test] +fn http_validates_payload_prefix_and_suffix_fails_on_invalid_json() { + let config = r#" + uri = "http://$IN_ADDR/" + encoding.codec = "json" + payload_prefix = '{"data":' + payload_suffix = "" + "#; + let config: HttpSinkConfig = toml::from_str(config).unwrap(); + let (framer, serializer) = config.encoding.build(SinkType::MessageBased).unwrap(); + let encoder = Encoder::::new(framer, serializer); + assert!( + validate_payload_wrapper(&config.payload_prefix, &config.payload_suffix, &encoder).is_err() + ); +} + +// TODO: Fix failure on GH Actions using macos-latest image. +#[cfg(not(target_os = "macos"))] +#[tokio::test] +#[should_panic(expected = "Authorization header can not be used with defined auth options")] +async fn http_headers_auth_conflict() { + let config = r#" + uri = "http://$IN_ADDR/" + encoding.codec = "text" + [request.headers] + Authorization = "Basic base64encodedstring" + [auth] + strategy = "basic" + user = "user" + password = "password" + "#; + let config: HttpSinkConfig = toml::from_str(config).unwrap(); + + let cx = SinkContext::default(); + + _ = config.build(cx).await.unwrap(); +} + +#[tokio::test] +async fn http_happy_path_post() { + run_sink( + r#" + [auth] + strategy = "basic" + user = "waldo" + password = "hunter2" + "#, + |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + }, + ) + .await; +} + +#[tokio::test] +async fn http_happy_path_put() { + run_sink( + r#" + method = "put" + [auth] + strategy = "basic" + user = "waldo" + password = "hunter2" + "#, + |parts| { + assert_eq!(Method::PUT, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + }, + ) + .await; +} + +#[tokio::test] +async fn http_passes_custom_headers() { + run_sink( + r#" + [request.headers] + foo = "bar" + baz = "quux" + "#, + |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some("bar"), + parts.headers.get("foo").map(|v| v.to_str().unwrap()) + ); + assert_eq!( + Some("quux"), + parts.headers.get("baz").map(|v| v.to_str().unwrap()) + ); + }, + ) + .await; +} + +#[tokio::test] +async fn retries_on_no_connection() { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + let num_lines = 10; + + let (in_addr, sink) = build_sink("").await; + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = tokio::spawn(sink.run(events)); + + // This ordering starts the sender before the server has built + // its accepting socket. The delay below ensures that the sink + // attempts to connect at least once before creating the + // listening socket. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + pump.await.unwrap().unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx, |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + }) + .await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +#[tokio::test] +async fn retries_on_temporary_error() { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + const NUM_LINES: usize = 1000; + const NUM_FAILURES: usize = 2; + + let (in_addr, sink) = build_sink("").await; + + let counter = Arc::new(atomic::AtomicUsize::new(0)); + let in_counter = Arc::clone(&counter); + let (rx, trigger, server) = build_test_server_generic(in_addr, move || { + let count = in_counter.fetch_add(1, atomic::Ordering::Relaxed); + if count < NUM_FAILURES { + // Send a temporary error for the first two responses + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap_or_else(|_| unreachable!()) + } else { + Response::new(Body::empty()) + } + }); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, NUM_LINES, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx, |parts| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + }) + .await; + + let tries = counter.load(atomic::Ordering::Relaxed); + assert!(tries > NUM_FAILURES); + assert_eq!(NUM_LINES, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +#[tokio::test] +async fn fails_on_permanent_error() { + components::assert_sink_error(&COMPONENT_ERROR_TAGS, async { + let num_lines = 1000; + + let (in_addr, sink) = build_sink("").await; + + let (rx, trigger, server) = build_test_server_status(in_addr, StatusCode::FORBIDDEN); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (_input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + + let output_lines = get_received(rx, |_| unreachable!("There should be no lines")).await; + assert!(output_lines.is_empty()); + }) + .await; +} + +#[tokio::test] +async fn json_gzip_compression() { + json_compression("gzip").await; +} + +#[tokio::test] +async fn json_zstd_compression() { + json_compression("zstd").await; +} + +#[tokio::test] +async fn json_zlib_compression() { + json_compression("zlib").await; +} + +#[tokio::test] +async fn json_gzip_compression_with_payload_wrapper() { + json_compression_with_payload_wrapper("gzip").await; +} + +#[tokio::test] +async fn json_zlib_compression_with_payload_wrapper() { + json_compression_with_payload_wrapper("zlib").await; +} + +#[tokio::test] +async fn json_zstd_compression_with_payload_wrapper() { + json_compression_with_payload_wrapper("zstd").await; +} + +async fn json_compression(compression: &str) { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + let num_lines = 1000; + + let in_addr = next_addr(); + + let config = r#" + uri = "http://$IN_ADDR/frames" + compression = "$COMPRESSION" + encoding.codec = "json" + method = "post" + + [auth] + strategy = "basic" + user = "waldo" + password = "hunter2" + "# + .replace("$IN_ADDR", &in_addr.to_string()) + .replace("$COMPRESSION", compression); + + let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::default(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (rx, trigger, server) = build_test_server(in_addr); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = rx + .flat_map(|(parts, body)| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + let lines: Vec = parse_compressed_json(compression, body); + stream::iter(lines) + }) + .map(|line| line.get("message").unwrap().as_str().unwrap().to_owned()) + .collect::>() + .await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +async fn json_compression_with_payload_wrapper(compression: &str) { + components::assert_sink_compliance(&HTTP_SINK_TAGS, async { + let num_lines = 1000; + + let in_addr = next_addr(); + + let config = r#" + uri = "http://$IN_ADDR/frames" + compression = "$COMPRESSION" + encoding.codec = "json" + payload_prefix = '{"data":' + payload_suffix = "}" + method = "post" + + [auth] + strategy = "basic" + user = "waldo" + password = "hunter2" + "# + .replace("$IN_ADDR", &in_addr.to_string()) + .replace("$COMPRESSION", compression); + + let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::default(); + + let (sink, _) = config.build(cx).await.unwrap(); + let (rx, trigger, server) = build_test_server(in_addr); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + let pump = sink.run(events); + + tokio::spawn(server); + + pump.await.unwrap(); + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = rx + .flat_map(|(parts, body)| { + assert_eq!(Method::POST, parts.method); + assert_eq!("/frames", parts.uri.path()); + assert_eq!( + Some(Authorization::basic("waldo", "hunter2")), + parts.headers.typed_get() + ); + + let message: serde_json::Value = parse_compressed_json(compression, body); + + let lines: Vec = message["data"].as_array().unwrap().to_vec(); + stream::iter(lines) + }) + .map(|line| line.get("message").unwrap().as_str().unwrap().to_owned()) + .collect::>() + .await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); + }) + .await; +} + +fn parse_compressed_json(compression: &str, buf: Bytes) -> T +where + T: de::DeserializeOwned, +{ + match compression { + "gzip" => serde_json::from_reader(MultiGzDecoder::new(buf.reader())).unwrap(), + "zstd" => serde_json::from_reader(zstd::Decoder::new(buf.reader()).unwrap()).unwrap(), + "zlib" => serde_json::from_reader(ZlibDecoder::new(buf.reader())).unwrap(), + _ => panic!("undefined compression: {}", compression), + } +} + +async fn get_received( + rx: mpsc::Receiver<(Parts, Bytes)>, + assert_parts: impl Fn(Parts), +) -> Vec { + rx.flat_map(|(parts, body)| { + assert_parts(parts); + stream::iter(BufReader::new(MultiGzDecoder::new(body.reader())).lines()) + }) + .map(Result::unwrap) + .map(|line| { + let val: serde_json::Value = serde_json::from_str(&line).unwrap(); + val.get("message").unwrap().as_str().unwrap().to_owned() + }) + .collect::>() + .await +} + +async fn run_sink(extra_config: &str, assert_parts: impl Fn(http::request::Parts)) { + let num_lines = 1000; + + let (in_addr, sink) = build_sink(extra_config).await; + + let (rx, trigger, server) = build_test_server(in_addr); + tokio::spawn(server); + + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + let (input_lines, events) = random_lines_with_stream(100, num_lines, Some(batch)); + components::run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await; + drop(trigger); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let output_lines = get_received(rx, assert_parts).await; + + assert_eq!(num_lines, output_lines.len()); + assert_eq!(input_lines, output_lines); +} + +async fn build_sink(extra_config: &str) -> (std::net::SocketAddr, crate::sinks::VectorSink) { + let in_addr = next_addr(); + + let config = format!( + r#" + uri = "http://{addr}/frames" + compression = "gzip" + framing.method = "newline_delimited" + encoding.codec = "json" + {extras} + "#, + addr = in_addr, + extras = extra_config, + ); + let config: HttpSinkConfig = toml::from_str(&config).unwrap(); + + let cx = SinkContext::default(); + + let (sink, _) = config.build(cx).await.unwrap(); + (in_addr, sink) +} diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index 9c7e81cf39418..ecc62097aaef2 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -3,9 +3,14 @@ pub use crate::{ codecs::{Encoder, EncodingConfig, Transformer}, + components::validation::{ + ExternalResource, HttpResourceConfig, ResourceDirection, ValidatableComponent, + ValidationConfiguration, + }, config::{DataType, GenerateConfig, SinkConfig, SinkContext}, event::{Event, LogEvent}, internal_events::{SinkRequestBuildError, TemplateRenderingError}, + register_validatable_component, sinks::{ util::{ builder::SinkBuilderExt, @@ -35,7 +40,7 @@ pub use vector_common::{ pub use vector_config::configurable_component; pub use vector_core::{ - config::{AcknowledgementsConfig, Input}, + config::{telemetry, AcknowledgementsConfig, Input}, event::Value, partition::Partitioner, schema::Requirement, diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index afdb097b49304..b81974e1d5288 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -50,6 +50,9 @@ pub fn create_client( Ok(HttpClient::new(tls_settings, proxy_config)?) } +// TODO: `HttpBatchService` has been deprecated for direct use in sinks. +// This sink should undergo a refactor to utilize the `HttpService` +// instead, which extracts much of the boilerplate code for `Service`. pub fn build_http_batch_service( client: HttpClient, http_request_builder: Arc, diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 3f943db17eeb9..4dbfcab0ab4f8 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -12,7 +12,7 @@ use std::{ use bytes::{Buf, Bytes}; use futures::{future::BoxFuture, Sink}; use headers::HeaderName; -use http::{header, HeaderValue, StatusCode}; +use http::{header, HeaderValue, Request, Response, StatusCode}; use hyper::{body, Body}; use indexmap::IndexMap; use pin_project::pin_project; @@ -24,13 +24,15 @@ use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::{ retries::{RetryAction, RetryLogic}, - sink, uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, - TowerRequestConfig, TowerRequestSettings, + sink::{self, Response as _}, + uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig, + TowerRequestSettings, }; use crate::{ event::Event, http::{HttpClient, HttpError}, internal_events::{EndpointBytesSent, SinkRequestBuildError}, + sinks::prelude::*, }; pub trait HttpEventEncoder { @@ -355,6 +357,12 @@ where } } +/// @struct HttpBatchService +/// +/// NOTE: This has been deprecated, please do not use directly when creating new sinks. +/// The `HttpService` currently wraps this structure. Eventually all sinks currently using the +/// HttpBatchService directly should be updated to use `HttpService`. At which time we can +/// remove this struct and inline the functionality into the `HttpService` directly. pub struct HttpBatchService { inner: HttpClient, request_builder: Arc F + Send + Sync>, @@ -598,6 +606,147 @@ pub fn validate_headers( Ok(validated_headers) } +/// Request type for use in the `Service` implementation of HTTP stream sinks. +#[derive(Clone)] +pub struct HttpRequest { + payload: Bytes, + finalizers: EventFinalizers, + request_metadata: RequestMetadata, +} + +impl HttpRequest { + /// Creates a new `HttpRequest`. + pub fn new( + payload: Bytes, + finalizers: EventFinalizers, + request_metadata: RequestMetadata, + ) -> Self { + Self { + payload, + finalizers, + request_metadata, + } + } +} + +impl Finalizable for HttpRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl MetaDescriptive for HttpRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.request_metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.request_metadata + } +} + +impl ByteSizeOf for HttpRequest { + fn allocated_bytes(&self) -> usize { + self.payload.allocated_bytes() + self.finalizers.allocated_bytes() + } +} + +/// Response type for use in the `Service` implementation of HTTP stream sinks. +pub struct HttpResponse { + pub http_response: Response, + events_byte_size: GroupedCountByteSize, + raw_byte_size: usize, +} + +impl DriverResponse for HttpResponse { + fn event_status(&self) -> EventStatus { + if self.http_response.is_successful() { + EventStatus::Delivered + } else if self.http_response.is_transient() { + EventStatus::Errored + } else { + EventStatus::Rejected + } + } + + fn events_sent(&self) -> &GroupedCountByteSize { + &self.events_byte_size + } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } +} + +/// HTTP request builder for HTTP stream sinks using the generic `HttpService` +pub trait HttpServiceRequestBuilder { + fn build(&self, body: Bytes) -> Request; +} + +/// Generic 'Service' implementation for HTTP stream sinks. +#[derive(Clone)] +pub struct HttpService { + batch_service: + HttpBatchService, crate::Error>>, HttpRequest>, + _phantom: PhantomData, +} + +impl HttpService +where + B: HttpServiceRequestBuilder + std::marker::Sync + std::marker::Send + 'static, +{ + pub fn new(http_client: HttpClient, http_request_builder: B) -> Self { + let http_request_builder = Arc::new(http_request_builder); + + let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest| { + let request_builder = Arc::clone(&http_request_builder); + + let fut: BoxFuture<'static, Result, crate::Error>> = + Box::pin(async move { Ok(request_builder.build(req.payload)) }); + + fut + }); + Self { + batch_service, + _phantom: PhantomData, + } + } +} + +impl Service for HttpService +where + B: HttpServiceRequestBuilder + std::marker::Sync + std::marker::Send + 'static, +{ + type Response = HttpResponse; + type Error = crate::Error; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, mut request: HttpRequest) -> Self::Future { + let mut http_service = self.batch_service.clone(); + + let raw_byte_size = request.payload.len(); + + // NOTE: By taking the metadata here, when passing the request to `call()` below, + // that function does not have access to the metadata anymore. + let metadata = std::mem::take(request.metadata_mut()); + let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size(); + + Box::pin(async move { + let http_response = http_service.call(request).await?; + + Ok(HttpResponse { + http_response, + events_byte_size, + raw_byte_size, + }) + }) + } +} + #[cfg(test)] mod test { #![allow(clippy::print_stderr)] //tests diff --git a/website/cue/reference/components/sinks/base/http.cue b/website/cue/reference/components/sinks/base/http.cue index 36381b0119f58..7ee407825918d 100644 --- a/website/cue/reference/components/sinks/base/http.cue +++ b/website/cue/reference/components/sinks/base/http.cue @@ -305,9 +305,10 @@ base: components: sinks: http: configuration: { } } headers: { - deprecated: true - description: "A list of custom headers to add to each request." - required: false + deprecated: true + deprecated_message: "This option has been deprecated, use `request.headers` instead." + description: "A list of custom headers to add to each request." + required: false type: object: options: "*": { description: "An HTTP request header and it's value." required: true