Skip to content

Commit

Permalink
support _source_excludes,_source_includes in ES API (#4572)
Browse files Browse the repository at this point in the history
* support _source_excludes,_source_includes in ES API

* handle nested paths
  • Loading branch information
PSeitz authored Feb 15, 2024
1 parent 15979c2 commit b2799e0
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 11 deletions.
265 changes: 254 additions & 11 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,19 @@ async fn es_compat_index_search(
search_body: SearchBody,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticsearchResponse, ElasticsearchError> {
let _source_excludes = search_params._source_excludes.clone();
let _source_includes = search_params._source_includes.clone();
let start_instant = Instant::now();
let (search_request, append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let elapsed = start_instant.elapsed();
let mut search_response_rest: ElasticsearchResponse =
convert_to_es_search_response(search_response, append_shard_doc);
let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response(
search_response,
append_shard_doc,
_source_excludes,
_source_includes,
);
search_response_rest.took = elapsed.as_millis() as u32;
Ok(search_response_rest)
}
Expand Down Expand Up @@ -481,9 +487,113 @@ async fn es_compat_index_field_capabilities(
Ok(search_response_rest)
}

fn convert_hit(hit: quickwit_proto::search::Hit, append_shard_doc: bool) -> ElasticHit {
let fields: BTreeMap<String, serde_json::Value> =
serde_json::from_str(&hit.json).unwrap_or_default();
fn filter_source(
value: &mut serde_json::Value,
_source_excludes: &Option<Vec<String>>,
_source_includes: &Option<Vec<String>>,
) {
fn remove_path(value: &mut serde_json::Value, path: &str) {
for (prefix, suffix) in generate_path_variants_with_suffix(path) {
match value {
serde_json::Value::Object(ref mut map) => {
if let Some(suffix) = suffix {
if let Some(sub_value) = map.get_mut(prefix) {
remove_path(sub_value, suffix);
return;
}
} else {
map.remove(prefix);
}
}
_ => continue,
}
}
}
fn retain_includes(
value: &mut serde_json::Value,
current_path: &str,
include_paths: &Vec<String>,
) {
if let Some(ref mut map) = value.as_object_mut() {
map.retain(|key, sub_value| {
let path = if current_path.is_empty() {
key.to_string()
} else {
format!("{}.{}", current_path, key)
};

if include_paths.contains(&path) {
// Exact match keep whole node
return true;
}
// Check if the path is sub path of any allowed path
for allowed_path in include_paths {
if allowed_path.starts_with(path.as_str()) {
retain_includes(sub_value, &path, include_paths);
return true;
}
}
false
});
}
}

// Remove fields that are not included
if let Some(includes) = _source_includes {
retain_includes(value, "", includes);
}

// Remove fields that are excluded
if let Some(excludes) = _source_excludes {
for exclude in excludes {
remove_path(value, exclude);
}
}
}

/// "app.id.name" -> [("app", Some("id.name")), ("app.id", Some("name")), ("app.id.name", None)]
fn generate_path_variants_with_suffix(input: &str) -> Vec<(&str, Option<&str>)> {
let mut variants = Vec::new();

// Iterate over each character in the input.
for (idx, ch) in input.char_indices() {
if ch == '.' {
// If a dot is found, create a variant using the current slice and the remainder of the
// string.
let prefix = &input[0..idx];
let suffix = if idx + 1 < input.len() {
Some(&input[idx + 1..])
} else {
None
};
variants.push((prefix, suffix));
}
}

variants.push((&input[0..], None));

variants
}

fn convert_hit(
hit: quickwit_proto::search::Hit,
append_shard_doc: bool,
_source_excludes: &Option<Vec<String>>,
_source_includes: &Option<Vec<String>>,
) -> ElasticHit {
let mut json: serde_json::Value = serde_json::from_str(&hit.json).unwrap_or(json!({}));
filter_source(&mut json, _source_excludes, _source_includes);
let source =
Source::from_string(serde_json::to_string(&json).unwrap_or_else(|_| "{}".to_string()))
.unwrap_or_else(|_| Source::from_string("{}".to_string()).unwrap());

let mut fields: BTreeMap<String, serde_json::Value> = Default::default();
if let serde_json::Value::Object(map) = json {
for (key, val) in map {
fields.insert(key, val);
}
}

let mut sort = Vec::new();
if let Some(partial_hit) = hit.partial_hit {
if let Some(sort_value) = partial_hit.sort_value {
Expand All @@ -506,8 +616,7 @@ fn convert_hit(hit: quickwit_proto::search::Hit, append_shard_doc: bool) -> Elas
id: "".to_string(),
score: None,
nested: None,
source: Source::from_string(hit.json)
.unwrap_or_else(|_| Source::from_string("{}".to_string()).unwrap()),
source,
highlight: Default::default(),
inner_hits: Default::default(),
matched_queries: Vec::default(),
Expand Down Expand Up @@ -578,7 +687,7 @@ async fn es_compat_index_multi_search(
search_service.clone().root_search(search_request).await?;
let elapsed = start_instant.elapsed();
let mut search_response_rest: ElasticsearchResponse =
convert_to_es_search_response(search_response, append_shard_doc);
convert_to_es_search_response(search_response, append_shard_doc, None, None);
search_response_rest.took = elapsed.as_millis() as u32;
Ok::<_, ElasticsearchError>(search_response_rest)
}
Expand Down Expand Up @@ -622,7 +731,7 @@ async fn es_scroll(
let search_response: SearchResponse = search_service.scroll(scroll_request).await?;
// TODO append_shard_doc depends on the initial request, but we don't have access to it
let mut search_response_rest: ElasticsearchResponse =
convert_to_es_search_response(search_response, false);
convert_to_es_search_response(search_response, false, None, None);
search_response_rest.took = start_instant.elapsed().as_millis() as u32;
Ok(search_response_rest)
}
Expand Down Expand Up @@ -685,11 +794,13 @@ fn convert_to_es_stats_response(
fn convert_to_es_search_response(
resp: SearchResponse,
append_shard_doc: bool,
_source_excludes: Option<Vec<String>>,
_source_includes: Option<Vec<String>>,
) -> ElasticsearchResponse {
let hits: Vec<ElasticHit> = resp
.hits
.into_iter()
.map(|hit| convert_hit(hit, append_shard_doc))
.map(|hit| convert_hit(hit, append_shard_doc, &_source_excludes, &_source_includes))
.collect();
let aggregations: Option<serde_json::Value> = if let Some(aggregation_json) = resp.aggregation {
serde_json::from_str(&aggregation_json).ok()
Expand Down Expand Up @@ -722,7 +833,7 @@ pub(crate) fn str_lines(body: &str) -> impl Iterator<Item = &str> {
mod tests {
use hyper::StatusCode;

use super::partial_hit_from_search_after_param;
use super::{partial_hit_from_search_after_param, *};

#[test]
fn test_partial_hit_from_search_after_param_invalid_length() {
Expand Down Expand Up @@ -768,4 +879,136 @@ mod tests {
u32}`"
);
}

#[test]
fn test_single_element() {
let input = "app";
let expected = vec![("app", None)];
assert_eq!(generate_path_variants_with_suffix(input), expected);
}

#[test]
fn test_two_elements() {
let input = "app.id";
let expected = vec![("app", Some("id")), ("app.id", None)];
assert_eq!(generate_path_variants_with_suffix(input), expected);
}

#[test]
fn test_multiple_elements() {
let input = "app.id.name";
let expected = vec![
("app", Some("id.name")),
("app.id", Some("name")),
("app.id.name", None),
];
assert_eq!(generate_path_variants_with_suffix(input), expected);
}

#[test]
fn test_include_fields1() {
let mut fields = json!({
"app": { "id": 123, "name": "Blub" },
"user": { "id": 456, "name": "Fred" }
});

let includes = Some(vec!["app.id".to_string()]);
filter_source(&mut fields, &None, &includes);

let expected = json!({
"app": { "id": 123 }
});

assert_eq!(fields, expected);
}
#[test]
fn test_include_fields2() {
let mut fields = json!({
"app": { "id": 123, "name": "Blub" },
"app.id": { "id": 123, "name": "Blub" },
"user": { "id": 456, "name": "Fred" }
});

let includes = Some(vec!["app".to_string(), "app.id".to_string()]);
filter_source(&mut fields, &None, &includes);

let expected = json!({
"app": { "id": 123, "name": "Blub" },
"app.id": { "id": 123, "name": "Blub" },
});

assert_eq!(fields, expected);
}

#[test]
fn test_exclude_fields() {
let mut fields = json!({
"app": {
"id": 123,
"name": "Blub"
},
"user": {
"id": 456,
"name": "Fred"
}
});

let excludes = Some(vec!["app.name".to_string(), "user.id".to_string()]);
filter_source(&mut fields, &excludes, &None);

let expected = json!({
"app": {
"id": 123
},
"user": {
"name": "Fred"
}
});

assert_eq!(fields, expected);
}

#[test]
fn test_include_and_exclude_fields() {
let mut fields = json!({
"app": { "id": 123, "name": "Blub", "version": "1.0" },
"user": { "id": 456, "name": "Fred", "email": "john@example.com" }
});

let includes = Some(vec![
"app".to_string(),
"user.name".to_string(),
"user.email".to_string(),
]);
let excludes = Some(vec!["app.version".to_string(), "user.email".to_string()]);
filter_source(&mut fields, &excludes, &includes);

let expected = json!({
"app": { "id": 123, "name": "Blub" },
"user": { "name": "Fred" }
});

assert_eq!(fields, expected);
}

#[test]
fn test_no_includes_or_excludes() {
let mut fields = json!({
"app": {
"id": 123,
"name": "Blub"
}
});

filter_source(&mut fields, &None, &None);

let expected = json!({
"app": {
"id": 123,
"name": "Blub"
}
});

assert_eq!(fields, expected);
}
}
67 changes: 67 additions & 0 deletions quickwit/rest-api-tests/scenarii/es_compatibility/0022-source.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
--- # _source_excludes
params:
_source_excludes: ["actor"]
json:
size: 1
query:
match_all: {}
expected:
hits:
total:
value: 100
relation: eq
hits:
- _source:
$expect: "not 'actor' in val"
--- # _source_includes
params:
_source_includes: ["actor"]
json:
size: 1
query:
match_all: {}
expected:
hits:
total:
value: 100
relation: eq
hits:
- _source:
$expect: "len(val) == 1" # Contains only 'actor'
actor:
id: 5688
--- # _source_includes and _source_excludes
params:
_source_includes: "actor,id"
_source_excludes: ["actor"]
json:
size: 1
query:
match_all: {}
expected:
hits:
total:
value: 100
relation: eq
hits:
- _source:
$expect: "len(val) == 1" # Contains only 'actor'
id: 5688
--- # _source_includes with path
params:
_source_includes: "actor.id"
json:
size: 1
query:
match_all: {}
expected:
hits:
total:
value: 100
relation: eq
hits:
- _source:
actor:
$expect: "len(val) == 1" # Contains only 'actor'
id: 5688

0 comments on commit b2799e0

Please sign in to comment.