Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(opensearch): show search results only if user has access permission to the index #5097

Merged
merged 11 commits into from
Jun 27, 2024
40 changes: 26 additions & 14 deletions crates/analytics/src/opensearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use opensearch::{
};
use serde_json::{json, Value};
use storage_impl::errors::ApplicationError;
use strum::IntoEnumIterator;

use super::{health_check::HealthCheck, query::QueryResult, types::QueryExecutionError};
use crate::query::QueryBuildingError;
Expand Down Expand Up @@ -78,6 +77,10 @@ pub enum OpenSearchError {
QueryBuildingError,
#[error("Opensearch deserialisation error")]
DeserialisationError,
#[error("Opensearch index access not present error: {0:?}")]
IndexAccessNotPermittedError(SearchIndex),
#[error("Opensearch unknown error")]
UnknownError,
}

impl ErrorSwitch<OpenSearchError> for QueryBuildingError {
Expand All @@ -97,28 +100,39 @@ impl ErrorSwitch<ApiErrorResponse> for OpenSearchError {
)),
Self::ResponseNotOK(response) => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
1,
format!("Something went wrong {}", response),
None,
)),
Self::ResponseError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
2,
"Something went wrong",
None,
)),
Self::QueryBuildingError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
3,
"Query building error",
None,
)),
Self::DeserialisationError => ApiErrorResponse::InternalServerError(ApiError::new(
"IR",
0,
4,
"Deserialisation error",
None,
)),
Self::IndexAccessNotPermittedError(index) => {
ApiErrorResponse::ForbiddenCommonResource(ApiError::new(
"IR",
5,
format!("Index access not permitted: {index:?}"),
None,
))
}
Self::UnknownError => {
ApiErrorResponse::InternalServerError(ApiError::new("IR", 4, "Unknown error", None))
}
}
}
}
Expand Down Expand Up @@ -179,18 +193,16 @@ impl OpenSearchClient {
query_builder: OpenSearchQueryBuilder,
) -> CustomResult<Response, OpenSearchError> {
match query_builder.query_type {
OpenSearchQuery::Msearch => {
let search_indexes = SearchIndex::iter();

OpenSearchQuery::Msearch(ref indexes) => {
let payload = query_builder
.construct_payload(search_indexes.clone().collect())
.construct_payload(indexes)
.change_context(OpenSearchError::QueryBuildingError)?;

let payload_with_indexes = payload.into_iter().zip(search_indexes).fold(
let payload_with_indexes = payload.into_iter().zip(indexes).fold(
Vec::new(),
|mut payload_with_indexes, (index_hit, index)| {
payload_with_indexes.push(
json!({"index": self.search_index_to_opensearch_index(index)}).into(),
json!({"index": self.search_index_to_opensearch_index(*index)}).into(),
);
payload_with_indexes.push(JsonBody::new(index_hit.clone()));
payload_with_indexes
Expand All @@ -207,7 +219,7 @@ impl OpenSearchClient {
OpenSearchQuery::Search(index) => {
let payload = query_builder
.clone()
.construct_payload(vec![index])
.construct_payload(&[index])
.change_context(OpenSearchError::QueryBuildingError)?;

let final_payload = payload.first().unwrap_or(&Value::Null);
Expand Down Expand Up @@ -349,7 +361,7 @@ pub struct OpenSearchHealth {

#[derive(Debug, Clone)]
pub enum OpenSearchQuery {
Msearch,
Msearch(Vec<SearchIndex>),
Search(SearchIndex),
}

Expand Down Expand Up @@ -384,7 +396,7 @@ impl OpenSearchQueryBuilder {
Ok(())
}

pub fn construct_payload(&self, indexes: Vec<SearchIndex>) -> QueryResult<Vec<Value>> {
pub fn construct_payload(&self, indexes: &[SearchIndex]) -> QueryResult<Vec<Value>> {
let mut query =
vec![json!({"multi_match": {"type": "phrase", "query": self.query, "lenient": true}})];

Expand Down
75 changes: 29 additions & 46 deletions crates/analytics/src/search.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use api_models::analytics::search::{
GetGlobalSearchRequest, GetSearchRequestWithIndex, GetSearchResponse, OpenMsearchOutput,
OpensearchOutput, SearchIndex,
OpensearchOutput, SearchIndex, SearchStatus,
};
use common_utils::errors::{CustomResult, ReportSwitchExt};
use error_stack::ResultExt;
use router_env::tracing;
use strum::IntoEnumIterator;

use crate::opensearch::{
OpenSearchClient, OpenSearchError, OpenSearchQuery, OpenSearchQueryBuilder,
Expand All @@ -15,8 +14,10 @@ pub async fn msearch_results(
client: &OpenSearchClient,
req: GetGlobalSearchRequest,
merchant_id: &String,
indexes: Vec<SearchIndex>,
) -> CustomResult<Vec<GetSearchResponse>, OpenSearchError> {
let mut query_builder = OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch, req.query);
let mut query_builder =
OpenSearchQueryBuilder::new(OpenSearchQuery::Msearch(indexes.clone()), req.query);

query_builder
.add_filter_clause("merchant_id".to_string(), merchant_id.to_string())
Expand All @@ -40,29 +41,19 @@ pub async fn msearch_results(
Ok(response_body
.responses
.into_iter()
.zip(SearchIndex::iter())
.zip(indexes)
.map(|(index_hit, index)| match index_hit {
OpensearchOutput::Success(success) => {
if success.status == 200 {
GetSearchResponse {
count: success.hits.total.value,
index,
hits: success
.hits
.hits
.into_iter()
.map(|hit| hit.source)
.collect(),
}
} else {
tracing::error!("Unexpected status code: {}", success.status,);
GetSearchResponse {
count: 0,
index,
hits: Vec::new(),
}
}
}
OpensearchOutput::Success(success) => GetSearchResponse {
count: success.hits.total.value,
index,
hits: success
.hits
.hits
.into_iter()
.map(|hit| hit.source)
.collect(),
status: SearchStatus::Success,
},
OpensearchOutput::Error(error) => {
tracing::error!(
index = ?index,
Expand All @@ -73,6 +64,7 @@ pub async fn msearch_results(
count: 0,
index,
hits: Vec::new(),
status: SearchStatus::Failure,
}
}
})
Expand Down Expand Up @@ -113,27 +105,17 @@ pub async fn search_results(
let response_body: OpensearchOutput = response_text;

match response_body {
OpensearchOutput::Success(success) => {
if success.status == 200 {
Ok(GetSearchResponse {
count: success.hits.total.value,
index: req.index,
hits: success
.hits
.hits
.into_iter()
.map(|hit| hit.source)
.collect(),
})
} else {
tracing::error!("Unexpected status code: {}", success.status);
Ok(GetSearchResponse {
count: 0,
index: req.index,
hits: Vec::new(),
})
}
}
OpensearchOutput::Success(success) => Ok(GetSearchResponse {
count: success.hits.total.value,
index: req.index,
hits: success
.hits
.hits
.into_iter()
.map(|hit| hit.source)
.collect(),
status: SearchStatus::Success,
}),
OpensearchOutput::Error(error) => {
tracing::error!(
index = ?req.index,
Expand All @@ -144,6 +126,7 @@ pub async fn search_results(
count: 0,
index: req.index,
hits: Vec::new(),
status: SearchStatus::Failure,
})
}
}
Expand Down
13 changes: 11 additions & 2 deletions crates/api_models/src/analytics/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ pub struct GetSearchRequestWithIndex {
pub search_req: GetSearchRequest,
}

#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)]
#[derive(
Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy, Eq, PartialEq,
)]
#[serde(rename_all = "snake_case")]
pub enum SearchIndex {
PaymentAttempts,
Expand All @@ -39,12 +41,20 @@ pub enum SearchIndex {
Disputes,
}

#[derive(Debug, strum::EnumIter, Clone, serde::Deserialize, serde::Serialize, Copy)]
pub enum SearchStatus {
Success,
Failure,
NoAccess,
}

#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetSearchResponse {
pub count: u64,
pub index: SearchIndex,
pub hits: Vec<Value>,
pub status: SearchStatus,
}

#[derive(Debug, serde::Deserialize)]
Expand Down Expand Up @@ -74,7 +84,6 @@ pub struct OpensearchErrorDetails {

#[derive(Debug, serde::Deserialize)]
pub struct OpensearchSuccess {
pub status: u16,
pub hits: OpensearchHits,
}

Expand Down
Loading
Loading