Skip to content

Commit

Permalink
elastic: break out events query into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonish committed Jun 25, 2024
1 parent 780d57e commit 1cebe97
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 105 deletions.
117 changes: 117 additions & 0 deletions src/elastic/eventrepo/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// SPDX-FileCopyrightText: (C) 2020 Jason Ish <jason@codemonkey.net>
// SPDX-License-Identifier: MIT

use super::ElasticEventRepo;
use crate::elastic::request;
use crate::eventrepo::{self, DatastoreError};
use crate::LOG_QUERIES;
use serde_json::json;
use tracing::error;
use tracing::info;
use tracing::warn;

const MINIMUM_SHOULD_MATCH: &str = "minimum_should_match";

impl ElasticEventRepo {
pub async fn events(
&self,
params: eventrepo::EventQueryParams,
) -> Result<serde_json::Value, DatastoreError> {
let mut filters = vec![request::exists_filter(&self.map_field("event_type"))];
let mut should = vec![];
let mut must_not = vec![];

if let Some(event_type) = params.event_type {
filters.push(request::term_filter(
&self.map_field("event_type"),
&event_type,
));
}

self.apply_query_string(
&params.query_string,
&mut filters,
&mut should,
&mut must_not,
);

if let Some(ts) = params.min_timestamp {
warn!("Unexpected min_timestamp of {}", &ts);
}

if let Some(ts) = params.max_timestamp {
warn!("Unexpected max_timestamp of {}", &ts);
}

let sort_by = params.sort_by.unwrap_or_else(|| "@timestamp".to_string());
let sort_order = params.order.unwrap_or_else(|| "desc".to_string());
let size = params.size.unwrap_or(500);

let mut body = json!({
"query": {
"bool": {
"filter": filters,
"must_not": must_not,
}
},
"sort": [{sort_by: {"order": sort_order}}],
"size": size,
});

if !should.is_empty() {
body["query"]["bool"]["should"] = should.into();
body["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into();
}

if *LOG_QUERIES {
info!("{}", &body);
}

let response = self.search(&body).await?;
let response: serde_json::Value = response.json().await?;

if let Some(error) = response["error"].as_object() {
// Find the first reason, may be deeply nested.
if let serde_json::Value::String(reason) = &error["caused_by"]["reason"] {
error!(
"Failed to execute event query: error={}; query={}",
reason,
serde_json::to_string(&body).unwrap()
);
return Err(anyhow::anyhow!("{}", reason))?;
}
}

// Another way we can get errors from
// Elasticsearch/Opensearch, even with a 200 status code.
if let Some(failure) = response["_shards"]["failures"]
.as_array()
.and_then(|v| v.first())
{
warn!(
"Elasticsearch reported failures, the first being: {:?}",
failure
);
}

let hits = &response["hits"]["hits"];

let mut events = vec![];
if let Some(hits) = hits.as_array() {
for hit in hits {
let mut hit = hit.clone();
if self.ecs {
self.transform_ecs(&mut hit);
}
events.push(hit);
}
}

let response = json!({
"ecs": self.ecs,
"events": events,
});

Ok(response)
}
}
107 changes: 2 additions & 105 deletions src/elastic/eventrepo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@ use crate::datetime;
use crate::elastic::importer::ElasticEventSink;
use crate::elastic::request::exists_filter;
use crate::elastic::{request, ElasticResponse, TAGS_ARCHIVED, TAGS_ESCALATED, TAG_ARCHIVED};
use crate::eventrepo::{self, DatastoreError};
use crate::eventrepo::DatastoreError;
use crate::queryparser;
use crate::queryparser::QueryElement;
use crate::queryparser::QueryParser;
use crate::server::api;
use crate::server::session::Session;
use crate::util;
use crate::LOG_QUERIES;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;

mod alerts;
mod dhcp;
mod events;
mod stats;

const MINIMUM_SHOULD_MATCH: &str = "minimum_should_match";
Expand Down Expand Up @@ -491,108 +490,6 @@ impl ElasticEventRepo {
.await
}

pub async fn events(
&self,
params: eventrepo::EventQueryParams,
) -> Result<serde_json::Value, DatastoreError> {
let mut filters = vec![request::exists_filter(&self.map_field("event_type"))];
let mut should = vec![];
let mut must_not = vec![];

if let Some(event_type) = params.event_type {
filters.push(request::term_filter(
&self.map_field("event_type"),
&event_type,
));
}

self.apply_query_string(
&params.query_string,
&mut filters,
&mut should,
&mut must_not,
);

if let Some(ts) = params.min_timestamp {
warn!("Unexpected min_timestamp of {}", &ts);
}

if let Some(ts) = params.max_timestamp {
warn!("Unexpected max_timestamp of {}", &ts);
}

let sort_by = params.sort_by.unwrap_or_else(|| "@timestamp".to_string());
let sort_order = params.order.unwrap_or_else(|| "desc".to_string());
let size = params.size.unwrap_or(500);

let mut body = json!({
"query": {
"bool": {
"filter": filters,
"must_not": must_not,
}
},
"sort": [{sort_by: {"order": sort_order}}],
"size": size,
});

if !should.is_empty() {
body["query"]["bool"]["should"] = should.into();
body["query"]["bool"][MINIMUM_SHOULD_MATCH] = 1.into();
}

if *LOG_QUERIES {
info!("{}", &body);
}

let response = self.search(&body).await?;
let response: serde_json::Value = response.json().await?;

if let Some(error) = response["error"].as_object() {
// Find the first reason, may be deeply nested.
if let serde_json::Value::String(reason) = &error["caused_by"]["reason"] {
error!(
"Failed to execute event query: error={}; query={}",
reason,
serde_json::to_string(&body).unwrap()
);
return Err(anyhow::anyhow!("{}", reason))?;
}
}

// Another way we can get errors from
// Elasticsearch/Opensearch, even with a 200 status code.
if let Some(failure) = response["_shards"]["failures"]
.as_array()
.and_then(|v| v.first())
{
warn!(
"Elasticsearch reported failures, the first being: {:?}",
failure
);
}

let hits = &response["hits"]["hits"];

let mut events = vec![];
if let Some(hits) = hits.as_array() {
for hit in hits {
let mut hit = hit.clone();
if self.ecs {
self.transform_ecs(&mut hit);
}
events.push(hit);
}
}

let response = json!({
"ecs": self.ecs,
"events": events,
});

Ok(response)
}

async fn get_earliest_timestamp(
&self,
) -> Result<Option<crate::datetime::DateTime>, DatastoreError> {
Expand Down

0 comments on commit 1cebe97

Please sign in to comment.