Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dsmith3197 committed Oct 20, 2023
1 parent e6a3cfc commit 351ea53
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ impl SinkConfig for ElasticsearchConfig {
services,
health_config,
ElasticsearchHealthLogic,
1,
);

let sink = ElasticsearchSink::new(&common, self, service)?;
Expand Down
7 changes: 6 additions & 1 deletion src/sinks/util/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,16 @@ impl TowerRequestSettings {
}

/// Distributes requests to services [(Endpoint, service, healthcheck)]
///
/// [BufferLayer] suggests that the `buffer_bound` should be at least equal to
/// the number of the callers of the service. For sinks, this should typically be 1.
pub fn distributed_service<Req, RL, HL, S>(
self,
retry_logic: RL,
services: Vec<(String, S)>,
health_config: HealthConfig,
health_logic: HL,
buffer_bound: usize,
) -> DistributedService<S, RL, HL, usize, Req>
where
Req: Clone + Send + 'static,
Expand Down Expand Up @@ -389,7 +393,8 @@ impl TowerRequestSettings {
ServiceBuilder::new()
.rate_limit(self.rate_limit_num, self.rate_limit_duration)
.retry(policy)
.layer(BufferLayer::new(1))
// [Balance] must be wrapped with a [BufferLayer] so that the overall service implements Clone.
.layer(BufferLayer::new(buffer_bound))
.service(Balance::new(Box::pin(stream::iter(services)) as Pin<Box<_>>))
}
}
Expand Down

0 comments on commit 351ea53

Please sign in to comment.