Skip to content

Commit

Permalink
add _elastic/_field_caps API
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Jan 12, 2024
1 parent 44360ac commit 8de1c1c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 46 deletions.
70 changes: 43 additions & 27 deletions quickwit/quickwit-serve/src/elastic_search_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::model::{
use crate::elastic_search_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
};
use crate::search_api::extract_index_id_patterns;
use crate::search_api::{extract_index_id_patterns, extract_index_id_patterns_default};

const BODY_LENGTH_LIMIT: ByteSize = ByteSize::mib(1);
const CONTENT_LENGTH_LIMIT: ByteSize = ByteSize::mib(10);
Expand Down Expand Up @@ -79,6 +79,31 @@ pub(crate) fn elastic_bulk_filter(
.and(serde_qs::warp::query(serde_qs::Config::default()))
}

#[utoipa::path(
post,
tag = "Ingest",
path = "/{index}/_bulk",
request_body(content = String, description = "Elasticsearch compatible bulk request body limited to 10MB", content_type = "application/json"),
responses(
(status = 200, description = "Successfully ingested documents.", body = IngestResponse)
),
params(
("refresh" = Option<ElasticRefresh>, Query, description = "Force or wait for commit at the end of the indexing operation."),
)
)]
pub(crate) fn elastic_index_bulk_filter(
) -> impl Filter<Extract = (String, Bytes, ElasticBulkOptions), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_bulk")
.and(warp::post())
.and(warp::body::content_length_limit(
CONTENT_LENGTH_LIMIT.as_u64(),
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<ElasticBulkOptions>(
serde_qs::Config::default(),
))
}

/// Like the warp json filter, but accepts an empty body and interprets it as `T::default`.
fn json_or_empty<T: DeserializeOwned + Send + Default>(
) -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
Expand All @@ -102,7 +127,7 @@ fn json_or_empty<T: DeserializeOwned + Send + Default>(
.unify()
}

#[utoipa::path(get, tag = "metadata", path = "/{index}/_field_caps")]
#[utoipa::path(get, tag = "Metadata", path = "/{index}/_field_caps")]
pub(crate) fn elastic_index_field_capabilities_filter() -> impl Filter<
Extract = (
Vec<String>,
Expand All @@ -118,6 +143,22 @@ pub(crate) fn elastic_index_field_capabilities_filter() -> impl Filter<
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Metadata", path = "/_field_caps")]
pub(crate) fn elastic_field_capabilities_filter() -> impl Filter<
Extract = (
Vec<String>,
FieldCapabilityQueryParams,
FieldCapabilityRequestBody,
),
Error = Rejection,
> + Clone {
warp::path!("_elastic" / "_field_caps")
.and_then(extract_index_id_patterns_default)
.and(warp::get().or(warp::post()).unify())
.and(serde_qs::warp::query(serde_qs::Config::default()))
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone
Expand All @@ -129,31 +170,6 @@ pub(crate) fn elastic_index_search_filter(
.and(json_or_empty())
}

#[utoipa::path(
post,
tag = "Ingest",
path = "/{index}/_bulk",
request_body(content = String, description = "Elasticsearch compatible bulk request body limited to 10MB", content_type = "application/json"),
responses(
(status = 200, description = "Successfully ingested documents.", body = IngestResponse)
),
params(
("refresh" = Option<ElasticRefresh>, Query, description = "Force or wait for commit at the end of the indexing operation."),
)
)]
pub(crate) fn elastic_index_bulk_filter(
) -> impl Filter<Extract = (String, Bytes, ElasticBulkOptions), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_bulk")
.and(warp::post())
.and(warp::body::content_length_limit(
CONTENT_LENGTH_LIMIT.as_u64(),
))
.and(warp::body::bytes())
.and(serde_qs::warp::query::<ElasticBulkOptions>(
serde_qs::Config::default(),
))
}

