Skip to content

Commit

Permalink
enhancement(elasticsearch sink): Add option to specify a default inde…
Browse files Browse the repository at this point in the history
…x if template cannot be resolved for Elasticsearch destination index (#21953)

* add default fallback index option for elasticsearch destination

* add changelog

* fix tests

* update docs

* update changelog with suggestion
  • Loading branch information
ArunPiduguDD authored Dec 18, 2024
1 parent 485c38b commit 6de31c3
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Add an option to Elasticsearch sink to set a fallback index if the provided template in the `bulk.index` field
cannot be resolved

authors: ArunPiduguDD
6 changes: 6 additions & 0 deletions src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl ElasticsearchConfig {
match self.mode {
ElasticsearchMode::Bulk => Ok(ElasticsearchCommonMode::Bulk {
index: self.bulk.index.clone(),
template_fallback_index: self.bulk.template_fallback_index.clone(),
action: self.bulk.action.clone(),
version: self.bulk.version.clone(),
version_type: self.bulk.version_type,
Expand Down Expand Up @@ -296,6 +297,10 @@ pub struct BulkConfig {
#[configurable(metadata(docs::examples = "{{ index }}"))]
pub index: Template,

/// The default index to write events to if the template in `bulk.index` cannot be resolved
#[configurable(metadata(docs::examples = "test-index"))]
pub template_fallback_index: Option<String>,

/// Version field value.
#[configurable(metadata(docs::examples = "{{ obj_version }}-%Y-%m-%d"))]
#[configurable(metadata(docs::examples = "123"))]
Expand Down Expand Up @@ -329,6 +334,7 @@ impl Default for BulkConfig {
Self {
action: default_bulk_action(),
index: default_index(),
template_fallback_index: Default::default(),
version: Default::default(),
version_type: default_version_type(),
}
Expand Down
29 changes: 22 additions & 7 deletions src/sinks/elasticsearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ impl_generate_config_from_default!(ElasticsearchConfig);
pub enum ElasticsearchCommonMode {
Bulk {
index: Template,
template_fallback_index: Option<String>,
action: Template,
version: Option<Template>,
version_type: VersionType,
Expand All @@ -195,14 +196,28 @@ impl fmt::Display for VersionValueParseError<'_> {
impl ElasticsearchCommonMode {
fn index(&self, log: &LogEvent) -> Option<String> {
match self {
Self::Bulk { index, .. } => index
Self::Bulk {
index,
template_fallback_index,
..
} => index
.render_string(log)
.map_err(|error| {
emit!(TemplateRenderingError {
error,
field: Some("index"),
drop_event: true,
});
.or_else(|error| {
if let Some(fallback) = template_fallback_index {
emit!(TemplateRenderingError {
error,
field: Some("index"),
drop_event: false,
});
Ok(fallback.clone())
} else {
emit!(TemplateRenderingError {
error,
field: Some("index"),
drop_event: true,
});
Err(())
}
})
.ok(),
Self::DataStream(ds) => ds.index(log),
Expand Down
4 changes: 4 additions & 0 deletions src/sinks/elasticsearch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ async fn sets_create_action_when_configured() {
bulk: BulkConfig {
action: parse_template("{{ action }}te"),
index: parse_template("vector"),
template_fallback_index: None,
version: None,
version_type: VersionType::Internal,
},
Expand Down Expand Up @@ -70,6 +71,7 @@ async fn encoding_with_external_versioning_without_version_set_does_not_include_
let config = ElasticsearchConfig {
bulk: BulkConfig {
action: parse_template("create"),
template_fallback_index: None,
index: parse_template("vector"),
version: None,
version_type: VersionType::External,
Expand All @@ -92,6 +94,7 @@ async fn encoding_with_external_versioning_with_version_set_includes_version() {
bulk: BulkConfig {
action: parse_template("create"),
index: parse_template("vector"),
template_fallback_index: None,
version: Some(parse_template("{{ my_field }}")),
version_type: VersionType::External,
},
Expand Down Expand Up @@ -140,6 +143,7 @@ async fn encoding_with_external_gte_versioning_with_version_set_includes_version
bulk: BulkConfig {
action: parse_template("create"),
index: parse_template("vector"),
template_fallback_index: None,
version: Some(parse_template("{{ my_field }}")),
version_type: VersionType::ExternalGte,
},
Expand Down
5 changes: 5 additions & 0 deletions website/cue/reference/components/sinks/base/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ base: components: sinks: elasticsearch: configuration: {
syntax: "template"
}
}
template_fallback_index: {
description: "The default index to write events to if the template in `bulk.index` cannot be resolved"
required: false
type: string: examples: ["test-index"]
}
version: {
description: "Version field value."
required: false
Expand Down

0 comments on commit 6de31c3

Please sign in to comment.