Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(opensearch): Updated status filter field name to match index and added time-range based search #5468

Merged
merged 8 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 111 additions & 7 deletions crates/analytics/src/opensearch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use api_models::{
analytics::search::SearchIndex,
errors::types::{ApiError, ApiErrorResponse},
payments::TimeRange,
};
use aws_config::{self, meta::region::RegionProviderChain, Region};
use common_utils::errors::{CustomResult, ErrorSwitch};
Expand All @@ -18,8 +19,9 @@ use opensearch::{
},
MsearchParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use serde_json::{json, Map, Value};
use storage_impl::errors::ApplicationError;
use time::PrimitiveDateTime;

use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError};
use crate::query::QueryBuildingError;
Expand All @@ -40,6 +42,23 @@ pub struct OpenSearchIndexes {
pub disputes: String,
}

#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
pub struct OpensearchTimeRange {
#[serde(with = "common_utils::custom_serde::iso8601")]
pub gte: PrimitiveDateTime,
#[serde(default, with = "common_utils::custom_serde::iso8601::option")]
pub lte: Option<PrimitiveDateTime>,
}

impl From<TimeRange> for OpensearchTimeRange {
fn from(time_range: TimeRange) -> Self {
Self {
gte: time_range.start_time,
lte: time_range.end_time,
}
}
}

#[derive(Clone, Debug, serde::Deserialize)]
pub struct OpenSearchConfig {
host: String,
Expand Down Expand Up @@ -377,6 +396,7 @@ pub struct OpenSearchQueryBuilder {
pub offset: Option<i64>,
pub count: Option<i64>,
pub filters: Vec<(String, Vec<String>)>,
pub time_range: Option<OpensearchTimeRange>,
}

impl OpenSearchQueryBuilder {
Expand All @@ -387,6 +407,7 @@ impl OpenSearchQueryBuilder {
offset: Default::default(),
count: Default::default(),
filters: Default::default(),
time_range: Default::default(),
}
}

Expand All @@ -396,27 +417,110 @@ impl OpenSearchQueryBuilder {
Ok(())
}

pub fn set_time_range(&mut self, time_range: OpensearchTimeRange) -> QueryResult<()> {
self.time_range = Some(time_range);
Ok(())
}

pub fn add_filter_clause(&mut self, lhs: String, rhs: Vec<String>) -> QueryResult<()> {
self.filters.push((lhs, rhs));
Ok(())
}

pub fn get_status_field(&self, index: &SearchIndex) -> &str {
match index {
SearchIndex::Refunds => "refund_status.keyword",
SearchIndex::Disputes => "dispute_status.keyword",
_ => "status.keyword",
}
}

pub fn replace_status_field(&self, filters: &[Value], index: &SearchIndex) -> Vec<Value> {
filters
.iter()
.map(|filter| {
if let Some(terms) = filter.get("terms").and_then(|v| v.as_object()) {
let mut new_filter = filter.clone();
if let Some(new_terms) =
new_filter.get_mut("terms").and_then(|v| v.as_object_mut())
{
let key = "status.keyword";
if let Some(status_terms) = terms.get(key) {
new_terms.remove(key);
new_terms.insert(
self.get_status_field(index).to_string(),
status_terms.clone(),
);
}
}
new_filter
} else {
filter.clone()
}
})
.collect()
}

/// # Panics
///
/// This function will panic if:
///
/// * The structure of the JSON query is not as expected (e.g., missing keys or incorrect types).
///
/// Ensure that the input data and the structure of the query are valid and correctly handled.
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query =
vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})];
let mut query_obj = Map::new();
let mut bool_obj = Map::new();
let mut filter_array = Vec::new();

filter_array.push(json!({
"multi_match": {
"type": "phrase",
"query": self.query,
"lenient": true
}
}));

let mut filters = self
.filters
.iter()
.map(|(k, v)| json!({"terms" : {k : v}}))
.map(|(k, v)| json!({"terms": {k: v}}))
.collect::<Vec<Value>>();

query.append(&mut filters);
filter_array.append(&mut filters);

if let Some(ref time_range) = self.time_range {
let range = json!(time_range);
filter_array.push(json!({
"range": {
"timestamp": range
}
}));
}

bool_obj.insert("filter".to_string(), Value::Array(filter_array));
query_obj.insert("bool".to_string(), Value::Object(bool_obj));

let mut query = Map::new();
query.insert("query".to_string(), Value::Object(query_obj));

// TODO add index specific filters
Ok(indexes
.iter()
.map(|_index| json!({"query": {"bool": {"filter": query}}}))
.map(|index| {
let updated_query = query
.get("query")
.and_then(|q| q.get("bool"))
.and_then(|b| b.get("filter"))
.and_then(|f| f.as_array())
.map(|filters| self.replace_status_field(filters, index))
.unwrap_or_default();

let mut final_query = Map::new();
final_query.insert("bool".to_string(), json!({ "filter": updated_query }));

let payload = json!({ "query": Value::Object(final_query) });
payload
})
.collect::<Vec<Value>>())
}
}
9 changes: 9 additions & 0 deletions crates/analytics/src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ pub async fn msearch_results(
};
};

if let Some(time_range) = req.time_range {
query_builder.set_time_range(time_range.into()).switch()?;
};

let response_text: OpenMsearchOutput = client
.execute(query_builder)
.await
Expand Down Expand Up @@ -221,6 +225,11 @@ pub async fn search_results(
}
};
};

if let Some(time_range) = search_req.time_range {
query_builder.set_time_range(time_range.into()).switch()?;
};

query_builder
.set_offset_n_count(search_req.offset, search_req.count)
.switch()?;
Expand Down
6 changes: 6 additions & 0 deletions crates/api_models/src/analytics/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use common_utils::hashing::HashedString;
use masking::WithType;
use serde_json::Value;

use crate::payments::TimeRange;

#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize)]
pub struct SearchFilters {
pub payment_method: Option<Vec<String>>,
Expand All @@ -26,6 +28,8 @@ pub struct GetGlobalSearchRequest {
pub query: String,
#[serde(default)]
pub filters: Option<SearchFilters>,
#[serde(default)]
pub time_range: Option<TimeRange>,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
Expand All @@ -36,6 +40,8 @@ pub struct GetSearchRequest {
pub query: String,
#[serde(default)]
pub filters: Option<SearchFilters>,
#[serde(default)]
pub time_range: Option<TimeRange>,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
Expand Down
Loading