#[utoipa::path(post, tag = "Search", path = "/_msearch")]
pub(crate) fn elastic_multi_search_filter(
) -> impl Filter<Extract = (Bytes, MultiSearchQueryParams), Error = Rejection> + Clone {
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-serve/src/elastic_search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ pub fn elastic_api_handlers(
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
es_compat_cluster_info_handler(node_config, BuildInfo::get())
.or(es_compat_search_handler(search_service.clone()))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_scroll_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service.clone()))
.or(es_compat_index_field_capabilities_handler(
search_service.clone(),
))
.or(es_compat_index_search_handler(search_service.clone()))
.or(es_compat_scroll_handler(search_service.clone()))
.or(es_compat_index_multi_search_handler(search_service))
.or(es_compat_bulk_handler(
ingest_service.clone(),
ingest_router.clone(),
Expand Down
29 changes: 15 additions & 14 deletions quickwit/quickwit-serve/src/elastic_search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use serde_json::json;
use warp::{Filter, Rejection};

use super::filter::{
elastic_cluster_info_filter, elastic_index_field_capabilities_filter,
elastic_index_search_filter, elastic_multi_search_filter, elastic_scroll_filter,
elastic_search_filter,
elastic_cluster_info_filter, elastic_field_capabilities_filter,
elastic_index_field_capabilities_filter, elastic_index_search_filter,
elastic_multi_search_filter, elastic_scroll_filter, elastic_search_filter,
};
use super::model::{
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
Expand Down Expand Up @@ -98,11 +98,12 @@ pub fn es_compat_search_handler(
}

/// GET or POST _elastic/{index}/_field_caps
/// TODO: add route handling for _elastic/_field_caps
pub fn es_compat_index_field_capabilities_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_field_capabilities_filter()
.or(elastic_field_capabilities_filter())
.unify()
.and(with_arg(search_service))
.then(es_compat_index_field_capabilities)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
Expand All @@ -118,16 +119,6 @@ pub fn es_compat_index_search_handler(
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET or POST _elastic/_search/scroll
pub fn es_compat_scroll_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_scroll_filter()
.and(with_arg(search_service))
.then(es_scroll)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// POST _elastic/_search
pub fn es_compat_index_multi_search_handler(
search_service: Arc<dyn SearchService>,
Expand All @@ -144,6 +135,16 @@ pub fn es_compat_index_multi_search_handler(
})
}

/// GET or POST _elastic/_search/scroll
pub fn es_compat_scroll_handler(
search_service: Arc<dyn SearchService>,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_scroll_filter()
.and(with_arg(search_service))
.then(es_scroll)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

fn build_request_for_es_api(
index_id_patterns: Vec<String>,
search_params: SearchQueryParams,
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-serve/src/search_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ mod grpc_adapter;
mod rest_handler;

pub use self::grpc_adapter::GrpcSearchAdapter;
pub(crate) use self::rest_handler::extract_index_id_patterns;
pub(crate) use self::rest_handler::{extract_index_id_patterns, extract_index_id_patterns_default};
pub use self::rest_handler::{
search_get_handler, search_post_handler, search_request_from_api_request,
search_stream_handler, SearchApi, SearchRequestQueryString, SortBy,
Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-serve/src/search_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ use crate::{with_arg, BodyFormat};
)]
pub struct SearchApi;

pub(crate) async fn extract_index_id_patterns_default() -> Result<Vec<String>, Rejection> {
let index_id_patterns = Vec::new();
Ok(index_id_patterns)
}

pub(crate) async fn extract_index_id_patterns(
comma_separated_index_id_patterns: String,
) -> Result<Vec<String>, Rejection> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,34 @@ expected:
indices:
- fieldcaps-2
---
# _field_caps without index endpoint
method: [GET]
engines:
- quickwit
endpoint: _field_caps?fields=tags*
expected:
indices:
- fieldcaps
- fieldcaps-2
fields:
tags:
keyword:
type: keyword
metadata_field: false
searchable: true
aggregatable: true
indices:
- fieldcaps
- fieldcaps-2
tags-2:
keyword:
type: keyword
metadata_field: false
searchable: true
aggregatable: true
indices:
- fieldcaps-2
---
# Wildcard on index name + Wildcard without match
method: [GET]
engines:
Expand All @@ -318,7 +346,6 @@ expected:
metadata_field: false
searchable: true
aggregatable: true

---
# Exact match index + Non matching excact index
method: [GET]
Expand Down

0 comments on commit 8de1c1c

Please sign in to comment.