diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 25dafe96022..6670288a8b2 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -42,7 +42,8 @@ use quickwit_common::rand::append_random_suffix; use quickwit_common::uri::Uri; use quickwit_config::{SourceInputFormat, CLI_INGEST_SOURCE_ID}; use quickwit_metastore::{ - ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, SplitState, StageSplitsRequestExt, + ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, + SplitMetadata, SplitState, StageSplitsRequestExt, }; use quickwit_proto::metastore::{ DeleteSplitsRequest, EntityKind, IndexMetadataRequest, ListSplitsRequest, @@ -248,15 +249,18 @@ async fn test_ingest_docs_cli() { local_ingest_docs_cli(args).await.unwrap(); - let splits: Vec<_> = test_env + let splits_metadata: Vec = test_env .metastore() .await .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits_metadata() + .await .unwrap(); - assert_eq!(splits.len(), 1); - assert_eq!(splits[0].split_metadata.num_docs, 5); + assert_eq!(splits_metadata.len(), 1); + assert_eq!(splits_metadata[0].num_docs, 5); // Ensure cache directory is empty. let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path); @@ -662,11 +666,14 @@ async fn test_garbage_collect_cli_no_grace() { dry_run, }; - let splits = metastore + let splits_metadata = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits_metadata() + .await .unwrap(); - assert_eq!(splits.len(), 1); + assert_eq!(splits_metadata.len(), 1); let args = create_gc_args(false); @@ -676,7 +683,7 @@ async fn test_garbage_collect_cli_no_grace() { let index_path = test_env.indexes_dir_path.join(&test_env.index_id); assert_eq!(index_path.try_exists().unwrap(), true); - let split_ids = vec![splits[0].split_id().to_string()]; + let split_ids = vec![splits_metadata[0].split_id().to_string()]; let mut metastore = refresh_metastore(metastore).await.unwrap(); let mark_for_deletion_request = MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids.clone()); @@ -713,6 +720,9 @@ async fn test_garbage_collect_cli_no_grace() { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() + .collect_splits_metadata() + .await + .unwrap() .len(), 0 ); @@ -769,14 +779,17 @@ async fn test_garbage_collect_index_cli() { .await .unwrap(); - let splits = metastore + let splits_metadata = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits_metadata() + .await .unwrap(); - assert_eq!(splits.len(), 1); + assert_eq!(splits_metadata.len(), 1); let index_path = test_env.indexes_dir_path.join(&test_env.index_id); - let split_filename = quickwit_common::split_file(splits[0].split_metadata.split_id.as_str()); + let split_filename = quickwit_common::split_file(splits_metadata[0].split_id.as_str()); let split_path = index_path.join(&split_filename); assert_eq!(split_path.try_exists().unwrap(), true); @@ -786,39 +799,39 @@ async fn test_garbage_collect_index_cli() { // Split should still exists within grace period. let mut metastore = refresh_metastore(metastore).await.unwrap(); - let splits = metastore + let splits_metadata = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits_metadata() + .await .unwrap(); - assert_eq!(splits.len(), 1); + assert_eq!(splits_metadata.len(), 1); // The following steps help turn an existing published split into a staged one // without deleting the files. - let split = splits[0].clone(); + let split_metadata = splits_metadata[0].clone(); metastore .mark_splits_for_deletion(MarkSplitsForDeletionRequest::new( index_uid.clone(), - vec![split.split_metadata.split_id.to_string()], + vec![split_metadata.clone().split_id.to_string()], )) .await .unwrap(); metastore .delete_splits(DeleteSplitsRequest { index_uid: index_uid.to_string(), - split_ids: splits + split_ids: splits_metadata .into_iter() - .map(|split| split.split_metadata.split_id) + .map(|split_metadata| split_metadata.split_id) .collect(), }) .await .unwrap(); metastore .stage_splits( - StageSplitsRequest::try_from_split_metadata( - index_uid.clone(), - split.split_metadata.clone(), - ) - .unwrap(), + StageSplitsRequest::try_from_split_metadata(index_uid.clone(), split_metadata.clone()) + .unwrap(), ) .await .unwrap(); @@ -828,6 +841,9 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits[0].split_state, SplitState::Staged); @@ -841,6 +857,9 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 1); assert_eq!(splits[0].split_state, SplitState::Staged); @@ -857,6 +876,9 @@ async fn test_garbage_collect_index_cli() { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); // Splits should be deleted from both metastore and file system. assert_eq!(splits.len(), 0); diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index c16ed9a16e1..e2d54b70e8a 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -47,16 +47,6 @@ where T: Send + 'static } } -impl From> for ServiceStream -where T: Send + 'static -{ - fn from(values: Vec) -> Self { - Self { - inner: Box::pin(stream::iter(values)), - } - } -} - impl fmt::Debug for ServiceStream where T: 'static { @@ -183,3 +173,14 @@ where T: Send + 'static } } } + +#[cfg(any(test, feature = "testsuite"))] +impl From> for ServiceStream +where T: Send + 'static +{ + fn from(values: Vec) -> Self { + Self { + inner: Box::pin(stream::iter(values)), + } + } +} diff --git a/quickwit/quickwit-index-management/src/garbage_collection.rs b/quickwit/quickwit-index-management/src/garbage_collection.rs index 6768f9c041c..67594422087 100644 --- a/quickwit/quickwit-index-management/src/garbage_collection.rs +++ b/quickwit/quickwit-index-management/src/garbage_collection.rs @@ -23,14 +23,14 @@ use std::sync::Arc; use std::time::Duration; use futures::Future; -use quickwit_common::{PrettySample, Progress}; +use quickwit_common::{PrettySample, Progress, ServiceStream}; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata, - SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo, + SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ - DeleteSplitsRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, - MetastoreService, MetastoreServiceClient, + DeleteSplitsRequest, ListSplitsRequest, ListSplitsResponse, MarkSplitsForDeletionRequest, + MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::{IndexUid, SplitId}; use quickwit_storage::{BulkDeleteError, Storage}; @@ -106,9 +106,8 @@ pub async fn run_garbage_collect( metastore.list_splits(list_deletable_staged_request), ) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; if dry_run { let marked_for_deletion_query = ListSplitsQuery::for_index(index_uid.clone()) @@ -120,9 +119,8 @@ pub async fn run_garbage_collect( metastore.list_splits(marked_for_deletion_request), ) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; splits_marked_for_deletion.extend(deletable_staged_splits); let candidate_entries: Vec = splits_marked_for_deletion @@ -195,19 +193,25 @@ async fn delete_splits_marked_for_deletion( break; } }; - let list_splits_result = + let splits_stream_result = protect_future(progress_opt, metastore.list_splits(list_splits_request)).await; + let splits_to_delete_stream: ServiceStream> = + match splits_stream_result { + Ok(splits_stream) => splits_stream, + Err(error) => { + error!(error = ?error, "failed to fetch stream splits"); + break; + } + }; - let splits_to_delete: Vec = match list_splits_result { - Ok(splits) => splits - .into_iter() - .map(|split| split.split_metadata) - .collect(), - Err(error) => { - error!(error = ?error, "failed to fetch deletable splits"); - break; - } - }; + let splits_to_delete: Vec = + match splits_to_delete_stream.collect_splits_metadata().await { + Ok(splits) => splits, + Err(error) => { + error!(error = ?error, "failed to collect splits"); + break; + } + }; let num_splits_to_delete = splits_to_delete.len(); @@ -350,8 +354,8 @@ mod tests { use quickwit_common::ServiceStream; use quickwit_config::IndexConfig; use quickwit_metastore::{ - metastore_for_test, CreateIndexRequestExt, ListSplitsQuery, SplitMetadata, SplitState, - StageSplitsRequestExt, + metastore_for_test, CreateIndexRequestExt, ListSplitsQuery, + MetastoreServiceStreamSplitsExt, SplitMetadata, SplitState, StageSplitsRequestExt, }; use quickwit_proto::metastore::{CreateIndexRequest, EntityKind, StageSplitsRequest}; use quickwit_proto::types::IndexUid; @@ -396,6 +400,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 1 ); @@ -421,6 +428,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 1 ); @@ -446,6 +456,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 1 ); @@ -491,6 +504,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 1 ); @@ -516,6 +532,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 1 ); @@ -540,6 +559,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() + .collect_splits() + .await + .unwrap() .len(), 0 ); @@ -551,7 +573,7 @@ mod tests { let storage = storage_for_test(); let mut metastore = MetastoreServiceClient::mock(); metastore - .expect_stream_splits() + .expect_list_splits() .times(2) .returning(|_| Ok(ServiceStream::empty())); run_garbage_collect( @@ -609,6 +631,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 1); @@ -633,6 +658,9 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() + .collect_splits() + .await + .unwrap() .is_empty()); } @@ -723,6 +751,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 1); assert_eq!(splits[0].split_id(), split_id_1); diff --git a/quickwit/quickwit-index-management/src/index.rs b/quickwit/quickwit-index-management/src/index.rs index 6d7cc22609d..e40118c6c5f 100644 --- a/quickwit/quickwit-index-management/src/index.rs +++ b/quickwit/quickwit-index-management/src/index.rs @@ -25,8 +25,8 @@ use quickwit_config::{validate_identifier, IndexConfig, SourceConfig}; use quickwit_indexing::check_source_connectivity; use quickwit_metastore::{ AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitInfo, SplitMetadata, - SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo, + SplitMetadata, SplitState, }; use quickwit_proto::metastore::{ AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind, IndexMetadataRequest, @@ -178,6 +178,8 @@ impl IndexService { .metastore .list_splits(list_splits_request) .await? + .collect_splits() + .await? .into_iter() .map(|split| split.split_metadata.as_split_info()) .collect(); @@ -191,9 +193,8 @@ impl IndexService { .metastore .list_splits(list_splits_request) .await? - .into_iter() - .map(|split| split.split_metadata.split_id) - .collect(); + .collect_split_ids() + .await?; let mark_splits_for_deletion_request = MarkSplitsForDeletionRequest::new(index_uid.clone(), split_ids); self.metastore @@ -204,19 +205,18 @@ impl IndexService { let query = ListSplitsQuery::for_index(index_uid.clone()) .with_split_state(SplitState::MarkedForDeletion); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits_to_delete: Vec = self + let splits_metadata_to_delete: Vec = self .metastore .list_splits(list_splits_request) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; let deleted_splits = delete_splits_from_storage_and_metastore( index_uid.clone(), storage, self.metastore.clone(), - splits_to_delete, + splits_metadata_to_delete, None, ) .await?; @@ -294,9 +294,8 @@ impl IndexService { .metastore .list_splits(list_splits_request) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; let split_ids: Vec = splits_metadata .iter() .map(|split| split.split_id.to_string()) @@ -422,7 +421,9 @@ mod tests { use quickwit_common::uri::Uri; use quickwit_config::IndexConfig; - use quickwit_metastore::{metastore_for_test, SplitMetadata, StageSplitsRequestExt}; + use quickwit_metastore::{ + metastore_for_test, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + }; use quickwit_proto::metastore::StageSplitsRequest; use quickwit_storage::PutPayload; @@ -501,6 +502,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 1); @@ -517,6 +521,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(splits.is_empty()); assert!(!storage.exists(split_path).await.unwrap()); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 4e7cce6bc27..c4ae50e2a07 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -831,7 +831,7 @@ mod tests { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata).unwrap()) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .returning(|_| Ok(ServiceStream::empty())); let universe = Universe::with_accelerated_time(); let node_id = "test-node"; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 95076e7d163..7e21f4cf96a 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -1455,7 +1455,7 @@ mod tests { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); metastore - .expect_stream_splits() + .expect_list_splits() .returning(|_| Ok(ServiceStream::empty())); let universe = Universe::new(); let temp_dir = tempfile::tempdir().unwrap(); diff --git a/quickwit/quickwit-indexing/src/actors/merge_executor.rs b/quickwit/quickwit-indexing/src/actors/merge_executor.rs index b5b6287bf6f..04a54f3d5fd 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_executor.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_executor.rs @@ -560,7 +560,7 @@ mod tests { use quickwit_actors::Universe; use quickwit_common::split_file; use quickwit_metastore::{ - ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, StageSplitsRequestExt, + ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, StageSplitsRequestExt, }; use quickwit_proto::metastore::{ DeleteQuery, ListSplitsRequest, PublishSplitsRequest, StageSplitsRequest, @@ -606,9 +606,9 @@ mod tests { .list_splits(list_splits_request) .await .unwrap() - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await + .unwrap(); assert_eq!(split_metas.len(), 4); let merge_scratch_directory = TempDirectory::for_test(); let downloaded_splits_directory = @@ -730,15 +730,16 @@ mod tests { query_ast: quickwit_query::query_ast::qast_json_helper(delete_query, &["body"]), }) .await?; - let split = metastore + let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap() - .into_iter() - .next() + .collect_splits() + .await .unwrap(); + // We want to test a delete on a split with num_merge_ops > 0. - let mut new_split_metadata = split.split_metadata.clone(); + let mut new_split_metadata = splits[0].split_metadata.clone(); new_split_metadata.split_id = new_split_id(); new_split_metadata.num_merge_ops = 1; let stage_splits_request = StageSplitsRequest::try_from_split_metadata( @@ -750,7 +751,7 @@ mod tests { let publish_splits_request = PublishSplitsRequest { index_uid: index_uid.to_string(), staged_split_ids: vec![new_split_metadata.split_id.to_string()], - replaced_split_ids: vec![split.split_metadata.split_id.to_string()], + replaced_split_ids: vec![splits[0].split_metadata.split_id.to_string()], index_checkpoint_delta_json_opt: None, publish_token_opt: None, }; @@ -763,7 +764,7 @@ mod tests { let merge_scratch_directory = TempDirectory::for_test(); let downloaded_splits_directory = merge_scratch_directory.named_temp_child("downloaded-splits-")?; - let split_filename = split_file(split.split_metadata.split_id()); + let split_filename = split_file(splits[0].split_metadata.split_id()); let new_split_filename = split_file(new_split_metadata.split_id()); let dest_filepath = downloaded_splits_directory.path().join(&new_split_filename); test_sandbox @@ -845,6 +846,9 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await .unwrap() + .collect_splits() + .await + .unwrap() .into_iter() .all( |split| split.split_state == quickwit_metastore::SplitState::MarkedForDeletion diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 516a404edf6..42ee16976c1 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -32,10 +32,12 @@ use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; use quickwit_doc_mapper::DocMapper; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitState, }; use quickwit_proto::indexing::IndexingPipelineId; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreError, MetastoreServiceClient}; +use quickwit_proto::metastore::{ + ListSplitsRequest, MetastoreError, MetastoreService, MetastoreServiceClient, +}; use time::OffsetDateTime; use tracing::{debug, error, info, instrument}; @@ -221,12 +223,12 @@ impl MergePipeline { .with_split_state(SplitState::Published) .retain_immature(OffsetDateTime::now_utc()); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let published_splits_metadata: Vec = ctx + let published_splits_stream = ctx .protect_future(self.params.metastore.list_splits(list_splits_request)) - .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .await?; + let published_splits_metadata = ctx + .protect_future(published_splits_stream.collect_splits_metadata()) + .await?; info!( num_splits = published_splits_metadata.len(), @@ -508,7 +510,7 @@ mod tests { pipeline_uid: PipelineUid::default(), }; metastore - .expect_stream_splits() + .expect_list_splits() .times(1) .withf(move |list_splits_request| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index c70f6d36863..0dfa69448e5 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -282,8 +282,8 @@ pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata { #[cfg(test)] mod tests { - use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt}; - use quickwit_proto::metastore::ListSplitsRequest; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt}; + use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; use super::TestSandbox; @@ -312,6 +312,8 @@ mod tests { .list_splits( ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap(), ) + .await? + .collect_splits() .await?; assert_eq!(splits.len(), 1); test_sandbox.add_documents(vec![ @@ -323,6 +325,8 @@ mod tests { .list_splits( ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap(), ) + .await? + .collect_splits() .await?; assert_eq!(splits.len(), 2); } diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index c902fc37e29..b9dde8d7fde 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -291,7 +291,7 @@ mod tests { use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_indexing::TestSandbox; - use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt, SplitState}; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitState}; use quickwit_proto::metastore::{DeleteQuery, ListSplitsRequest, MetastoreService}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; use quickwit_search::{ @@ -419,6 +419,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 2); let published_split = splits diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs index a4c5541eb92..0c6b519ada4 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_planner.rs @@ -426,7 +426,8 @@ mod tests { use quickwit_indexing::merge_policy::MergeOperation; use quickwit_indexing::TestSandbox; use quickwit_metastore::{ - IndexMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, + IndexMetadataResponseExt, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, + SplitMetadata, }; use quickwit_proto::metastore::{DeleteQuery, IndexMetadataRequest, ListSplitsRequest}; use quickwit_proto::search::{LeafSearchRequest, LeafSearchResponse}; @@ -481,9 +482,9 @@ mod tests { .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await .unwrap() - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await + .unwrap(); assert_eq!(split_metas.len(), 3); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings)?; @@ -597,11 +598,14 @@ mod tests { let all_splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits_metadata() + .await .unwrap(); - assert_eq!(all_splits[0].split_metadata.delete_opstamp, 2); - assert_eq!(all_splits[1].split_metadata.delete_opstamp, 2); + assert_eq!(all_splits[0].delete_opstamp, 2); + assert_eq!(all_splits[1].delete_opstamp, 2); // The last split has not yet its delete opstamp updated. - assert_eq!(all_splits[2].split_metadata.delete_opstamp, 0); + assert_eq!(all_splits[2].delete_opstamp, 0); test_sandbox.assert_quit().await; Ok(()) } diff --git a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs index fb016961453..4c082c36f1d 100644 --- a/quickwit/quickwit-janitor/src/actors/garbage_collector.rs +++ b/quickwit/quickwit-janitor/src/actors/garbage_collector.rs @@ -263,7 +263,7 @@ mod tests { let mut mock_metastore = MetastoreServiceClient::mock(); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(2) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -358,7 +358,7 @@ mod tests { ) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(2) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -434,7 +434,7 @@ mod tests { ) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(6) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -604,7 +604,7 @@ mod tests { ) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(3) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); @@ -686,7 +686,7 @@ mod tests { ) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(4) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); diff --git a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs index 71d85d43b89..40ec1a9c1ea 100644 --- a/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs +++ b/quickwit/quickwit-janitor/src/actors/retention_policy_executor.rs @@ -348,7 +348,7 @@ mod tests { let mut sequence = Sequence::new(); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(..) .returning(|_| Ok(ServiceStream::empty())); mock_metastore @@ -457,7 +457,7 @@ mod tests { }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .times(2..=4) .returning(|list_splits_request| { let query = list_splits_request.deserialize_list_splits_query().unwrap(); diff --git a/quickwit/quickwit-janitor/src/retention_policy_execution.rs b/quickwit/quickwit-janitor/src/retention_policy_execution.rs index 255e5f05d0f..ce90cfa1695 100644 --- a/quickwit/quickwit-janitor/src/retention_policy_execution.rs +++ b/quickwit/quickwit-janitor/src/retention_policy_execution.rs @@ -21,7 +21,8 @@ use quickwit_actors::ActorContext; use quickwit_common::PrettySample; use quickwit_config::RetentionPolicy; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, + SplitState, }; use quickwit_proto::metastore::{ ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient, @@ -58,8 +59,9 @@ pub async fn run_execute_retention_policy( let (expired_splits, ignored_splits): (Vec, Vec) = ctx .protect_future(metastore.list_splits(list_splits_request)) .await? + .collect_splits_metadata() + .await? .into_iter() - .map(|split| split.split_metadata) .partition(|split_metadata| split_metadata.time_range.is_some()); if !ignored_splits.is_empty() { diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index 5ac68335cd5..017d2c6bead 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -50,8 +50,8 @@ pub use metastore::postgresql_metastore::PostgresqlMetastore; pub use metastore::{ file_backed_metastore, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, - ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, PublishSplitsRequestExt, - StageSplitsRequestExt, + ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, + MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt, }; pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; pub use metastore_resolver::MetastoreResolver; diff --git a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs index a54ac5ef995..9cb25df928f 100644 --- a/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs @@ -142,11 +142,11 @@ impl MetastoreService for ControlPlaneMetastore { self.metastore.publish_splits(request).await } - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> MetastoreResult> { - self.metastore.stream_splits(request).await + self.metastore.list_splits(request).await } async fn list_stale_splits( diff --git a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs index 2f32ff315dc..131c606bbc7 100644 --- a/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/file_backed_metastore/mod.rs @@ -637,7 +637,7 @@ impl MetastoreService for FileBackedMetastore { /// Streams of splits for the given request. /// No error is returned if one of the requested `index_uid` does not exist. - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> MetastoreResult> { @@ -646,7 +646,8 @@ impl MetastoreService for FileBackedMetastore { .chunks(STREAM_SPLITS_CHUNK_SIZE) .map(|chunk| ListSplitsResponse::try_from_splits(chunk.to_vec())) .collect(); - Ok(ServiceStream::from(splits_responses)) + let splits_responses_stream = Box::pin(futures::stream::iter(splits_responses)); + Ok(ServiceStream::new(splits_responses_stream)) } async fn list_stale_splits( @@ -991,6 +992,7 @@ mod tests { fetch_or_init_indexes_states, meta_path, put_index_given_index_id, put_indexes_states, }; use super::*; + use crate::metastore::MetastoreServiceStreamSplitsExt; use crate::tests::DefaultForTest; use crate::{metastore_test_suite, IndexMetadata, ListSplitsQuery, SplitMetadata, SplitState}; @@ -1156,14 +1158,26 @@ mod tests { let query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query).unwrap(); - let splits = metastore.list_splits(list_splits_request).await.unwrap(); + let splits = metastore + .list_splits(list_splits_request) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); assert!(splits.is_empty()); let list_splits_query = ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Staged); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(list_splits_query).unwrap(); - let splits = metastore.list_splits(list_splits_request).await.unwrap(); + let splits = metastore + .list_splits(list_splits_request) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); assert!(!splits.is_empty()); } @@ -1218,6 +1232,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(splits.is_empty()); @@ -1236,6 +1253,9 @@ mod tests { let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert_eq!(splits.len(), 1); Ok(()) @@ -1264,12 +1284,18 @@ mod tests { let splits = metastore_write .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(splits.is_empty()); let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(splits.is_empty()); @@ -1288,6 +1314,9 @@ mod tests { let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(splits.is_empty()); @@ -1297,6 +1326,9 @@ mod tests { let splits = metastore_read .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); if !splits.is_empty() { return Ok(()); @@ -1365,7 +1397,13 @@ mod tests { ListSplitsQuery::for_index(index_uid.clone()).with_split_state(SplitState::Published); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(list_splits_query).unwrap(); - let splits = metastore.list_splits(list_splits_request).await.unwrap(); + let splits = metastore + .list_splits(list_splits_request) + .await + .unwrap() + .collect_splits() + .await + .unwrap(); // Make sure that all 20 splits are in `Published` state. assert_eq!(splits.len(), 20); diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 6d428c2a536..888162e0ed0 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -40,7 +40,7 @@ use quickwit_proto::metastore::{ serde_utils, AddSourceRequest, CreateIndexRequest, DeleteTask, IndexMetadataRequest, IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient, - PublishSplitsRequest, StageSplitsRequest, + MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, }; use quickwit_proto::types::{IndexUid, SplitId}; use time::OffsetDateTime; @@ -49,7 +49,7 @@ use crate::checkpoint::IndexCheckpointDelta; use crate::{Split, SplitMetadata, SplitState}; /// Splits batch size returned by the stream splits API -const STREAM_SPLITS_CHUNK_SIZE: usize = 1000; +const STREAM_SPLITS_CHUNK_SIZE: usize = 1_000; static METASTORE_METRICS_LAYER: Lazy> = Lazy::new(|| PrometheusMetricsLayer::new("metastore", ["request"])); @@ -74,23 +74,52 @@ pub trait MetastoreServiceExt: MetastoreService { Err(error) => Err(error), } } +} + +impl MetastoreServiceExt for MetastoreServiceClient {} + +/// Helper trait to collect splits from a [`MetastoreServiceStream`]. +#[async_trait] +pub trait MetastoreServiceStreamSplitsExt { + /// Collects all splits from a [`MetastoreServiceStream`]. + async fn collect_splits(mut self) -> MetastoreResult>; - /// Lists all splits matching the given query. - /// It calls `stream_splits` and collects the splits into a single vector. - /// This method is added for convenience and may be removed in the future to avoid - /// loading all splits in memory. - async fn list_splits(&mut self, query: ListSplitsRequest) -> MetastoreResult> { - let mut stream = self.stream_splits(query).await?; + /// Collects all splits metadata from a [`MetastoreServiceStream`]. + async fn collect_splits_metadata(mut self) -> MetastoreResult>; + + /// Collects all splits IDs from a [`MetastoreServiceStream`]. + async fn collect_split_ids(mut self) -> MetastoreResult>; +} + +#[async_trait] +impl MetastoreServiceStreamSplitsExt for MetastoreServiceStream { + async fn collect_splits(mut self) -> MetastoreResult> { let mut all_splits = Vec::new(); - while let Some(list_splits_response) = stream.try_next().await? { + while let Some(list_splits_response) = self.try_next().await? { let splits = list_splits_response.deserialize_splits()?; all_splits.extend(splits); } Ok(all_splits) } -} -impl MetastoreServiceExt for MetastoreServiceClient {} + async fn collect_splits_metadata(mut self) -> MetastoreResult> { + let mut all_splits_metadata = Vec::new(); + while let Some(list_splits_response) = self.try_next().await? { + let splits_metadata = list_splits_response.deserialize_splits_metadata()?; + all_splits_metadata.extend(splits_metadata); + } + Ok(all_splits_metadata) + } + + async fn collect_split_ids(mut self) -> MetastoreResult> { + let mut all_splits = Vec::new(); + while let Some(list_splits_response) = self.try_next().await? { + let splits = list_splits_response.deserialize_split_ids()?; + all_splits.extend(splits); + } + Ok(all_splits) + } +} /// Helper trait to build a [`CreateIndexRequest`] and deserialize its payload. pub trait CreateIndexRequestExt { diff --git a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs index 3012b237656..d145cb2ea0b 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs @@ -805,7 +805,7 @@ impl MetastoreService for PostgresqlMetastore { } #[instrument(skip(self))] - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> MetastoreResult> { diff --git a/quickwit/quickwit-metastore/src/tests/list_splits.rs b/quickwit/quickwit-metastore/src/tests/list_splits.rs index 593a424c768..9d4ac809be0 100644 --- a/quickwit/quickwit-metastore/src/tests/list_splits.rs +++ b/quickwit/quickwit-metastore/src/tests/list_splits.rs @@ -34,6 +34,7 @@ use tokio::time::sleep; use tracing::info; use super::{to_btree_set, DefaultForTest}; +use crate::metastore::MetastoreServiceStreamSplitsExt; use crate::tests::{cleanup_index, collect_split_ids}; use crate::{ CreateIndexRequestExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, @@ -95,6 +96,9 @@ pub async fn test_metastore_list_all_splits< .unwrap(), ) .await + .unwrap() + .collect_splits() + .await .unwrap(); assert!(no_splits.is_empty()); @@ -142,6 +146,9 @@ pub async fn test_metastore_list_all_splits< let splits = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); let split_ids = collect_split_ids(&splits); assert_eq!( @@ -209,10 +216,7 @@ pub async fn test_metastore_stream_splits = splits .iter() @@ -383,6 +399,9 @@ pub async fn test_metastore_list_splits = splits .iter() @@ -743,6 +834,9 @@ pub async fn test_metastore_list_splits current_timestamp); @@ -1504,6 +1526,9 @@ pub async fn test_metastore_split_update_timestamp< let split_meta = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap()[0] .clone(); assert!(split_meta.update_timestamp > current_timestamp); @@ -1524,6 +1549,9 @@ pub async fn test_metastore_split_update_timestamp< let split_meta = metastore .list_splits(ListSplitsRequest::try_from_index_uid(index_uid.clone()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap()[0] .clone(); assert!(split_meta.update_timestamp > current_timestamp); @@ -1593,6 +1621,9 @@ pub async fn test_metastore_stage_splits crate::metastore::MetastoreResult; /// Streams splits from index. - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> crate::metastore::MetastoreResult>; @@ -811,11 +811,11 @@ impl MetastoreService for MetastoreServiceClient { ) -> crate::metastore::MetastoreResult { self.inner.delete_index(request).await } - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> crate::metastore::MetastoreResult> { - self.inner.stream_splits(request).await + self.inner.list_splits(request).await } async fn stage_splits( &mut self, @@ -959,13 +959,13 @@ pub mod metastore_service_mock { ) -> crate::metastore::MetastoreResult { self.inner.lock().await.delete_index(request).await } - async fn stream_splits( + async fn list_splits( &mut self, request: super::ListSplitsRequest, ) -> crate::metastore::MetastoreResult< MetastoreServiceStream, > { - self.inner.lock().await.stream_splits(request).await + self.inner.lock().await.list_splits(request).await } async fn stage_splits( &mut self, @@ -1166,7 +1166,7 @@ impl tower::Service for Box { } fn call(&mut self, request: ListSplitsRequest) -> Self::Future { let mut svc = self.clone(); - let fut = async move { svc.stream_splits(request).await }; + let fut = async move { svc.list_splits(request).await }; Box::pin(fut) } } @@ -1466,7 +1466,7 @@ struct MetastoreServiceTowerBlock { EmptyResponse, crate::metastore::MetastoreError, >, - stream_splits_svc: quickwit_common::tower::BoxService< + list_splits_svc: quickwit_common::tower::BoxService< ListSplitsRequest, MetastoreServiceStream, crate::metastore::MetastoreError, @@ -1565,7 +1565,7 @@ impl Clone for MetastoreServiceTowerBlock { index_metadata_svc: self.index_metadata_svc.clone(), list_indexes_metadata_svc: self.list_indexes_metadata_svc.clone(), delete_index_svc: self.delete_index_svc.clone(), - stream_splits_svc: self.stream_splits_svc.clone(), + list_splits_svc: self.list_splits_svc.clone(), stage_splits_svc: self.stage_splits_svc.clone(), publish_splits_svc: self.publish_splits_svc.clone(), mark_splits_for_deletion_svc: self.mark_splits_for_deletion_svc.clone(), @@ -1614,11 +1614,11 @@ impl MetastoreService for MetastoreServiceTowerBlock { ) -> crate::metastore::MetastoreResult { self.delete_index_svc.ready().await?.call(request).await } - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> crate::metastore::MetastoreResult> { - self.stream_splits_svc.ready().await?.call(request).await + self.list_splits_svc.ready().await?.call(request).await } async fn stage_splits( &mut self, @@ -1768,7 +1768,7 @@ pub struct MetastoreServiceTowerBlockBuilder { >, >, #[allow(clippy::type_complexity)] - stream_splits_layer: Option< + list_splits_layer: Option< quickwit_common::tower::BoxLayer< Box, ListSplitsRequest, @@ -2092,7 +2092,7 @@ impl MetastoreServiceTowerBlockBuilder { quickwit_common::tower::BoxLayer::new(layer.clone()), ); self - .stream_splits_layer = Some( + .list_splits_layer = Some( quickwit_common::tower::BoxLayer::new(layer.clone()), ); self @@ -2219,7 +2219,7 @@ impl MetastoreServiceTowerBlockBuilder { self.delete_index_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } - pub fn stream_splits_layer(mut self, layer: L) -> Self + pub fn list_splits_layer(mut self, layer: L) -> Self where L: tower::Layer> + Send + Sync + 'static, L::Service: tower::Service< @@ -2229,7 +2229,7 @@ impl MetastoreServiceTowerBlockBuilder { > + Clone + Send + Sync + 'static, >::Future: Send + 'static, { - self.stream_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); + self.list_splits_layer = Some(quickwit_common::tower::BoxLayer::new(layer)); self } pub fn stage_splits_layer(mut self, layer: L) -> Self @@ -2539,7 +2539,7 @@ impl MetastoreServiceTowerBlockBuilder { } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) }; - let stream_splits_svc = if let Some(layer) = self.stream_splits_layer { + let list_splits_svc = if let Some(layer) = self.list_splits_layer { layer.layer(boxed_instance.clone()) } else { quickwit_common::tower::BoxService::new(boxed_instance.clone()) @@ -2642,7 +2642,7 @@ impl MetastoreServiceTowerBlockBuilder { index_metadata_svc, list_indexes_metadata_svc, delete_index_svc, - stream_splits_svc, + list_splits_svc, stage_splits_svc, publish_splits_svc, mark_splits_for_deletion_svc, @@ -2905,7 +2905,7 @@ where ) -> crate::metastore::MetastoreResult { self.call(request).await } - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> crate::metastore::MetastoreResult> { @@ -3100,12 +3100,12 @@ where .map(|response| response.into_inner()) .map_err(|error| error.into()) } - async fn stream_splits( + async fn list_splits( &mut self, request: ListSplitsRequest, ) -> crate::metastore::MetastoreResult> { self.inner - .stream_splits(request) + .list_splits(request) .await .map(|response| { let streaming: tonic::Streaming<_> = response.into_inner(); @@ -3359,16 +3359,16 @@ for MetastoreServiceGrpcServerAdapter { .map(tonic::Response::new) .map_err(|error| error.into()) } - type StreamSplitsStream = quickwit_common::ServiceStream< + type ListSplitsStream = quickwit_common::ServiceStream< tonic::Result, >; - async fn stream_splits( + async fn list_splits( &self, request: tonic::Request, - ) -> Result, tonic::Status> { + ) -> Result, tonic::Status> { self.inner .clone() - .stream_splits(request.into_inner()) + .list_splits(request.into_inner()) .await .map(|stream| tonic::Response::new(stream.map_err(|error| error.into()))) .map_err(|error| error.into()) @@ -3811,7 +3811,7 @@ pub mod metastore_service_grpc_client { self.inner.unary(req, path, codec).await } /// Streams splits from index. - pub async fn stream_splits( + pub async fn list_splits( &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result< @@ -3829,15 +3829,12 @@ pub mod metastore_service_grpc_client { })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( - "/quickwit.metastore.MetastoreService/StreamSplits", + "/quickwit.metastore.MetastoreService/ListSplits", ); let mut req = request.into_request(); req.extensions_mut() .insert( - GrpcMethod::new( - "quickwit.metastore.MetastoreService", - "StreamSplits", - ), + GrpcMethod::new("quickwit.metastore.MetastoreService", "ListSplits"), ); self.inner.server_streaming(req, path, codec).await } @@ -4377,20 +4374,17 @@ pub mod metastore_service_grpc_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Server streaming response type for the StreamSplits method. - type StreamSplitsStream: futures_core::Stream< + /// Server streaming response type for the ListSplits method. + type ListSplitsStream: futures_core::Stream< Item = std::result::Result, > + Send + 'static; /// Streams splits from index. - async fn stream_splits( + async fn list_splits( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// Stages several splits. async fn stage_splits( &self, @@ -4818,15 +4812,15 @@ pub mod metastore_service_grpc_server { }; Box::pin(fut) } - "/quickwit.metastore.MetastoreService/StreamSplits" => { + "/quickwit.metastore.MetastoreService/ListSplits" => { #[allow(non_camel_case_types)] - struct StreamSplitsSvc(pub Arc); + struct ListSplitsSvc(pub Arc); impl< T: MetastoreServiceGrpc, > tonic::server::ServerStreamingService - for StreamSplitsSvc { + for ListSplitsSvc { type Response = super::ListSplitsResponse; - type ResponseStream = T::StreamSplitsStream; + type ResponseStream = T::ListSplitsStream; type Future = BoxFuture< tonic::Response, tonic::Status, @@ -4836,9 +4830,7 @@ pub mod metastore_service_grpc_server { request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - (*inner).stream_splits(request).await - }; + let fut = async move { (*inner).list_splits(request).await }; Box::pin(fut) } } @@ -4849,7 +4841,7 @@ pub mod metastore_service_grpc_server { let inner = self.inner.clone(); let fut = async move { let inner = inner.0; - let method = StreamSplitsSvc(inner); + let method = ListSplitsSvc(inner); let codec = tonic::codec::ProstCodec::default(); let mut grpc = tonic::server::Grpc::new(codec) .apply_compression_config( diff --git a/quickwit/quickwit-search/src/lib.rs b/quickwit/quickwit-search/src/lib.rs index 86e1fab35af..65fc51ff295 100644 --- a/quickwit/quickwit-search/src/lib.rs +++ b/quickwit/quickwit-search/src/lib.rs @@ -49,7 +49,7 @@ pub use collector::QuickwitAggregations; use metrics::SEARCH_METRICS; use quickwit_common::tower::Pool; use quickwit_doc_mapper::DocMapper; -use quickwit_proto::metastore::{ListSplitsRequest, MetastoreServiceClient}; +use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; use tantivy::schema::NamedFieldDocument; /// Refer to this as `crate::Result`. @@ -62,7 +62,8 @@ pub use find_trace_ids_collector::FindTraceIdsCollector; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::tag_pruning::TagFilterAst; use quickwit_metastore::{ - ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceExt, SplitMetadata, SplitState, + ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitMetadata, + SplitState, }; use quickwit_proto::search::{PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets}; use quickwit_proto::types::IndexUid; @@ -189,9 +190,8 @@ async fn list_relevant_splits( let splits_metadata: Vec = metastore .list_splits(list_splits_request) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; Ok(splits_metadata) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index c35e476e735..05252a65969 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -31,7 +31,7 @@ use quickwit_doc_mapper::tag_pruning::extract_tags_from_query; use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt, - MetastoreServiceExt, SplitMetadata, + MetastoreServiceStreamSplitsExt, SplitMetadata, }; use quickwit_proto::metastore::{ IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService, @@ -1016,9 +1016,8 @@ pub async fn root_list_terms( .clone() .list_splits(list_splits_request) .await? - .into_iter() - .map(|split| split.split_metadata) - .collect(); + .collect_splits_metadata() + .await?; let index_uri = &index_config.index_uri; @@ -1467,7 +1466,7 @@ mod tests { .unwrap()) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") @@ -1565,7 +1564,7 @@ mod tests { .unwrap()) }); metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |_list_splits_request| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1633,7 +1632,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1729,7 +1728,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -1909,7 +1908,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2084,7 +2083,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2205,7 +2204,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2337,7 +2336,7 @@ mod tests { .unwrap()) }); metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |_list_splits_request| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2416,7 +2415,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -2479,7 +2478,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -2568,7 +2567,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -2641,7 +2640,7 @@ mod tests { .unwrap()) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split") .with_index_uid(&index_uid) @@ -2726,7 +2725,7 @@ mod tests { ]) .unwrap()) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -2774,7 +2773,7 @@ mod tests { .unwrap()) }); mock_metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |_filter| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -2975,7 +2974,7 @@ mod tests { .unwrap(), ) }); - metastore.expect_stream_splits().returning(move |_filter| { + metastore.expect_list_splits().returning(move |_filter| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -3167,7 +3166,7 @@ mod tests { }, ); metastore - .expect_stream_splits() + .expect_list_splits() .return_once(move |list_splits_request| { let list_splits_query = list_splits_request.deserialize_list_splits_query().unwrap(); diff --git a/quickwit/quickwit-search/src/search_stream/leaf.rs b/quickwit/quickwit-search/src/search_stream/leaf.rs index f6377d079fc..60b32b861c8 100644 --- a/quickwit/quickwit-search/src/search_stream/leaf.rs +++ b/quickwit/quickwit-search/src/search_stream/leaf.rs @@ -451,8 +451,8 @@ mod tests { use itertools::Itertools; use quickwit_indexing::TestSandbox; - use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceExt}; - use quickwit_proto::metastore::ListSplitsRequest; + use quickwit_metastore::{ListSplitsRequestExt, MetastoreServiceStreamSplitsExt}; + use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService}; use quickwit_query::query_ast::qast_json_helper; use serde_json::json; use tantivy::time::{Duration, OffsetDateTime}; @@ -501,7 +501,10 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await?; + .await? + .collect_splits() + .await + .unwrap(); let splits_offsets = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) @@ -577,6 +580,8 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) + .await? + .collect_splits() .await?; let splits_offsets = splits .into_iter() @@ -632,6 +637,8 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) + .await? + .collect_splits() .await?; let splits_offsets = splits .into_iter() @@ -720,6 +727,8 @@ mod tests { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) + .await? + .collect_splits() .await?; let splits_offsets = splits .into_iter() diff --git a/quickwit/quickwit-search/src/search_stream/root.rs b/quickwit/quickwit-search/src/search_stream/root.rs index 767481c646f..2ba006720e1 100644 --- a/quickwit/quickwit-search/src/search_stream/root.rs +++ b/quickwit/quickwit-search/src/search_stream/root.rs @@ -158,7 +158,7 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_stream_splits().returning(move |_| { + mock_metastore.expect_list_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -216,7 +216,7 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_stream_splits().returning(move |_| { + mock_metastore.expect_list_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split1") .with_index_uid(&index_uid) .build()]; @@ -272,7 +272,7 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_stream_splits().returning(move |_| { + mock_metastore.expect_list_splits().returning(move |_| { let splits = vec![ MockSplitBuilder::new("split1") .with_index_uid(&index_uid) @@ -333,7 +333,7 @@ mod tests { mock_metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - mock_metastore.expect_stream_splits().returning(move |_| { + mock_metastore.expect_list_splits().returning(move |_| { let splits = vec![MockSplitBuilder::new("split") .with_index_uid(&index_uid) .build()]; diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index 21c9a2eef9d..d84b730f1e7 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -1028,6 +1028,9 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) .await + .unwrap() + .collect_splits() + .await .unwrap(); let splits_offsets: Vec<_> = splits .into_iter() @@ -1664,7 +1667,10 @@ async fn test_single_node_list_terms() -> anyhow::Result<()> { let splits = test_sandbox .metastore() .list_splits(ListSplitsRequest::try_from_index_uid(test_sandbox.index_uid()).unwrap()) - .await?; + .await? + .collect_splits() + .await + .unwrap(); let splits_offsets: Vec<_> = splits .into_iter() .map(|split| extract_split_and_footer_offsets(&split.split_metadata)) diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index bcad6721a16..4a8b6fc71bd 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -30,7 +30,7 @@ use quickwit_doc_mapper::{analyze_text, TokenizerConfig}; use quickwit_index_management::{IndexService, IndexServiceError}; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery, - ListSplitsRequestExt, MetastoreServiceExt, Split, SplitInfo, SplitState, + ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, Split, SplitInfo, SplitState, }; use quickwit_proto::metastore::{ DeleteSourceRequest, EntityKind, IndexMetadataRequest, ListIndexesMetadataRequest, @@ -198,7 +198,11 @@ async fn describe_index( .deserialize_index_metadata()?; let query = ListSplitsQuery::for_index(index_metadata.index_uid.clone()); let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits = metastore.list_splits(list_splits_request).await?; + let splits = metastore + .list_splits(list_splits_request) + .await? + .collect_splits() + .await?; let published_splits: Vec = splits .into_iter() .filter(|split| split.split_state == SplitState::Published) @@ -344,7 +348,11 @@ async fn list_splits( query = query.with_create_timestamp_lt(end_created_timestamp); } let list_splits_request = ListSplitsRequest::try_from_list_splits_query(query)?; - let splits = metastore.list_splits(list_splits_request).await?; + let splits = metastore + .list_splits(list_splits_request) + .await? + .collect_splits() + .await?; Ok(ListSplitsResponse { offset, size: splits.len(), @@ -940,7 +948,7 @@ mod tests { }) .times(2); metastore - .expect_stream_splits() + .expect_list_splits() .returning(move |list_splits_request: ListSplitsRequest| { let list_split_query = list_splits_request.deserialize_list_splits_query().unwrap(); if list_split_query.index_uids.contains(&index_uid) @@ -1027,7 +1035,7 @@ mod tests { split_1_time_range.end() + 10, )); mock_metastore - .expect_stream_splits() + .expect_list_splits() .withf(move |list_split_request| -> bool { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); list_split_query.index_uids.contains(&index_uid) @@ -1079,7 +1087,7 @@ mod tests { .return_once(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata).unwrap()) }); - mock_metastore.expect_stream_splits().return_once( + mock_metastore.expect_list_splits().return_once( move |list_split_request: ListSplitsRequest| { let list_split_query = list_split_request.deserialize_list_splits_query().unwrap(); if list_split_query.index_uids.contains(&index_uid) @@ -1220,7 +1228,7 @@ mod tests { .unwrap(), ) }); - mock_metastore.expect_stream_splits().return_once(|_| { + mock_metastore.expect_list_splits().return_once(|_| { let splits = ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap(); Ok(ServiceStream::from(vec![Ok(splits)])) }); @@ -1265,7 +1273,7 @@ mod tests { }) .times(2); mock_metastore - .expect_stream_splits() + .expect_list_splits() .returning(|_| { let splits = ListSplitsResponse::try_from_splits(vec![mock_split("split_1")]).unwrap(); diff --git a/quickwit/quickwit-serve/src/search_api/mod.rs b/quickwit/quickwit-serve/src/search_api/mod.rs index dba69e5e183..0bc69b44065 100644 --- a/quickwit/quickwit-serve/src/search_api/mod.rs +++ b/quickwit/quickwit-serve/src/search_api/mod.rs @@ -86,7 +86,7 @@ mod tests { metastore.expect_index_metadata().returning(move |_| { Ok(IndexMetadataResponse::try_from_index_metadata(index_metadata.clone()).unwrap()) }); - metastore.expect_stream_splits().returning(move |_| { + metastore.expect_list_splits().returning(move |_| { let splits = vec![ MockSplitBuilder::new("split_1") .with_index_uid(&index_uid)