Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futures::stream in leaf functions #452

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 28 additions & 14 deletions quickwit-search/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

use futures::StreamExt;
use http::Uri;
use quickwit_proto::LeafSearchStreamResult;
use std::fmt;
Expand Down Expand Up @@ -126,21 +127,20 @@ impl SearchServiceClient {
pub async fn leaf_search_stream(
&mut self,
request: quickwit_proto::LeafSearchStreamRequest,
) -> crate::Result<UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>> {
) -> crate::Result<UnboundedReceiverStream<crate::Result<LeafSearchStreamResult>>> {
// It could be possible to return a stream... but the stream should be Sync + Send and tonic returns
// a stream which is only Send.
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
match &mut self.client_impl {
SearchServiceClientImpl::Grpc(grpc_client) => {
let mut grpc_client_clone = grpc_client.clone();
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
let tonic_request = Request::new(request);
let mut results_stream = grpc_client_clone
.leaf_search_stream(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?
.into_inner();
tokio::spawn(async move {
let tonic_request = Request::new(request);
let mut results_stream = grpc_client_clone
.leaf_search_stream(tonic_request)
.await
.map_err(|tonic_error| parse_grpc_error(&tonic_error))?
.into_inner();

// TODO: returning stream instead of a channel may be better.
// But this seems to be difficult. Try it at your own expense.
while let Some(result) = results_stream
.message()
.await
Expand All @@ -156,11 +156,25 @@ impl SearchServiceClient {
}
Result::<_, SearchError>::Ok(())
});

Ok(UnboundedReceiverStream::new(result_receiver))
}
SearchServiceClientImpl::Local(service) => service.leaf_search_stream(request).await,
SearchServiceClientImpl::Local(service) => {
let mut results_stream = service.leaf_search_stream(request).await?;
tokio::spawn(async move {
while let Some(result) = results_stream.next().await {
// We want to stop doing unnecessary work on the leaves as soon as
// there is an issue sending the result.
// Terminating the task will drop the `result_stream` consequently
// canceling the gRPC request.
result_sender.send(result).map_err(|_| {
SearchError::InternalError("Could not send leaf result".into())
})?;
}
Result::<_, SearchError>::Ok(())
});
}
}

Ok(UnboundedReceiverStream::new(result_receiver))
}

/// Perform fetch docs.
Expand Down
2 changes: 1 addition & 1 deletion quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use quickwit_storage::StorageResolverError;

/// Possible SearchError
#[allow(missing_docs)]
#[derive(Error, Debug, Serialize, Deserialize)]
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum SearchError {
#[error("Index `{index_id}` does not exist.")]
IndexDoesNotExist { index_id: String },
Expand Down
45 changes: 19 additions & 26 deletions quickwit-search/src/search_stream/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::FastFieldCollectorBuilder;
use crate::leaf::open_index;
use crate::leaf::warmup;
use crate::SearchError;
use futures::{FutureExt, StreamExt};
use quickwit_index_config::IndexConfig;
use quickwit_proto::LeafSearchStreamResult;
use quickwit_proto::OutputFormat;
Expand All @@ -31,7 +32,8 @@ use quickwit_storage::Storage;
use std::sync::Arc;
use tantivy::schema::Type;
use tantivy::ReloadPolicy;
use tokio_stream::wrappers::UnboundedReceiverStream;

const CONCURRENT_SPLIT_SEARCH_STREAM: usize = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tested to see if it was not harming the perf too much?


/// `leaf` step of search stream.
// Note: we return a stream of a result with a tonic::Status error
Expand All @@ -42,38 +44,29 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
// signature defined by proto generated code.
pub async fn leaf_search_stream(
index_config: Arc<dyn IndexConfig>,
request: &SearchStreamRequest,
request: SearchStreamRequest,
split_ids: Vec<String>,
storage: Arc<dyn Storage>,
) -> UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>> {
let (result_sender, result_receiver) = tokio::sync::mpsc::unbounded_channel();
for split_id in split_ids {
let split_storage: Arc<dyn Storage> =
quickwit_storage::add_prefix_to_storage(storage.clone(), split_id.clone());
let index_config_clone = index_config.clone();
let result_sender_clone = result_sender.clone();
let request_clone = request.clone();
tokio::spawn(async move {
let leaf_split_result =
leaf_search_stream_single_split(index_config_clone, &request_clone, split_storage)
.await
.map_err(SearchError::convert_to_tonic_status);
result_sender_clone.send(leaf_split_result).map_err(|_| {
SearchError::InternalError(format!(
"Unable to send leaf export result for split `{}`",
split_id
))
})?;
Result::<(), SearchError>::Ok(())
});
}
UnboundedReceiverStream::new(result_receiver)
) -> impl futures::Stream<Item = crate::Result<LeafSearchStreamResult>> + Sync + Send + 'static {
futures::stream::iter(split_ids)
.map(move |split_id| {
let index_config_clone = index_config.clone();
let request_clone = request.clone();
let split_storage: Arc<dyn Storage> =
quickwit_storage::add_prefix_to_storage(storage.clone(), split_id);
(index_config_clone, request_clone, split_storage)
})
.map(|(index_config_clone, request_clone, split_storage)| {
leaf_search_stream_single_split(index_config_clone, request_clone, split_storage)
.shared()
})
.buffered(CONCURRENT_SPLIT_SEARCH_STREAM)
}

/// Apply a leaf search on a single split.
async fn leaf_search_stream_single_split(
index_config: Arc<dyn IndexConfig>,
stream_request: &SearchStreamRequest,
stream_request: SearchStreamRequest,
storage: Arc<dyn Storage>,
) -> crate::Result<LeafSearchStreamResult> {
let index = open_index(storage).await?;
Expand Down
7 changes: 3 additions & 4 deletions quickwit-search/src/search_stream/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use quickwit_metastore::Metastore;
use quickwit_proto::SearchRequest;

use crate::client_pool::Job;
use crate::error::parse_grpc_error;
use crate::list_relevant_splits;
use crate::root::job_for_splits;
use crate::root::NodeSearchError;
Expand Down Expand Up @@ -88,8 +87,8 @@ pub async fn root_search_stream(

let mut leaf_bytes: Vec<Bytes> = Vec::new();
while let Some(leaf_result) = receiver.next().await {
let leaf_data = leaf_result.map_err(|status| NodeSearchError {
search_error: parse_grpc_error(&status),
let leaf_data = leaf_result.map_err(|search_error| NodeSearchError {
search_error,
split_ids: split_ids.clone(),
})?;
leaf_bytes.push(Bytes::from(leaf_data.data));
Expand Down Expand Up @@ -188,7 +187,7 @@ mod tests {
}))?;
mock_search_service.expect_leaf_search_stream().return_once(
|_leaf_search_req: quickwit_proto::LeafSearchStreamRequest| {
Ok(UnboundedReceiverStream::new(result_receiver))
Ok(Box::pin(UnboundedReceiverStream::new(result_receiver)))
},
);
// The test will hang on indefinitely if we don't drop the receiver.
Expand Down
29 changes: 24 additions & 5 deletions quickwit-search/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use quickwit_proto::{
};
use quickwit_proto::{LeafSearchStreamRequest, LeafSearchStreamResult, SearchStreamRequest};
use quickwit_storage::StorageUriResolver;
use std::pin::Pin;
use std::sync::Arc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::info;

use crate::fetch_docs;
Expand Down Expand Up @@ -89,7 +89,16 @@ pub trait SearchService: 'static + Send + Sync {
async fn leaf_search_stream(
&self,
_request: LeafSearchStreamRequest,
) -> crate::Result<UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>>;
) -> crate::Result<
Pin<
Box<
dyn futures::Stream<Item = crate::Result<LeafSearchStreamResult>>
+ Sync
+ Send
+ 'static,
>,
>,
>;
}

impl SearchServiceImpl {
Expand Down Expand Up @@ -172,7 +181,16 @@ impl SearchService for SearchServiceImpl {
async fn leaf_search_stream(
&self,
leaf_stream_request: LeafSearchStreamRequest,
) -> crate::Result<UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>> {
) -> crate::Result<
Pin<
Box<
dyn futures::Stream<Item = crate::Result<LeafSearchStreamResult>>
+ Sync
+ Send
+ 'static,
>,
>,
> {
let stream_request = leaf_stream_request
.request
.ok_or_else(|| SearchError::InternalError("No search request.".to_string()))?;
Expand All @@ -184,8 +202,9 @@ impl SearchService for SearchServiceImpl {
let storage = self.storage_resolver.resolve(&index_metadata.index_uri)?;
let split_ids = leaf_stream_request.split_ids;
let index_config = index_metadata.index_config;
let leaf_receiver =
leaf_search_stream(index_config, &stream_request, split_ids, storage.clone()).await;
let leaf_receiver = Box::pin(
leaf_search_stream(index_config, stream_request, split_ids, storage.clone()).await,
);
Ok(leaf_receiver)
}
}
19 changes: 13 additions & 6 deletions quickwit-serve/src/grpc_adapter/search_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

use std::sync::Arc;
use std::{pin::Pin, sync::Arc};

use async_trait::async_trait;
use tokio_stream::wrappers::UnboundedReceiverStream;
use futures::TryStreamExt;

use quickwit_proto::{
search_service_server as grpc, LeafSearchStreamRequest, LeafSearchStreamResult,
Expand Down Expand Up @@ -79,8 +79,14 @@ impl grpc::SearchService for GrpcSearchAdapter {
Ok(tonic::Response::new(fetch_docs_result))
}

type LeafSearchStreamStream =
UnboundedReceiverStream<Result<LeafSearchStreamResult, tonic::Status>>;
type LeafSearchStreamStream = Pin<
Box<
dyn futures::Stream<Item = Result<LeafSearchStreamResult, tonic::Status>>
+ Sync
+ Send
+ 'static,
>,
>;
async fn leaf_search_stream(
&self,
request: tonic::Request<LeafSearchStreamRequest>,
Expand All @@ -90,7 +96,8 @@ impl grpc::SearchService for GrpcSearchAdapter {
.0
.leaf_search_stream(leaf_search_request)
.await
.map_err(SearchError::convert_to_tonic_status)?;
Ok(tonic::Response::new(leaf_search_result))
.map_err(SearchError::convert_to_tonic_status)?
.map_err(SearchError::convert_to_tonic_status);
Ok(tonic::Response::new(Box::pin(leaf_search_result)))
}
}
2 changes: 1 addition & 1 deletion quickwit-storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub enum StorageErrorKind {

/// Generic Storage Resolver Error.
#[allow(missing_docs)]
#[derive(Error, Debug, Serialize, Deserialize)]
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum StorageResolverError {
/// The input is not a valid URI.
/// A protocol is required for the URI.
Expand Down