From 1cebe977020c33a695b59263c64409201cf45a47 Mon Sep 17 00:00:00 2001 From: Jason Ish Date: Tue, 25 Jun 2024 16:30:41 -0600 Subject: [PATCH] elastic: break out events query into its own file --- src/elastic/eventrepo/events.rs | 117 ++++++++++++++++++++++++++++++++ src/elastic/eventrepo/mod.rs | 107 +---------------------------- 2 files changed, 119 insertions(+), 105 deletions(-) create mode 100644 src/elastic/eventrepo/events.rs diff --git a/src/elastic/eventrepo/events.rs b/src/elastic/eventrepo/events.rs new file mode 100644 index 00000000..47b10db8 --- /dev/null +++ b/src/elastic/eventrepo/events.rs @@ -0,0 +1,117 @@ +// SPDX-FileCopyrightText: (C) 2020 Jason Ish +// SPDX-License-Identifier: MIT + +use super::ElasticEventRepo; +use crate::elastic::request; +use crate::eventrepo::{self, DatastoreError}; +use crate::LOG_QUERIES; +use serde_json::json; +use tracing::error; +use tracing::info; +use tracing::warn; + +const MINIMUM_SHOULD_MATCH: &str = "minimum_should_match"; + +impl ElasticEventRepo { + pub async fn events( + &self, + params: eventrepo::EventQueryParams, + ) -> Result { + let mut filters = vec![request::exists_filter(&self.map_field("event_type"))]; + let mut should = vec![]; + let mut must_not = vec![]; + + if let Some(event_type) = params.event_type { + filters.push(request::term_filter( + &self.map_field("event_type"), + &event_type, + )); + } + + self.apply_query_string( + ¶ms.query_string, + &mut filters, + &mut should, + &mut must_not, + ); + + if let Some(ts) = params.min_timestamp { + warn!("Unexpected min_timestamp of {}", &ts); + } + + if let Some(ts) = params.max_timestamp { + warn!("Unexpected max_timestamp of {}", &ts); + } + + let sort_by = params.sort_by.unwrap_or_else(|| "@timestamp".to_string()); + let sort_order = params.order.unwrap_or_else(|| "desc".to_string()); + let size = params.size.unwrap_or(500); + + let mut body = json!({ + "query": { + "bool": { + "filter": filters, + "must_not": must_not, + } + }, + "sort": [{sort_by: {"order": sort_order}}], + "size": size, + }); + + if !should.is_empty() { + body["query"]["bool"]["should"] = should.into(); + body["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into(); + } + + if *LOG_QUERIES { + info!("{}", &body); + } + + let response = self.search(&body).await?; + let response: serde_json::Value = response.json().await?; + + if let Some(error) = response["error"].as_object() { + // Find the first reason, may be deeply nested. + if let serde_json::Value::String(reason) = &error["caused_by"]["reason"] { + error!( + "Failed to execute event query: error={}; query={}", + reason, + serde_json::to_string(&body).unwrap() + ); + return Err(anyhow::anyhow!("{}", reason))?; + } + } + + // Another way we can get errors from + // Elasticsearch/Opensearch, even with a 200 status code. + if let Some(failure) = response["_shards"]["failures"] + .as_array() + .and_then(|v| v.first()) + { + warn!( + "Elasticsearch reported failures, the first being: {:?}", + failure + ); + } + + let hits = &response["hits"]["hits"]; + + let mut events = vec![]; + if let Some(hits) = hits.as_array() { + for hit in hits { + let mut hit = hit.clone(); + if self.ecs { + self.transform_ecs(&mut hit); + } + events.push(hit); + } + } + + let response = json!({ + "ecs": self.ecs, + "events": events, + }); + + Ok(response) + } +} diff --git a/src/elastic/eventrepo/mod.rs b/src/elastic/eventrepo/mod.rs index 9ec411d3..d16cb478 100644 --- a/src/elastic/eventrepo/mod.rs +++ b/src/elastic/eventrepo/mod.rs @@ -11,24 +11,23 @@ use crate::datetime; use crate::elastic::importer::ElasticEventSink; use crate::elastic::request::exists_filter; use crate::elastic::{request, ElasticResponse, TAGS_ARCHIVED, TAGS_ESCALATED, TAG_ARCHIVED}; -use crate::eventrepo::{self, DatastoreError}; +use crate::eventrepo::DatastoreError; use crate::queryparser; use crate::queryparser::QueryElement; use crate::queryparser::QueryParser; use crate::server::api; use crate::server::session::Session; use crate::util; -use crate::LOG_QUERIES; use serde::{Deserialize, Serialize}; use serde_json::json; use std::sync::Arc; use tracing::debug; use tracing::error; -use tracing::info; use tracing::warn; mod alerts; mod dhcp; +mod events; mod stats; const MINIMUM_SHOULD_MATCH: &str = "minimum_should_match"; @@ -491,108 +490,6 @@ impl ElasticEventRepo { .await } - pub async fn events( - &self, - params: eventrepo::EventQueryParams, - ) -> Result { - let mut filters = vec![request::exists_filter(&self.map_field("event_type"))]; - let mut should = vec![]; - let mut must_not = vec![]; - - if let Some(event_type) = params.event_type { - filters.push(request::term_filter( - &self.map_field("event_type"), - &event_type, - )); - } - - self.apply_query_string( - ¶ms.query_string, - &mut filters, - &mut should, - &mut must_not, - ); - - if let Some(ts) = params.min_timestamp { - warn!("Unexpected min_timestamp of {}", &ts); - } - - if let Some(ts) = params.max_timestamp { - warn!("Unexpected max_timestamp of {}", &ts); - } - - let sort_by = params.sort_by.unwrap_or_else(|| "@timestamp".to_string()); - let sort_order = params.order.unwrap_or_else(|| "desc".to_string()); - let size = params.size.unwrap_or(500); - - let mut body = json!({ - "query": { - "bool": { - "filter": filters, - "must_not": must_not, - } - }, - "sort": [{sort_by: {"order": sort_order}}], - "size": size, - }); - - if !should.is_empty() { - body["query"]["bool"]["should"] = should.into(); - body["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into(); - } - - if *LOG_QUERIES { - info!("{}", &body); - } - - let response = self.search(&body).await?; - let response: serde_json::Value = response.json().await?; - - if let Some(error) = response["error"].as_object() { - // Find the first reason, may be deeply nested. - if let serde_json::Value::String(reason) = &error["caused_by"]["reason"] { - error!( - "Failed to execute event query: error={}; query={}", - reason, - serde_json::to_string(&body).unwrap() - ); - return Err(anyhow::anyhow!("{}", reason))?; - } - } - - // Another way we can get errors from - // Elasticsearch/Opensearch, even with a 200 status code. - if let Some(failure) = response["_shards"]["failures"] - .as_array() - .and_then(|v| v.first()) - { - warn!( - "Elasticsearch reported failures, the first being: {:?}", - failure - ); - } - - let hits = &response["hits"]["hits"]; - - let mut events = vec![]; - if let Some(hits) = hits.as_array() { - for hit in hits { - let mut hit = hit.clone(); - if self.ecs { - self.transform_ecs(&mut hit); - } - events.push(hit); - } - } - - let response = json!({ - "ecs": self.ecs, - "events": events, - }); - - Ok(response) - } - async fn get_earliest_timestamp( &self, ) -> Result, DatastoreError> {