Skip to content

Commit

Permalink
fix(elasticsearch sink): Elasticsearch sink with api_version set to "…
Browse files Browse the repository at this point in the history
…auto"

  does not recognize the API version of ES6 as V6 (vectordotdev#17226)
  • Loading branch information
syedriko committed Apr 27, 2023
1 parent 752d424 commit a26bd23
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl ElasticsearchCommon {
%assumed_version,
%config.suppress_type_name
);
warn!(message = "Failed to determine Elasticsearch version from `/_cluster/state/version`. Please fix the reported error or set an API version explicitly via `api_version`.",
warn!(message = "Failed to determine Elasticsearch version. Please fix the reported error or set an API version explicitly via `api_version`.",
%assumed_version,
%error
);
Expand Down Expand Up @@ -276,29 +276,35 @@ async fn get_version(
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct Version {
number: Option<String>,
}
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
version: Option<Version>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let response = get(
base_url,
http_auth,
aws_auth,
region,
request,
client,
"/_cluster/state/version",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
let response = get(base_url, http_auth, aws_auth, region, request, client, "/")
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;

let (_, body) = response.into_parts();
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
if let Some(version) = version {
if let Some(number) = version.number {
let v: Vec<&str> = number.split('.').collect();
if v.len() > 0 {
if let Ok(major_version) = v[0].parse::<usize>() {
return Ok(major_version);
}
}
}
}
return Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into());
}

async fn get(
Expand Down

0 comments on commit a26bd23

Please sign in to comment.