Skip to content

Commit

Permalink
Fix #483, #484 and above all handle errors happening in search stream…
Browse files Browse the repository at this point in the history
… leaves. (#487)

* Fix #483, #484 and above all handle errors happening in leaves.

* Add test on search stream with a leaf error.

* Add test on leaf search stream result.

* Add cli serve tests on a local file index.

* Fix typo in tests.

* Fix typo in test variable name.

Co-authored-by: Adrien Guillo <adrien@quickwit.io>

* Improve code syntax/readability.

* Fix fmt

Co-authored-by: Adrien Guillo <adrien@quickwit.io>
Co-authored-by: Paul Masurel <paul@quickwit.io>
  • Loading branch information
3 people authored Sep 1, 2021
1 parent ac7001b commit b76a468
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 82 deletions.
92 changes: 92 additions & 0 deletions quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,98 @@ async fn test_cmd_dry_run_delete_on_s3_localstack() -> Result<()> {
Ok(())
}

/// testing the api via cli commands
#[tokio::test]
#[serial]
async fn test_all_local_index() -> Result<()> {
// Implicit index_id defined in test env struct.
// TODO: change that after the metastore uri refactoring.
let index_id = "data";
let test_env = create_test_env(TestStorageType::LocalFileSystem)?;
make_command(
format!(
"new --index-uri {} --metastore-uri {} --index-config-path {}",
test_env.index_uri(index_id),
test_env.metastore_uri,
test_env.resource_files["config"].display()
)
.as_str(),
)
.assert()
.success();

let metadata_file_exists = test_env
.storage
.exists(&Path::new(index_id).join("quickwit.json"))
.await?;
assert_eq!(metadata_file_exists, true);

index_data(
index_id,
test_env.resource_files["logs"].as_path(),
&test_env.metastore_uri,
);

// serve & api-search
let mut server_process = spawn_command(
format!(
"serve --metastore-uri {} --host 127.0.0.1 --port 8182",
test_env.metastore_uri,
)
.as_str(),
)
.unwrap();
sleep(Duration::from_secs(2)).await;
let mut data = vec![0; 512];
server_process
.stdout
.as_mut()
.expect("Failed to get server process output")
.read_exact(&mut data)
.expect("Cannot read output");
let process_output_str = String::from_utf8(data).unwrap();
let query_response = reqwest::get(format!(
"http://127.0.0.1:8182/api/v1/{}/search?query=level:info",
index_id
))
.await?
.text()
.await?;

assert!(process_output_str.contains("http://127.0.0.1:8182"));
let result: Value =
serde_json::from_str(&query_response).expect("Couldn't deserialize response.");
assert_eq!(result["numHits"], Value::Number(Number::from(2i64)));

let search_stream_response = reqwest::get(format!(
"http://127.0.0.1:8182/api/v1/{}/search/stream?query=level:info&outputFormat=csv&fastField=ts",
index_id
))
.await?
.text()
.await?;
assert_eq!(search_stream_response, "2\n13\n");

server_process.kill().unwrap();

make_command(
format!(
"delete --index-id {} --metastore-uri {}",
index_id, test_env.metastore_uri
)
.as_str(),
)
.assert()
.success();
let metadata_file_exists = test_env
.storage
.exists(&Path::new(index_id).join("quickwit.json"))
.await?;
assert_eq!(metadata_file_exists, false);

Ok(())
}

/// testing the api via cli commands
#[tokio::test]
#[serial]
Expand Down
30 changes: 13 additions & 17 deletions quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

use futures::StreamExt;
use futures::TryStreamExt;
use http::Uri;
use quickwit_proto::LeafSearchStreamResult;
use std::fmt;
Expand Down Expand Up @@ -126,7 +128,7 @@ impl SearchServiceClient {
pub async fn leaf_search_stream(
&mut self,
request: quickwit_proto::LeafSearchStreamRequest,
) -> crate::Result<UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>> {
) -> crate::Result<UnboundedReceiverStream<crate::Result<LeafSearchStreamResult>>> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut grpc_client_clone = grpc_client.clone();
Expand All @@ -137,26 +139,20 @@ impl SearchServiceClient {
.leaf_search_stream(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?
.into_inner();

// TODO: returning stream instead of a channel may be better.
// But this seems to be difficult. Try it at your own expense.
while let Some(result) = results_stream
.message()
.await
.map_err(|status| parse_grpc_error(&status))?
{
// We want to stop doing unnecessary work on the leaves as soon as
// there is an issue sending the result.
// Terminating the task will drop the `result_stream` consequently
// canceling the gRPC request.
result_sender.send(Ok(result)).map_err(|_| {
SearchError::InternalError("Could not send leaf result".into())
.into_inner()
.take_while(|grpc_result| futures::future::ready(grpc_result.is_ok()))
.map_err(|tonic_error| parse_grpc_error(&tonic_error));

while let Some(search_result) = results_stream.next().await {
result_sender.send(search_result).map_err(|_| {
SearchError::InternalError(
"Sender closed, could not send leaf result.".into(),
)
})?;
}

Result::<_, SearchError>::Ok(())
});

Ok(UnboundedReceiverStream::new(result_receiver))
}
SearchServiceClientImpl::Local(service) => service.leaf_search_stream(request).await,
Expand Down
21 changes: 9 additions & 12 deletions quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::collections::HashSet;
use tantivy::schema::Schema;

use quickwit_index_config::IndexConfig;
Expand Down Expand Up @@ -222,7 +223,7 @@ impl SegmentCollector for QuickwitSegmentCollector {

// TODO: seems not very useful, remove it and refactor it.
pub trait GenericQuickwitCollector: Collector {
fn fast_field_names(&self) -> Vec<String>;
fn fast_field_names(&self) -> HashSet<String>;
}

/// The quickwit collector is the tantivy Collector used in Quickwit.
Expand All @@ -235,14 +236,14 @@ pub struct QuickwitCollector {
pub start_offset: usize,
pub max_hits: usize,
pub sort_by: SortBy,
pub fast_field_names: Vec<String>,
pub fast_field_names: HashSet<String>,
pub timestamp_field_opt: Option<Field>,
pub start_timestamp_opt: Option<i64>,
pub end_timestamp_opt: Option<i64>,
}

impl GenericQuickwitCollector for QuickwitCollector {
fn fast_field_names(&self) -> Vec<String> {
fn fast_field_names(&self) -> HashSet<String> {
self.fast_field_names.clone()
}
}
Expand Down Expand Up @@ -352,18 +353,14 @@ fn top_k_partial_hits(mut partial_hits: Vec<PartialHit>, num_hits: usize) -> Vec
}

/// Extracts all fast field names.
fn extract_fast_field_names(index_config: &dyn IndexConfig) -> Vec<String> {
let mut fast_fields = vec![];
fn extract_fast_field_names(index_config: &dyn IndexConfig) -> HashSet<String> {
let mut fast_fields = HashSet::new();
if let Some(timestamp_field) = index_config.timestamp_field_name() {
fast_fields.push(timestamp_field);
fast_fields.insert(timestamp_field);
}

if let SortBy::SortByFastField { field_name, .. } = index_config.default_sort_by() {
if !fast_fields.contains(&field_name) {
fast_fields.push(field_name);
}
fast_fields.insert(field_name);
}

fast_fields
}

Expand Down Expand Up @@ -396,7 +393,7 @@ pub fn make_merge_collector(search_request: &SearchRequest) -> QuickwitCollector
start_offset: search_request.start_offset as usize,
max_hits: search_request.max_hits as usize,
sort_by: SortBy::DocId,
fast_field_names: vec![],
fast_field_names: HashSet::new(),
timestamp_field_opt: None,
start_timestamp_opt: search_request.start_timestamp,
end_timestamp_opt: search_request.end_timestamp,
Expand Down
2 changes: 1 addition & 1 deletion quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_storage::StorageResolverError;

/// Possible SearchError
#[allow(missing_docs)]
#[derive(Error, Debug, Serialize, Deserialize)]
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum SearchError {
#[error("Index `{index_id}` does not exist.")]
IndexDoesNotExist { index_id: String },
Expand Down
8 changes: 4 additions & 4 deletions quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_index_config::IndexConfig;
use quickwit_proto::{LeafSearchResult, SearchRequest, SplitIdAndFooterOffsets, SplitSearchError};
use quickwit_storage::{BundleStorage, Storage};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn open_index(
pub(crate) async fn warmup(
searcher: &Searcher,
query: &dyn Query,
fast_field_names: Vec<String>,
fast_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
warm_up_terms(searcher, query).await?;
warm_up_fastfields(searcher, fast_field_names).await?;
Expand All @@ -90,7 +90,7 @@ pub(crate) async fn warmup(

async fn warm_up_fastfields(
searcher: &Searcher,
fast_field_names: Vec<String>,
fast_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
let mut fast_fields = Vec::new();
for fast_field_name in fast_field_names.iter() {
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn leaf_search_single_split(
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();
warmup(&*searcher, &query, quickwit_collector.fast_field_names()).await?;
warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?;
let leaf_search_result = searcher.search(&query, &quickwit_collector)?;
Ok(leaf_search_result)
}
Expand Down
46 changes: 42 additions & 4 deletions quickwit-search/src/search_stream/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::marker::PhantomData;

use crate::filters::TimestampFilter;
Expand Down Expand Up @@ -167,12 +168,13 @@ impl FastFieldCollectorBuilder {
self.fast_field_value_type
}

pub fn fast_field_to_warm(&self) -> Vec<String> {
pub fn fast_field_to_warm(&self) -> HashSet<String> {
let mut fields = HashSet::new();
fields.insert(self.fast_field_name.clone());
if let Some(timestamp_field_name) = &self.timestamp_field_name {
vec![timestamp_field_name.clone(), self.fast_field_name.clone()]
} else {
vec![self.fast_field_name.clone()]
fields.insert(timestamp_field_name.clone());
}
fields
}

pub fn typed_build<TFastValue: FastValue>(&self) -> FastFieldCollector<TFastValue> {
Expand All @@ -195,3 +197,39 @@ impl FastFieldCollectorBuilder {
self.typed_build::<u64>()
}
}

#[cfg(test)]
mod tests {
use std::iter::FromIterator;

use super::*;

#[test]
fn test_fast_field_collector_builder() -> anyhow::Result<()> {
let builder = FastFieldCollectorBuilder::new(
Type::U64,
"field_name".to_string(),
Some("field_name".to_string()),
None,
None,
None,
)?;
assert_eq!(
builder.fast_field_to_warm(),
HashSet::from_iter(["field_name".to_string()])
);
let builder = FastFieldCollectorBuilder::new(
Type::U64,
"field_name".to_string(),
Some("timestamp_field_name".to_string()),
None,
None,
None,
)?;
assert_eq!(
builder.fast_field_to_warm(),
HashSet::from_iter(["field_name".to_string(), "timestamp_field_name".to_string()])
);
Ok(())
}
}
Loading

0 comments on commit b76a468

Please sign in to comment.