Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(http sink): refactor to new style #18200

Merged
merged 22 commits into from
Aug 19, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,12 @@ where the data is actually sent.

# Service

**⚠ NOTE! This section implements an HTTP tower `Service` from scratch, for the
purpose of demonstration only. Many sinks will require implementing `Service`
in this way. Any new HTTP based sink should ideally utilize the structure
`HttpService`, which handles most of what this section discusses in a shared
structure that HTTP based sinks can utilize.**
neuronull marked this conversation as resolved.
Show resolved Hide resolved

We need to create a [`Tower`][tower] service that is responsible for actually
sending our final encoded data.

Expand Down
3 changes: 3 additions & 0 deletions src/sinks/appsignal/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use super::request_builder::AppsignalRequest;

#[derive(Clone)]
pub(super) struct AppsignalService {
// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
// This sink should undergo a refactor to utilize the `HttpService`
// instead, which extracts much of the boilerplate code for `Service`.
pub(super) batch_service:
HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, AppsignalRequest>,
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/datadog/events/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ impl DriverResponse for DatadogEventsResponse {

#[derive(Clone)]
pub struct DatadogEventsService {
// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
// This sink should undergo a refactor to utilize the `HttpService`
// instead, which extracts much of the boilerplate code for `Service`.
batch_http_service:
HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, DatadogEventsRequest>,
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/elasticsearch/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ impl MetaDescriptive for ElasticsearchRequest {

#[derive(Clone)]
pub struct ElasticsearchService {
// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
// This sink should undergo a refactor to utilize the `HttpService`
// instead, which extracts much of the boilerplate code for `Service`.
batch_service: HttpBatchService<
BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>>,
ElasticsearchRequest,
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/http/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ pub(super) struct HttpBatchSizer {
impl ItemBatchSize<Event> for HttpBatchSizer {
fn size(&self, item: &Event) -> usize {
match self.encoder.serializer() {
codecs::encoding::Serializer::Json(_) => item.estimated_json_encoded_size_of().get(),
codecs::encoding::Serializer::Json(_) | codecs::encoding::Serializer::NativeJson(_) => {
item.estimated_json_encoded_size_of().get()
}
_ => item.size_of(),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl SinkConfig for HttpSinkConfig {
.to_string()
});

let http_service_request_builder = HttpSinkRequestBuilder::new(
let http_sink_request_builder = HttpSinkRequestBuilder::new(
self.uri.with_default_parts(),
self.method,
self.auth.choose_one(&self.uri.auth)?,
Expand All @@ -284,7 +284,7 @@ impl SinkConfig for HttpSinkConfig {
content_encoding,
);

let service = HttpService::new(client, http_service_request_builder);
let service = HttpService::new(client, http_sink_request_builder);

let request_limits = self.request.tower.unwrap_with(&Default::default());

Expand Down
17 changes: 9 additions & 8 deletions src/sinks/http/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ impl SinkEncoder<Vec<Event>> for HttpEncoder {
let mut byte_size = telemetry().create_request_count_byte_size();
let mut body = BytesMut::new();

match (self.encoder.serializer(), self.encoder.framer()) {
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
body.put(self.payload_prefix.as_bytes());
body.put_u8(b'[');
}
_ => {}
}

for mut event in events {
self.transformer.transform(&mut event);

Expand All @@ -72,14 +80,7 @@ impl SinkEncoder<Vec<Event>> for HttpEncoder {
}
}
(Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
// TODO(https://github.com/vectordotdev/vector/issues/11253):
// Prepend before building a request body to eliminate the
// additional copy here.
let message = body.split();
body.put(self.payload_prefix.as_bytes());
body.put_u8(b'[');
if !message.is_empty() {
body.unsplit(message);
if !body.is_empty() {
// remove trailing comma from last record
body.truncate(body.len() - 1);
}
Expand Down
3 changes: 1 addition & 2 deletions src/sinks/http/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ where
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let service = ServiceBuilder::new().service(self.service);
input
// Batch the input stream with size calculation based on the configured codec
.batched(self.batch_settings.into_item_size_config(HttpBatchSizer {
Expand All @@ -51,7 +50,7 @@ where
})
// Generate the driver that will send requests and handle retries,
// event finalization, and logging/internal metric reporting.
.into_driver(service)
.into_driver(self.service)
.run()
.await
}
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ pub fn create_client(
Ok(HttpClient::new(tls_settings, proxy_config)?)
}

// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
// This sink should undergo a refactor to utilize the `HttpService`
// instead, which extracts much of the boilerplate code for `Service`.
pub fn build_http_batch_service(
client: HttpClient,
http_request_builder: Arc<HttpRequestBuilder>,
Expand Down
19 changes: 14 additions & 5 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf};

use super::{
retries::{RetryAction, RetryLogic},
sink, uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink,
TowerRequestConfig, TowerRequestSettings,
sink::{self, Response as _},
uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
TowerRequestSettings,
};
use crate::{
event::Event,
Expand Down Expand Up @@ -356,6 +357,12 @@ where
}
}

/// @struct HttpBatchService
///
/// NOTE: This has been deprecated, please do not use <directly> when creating new sinks.
/// The `HttpService` currently wraps this structure. Eventually all sinks currently using the
/// HttpBatchService directly should be updated to use `HttpService`. At which time we can
/// remove this struct and inline the functionality into the `HttpService` directly.
pub struct HttpBatchService<F, B = Bytes> {
inner: HttpClient<Body>,
request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
Expand Down Expand Up @@ -653,8 +660,10 @@ pub struct HttpResponse {

impl DriverResponse for HttpResponse {
fn event_status(&self) -> EventStatus {
if self.http_response.status().is_success() {
if self.http_response.is_successful() {
EventStatus::Delivered
} else if self.http_response.is_transient() {
EventStatus::Errored
} else {
EventStatus::Rejected
}
Expand All @@ -669,12 +678,12 @@ impl DriverResponse for HttpResponse {
}
}

/// HTTP request builder for batched HTTP stream sinks using the generic `HttpService`
/// HTTP request builder for HTTP stream sinks using the generic `HttpService`
pub trait HttpServiceRequestBuilder {
fn build(&self, body: Bytes) -> Request<Bytes>;
}

/// Generic 'Service' implementation for batched HTTP stream sinks.
/// Generic 'Service' implementation for HTTP stream sinks.
#[derive(Clone)]
pub struct HttpService<B> {
batch_service:
Expand Down