Skip to content

Commit

Permalink
fix(sinks): set fixed buffer size for distributed service (#18699)
Browse files Browse the repository at this point in the history
* fix(sinks): set fixed buffer size for distributed service

* add comments

* add regression test

* fmt

* fmt
  • Loading branch information
dsmith3197 authored Nov 10, 2023
1 parent 0cd6fd2 commit 2e340d0
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 2 deletions.
Empty file.
16 changes: 16 additions & 0 deletions regression/cases/http_elasticsearch/lading/lading.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
generator:
- http:
seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53,
59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131]
target_uri: "http://localhost:8282/"
bytes_per_second: "500 Mb"
parallel_connections: 10
method:
post:
variant: "json"
maximum_prebuild_cache_size_bytes: "256 Mb"
headers: {}

blackhole:
- http:
binding_addr: "0.0.0.0:8080"
44 changes: 44 additions & 0 deletions regression/cases/http_elasticsearch/vector/vector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
data_dir: "/var/lib/vector"

##
## Sources
##

sources:
internal_metrics:
type: "internal_metrics"

logs:
type: "http"
address: "0.0.0.0:8282"
decoding:
codec: "json"

##
## Transforms
##

## No transforms specified in the original TOML configuration.

##
## Sinks
##

sinks:
prometheus:
type: "prometheus_exporter"
inputs: [ "internal_metrics" ]
address: "0.0.0.0:9090"

elasticsearch:
type: "elasticsearch"
inputs: [ "logs" ]
endpoints: [ "http://localhost:8080" ]
api_version: "v8"
mode: "bulk"
pipeline: "pipeline-name"
compression: "none"
bulk:
index: "vector-%F"
healthcheck:
enabled: false
1 change: 1 addition & 0 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ impl SinkConfig for ElasticsearchConfig {
services,
health_config,
ElasticsearchHealthLogic,
1,
);

let sink = ElasticsearchSink::new(&common, self, service)?;
Expand Down
8 changes: 6 additions & 2 deletions src/sinks/util/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,16 @@ impl TowerRequestSettings {
}

/// Distributes requests to services [(Endpoint, service, healthcheck)]
///
/// [BufferLayer] suggests that the `buffer_bound` should be at least equal to
/// the number of the callers of the service. For sinks, this should typically be 1.
pub fn distributed_service<Req, RL, HL, S>(
self,
retry_logic: RL,
services: Vec<(String, S)>,
health_config: HealthConfig,
health_logic: HL,
buffer_bound: usize,
) -> DistributedService<S, RL, HL, usize, Req>
where
Req: Clone + Send + 'static,
Expand All @@ -360,7 +364,6 @@ impl TowerRequestSettings {

// Build services
let open = OpenGauge::new();
let max_concurrency = services.len() * AdaptiveConcurrencySettings::max_concurrency();
let services = services
.into_iter()
.map(|(endpoint, inner)| {
Expand Down Expand Up @@ -390,7 +393,8 @@ impl TowerRequestSettings {
ServiceBuilder::new()
.rate_limit(self.rate_limit_num, self.rate_limit_duration)
.retry(policy)
.layer(BufferLayer::new(max_concurrency))
// [Balance] must be wrapped with a [BufferLayer] so that the overall service implements Clone.
.layer(BufferLayer::new(buffer_bound))
.service(Balance::new(Box::pin(stream::iter(services)) as Pin<Box<_>>))
}
}
Expand Down

0 comments on commit 2e340d0

Please sign in to comment.