From 1b117082a2ffdcd7246ef196ee8ba0b31e14bccf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Sun, 29 Aug 2021 23:10:42 +0200 Subject: [PATCH 1/8] Fix #483, #484 and above all handle errors happening in leaves. --- quickwit-search/src/client.rs | 44 ++++--- quickwit-search/src/error.rs | 2 +- .../src/search_stream/collector.rs | 39 +++++- quickwit-search/src/search_stream/leaf.rs | 111 +++++++++++++----- quickwit-search/src/search_stream/root.rs | 5 +- quickwit-search/src/service.rs | 4 +- .../src/grpc_adapter/search_adapter.rs | 14 ++- quickwit-storage/src/error.rs | 2 +- 8 files changed, 160 insertions(+), 61 deletions(-) diff --git a/quickwit-search/src/client.rs b/quickwit-search/src/client.rs index 0932b3b9245..8c6d65941d2 100644 --- a/quickwit-search/src/client.rs +++ b/quickwit-search/src/client.rs @@ -126,7 +126,7 @@ impl SearchServiceClient { pub async fn leaf_search_stream( &mut self, request: quickwit_proto::LeafSearchStreamRequest, - ) -> crate::Result>> { + ) -> crate::Result>> { match &mut self.client_impl { SearchServiceClientImpl::Grpc(grpc_client) => { let mut grpc_client_clone = grpc_client.clone(); @@ -138,25 +138,35 @@ impl SearchServiceClient { .await .map_err(|tonic_error| parse_grpc_error(&tonic_error))? .into_inner(); - - // TODO: returning stream instead of a channel may be better. - // But this seems to be difficult. Try it at your own expense. - while let Some(result) = results_stream - .message() - .await - .map_err(|status| parse_grpc_error(&status))? - { - // We want to stop doing unnecessary work on the leaves as soon as - // there is an issue sending the result. - // Terminating the task will drop the `result_stream` consequently - // canceling the gRPC request. - result_sender.send(Ok(result)).map_err(|_| { - SearchError::InternalError("Could not send leaf result".into()) - })?; + loop { + let grpc_result = results_stream.message().await; + if grpc_result.is_err() { + result_sender + .send(Err(parse_grpc_error(&grpc_result.unwrap_err()))) + .map_err(|_| { + SearchError::InternalError( + "Sender closed, could not send leaf result.".into(), + ) + })?; + // Stop the loop as soon as we receive a leaf error. + break; + } + + if let Some(result) = grpc_result.unwrap() { + result_sender.send(Ok(result)).map_err(|_| { + SearchError::InternalError( + "Sender closed, could not send leaf result.".into(), + ) + })?; + } else { + // Stop the loop as soon as we get an empty message as this means that + // there is no more result to receive. + break; + } } + Result::<_, SearchError>::Ok(()) }); - Ok(UnboundedReceiverStream::new(result_receiver)) } SearchServiceClientImpl::Local(service) => service.leaf_search_stream(request).await, diff --git a/quickwit-search/src/error.rs b/quickwit-search/src/error.rs index 9ca2b563fab..0460acfe399 100644 --- a/quickwit-search/src/error.rs +++ b/quickwit-search/src/error.rs @@ -29,7 +29,7 @@ use quickwit_storage::StorageResolverError; /// Possible SearchError #[allow(missing_docs)] -#[derive(Error, Debug, Serialize, Deserialize)] +#[derive(Error, Debug, Serialize, Deserialize, Clone)] pub enum SearchError { #[error("Index `{index_id}` does not exist.")] IndexDoesNotExist { index_id: String }, diff --git a/quickwit-search/src/search_stream/collector.rs b/quickwit-search/src/search_stream/collector.rs index c84a1cc2e5c..7df2bf6b38e 100644 --- a/quickwit-search/src/search_stream/collector.rs +++ b/quickwit-search/src/search_stream/collector.rs @@ -168,11 +168,13 @@ impl FastFieldCollectorBuilder { } pub fn fast_field_to_warm(&self) -> Vec { + let mut fields = vec![self.fast_field_name.clone()]; if let Some(timestamp_field_name) = &self.timestamp_field_name { - vec![timestamp_field_name.clone(), self.fast_field_name.clone()] - } else { - vec![self.fast_field_name.clone()] + if *timestamp_field_name != self.fast_field_name { + fields.push(timestamp_field_name.clone()); + } } + fields } pub fn typed_build(&self) -> FastFieldCollector { @@ -195,3 +197,34 @@ impl FastFieldCollectorBuilder { self.typed_build::() } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fast_field_collector_builder() -> anyhow::Result<()> { + let builder = FastFieldCollectorBuilder::new( + Type::U64, + "field_name".to_string(), + Some("field_name".to_string()), + None, + None, + None, + )?; + assert_eq!(builder.fast_field_to_warm(), vec!["field_name"]); + let builder = FastFieldCollectorBuilder::new( + Type::U64, + "field_name".to_string(), + Some("timestamp_field_name".to_string()), + None, + None, + None, + )?; + assert_eq!( + builder.fast_field_to_warm(), + vec!["field_name", "timestamp_field_name"] + ); + Ok(()) + } +} diff --git a/quickwit-search/src/search_stream/leaf.rs b/quickwit-search/src/search_stream/leaf.rs index 43254de00b8..73086ed9ef4 100644 --- a/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit-search/src/search_stream/leaf.rs @@ -22,6 +22,7 @@ use super::FastFieldCollectorBuilder; use crate::leaf::open_index; use crate::leaf::warmup; use crate::SearchError; +use futures::{FutureExt, StreamExt}; use quickwit_index_config::IndexConfig; use quickwit_proto::LeafSearchStreamResult; use quickwit_proto::OutputFormat; @@ -30,9 +31,17 @@ use quickwit_proto::SearchStreamRequest; use quickwit_proto::SplitIdAndFooterOffsets; use quickwit_storage::Storage; use std::sync::Arc; +use tantivy::query::Query; use tantivy::schema::Type; +use tantivy::LeasedItem; use tantivy::ReloadPolicy; +use tantivy::Searcher; +use tokio::task::spawn_blocking; use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::error; + +// TODO: buffer of 5 seems to be sufficient to do the job locally, needs to be tested on a cluster. +const CONCURRENT_SPLIT_SEARCH_STREAM: usize = 5; /// `leaf` step of search stream. // Note: we return a stream of a result with a tonic::Status error @@ -46,39 +55,59 @@ pub async fn leaf_search_stream( storage: Arc, splits: Vec, index_config: Arc, -) -> UnboundedReceiverStream> { +) -> UnboundedReceiverStream> { let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); - for split in splits { - let index_config_clone = index_config.clone(); - let result_sender_clone = result_sender.clone(); - let request_clone = request.clone(); - let storage_clone = storage.clone(); - tokio::spawn(async move { - let leaf_split_result = leaf_search_stream_single_split( - split.clone(), - index_config_clone, - &request_clone, - storage_clone, - ) - .await - .map_err(SearchError::convert_to_tonic_status); - result_sender_clone.send(leaf_split_result).map_err(|_| { - SearchError::InternalError(format!( - "Unable to send leaf export result for split `{}`", - &split.split_id - )) - })?; - Result::<(), SearchError>::Ok(()) - }); - } + let request_clone = request.clone(); + tokio::spawn(async move { + let mut stream = + leaf_search_results_stream(request_clone, storage, splits, index_config).await; + while let Some(item) = stream.next().await { + if let Err(error) = result_sender.send(item) { + error!( + "Failed to send leaf search stream result. Stop sending. Cause: {}", + error + ); + break; + } + } + }); UnboundedReceiverStream::new(result_receiver) } +async fn leaf_search_results_stream( + request: SearchStreamRequest, + storage: Arc, + splits: Vec, + index_config: Arc, +) -> impl futures::Stream> + Sync + Send + 'static { + futures::stream::iter(splits) + .map(move |split| { + ( + split, + index_config.clone(), + request.clone(), + storage.clone(), + ) + }) + .map( + |(split, index_config_clone, request_clone, split_storage)| { + leaf_search_stream_single_split( + split, + index_config_clone, + request_clone, + split_storage, + ) + .shared() + }, + ) + .buffer_unordered(CONCURRENT_SPLIT_SEARCH_STREAM) +} + /// Apply a leaf search on a single split. async fn leaf_search_stream_single_split( split: SplitIdAndFooterOffsets, index_config: Arc, - stream_request: &SearchStreamRequest, + stream_request: SearchStreamRequest, storage: Arc, ) -> crate::Result { let index = open_index(storage, &split).await?; @@ -88,8 +117,8 @@ async fn leaf_search_stream_single_split( .get_field(&fast_field_to_extract) .ok_or_else(|| { SearchError::InvalidQuery(format!( - "Fast field `{}` does not exist.", - fast_field_to_extract + "Fast field `{}` does not exist for split {}.", + fast_field_to_extract, split.split_id, )) })?; let fast_field_type = split_schema.get_field_entry(fast_field).field_type(); @@ -103,7 +132,10 @@ async fn leaf_search_stream_single_split( )?; let output_format = OutputFormat::from_i32(stream_request.output_format).ok_or_else(|| { - SearchError::InternalError("Invalid output format specified.".to_string()) + SearchError::InternalError(format!( + "Invalid output format specified for split {}.", + split.split_id + )) })?; let search_request = SearchRequest::from(stream_request.clone()); let query = index_config.query(split_schema, &search_request)?; @@ -120,6 +152,27 @@ async fn leaf_search_stream_single_split( fast_field_collector_builder.fast_field_to_warm(), ) .await?; + let collect_handle = spawn_blocking(move || { + collect_fast_field_values( + &fast_field_collector_builder, + &searcher, + query, + output_format, + ) + }); + let buffer = collect_handle.await.map_err(|error| { + error!(split_id = %split.split_id, fast_field=%fast_field_to_extract, error_message=%error, "Failed to collect fast field"); + SearchError::InternalError(format!("Error when collecting fast field values for split {}: {:?}", split.split_id, error)) + })??; + Ok(LeafSearchStreamResult { data: buffer }) +} + +fn collect_fast_field_values( + fast_field_collector_builder: &FastFieldCollectorBuilder, + searcher: &LeasedItem, + query: Box, + output_format: OutputFormat, +) -> crate::Result> { let mut buffer = Vec::new(); match fast_field_collector_builder.value_type() { Type::I64 => { @@ -151,5 +204,5 @@ async fn leaf_search_stream_single_split( ))); } } - Ok(LeafSearchStreamResult { data: buffer }) + Ok(buffer) } diff --git a/quickwit-search/src/search_stream/root.rs b/quickwit-search/src/search_stream/root.rs index f167a44e8b1..fb6be98eeda 100644 --- a/quickwit-search/src/search_stream/root.rs +++ b/quickwit-search/src/search_stream/root.rs @@ -36,7 +36,6 @@ use quickwit_metastore::Metastore; use quickwit_proto::SearchRequest; use crate::client_pool::Job; -use crate::error::parse_grpc_error; use crate::list_relevant_splits; use crate::root::job_for_splits; use crate::root::NodeSearchError; @@ -96,8 +95,8 @@ pub async fn root_search_stream( let mut leaf_bytes: Vec = Vec::new(); while let Some(leaf_result) = receiver.next().await { - let leaf_data = leaf_result.map_err(|status| NodeSearchError { - search_error: parse_grpc_error(&status), + let leaf_data = leaf_result.map_err(|search_error| NodeSearchError { + search_error, split_ids: split_metadata_list .iter() .map(|split| split.split_id.clone()) diff --git a/quickwit-search/src/service.rs b/quickwit-search/src/service.rs index 8a82ba450ae..efdc8b1dd89 100644 --- a/quickwit-search/src/service.rs +++ b/quickwit-search/src/service.rs @@ -89,7 +89,7 @@ pub trait SearchService: 'static + Send + Sync { async fn leaf_search_stream( &self, _request: LeafSearchStreamRequest, - ) -> crate::Result>>; + ) -> crate::Result>>; } impl SearchServiceImpl { @@ -176,7 +176,7 @@ impl SearchService for SearchServiceImpl { async fn leaf_search_stream( &self, leaf_stream_request: LeafSearchStreamRequest, - ) -> crate::Result>> { + ) -> crate::Result>> { let stream_request = leaf_stream_request .request .ok_or_else(|| SearchError::InternalError("No search request.".to_string()))?; diff --git a/quickwit-serve/src/grpc_adapter/search_adapter.rs b/quickwit-serve/src/grpc_adapter/search_adapter.rs index 0e6a9c0aa03..1e406bf9f07 100644 --- a/quickwit-serve/src/grpc_adapter/search_adapter.rs +++ b/quickwit-serve/src/grpc_adapter/search_adapter.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use async_trait::async_trait; -use tokio_stream::wrappers::UnboundedReceiverStream; +use futures::TryStreamExt; use quickwit_proto::{ search_service_server as grpc, LeafSearchStreamRequest, LeafSearchStreamResult, @@ -79,8 +79,11 @@ impl grpc::SearchService for GrpcSearchAdapter { Ok(tonic::Response::new(fetch_docs_result)) } - type LeafSearchStreamStream = - UnboundedReceiverStream>; + type LeafSearchStreamStream = std::pin::Pin< + Box< + dyn futures::Stream> + Send + Sync, + >, + >; async fn leaf_search_stream( &self, request: tonic::Request, @@ -90,7 +93,8 @@ impl grpc::SearchService for GrpcSearchAdapter { .0 .leaf_search_stream(leaf_search_request) .await - .map_err(SearchError::convert_to_tonic_status)?; - Ok(tonic::Response::new(leaf_search_result)) + .map_err(SearchError::convert_to_tonic_status)? + .map_err(SearchError::convert_to_tonic_status); + Ok(tonic::Response::new(Box::pin(leaf_search_result))) } } diff --git a/quickwit-storage/src/error.rs b/quickwit-storage/src/error.rs index 6c1f170cd8e..f9803b8d86a 100644 --- a/quickwit-storage/src/error.rs +++ b/quickwit-storage/src/error.rs @@ -42,7 +42,7 @@ pub enum StorageErrorKind { /// Generic Storage Resolver Error. #[allow(missing_docs)] -#[derive(Error, Debug, Serialize, Deserialize)] +#[derive(Error, Debug, Serialize, Deserialize, Clone)] pub enum StorageResolverError { /// The input is not a valid URI. /// A protocol is required for the URI. From b05db9325cfff41d362c24bbf9238f0ee13cc2b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 30 Aug 2021 10:34:45 +0200 Subject: [PATCH 2/8] Add test on search stream with a leaf error. --- quickwit-search/src/search_stream/root.rs | 50 +++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/quickwit-search/src/search_stream/root.rs b/quickwit-search/src/search_stream/root.rs index fb6be98eeda..1dfffc95075 100644 --- a/quickwit-search/src/search_stream/root.rs +++ b/quickwit-search/src/search_stream/root.rs @@ -211,4 +211,54 @@ mod tests { assert_eq!(&result[1], &b"456"[..]); Ok(()) } + + #[tokio::test] + async fn test_root_search_stream_single_split_with_error() -> anyhow::Result<()> { + let request = quickwit_proto::SearchStreamRequest { + index_id: "test-idx".to_string(), + query: "test".to_string(), + search_fields: vec!["body".to_string()], + start_timestamp: None, + end_timestamp: None, + fast_field: "timestamp".to_string(), + output_format: OutputFormat::Csv as i32, + tags: vec![], + }; + let mut metastore = MockMetastore::new(); + metastore + .expect_index_metadata() + .returning(|_index_id: &str| { + Ok(IndexMetadata { + index_id: "test-idx".to_string(), + index_uri: "file:///path/to/index/test-idx".to_string(), + index_config: Arc::new(WikipediaIndexConfig::new()), + checkpoint: Checkpoint::default(), + }) + }); + metastore.expect_list_splits().returning( + |_index_id: &str, + _split_state: SplitState, + _time_range: Option>, + _tags: &[String]| { Ok(vec![mock_split_meta("split1")]) }, + ); + let mut mock_search_service = MockSearchService::new(); + let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); + result_sender.send(Ok(quickwit_proto::LeafSearchStreamResult { + data: b"123".to_vec(), + }))?; + result_sender.send(Err(SearchError::InternalError("error".to_string())))?; + mock_search_service.expect_leaf_search_stream().return_once( + |_leaf_search_req: quickwit_proto::LeafSearchStreamRequest| { + Ok(UnboundedReceiverStream::new(result_receiver)) + }, + ); + // The test will hang on indefinitely if we don't drop the receiver. + drop(result_sender); + let client_pool = + Arc::new(SearchClientPool::from_mocks(vec![Arc::new(mock_search_service)]).await?); + let result = root_search_stream(&request, &metastore, &client_pool).await; + assert_eq!(result.is_err(), true); + assert_eq!(result.unwrap_err().to_string(), "Internal error: `[NodeSearchError { search_error: InternalError(\"error\"), split_ids: [\"split1\"] }]`."); + Ok(()) + } } From 2447554045bae38098bdc7584491918011c0b00e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 30 Aug 2021 16:16:51 +0200 Subject: [PATCH 3/8] Add test on leaf search stream result. --- quickwit-search/src/search_stream/leaf.rs | 96 ++++++++++++++++++++--- 1 file changed, 86 insertions(+), 10 deletions(-) diff --git a/quickwit-search/src/search_stream/leaf.rs b/quickwit-search/src/search_stream/leaf.rs index 73086ed9ef4..451f4395fe1 100644 --- a/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit-search/src/search_stream/leaf.rs @@ -89,17 +89,10 @@ async fn leaf_search_results_stream( storage.clone(), ) }) - .map( - |(split, index_config_clone, request_clone, split_storage)| { - leaf_search_stream_single_split( - split, - index_config_clone, - request_clone, - split_storage, - ) + .map(|(split, index_config_clone, request_clone, storage)| { + leaf_search_stream_single_split(split, index_config_clone, request_clone, storage) .shared() - }, - ) + }) .buffer_unordered(CONCURRENT_SPLIT_SEARCH_STREAM) } @@ -206,3 +199,86 @@ fn collect_fast_field_values( } Ok(buffer) } + +#[cfg(test)] +mod tests { + use std::{str::from_utf8, sync::Arc}; + + use quickwit_core::TestSandbox; + use quickwit_index_config::DefaultIndexConfigBuilder; + + use super::*; + use serde_json::json; + + #[tokio::test] + async fn test_leaf_search_stream_to_csv_output_with_filtering() -> anyhow::Result<()> { + let index_config = r#"{ + "default_search_fields": ["body"], + "timestamp_field": "ts", + "tag_fields": [], + "field_mappings": [ + { + "name": "body", + "type": "text" + }, + { + "name": "ts", + "type": "i64", + "fast": true + } + ] + }"#; + let index_config = + Arc::new(serde_json::from_str::(index_config)?.build()?); + let index_id = "single-node-simple"; + let test_sandbox = TestSandbox::create("single-node-simple", index_config.clone()).await?; + + let mut docs = vec![]; + let mut filtered_timestamp_values = vec![]; + let end_timestamp = 20; + for i in 0..30 { + let body = format!("info @ t:{}", i + 1); + docs.push(json!({"body": body, "ts": i+1})); + if i + 1 < end_timestamp { + filtered_timestamp_values.push((i + 1).to_string()); + } + } + test_sandbox.add_documents(docs).await?; + + let request = SearchStreamRequest { + index_id: index_id.to_string(), + query: "info".to_string(), + search_fields: vec![], + start_timestamp: None, + end_timestamp: Some(end_timestamp), + fast_field: "ts".to_string(), + output_format: 0, + tags: vec![], + }; + let index_metadata = test_sandbox.metastore().index_metadata(index_id).await?; + let splits = test_sandbox.metastore().list_all_splits(index_id).await?; + let splits_offsets = splits + .into_iter() + .map(|split_meta| SplitIdAndFooterOffsets { + split_id: split_meta.split_metadata.split_id, + split_footer_start: split_meta.footer_offsets.start, + split_footer_end: split_meta.footer_offsets.end, + }) + .collect(); + let mut single_node_stream = leaf_search_stream( + &request, + test_sandbox + .storage_uri_resolver() + .resolve(&index_metadata.index_uri)?, + splits_offsets, + index_config, + ) + .await; + let res = single_node_stream.next().await.expect("no leaf result")?; + assert_eq!( + from_utf8(&res.data)?, + format!("{}\n", filtered_timestamp_values.join("\n")) + ); + Ok(()) + } +} From d87b4cefabae9cfc3273766d6cb68011adaa89d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 30 Aug 2021 21:35:08 +0200 Subject: [PATCH 4/8] Add cli serve tests on a local file index. --- quickwit-cli/tests/cli.rs | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/quickwit-cli/tests/cli.rs b/quickwit-cli/tests/cli.rs index 7989ae11383..743017550e3 100644 --- a/quickwit-cli/tests/cli.rs +++ b/quickwit-cli/tests/cli.rs @@ -586,6 +586,92 @@ async fn test_cmd_dry_run_delete_on_s3_localstack() -> Result<()> { Ok(()) } +/// testing the api via cli commands +#[tokio::test] +#[serial] +async fn test_all_local_index() -> Result<()> { + // Implicit index_id defined in test env struct. + // TODO: change that after the metastore uri refactoring. + let index_id = "data"; + let test_env = create_test_env(TestStorageType::LocalFileSystem)?; + make_command( + format!( + "new --index-uri {} --metastore-uri {} --index-config-path {}", + test_env.index_uri(index_id), + test_env.metastore_uri, + test_env.resource_files["config"].display() + ) + .as_str(), + ) + .assert() + .success(); + + let metadata_file_exist = test_env.storage.exists(&Path::new(index_id).join("quickwit.json")).await?; + assert_eq!(metadata_file_exist, true); + + index_data( + index_id, + test_env.resource_files["logs"].as_path(), + &test_env.metastore_uri, + ); + + // serve & api-search + let mut server_process = spawn_command( + format!( + "serve --metastore-uri {} --host 127.0.0.1 --port 8182", + test_env.metastore_uri, + ) + .as_str(), + ) + .unwrap(); + sleep(Duration::from_secs(2)).await; + let mut data = vec![0; 512]; + server_process + .stdout + .as_mut() + .expect("Failed to get server process output") + .read_exact(&mut data) + .expect("Cannot read output"); + let process_output_str = String::from_utf8(data).unwrap(); + let query_response = reqwest::get(format!( + "http://127.0.0.1:8182/api/v1/{}/search?query=level:info", + index_id + )) + .await? + .text() + .await?; + + assert!(process_output_str.contains("http://127.0.0.1:8182")); + let result: Value = + serde_json::from_str(&query_response).expect("Couldn't deserialize response."); + assert_eq!(result["numHits"], Value::Number(Number::from(2i64))); + + let search_stream_response = reqwest::get(format!( + "http://127.0.0.1:8182/api/v1/{}/search/stream?query=level:info&outputFormat=csv&fastField=ts", + index_id + )) + .await? + .text() + .await?; + assert_eq!(search_stream_response, "2\n13\n"); + + server_process.kill().unwrap(); + + make_command( + format!( + "delete --index-id {} --metastore-uri {}", + index_id, test_env.metastore_uri + ) + .as_str(), + ) + .assert() + .success(); + let metadata_file_exist = test_env.storage.exists(&Path::new(index_id).join("quickwit.json")).await?; + assert_eq!(metadata_file_exist, false); + + Ok(()) +} + /// testing the api via cli commands #[tokio::test] #[serial] From 0b8b095c29c46d1d94a8f465d77c22cbbba49077 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Mon, 30 Aug 2021 21:38:03 +0200 Subject: [PATCH 5/8] Fix typo in tests. --- quickwit-cli/tests/cli.rs | 12 +++++++++--- quickwit-search/src/search_stream/root.rs | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/quickwit-cli/tests/cli.rs b/quickwit-cli/tests/cli.rs index 743017550e3..054bbaa6e42 100644 --- a/quickwit-cli/tests/cli.rs +++ b/quickwit-cli/tests/cli.rs @@ -606,7 +606,10 @@ async fn test_all_local_index() -> Result<()> { .assert() .success(); - let metadata_file_exist = test_env.storage.exists(&Path::new(index_id).join("quickwit.json")).await?; + let metadata_file_exist = test_env + .storage + .exists(&Path::new(index_id).join("quickwit.json")) + .await?; assert_eq!(metadata_file_exist, true); index_data( @@ -654,7 +657,7 @@ async fn test_all_local_index() -> Result<()> { .text() .await?; assert_eq!(search_stream_response, "2\n13\n"); - + server_process.kill().unwrap(); make_command( @@ -666,7 +669,10 @@ async fn test_all_local_index() -> Result<()> { ) .assert() .success(); - let metadata_file_exist = test_env.storage.exists(&Path::new(index_id).join("quickwit.json")).await?; + let metadata_file_exist = test_env + .storage + .exists(&Path::new(index_id).join("quickwit.json")) + .await?; assert_eq!(metadata_file_exist, false); Ok(()) diff --git a/quickwit-search/src/search_stream/root.rs b/quickwit-search/src/search_stream/root.rs index 1dfffc95075..5cb41ccc0e7 100644 --- a/quickwit-search/src/search_stream/root.rs +++ b/quickwit-search/src/search_stream/root.rs @@ -201,7 +201,7 @@ mod tests { Ok(UnboundedReceiverStream::new(result_receiver)) }, ); - // The test will hang on indefinitely if we don't drop the receiver. + // The test will hang on indefinitely if we don't drop the sender. drop(result_sender); let client_pool = Arc::new(SearchClientPool::from_mocks(vec![Arc::new(mock_search_service)]).await?); @@ -252,7 +252,7 @@ mod tests { Ok(UnboundedReceiverStream::new(result_receiver)) }, ); - // The test will hang on indefinitely if we don't drop the receiver. + // The test will hang on indefinitely if we don't drop the sender. drop(result_sender); let client_pool = Arc::new(SearchClientPool::from_mocks(vec![Arc::new(mock_search_service)]).await?); From 17d181205ff4eb041ed669739b201d98a1da1c53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Wed, 1 Sep 2021 09:35:35 +0200 Subject: [PATCH 6/8] Fix typo in test variable name. Co-authored-by: Adrien Guillo --- quickwit-cli/tests/cli.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/quickwit-cli/tests/cli.rs b/quickwit-cli/tests/cli.rs index 054bbaa6e42..fa55120f51a 100644 --- a/quickwit-cli/tests/cli.rs +++ b/quickwit-cli/tests/cli.rs @@ -606,11 +606,11 @@ async fn test_all_local_index() -> Result<()> { .assert() .success(); - let metadata_file_exist = test_env + let metadata_file_exists = test_env .storage .exists(&Path::new(index_id).join("quickwit.json")) .await?; - assert_eq!(metadata_file_exist, true); + assert_eq!(metadata_file_exists, true); index_data( index_id, @@ -669,11 +669,11 @@ async fn test_all_local_index() -> Result<()> { ) .assert() .success(); - let metadata_file_exist = test_env + let metadata_file_exists = test_env .storage .exists(&Path::new(index_id).join("quickwit.json")) .await?; - assert_eq!(metadata_file_exist, false); + assert_eq!(metadata_file_exists, false); Ok(()) } From c51adaa688b8d77b68688dbf0b557148764f888d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Wed, 1 Sep 2021 11:22:56 +0200 Subject: [PATCH 7/8] Improve code syntax/readability. --- quickwit-search/src/client.rs | 38 ++++++------------- quickwit-search/src/collector.rs | 21 +++++----- quickwit-search/src/leaf.rs | 8 ++-- .../src/search_stream/collector.rs | 16 ++++---- quickwit-search/src/search_stream/leaf.rs | 17 +++------ quickwit-search/src/service.rs | 2 +- 6 files changed, 41 insertions(+), 61 deletions(-) diff --git a/quickwit-search/src/client.rs b/quickwit-search/src/client.rs index 8c6d65941d2..70b351734dc 100644 --- a/quickwit-search/src/client.rs +++ b/quickwit-search/src/client.rs @@ -19,6 +19,8 @@ * along with this program. If not, see . */ +use futures::StreamExt; +use futures::TryStreamExt; use http::Uri; use quickwit_proto::LeafSearchStreamResult; use std::fmt; @@ -137,32 +139,16 @@ impl SearchServiceClient { .leaf_search_stream(tonic_request) .await .map_err(|tonic_error| parse_grpc_error(&tonic_error))? - .into_inner(); - loop { - let grpc_result = results_stream.message().await; - if grpc_result.is_err() { - result_sender - .send(Err(parse_grpc_error(&grpc_result.unwrap_err()))) - .map_err(|_| { - SearchError::InternalError( - "Sender closed, could not send leaf result.".into(), - ) - })?; - // Stop the loop as soon as we receive a leaf error. - break; - } - - if let Some(result) = grpc_result.unwrap() { - result_sender.send(Ok(result)).map_err(|_| { - SearchError::InternalError( - "Sender closed, could not send leaf result.".into(), - ) - })?; - } else { - // Stop the loop as soon as we get an empty message as this means that - // there is no more result to receive. - break; - } + .into_inner() + .take_while(|grpc_result| futures::future::ready(grpc_result.is_ok())) + .map_err(|tonic_error| parse_grpc_error(&tonic_error)); + + while let Some(search_result) = results_stream.next().await { + result_sender.send(search_result).map_err(|_| { + SearchError::InternalError( + "Sender closed, could not send leaf result.".into(), + ) + })?; } Result::<_, SearchError>::Ok(()) diff --git a/quickwit-search/src/collector.rs b/quickwit-search/src/collector.rs index 37c09e1fa85..e68beebead6 100644 --- a/quickwit-search/src/collector.rs +++ b/quickwit-search/src/collector.rs @@ -21,6 +21,7 @@ use itertools::Itertools; use std::cmp::Ordering; use std::collections::BinaryHeap; +use std::collections::HashSet; use tantivy::schema::Schema; use quickwit_index_config::IndexConfig; @@ -222,7 +223,7 @@ impl SegmentCollector for QuickwitSegmentCollector { // TODO: seems not very useful, remove it and refactor it. pub trait GenericQuickwitCollector: Collector { - fn fast_field_names(&self) -> Vec; + fn fast_field_names(&self) -> HashSet; } /// The quickwit collector is the tantivy Collector used in Quickwit. @@ -235,14 +236,14 @@ pub struct QuickwitCollector { pub start_offset: usize, pub max_hits: usize, pub sort_by: SortBy, - pub fast_field_names: Vec, + pub fast_field_names: HashSet, pub timestamp_field_opt: Option, pub start_timestamp_opt: Option, pub end_timestamp_opt: Option, } impl GenericQuickwitCollector for QuickwitCollector { - fn fast_field_names(&self) -> Vec { + fn fast_field_names(&self) -> HashSet { self.fast_field_names.clone() } } @@ -352,18 +353,14 @@ fn top_k_partial_hits(mut partial_hits: Vec, num_hits: usize) -> Vec } /// Extracts all fast field names. -fn extract_fast_field_names(index_config: &dyn IndexConfig) -> Vec { - let mut fast_fields = vec![]; +fn extract_fast_field_names(index_config: &dyn IndexConfig) -> HashSet { + let mut fast_fields = HashSet::new(); if let Some(timestamp_field) = index_config.timestamp_field_name() { - fast_fields.push(timestamp_field); + fast_fields.insert(timestamp_field); } - if let SortBy::SortByFastField { field_name, .. } = index_config.default_sort_by() { - if !fast_fields.contains(&field_name) { - fast_fields.push(field_name); - } + fast_fields.insert(field_name); } - fast_fields } @@ -396,7 +393,7 @@ pub fn make_merge_collector(search_request: &SearchRequest) -> QuickwitCollector start_offset: search_request.start_offset as usize, max_hits: search_request.max_hits as usize, sort_by: SortBy::DocId, - fast_field_names: vec![], + fast_field_names: HashSet::new(), timestamp_field_opt: None, start_timestamp_opt: search_request.start_timestamp, end_timestamp_opt: search_request.end_timestamp, diff --git a/quickwit-search/src/leaf.rs b/quickwit-search/src/leaf.rs index cb0d8638fdf..7d770308f1a 100644 --- a/quickwit-search/src/leaf.rs +++ b/quickwit-search/src/leaf.rs @@ -27,7 +27,7 @@ use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory}; use quickwit_index_config::IndexConfig; use quickwit_proto::{LeafSearchResult, SearchRequest, SplitIdAndFooterOffsets, SplitSearchError}; use quickwit_storage::{BundleStorage, Storage}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; @@ -81,7 +81,7 @@ pub(crate) async fn open_index( pub(crate) async fn warmup( searcher: &Searcher, query: &dyn Query, - fast_field_names: Vec, + fast_field_names: &HashSet, ) -> anyhow::Result<()> { warm_up_terms(searcher, query).await?; warm_up_fastfields(searcher, fast_field_names).await?; @@ -90,7 +90,7 @@ pub(crate) async fn warmup( async fn warm_up_fastfields( searcher: &Searcher, - fast_field_names: Vec, + fast_field_names: &HashSet, ) -> anyhow::Result<()> { let mut fast_fields = Vec::new(); for fast_field_name in fast_field_names.iter() { @@ -168,7 +168,7 @@ async fn leaf_search_single_split( .reload_policy(ReloadPolicy::Manual) .try_into()?; let searcher = reader.searcher(); - warmup(&*searcher, &query, quickwit_collector.fast_field_names()).await?; + warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?; let leaf_search_result = searcher.search(&query, &quickwit_collector)?; Ok(leaf_search_result) } diff --git a/quickwit-search/src/search_stream/collector.rs b/quickwit-search/src/search_stream/collector.rs index 7df2bf6b38e..b2e1d2830dc 100644 --- a/quickwit-search/src/search_stream/collector.rs +++ b/quickwit-search/src/search_stream/collector.rs @@ -18,6 +18,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashSet; use std::marker::PhantomData; use crate::filters::TimestampFilter; @@ -167,12 +168,11 @@ impl FastFieldCollectorBuilder { self.fast_field_value_type } - pub fn fast_field_to_warm(&self) -> Vec { - let mut fields = vec![self.fast_field_name.clone()]; + pub fn fast_field_to_warm(&self) -> HashSet { + let mut fields = HashSet::new(); + fields.insert(self.fast_field_name.clone()); if let Some(timestamp_field_name) = &self.timestamp_field_name { - if *timestamp_field_name != self.fast_field_name { - fields.push(timestamp_field_name.clone()); - } + fields.insert(timestamp_field_name.clone()); } fields } @@ -200,6 +200,8 @@ impl FastFieldCollectorBuilder { #[cfg(test)] mod tests { + use std::iter::FromIterator; + use super::*; #[test] @@ -212,7 +214,7 @@ mod tests { None, None, )?; - assert_eq!(builder.fast_field_to_warm(), vec!["field_name"]); + assert_eq!(builder.fast_field_to_warm(), HashSet::from_iter(["field_name".to_string()])); let builder = FastFieldCollectorBuilder::new( Type::U64, "field_name".to_string(), @@ -223,7 +225,7 @@ mod tests { )?; assert_eq!( builder.fast_field_to_warm(), - vec!["field_name", "timestamp_field_name"] + HashSet::from_iter(["field_name".to_string(), "timestamp_field_name".to_string()]) ); Ok(()) } diff --git a/quickwit-search/src/search_stream/leaf.rs b/quickwit-search/src/search_stream/leaf.rs index 451f4395fe1..64af71b6f95 100644 --- a/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit-search/src/search_stream/leaf.rs @@ -51,16 +51,14 @@ const CONCURRENT_SPLIT_SEARCH_STREAM: usize = 5; // to tonic::Status as tonic::Status is required by the stream result // signature defined by proto generated code. pub async fn leaf_search_stream( - request: &SearchStreamRequest, + request: SearchStreamRequest, storage: Arc, splits: Vec, index_config: Arc, ) -> UnboundedReceiverStream> { let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel(); - let request_clone = request.clone(); tokio::spawn(async move { - let mut stream = - leaf_search_results_stream(request_clone, storage, splits, index_config).await; + let mut stream = leaf_search_results_stream(request, storage, splits, index_config).await; while let Some(item) = stream.next().await { if let Err(error) = result_sender.send(item) { error!( @@ -82,16 +80,13 @@ async fn leaf_search_results_stream( ) -> impl futures::Stream> + Sync + Send + 'static { futures::stream::iter(splits) .map(move |split| { - ( + leaf_search_stream_single_split( split, index_config.clone(), request.clone(), storage.clone(), ) - }) - .map(|(split, index_config_clone, request_clone, storage)| { - leaf_search_stream_single_split(split, index_config_clone, request_clone, storage) - .shared() + .shared() }) .buffer_unordered(CONCURRENT_SPLIT_SEARCH_STREAM) } @@ -142,7 +137,7 @@ async fn leaf_search_stream_single_split( warmup( &*searcher, query.as_ref(), - fast_field_collector_builder.fast_field_to_warm(), + &fast_field_collector_builder.fast_field_to_warm(), ) .await?; let collect_handle = spawn_blocking(move || { @@ -266,7 +261,7 @@ mod tests { }) .collect(); let mut single_node_stream = leaf_search_stream( - &request, + request, test_sandbox .storage_uri_resolver() .resolve(&index_metadata.index_uri)?, diff --git a/quickwit-search/src/service.rs b/quickwit-search/src/service.rs index efdc8b1dd89..b76adecb945 100644 --- a/quickwit-search/src/service.rs +++ b/quickwit-search/src/service.rs @@ -188,7 +188,7 @@ impl SearchService for SearchServiceImpl { let storage = self.storage_resolver.resolve(&index_metadata.index_uri)?; let index_config = index_metadata.index_config; let leaf_receiver = leaf_search_stream( - &stream_request, + stream_request, storage.clone(), leaf_stream_request.split_metadata, index_config, From c1217c70f7e34d16266b4b21303ef3933188a7f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Massot?= Date: Wed, 1 Sep 2021 11:27:54 +0200 Subject: [PATCH 8/8] Fix fmt --- quickwit-search/src/search_stream/collector.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/quickwit-search/src/search_stream/collector.rs b/quickwit-search/src/search_stream/collector.rs index b2e1d2830dc..5b77e507344 100644 --- a/quickwit-search/src/search_stream/collector.rs +++ b/quickwit-search/src/search_stream/collector.rs @@ -214,7 +214,10 @@ mod tests { None, None, )?; - assert_eq!(builder.fast_field_to_warm(), HashSet::from_iter(["field_name".to_string()])); + assert_eq!( + builder.fast_field_to_warm(), + HashSet::from_iter(["field_name".to_string()]) + ); let builder = FastFieldCollectorBuilder::new( Type::U64, "field_name".to_string(),