Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #483, #484 and above all handle errors happening in search stream leaves. #487

Merged
merged 10 commits into from
Sep 1, 2021
92 changes: 92 additions & 0 deletions quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,98 @@ async fn test_cmd_dry_run_delete_on_s3_localstack() -> Result<()> {
Ok(())
}

/// testing the api via cli commands
#[tokio::test]
#[serial]
async fn test_all_local_index() -> Result<()> {
// Implicit index_id defined in test env struct.
// TODO: change that after the metastore uri refactoring.
let index_id = "data";
let test_env = create_test_env(TestStorageType::LocalFileSystem)?;
make_command(
format!(
"new --index-uri {} --metastore-uri {} --index-config-path {}",
test_env.index_uri(index_id),
test_env.metastore_uri,
test_env.resource_files["config"].display()
)
.as_str(),
)
.assert()
.success();

let metadata_file_exists = test_env
.storage
.exists(&Path::new(index_id).join("quickwit.json"))
.await?;
assert_eq!(metadata_file_exists, true);

index_data(
index_id,
test_env.resource_files["logs"].as_path(),
&test_env.metastore_uri,
);

// serve & api-search
let mut server_process = spawn_command(
format!(
"serve --metastore-uri {} --host 127.0.0.1 --port 8182",
test_env.metastore_uri,
)
.as_str(),
)
.unwrap();
sleep(Duration::from_secs(2)).await;
let mut data = vec![0; 512];
server_process
.stdout
.as_mut()
.expect("Failed to get server process output")
.read_exact(&mut data)
.expect("Cannot read output");
let process_output_str = String::from_utf8(data).unwrap();
let query_response = reqwest::get(format!(
"http://127.0.0.1:8182/api/v1/{}/search?query=level:info",
index_id
))
.await?
.text()
.await?;

assert!(process_output_str.contains("http://127.0.0.1:8182"));
let result: Value =
serde_json::from_str(&query_response).expect("Couldn't deserialize response.");
assert_eq!(result["numHits"], Value::Number(Number::from(2i64)));

let search_stream_response = reqwest::get(format!(
"http://127.0.0.1:8182/api/v1/{}/search/stream?query=level:info&outputFormat=csv&fastField=ts",
index_id
))
.await?
.text()
.await?;
assert_eq!(search_stream_response, "2\n13\n");

server_process.kill().unwrap();

make_command(
format!(
"delete --index-id {} --metastore-uri {}",
index_id, test_env.metastore_uri
)
.as_str(),
)
.assert()
.success();
let metadata_file_exists = test_env
.storage
.exists(&Path::new(index_id).join("quickwit.json"))
.await?;
assert_eq!(metadata_file_exists, false);

Ok(())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also test that the split file no longer exists.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SearchError gets serialized/deserialized... anyhow cannot be serialized, hence the msg.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like anyhow because it tends to do a better job at tracking the underlying error while we do a poor job at it. We could serialize the anyhow error as a string using the debug format, which displays the cause and a backtrace if one is present.


/// testing the api via cli commands
#[tokio::test]
#[serial]
Expand Down
30 changes: 13 additions & 17 deletions quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

use futures::StreamExt;
use futures::TryStreamExt;
use http::Uri;
use quickwit_proto::LeafSearchStreamResult;
use std::fmt;
Expand Down Expand Up @@ -126,7 +128,7 @@ impl SearchServiceClient {
pub async fn leaf_search_stream(
&mut self,
request: quickwit_proto::LeafSearchStreamRequest,
) -> crate::Result<UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>> {
) -> crate::Result<UnboundedReceiverStream<crate::Result<LeafSearchStreamResult>>> {
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut grpc_client_clone = grpc_client.clone();
Expand All @@ -137,26 +139,20 @@ impl SearchServiceClient {
.leaf_search_stream(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?
.into_inner();

// TODO: returning stream instead of a channel may be better.
// But this seems to be difficult. Try it at your own expense.
while let Some(result) = results_stream
.message()
.await
.map_err(|status| parse_grpc_error(&status))?
{
// We want to stop doing unnecessary work on the leaves as soon as
// there is an issue sending the result.
// Terminating the task will drop the `result_stream` consequently
// canceling the gRPC request.
result_sender.send(Ok(result)).map_err(|_| {
SearchError::InternalError("Could not send leaf result".into())
.into_inner()
.take_while(|grpc_result| futures::future::ready(grpc_result.is_ok()))
.map_err(|tonic_error| parse_grpc_error(&tonic_error));

while let Some(search_result) = results_stream.next().await {
result_sender.send(search_result).map_err(|_| {
SearchError::InternalError(
"Sender closed, could not send leaf result.".into(),
)
})?;
}

Result::<_, SearchError>::Ok(())
});

Ok(UnboundedReceiverStream::new(result_receiver))
}
SearchServiceClientImpl::Local(service) => service.leaf_search_stream(request).await,
Expand Down
21 changes: 9 additions & 12 deletions quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::collections::HashSet;
use tantivy::schema::Schema;

use quickwit_index_config::IndexConfig;
Expand Down Expand Up @@ -222,7 +223,7 @@ impl SegmentCollector for QuickwitSegmentCollector {

// TODO: seems not very useful, remove it and refactor it.
pub trait GenericQuickwitCollector: Collector {
fn fast_field_names(&self) -> Vec<String>;
fn fast_field_names(&self) -> HashSet<String>;
}

/// The quickwit collector is the tantivy Collector used in Quickwit.
Expand All @@ -235,14 +236,14 @@ pub struct QuickwitCollector {
pub start_offset: usize,
pub max_hits: usize,
pub sort_by: SortBy,
pub fast_field_names: Vec<String>,
pub fast_field_names: HashSet<String>,
pub timestamp_field_opt: Option<Field>,
pub start_timestamp_opt: Option<i64>,
pub end_timestamp_opt: Option<i64>,
}

impl GenericQuickwitCollector for QuickwitCollector {
fn fast_field_names(&self) -> Vec<String> {
fn fast_field_names(&self) -> HashSet<String> {
self.fast_field_names.clone()
}
}
Expand Down Expand Up @@ -352,18 +353,14 @@ fn top_k_partial_hits(mut partial_hits: Vec<PartialHit>, num_hits: usize) -> Vec
}

/// Extracts all fast field names.
fn extract_fast_field_names(index_config: &dyn IndexConfig) -> Vec<String> {
let mut fast_fields = vec![];
fn extract_fast_field_names(index_config: &dyn IndexConfig) -> HashSet<String> {
let mut fast_fields = HashSet::new();
if let Some(timestamp_field) = index_config.timestamp_field_name() {
fast_fields.push(timestamp_field);
fast_fields.insert(timestamp_field);
}

if let SortBy::SortByFastField { field_name, .. } = index_config.default_sort_by() {
if !fast_fields.contains(&field_name) {
fast_fields.push(field_name);
}
fast_fields.insert(field_name);
}

fast_fields
}

Expand Down Expand Up @@ -396,7 +393,7 @@ pub fn make_merge_collector(search_request: &SearchRequest) -> QuickwitCollector
start_offset: search_request.start_offset as usize,
max_hits: search_request.max_hits as usize,
sort_by: SortBy::DocId,
fast_field_names: vec![],
fast_field_names: HashSet::new(),
timestamp_field_opt: None,
start_timestamp_opt: search_request.start_timestamp,
end_timestamp_opt: search_request.end_timestamp,
Expand Down
2 changes: 1 addition & 1 deletion quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_storage::StorageResolverError;

/// Possible SearchError
#[allow(missing_docs)]
#[derive(Error, Debug, Serialize, Deserialize)]
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum SearchError {
#[error("Index `{index_id}` does not exist.")]
IndexDoesNotExist { index_id: String },
Expand Down
8 changes: 4 additions & 4 deletions quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use quickwit_directories::{CachingDirectory, HotDirectory, StorageDirectory};
use quickwit_index_config::IndexConfig;
use quickwit_proto::{LeafSearchResult, SearchRequest, SplitIdAndFooterOffsets, SplitSearchError};
use quickwit_storage::{BundleStorage, Storage};
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn open_index(
pub(crate) async fn warmup(
searcher: &Searcher,
query: &dyn Query,
fast_field_names: Vec<String>,
fast_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
warm_up_terms(searcher, query).await?;
warm_up_fastfields(searcher, fast_field_names).await?;
Expand All @@ -90,7 +90,7 @@ pub(crate) async fn warmup(

async fn warm_up_fastfields(
searcher: &Searcher,
fast_field_names: Vec<String>,
fast_field_names: &HashSet<String>,
) -> anyhow::Result<()> {
let mut fast_fields = Vec::new();
for fast_field_name in fast_field_names.iter() {
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn leaf_search_single_split(
.reload_policy(ReloadPolicy::Manual)
.try_into()?;
let searcher = reader.searcher();
warmup(&*searcher, &query, quickwit_collector.fast_field_names()).await?;
warmup(&*searcher, &query, &quickwit_collector.fast_field_names()).await?;
let leaf_search_result = searcher.search(&query, &quickwit_collector)?;
Ok(leaf_search_result)
}
Expand Down
46 changes: 42 additions & 4 deletions quickwit-search/src/search_stream/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::marker::PhantomData;

use crate::filters::TimestampFilter;
Expand Down Expand Up @@ -167,12 +168,13 @@ impl FastFieldCollectorBuilder {
self.fast_field_value_type
}

pub fn fast_field_to_warm(&self) -> Vec<String> {
pub fn fast_field_to_warm(&self) -> HashSet<String> {
let mut fields = HashSet::new();
fields.insert(self.fast_field_name.clone());
if let Some(timestamp_field_name) = &self.timestamp_field_name {
vec![timestamp_field_name.clone(), self.fast_field_name.clone()]
} else {
vec![self.fast_field_name.clone()]
fields.insert(timestamp_field_name.clone());
}
fields
}

pub fn typed_build<TFastValue: FastValue>(&self) -> FastFieldCollector<TFastValue> {
Expand All @@ -195,3 +197,39 @@ impl FastFieldCollectorBuilder {
self.typed_build::<u64>()
}
}

#[cfg(test)]
mod tests {
use std::iter::FromIterator;

use super::*;

#[test]
fn test_fast_field_collector_builder() -> anyhow::Result<()> {
let builder = FastFieldCollectorBuilder::new(
Type::U64,
"field_name".to_string(),
Some("field_name".to_string()),
None,
None,
None,
)?;
assert_eq!(
builder.fast_field_to_warm(),
HashSet::from_iter(["field_name".to_string()])
);
let builder = FastFieldCollectorBuilder::new(
Type::U64,
"field_name".to_string(),
Some("timestamp_field_name".to_string()),
None,
None,
None,
)?;
assert_eq!(
builder.fast_field_to_warm(),
HashSet::from_iter(["field_name".to_string(), "timestamp_field_name".to_string()])
);
Ok(())
}
}
Loading