diff --git a/crates/analytics/src/opensearch.rs b/crates/analytics/src/opensearch.rs index 1031c8154481..11ea4e4f7ab3 100644 --- a/crates/analytics/src/opensearch.rs +++ b/crates/analytics/src/opensearch.rs @@ -1,6 +1,7 @@ use api_models::{ analytics::search::SearchIndex, errors::types::{ApiError, ApiErrorResponse}, + payments::TimeRange, }; use aws_config::{self, meta::region::RegionProviderChain, Region}; use common_utils::errors::{CustomResult, ErrorSwitch}; @@ -18,8 +19,9 @@ use opensearch::{ }, MsearchParts, OpenSearch, SearchParts, }; -use serde_json::{json, Value}; +use serde_json::{json, Map, Value}; use storage_impl::errors::ApplicationError; +use time::PrimitiveDateTime; use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError}; use crate::query::QueryBuildingError; @@ -40,6 +42,23 @@ pub struct OpenSearchIndexes { pub disputes: String, } +#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)] +pub struct OpensearchTimeRange { + #[serde(with = "common_utils::custom_serde::iso8601")] + pub gte: PrimitiveDateTime, + #[serde(default, with = "common_utils::custom_serde::iso8601::option")] + pub lte: Option, +} + +impl From for OpensearchTimeRange { + fn from(time_range: TimeRange) -> Self { + Self { + gte: time_range.start_time, + lte: time_range.end_time, + } + } +} + #[derive(Clone, Debug, serde::Deserialize)] pub struct OpenSearchConfig { host: String, @@ -377,6 +396,7 @@ pub struct OpenSearchQueryBuilder { pub offset: Option, pub count: Option, pub filters: Vec<(String, Vec)>, + pub time_range: Option, } impl OpenSearchQueryBuilder { @@ -387,6 +407,7 @@ impl OpenSearchQueryBuilder { offset: Default::default(), count: Default::default(), filters: Default::default(), + time_range: Default::default(), } } @@ -396,27 +417,110 @@ impl OpenSearchQueryBuilder { Ok(()) } + pub fn set_time_range(&mut self, time_range: OpensearchTimeRange) -> QueryResult<()> { + self.time_range = Some(time_range); + Ok(()) + } + pub fn add_filter_clause(&mut self, lhs: String, rhs: Vec) -> QueryResult<()> { self.filters.push((lhs, rhs)); Ok(()) } + pub fn get_status_field(&self, index: &SearchIndex) -> &str { + match index { + SearchIndex::Refunds => "refund_status.keyword", + SearchIndex::Disputes => "dispute_status.keyword", + _ => "status.keyword", + } + } + + pub fn replace_status_field(&self, filters: &[Value], index: &SearchIndex) -> Vec { + filters + .iter() + .map(|filter| { + if let Some(terms) = filter.get("terms").and_then(|v| v.as_object()) { + let mut new_filter = filter.clone(); + if let Some(new_terms) = + new_filter.get_mut("terms").and_then(|v| v.as_object_mut()) + { + let key = "status.keyword"; + if let Some(status_terms) = terms.get(key) { + new_terms.remove(key); + new_terms.insert( + self.get_status_field(index).to_string(), + status_terms.clone(), + ); + } + } + new_filter + } else { + filter.clone() + } + }) + .collect() + } + + /// # Panics + /// + /// This function will panic if: + /// + /// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types). + /// + /// Ensure that the input data and the structure of the query are valid and correctly handled. pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult> { - let mut query = - vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})]; + let mut query_obj = Map::new(); + let mut bool_obj = Map::new(); + let mut filter_array = Vec::new(); + + filter_array.push(json!({ + "multi_match": { + "type": "phrase", + "query": self.query, + "lenient": true + } + })); let mut filters = self .filters .iter() - .map(|(k, v)| json!({"terms" : {k : v}})) + .map(|(k, v)| json!({"terms": {k: v}})) .collect::>(); - query.append(&mut filters); + filter_array.append(&mut filters); + + if let Some(ref time_range) = self.time_range { + let range = json!(time_range); + filter_array.push(json!({ + "range": { + "timestamp": range + } + })); + } + + bool_obj.insert("filter".to_string(), Value::Array(filter_array)); + query_obj.insert("bool".to_string(), Value::Object(bool_obj)); + + let mut query = Map::new(); + query.insert("query".to_string(), Value::Object(query_obj)); - // TODO add index specific filters Ok(indexes .iter() - .map(|_index| json!({"query": {"bool": {"filter": query}}})) + .map(|index| { + let updated_query = query + .get("query") + .and_then(|q| q.get("bool")) + .and_then(|b| b.get("filter")) + .and_then(|f| f.as_array()) + .map(|filters| self.replace_status_field(filters, index)) + .unwrap_or_default(); + + let mut final_query = Map::new(); + final_query.insert("bool".to_string(), json!({ "filter": updated_query })); + + let payload = json!({ "query": Value::Object(final_query) }); + payload + }) .collect::>()) } } diff --git a/crates/analytics/src/search.rs b/crates/analytics/src/search.rs index 12ef7fb8d968..95b7f204b977 100644 --- a/crates/analytics/src/search.rs +++ b/crates/analytics/src/search.rs @@ -97,6 +97,10 @@ pub async fn msearch_results( }; }; + if let Some(time_range) = req.time_range { + query_builder.set_time_range(time_range.into()).switch()?; + }; + let response_text: OpenMsearchOutput = client .execute(query_builder) .await @@ -221,6 +225,11 @@ pub async fn search_results( } }; }; + + if let Some(time_range) = search_req.time_range { + query_builder.set_time_range(time_range.into()).switch()?; + }; + query_builder .set_offset_n_count(search_req.offset, search_req.count) .switch()?; diff --git a/crates/api_models/src/analytics/search.rs b/crates/api_models/src/analytics/search.rs index f27af75936d3..b962f60cae07 100644 --- a/crates/api_models/src/analytics/search.rs +++ b/crates/api_models/src/analytics/search.rs @@ -2,6 +2,8 @@ use common_utils::hashing::HashedString; use masking::WithType; use serde_json::Value; +use crate::payments::TimeRange; + #[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)] pub struct SearchFilters { pub payment_method: Option>, @@ -26,6 +28,8 @@ pub struct GetGlobalSearchRequest { pub query: String, #[serde(default)] pub filters: Option, + #[serde(default)] + pub time_range: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -36,6 +40,8 @@ pub struct GetSearchRequest { pub query: String, #[serde(default)] pub filters: Option, + #[serde(default)] + pub time_range: Option, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]