Skip to content

Commit

Permalink
fix(elasticsearch sink): Elasticsearch sink with api_version set to "…
Browse files Browse the repository at this point in the history
…auto"

  does not recognize the API version of ES6 as V6 (vectordotdev#17226)
  • Loading branch information
syedriko committed Apr 27, 2023
1 parent 752d424 commit e84f793
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,13 @@ async fn get_version(
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct Version {
number: Option<String>
}
#[derive(Deserialize)]
struct ClusterState {
version: Option<usize>,
version: Option<Version>
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
Expand All @@ -289,7 +293,7 @@ async fn get_version(
region,
request,
client,
"/_cluster/state/version",
"/",
)
.await
.map_err(|error| format!("Failed to get Elasticsearch API version: {}", error))?;
Expand All @@ -298,7 +302,17 @@ async fn get_version(
let mut body = body::aggregate(body).await?;
let body = body.copy_to_bytes(body.remaining());
let ClusterState { version } = serde_json::from_slice(&body)?;
version.ok_or_else(||"Unexpected response from Elasticsearch endpoint `/_cluster/state/version`. Missing `version`. Consider setting `api_version` option.".into())
if let Some(version) = version {
if let Some(number) = version.number {
let v: Vec<&str> = number.split('.').collect();
if v.len() > 0 {
if let Ok(major_version) = v[0].parse::<usize>() {
return Ok(major_version)
}
}
}
}
return Err("Unexpected response from Elasticsearch endpoint `/`. Consider setting `api_version` option.".into())
}

async fn get(
Expand Down

0 comments on commit e84f793

Please sign in to comment.