Skip to content

Commit

Permalink
Add a StorageUriResolver singleton and a specific resolver for tests. (
Browse files Browse the repository at this point in the history
…#492)

* Add a StorageUriResolver singleton and a specific resolver for tests.

* Making s3 region sniffing lazy

* Fix name in single node test.

* Use different index ids in test to avoid random failures.

Co-authored-by: Paul Masurel <paul@quickwit.io>
  • Loading branch information
fmassot and fulmicoton authored Sep 1, 2021
1 parent 8ce60c1 commit ac7001b
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 89 deletions.
11 changes: 5 additions & 6 deletions quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use quickwit_proto::SearchRequest;
use quickwit_proto::SearchResult;
use quickwit_search::single_node_search;
use quickwit_search::SearchResultJson;
use quickwit_storage::StorageUriResolver;
use quickwit_storage::quickwit_storage_uri_resolver;
use quickwit_telemetry::payload::TelemetryEvent;
use std::collections::VecDeque;
use std::env;
Expand Down Expand Up @@ -183,7 +183,7 @@ pub async fn index_data_cli(args: IndexDataArgs) -> anyhow::Result<()> {
ScratchDirectory::try_new_temp()
.with_context(|| "Failed to create a tempdir for the indexer")?
};
let storage_uri_resolver = StorageUriResolver::default();
let storage_uri_resolver = quickwit_storage_uri_resolver();
let metastore_uri_resolver = MetastoreUriResolver::default();
let metastore = metastore_uri_resolver.resolve(&args.metastore_uri).await?;

Expand All @@ -202,7 +202,7 @@ pub async fn index_data_cli(args: IndexDataArgs) -> anyhow::Result<()> {
source_config,
indexer_params,
metastore,
storage_uri_resolver,
storage_uri_resolver: storage_uri_resolver.clone(),
};

let indexing_supervisor = IndexingPipelineSupervisor::new(indexing_pipeline_params);
Expand Down Expand Up @@ -241,8 +241,7 @@ fn create_source_config_from_args(input_path_opt: Option<PathBuf>) -> SourceConf

pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResult> {
debug!(args = ?args, "search-index");

let storage_uri_resolver = StorageUriResolver::default();
let storage_uri_resolver = quickwit_storage_uri_resolver();
let metastore_uri_resolver = MetastoreUriResolver::default();
let metastore = metastore_uri_resolver.resolve(&args.metastore_uri).await?;
let search_request = SearchRequest {
Expand All @@ -256,7 +255,7 @@ pub async fn search_index(args: SearchIndexArgs) -> anyhow::Result<SearchResult>
tags: args.tags.unwrap_or_default(),
};
let search_result: SearchResult =
single_node_search(&search_request, &*metastore, storage_uri_resolver).await?;
single_node_search(&search_request, &*metastore, storage_uri_resolver.clone()).await?;
Ok(search_result)
}

Expand Down
4 changes: 2 additions & 2 deletions quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use assert_cmd::cargo::cargo_bin;
use assert_cmd::Command;
use predicates::str;
use quickwit_metastore::SingleFileMetastore;
use quickwit_storage::localstack_region;
use quickwit_storage::LocalFileStorage;
use quickwit_storage::RegionProvider;
use quickwit_storage::S3CompatibleObjectStorage;
use quickwit_storage::Storage;
use std::collections::HashMap;
Expand Down Expand Up @@ -159,7 +159,7 @@ pub fn create_test_env(storage_type: TestStorageType) -> anyhow::Result<TestEnv>
TestStorageType::S3 => {
let metastore_uri = "s3+localstack://quickwit-integration-tests/indices";
let storage: Arc<dyn Storage> = Arc::new(S3CompatibleObjectStorage::from_uri(
localstack_region(),
RegionProvider::Localstack.get_region(),
metastore_uri,
)?);
(metastore_uri.to_string(), storage)
Expand Down
12 changes: 7 additions & 5 deletions quickwit-core/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::FileEntry;
use futures::StreamExt;
use quickwit_metastore::{IndexMetadata, Metastore, MetastoreUriResolver};
use quickwit_metastore::{SplitMetadataAndFooterOffsets, SplitState};
use quickwit_storage::StorageUriResolver;
use quickwit_storage::{quickwit_storage_uri_resolver, StorageUriResolver};
use std::path::Path;
use std::time::Duration;
use tantivy::chrono::Utc;
Expand Down Expand Up @@ -64,7 +64,7 @@ pub async fn delete_index(
let metastore = MetastoreUriResolver::default()
.resolve(metastore_uri)
.await?;
let storage_resolver = StorageUriResolver::default();
let storage_resolver = quickwit_storage_uri_resolver();

if dry_run {
let all_splits = metastore.list_all_splits(index_id).await?;
Expand All @@ -87,7 +87,8 @@ pub async fn delete_index(
.mark_splits_as_deleted(index_id, &split_ids)
.await?;

let file_entries = delete_garbage_files(metastore.as_ref(), index_id, storage_resolver).await?;
let file_entries =
delete_garbage_files(metastore.as_ref(), index_id, storage_resolver.clone()).await?;
metastore.delete_index(index_id).await?;
Ok(file_entries)
}
Expand All @@ -108,7 +109,7 @@ pub async fn garbage_collect_index(
let metastore = MetastoreUriResolver::default()
.resolve(metastore_uri)
.await?;
let storage_resolver = StorageUriResolver::default();
let storage_resolver = quickwit_storage_uri_resolver();

// Prune staged splits that are not older than the `grace_period`
let grace_period_timestamp = Utc::now().timestamp() - grace_period.as_secs() as i64;
Expand Down Expand Up @@ -138,7 +139,8 @@ pub async fn garbage_collect_index(
.mark_splits_as_deleted(index_id, &split_ids)
.await?;

let file_entries = delete_garbage_files(metastore.as_ref(), index_id, storage_resolver).await?;
let file_entries =
delete_garbage_files(metastore.as_ref(), index_id, storage_resolver.clone()).await?;
Ok(file_entries)
}

Expand Down
2 changes: 1 addition & 1 deletion quickwit-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl TestSandbox {
index_config,
checkpoint: Checkpoint::default(),
};
let storage_uri_resolver = StorageUriResolver::default();
let storage_uri_resolver = StorageUriResolver::for_test();
let metastore_uri_resolver = MetastoreUriResolver::default();
let metastore = metastore_uri_resolver.resolve(metastore_uri).await?;
metastore.create_index(index_metadata).await?;
Expand Down
3 changes: 2 additions & 1 deletion quickwit-indexing/failpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use quickwit_metastore::IndexMetadata;
use quickwit_metastore::Metastore;
use quickwit_metastore::SingleFileMetastore;
use quickwit_metastore::SplitState;
use quickwit_storage::quickwit_storage_uri_resolver;
use quickwit_storage::StorageUriResolver;
use serde_json::json;
use std::sync::Arc;
Expand Down Expand Up @@ -191,7 +192,7 @@ async fn aux_test_failpoints() -> anyhow::Result<()> {
"batch_num_docs": 1
}),
};
let storage_uri_resolver = StorageUriResolver::default();
let storage_uri_resolver = quickwit_storage_uri_resolver();
index_data(
"test-index".to_string(),
metastore.clone(),
Expand Down
3 changes: 2 additions & 1 deletion quickwit-indexing/src/actors/pipeline_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ mod tests {
use super::IndexingPipelineSupervisor;
use crate::actors::IndexerParams;
use crate::source::SourceConfig;
use quickwit_storage::StorageUriResolver;
use serde_json::json;

use quickwit_actors::Universe;
Expand Down Expand Up @@ -349,7 +350,7 @@ mod tests {
source_config,
indexer_params,
metastore: Arc::new(metastore),
storage_uri_resolver: Default::default(),
storage_uri_resolver: StorageUriResolver::for_test(),
};
let indexing_supervisor = IndexingPipelineSupervisor::new(indexing_pipeline_params);
let (_pipeline_mailbox, pipeline_handler) = universe.spawn_async_actor(indexing_supervisor);
Expand Down
4 changes: 2 additions & 2 deletions quickwit-metastore/src/metastore/single_file_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use async_trait::async_trait;
use chrono::Utc;
use tokio::sync::RwLock;

use quickwit_storage::StorageResolverError;
use quickwit_storage::StorageUriResolver;
use quickwit_storage::{quickwit_storage_uri_resolver, StorageResolverError};
use quickwit_storage::{PutPayload, Storage, StorageErrorKind};

use crate::checkpoint::CheckpointDelta;
Expand Down Expand Up @@ -486,7 +486,7 @@ pub struct SingleFileMetastoreFactory {
impl Default for SingleFileMetastoreFactory {
fn default() -> Self {
SingleFileMetastoreFactory {
storage_uri_resolver: StorageUriResolver::default(),
storage_uri_resolver: quickwit_storage_uri_resolver().clone(),
}
}
}
Expand Down
25 changes: 11 additions & 14 deletions quickwit-search/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,16 @@ mod tests {

#[tokio::test]
async fn test_single_node_simple() -> anyhow::Result<()> {
let index_name = "single-node-simple";
let index_id = "single-node-simple-1";
let test_sandbox =
TestSandbox::create("single-node-simple", Arc::new(WikipediaIndexConfig::new()))
.await?;
TestSandbox::create(index_id, Arc::new(WikipediaIndexConfig::new())).await?;
let docs = vec![
json!({"title": "snoopy", "body": "Snoopy is an anthropomorphic beagle[5] in the comic strip...", "url": "http://snoopy"}),
json!({"title": "beagle", "body": "The beagle is a breed of small scent hound, similar in appearance to the much larger foxhound.", "url": "http://beagle"}),
];
test_sandbox.add_documents(docs.clone()).await?;
let search_request = SearchRequest {
index_id: index_name.to_string(),
index_id: index_id.to_string(),
query: "anthropomorphic".to_string(),
search_fields: vec!["body".to_string()],
start_timestamp: None,
Expand Down Expand Up @@ -264,18 +263,17 @@ mod tests {

#[tokio::test]
async fn test_single_node_several_splits() -> anyhow::Result<()> {
let index_name = "single-node-simple";
let index_id = "single-node-several-splits";
let test_sandbox =
TestSandbox::create("single-node-simple", Arc::new(WikipediaIndexConfig::new()))
.await?;
TestSandbox::create(index_id, Arc::new(WikipediaIndexConfig::new())).await?;
for _ in 0..10u32 {
test_sandbox.add_documents(vec![
json!({"title": "snoopy", "body": "Snoopy is an anthropomorphic beagle[5] in the comic strip...", "url": "http://snoopy"}),
json!({"title": "beagle", "body": "The beagle is a breed of small scent hound, similar in appearance to the much larger foxhound.", "url": "http://beagle"}),
]).await?;
}
let search_request = SearchRequest {
index_id: index_name.to_string(),
index_id: index_id.to_string(),
query: "beagle".to_string(),
search_fields: vec![],
start_timestamp: None,
Expand Down Expand Up @@ -322,9 +320,8 @@ mod tests {
}"#;
let index_config =
serde_json::from_str::<DefaultIndexConfigBuilder>(index_config)?.build()?;
let index_name = "single-node-simple";
let test_sandbox =
TestSandbox::create("single-node-simple", Arc::new(index_config)).await?;
let index_id = "single-node-filtering";
let test_sandbox = TestSandbox::create(index_id, Arc::new(index_config)).await?;

let mut docs = vec![];
for i in 0..30 {
Expand All @@ -334,7 +331,7 @@ mod tests {
test_sandbox.add_documents(docs).await?;

let search_request = SearchRequest {
index_id: index_name.to_string(),
index_id: index_id.to_string(),
query: "info".to_string(),
search_fields: vec![],
start_timestamp: Some(10),
Expand All @@ -356,7 +353,7 @@ mod tests {

// filter on time range [i64::MIN 20[ should only hit first 19 docs because of filtering
let search_request = SearchRequest {
index_id: index_name.to_string(),
index_id: index_id.to_string(),
query: "info".to_string(),
search_fields: vec![],
start_timestamp: None,
Expand All @@ -378,7 +375,7 @@ mod tests {

// filter on tag, should not return any hit since no split is tagged
let search_request = SearchRequest {
index_id: index_name.to_string(),
index_id: index_id.to_string(),
query: "info".to_string(),
search_fields: vec![],
start_timestamp: None,
Expand Down
6 changes: 3 additions & 3 deletions quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use quickwit_search::{
http_addr_to_grpc_addr, http_addr_to_swim_addr, SearchClientPool, SearchServiceImpl,
};
use quickwit_storage::{
localstack_region, LocalFileStorageFactory, S3CompatibleObjectStorageFactory,
StorageUriResolver, StorageWithCacheFactory,
LocalFileStorageFactory, RegionProvider, S3CompatibleObjectStorageFactory, StorageUriResolver,
StorageWithCacheFactory,
};
use quickwit_telemetry::payload::{ServeEvent, TelemetryEvent};

Expand Down Expand Up @@ -93,7 +93,7 @@ fn storage_uri_resolver() -> StorageUriResolver {
.register(LocalFileStorageFactory::default())
.register(s3_storage)
.register(S3CompatibleObjectStorageFactory::new(
localstack_region(),
RegionProvider::Localstack,
"s3+localstack",
))
.build()
Expand Down
6 changes: 4 additions & 2 deletions quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ pub use self::bundle_storage::{
};
pub use self::local_file_storage::{LocalFileStorage, LocalFileStorageFactory};
pub use self::object_storage::{
MultiPartPolicy, S3CompatibleObjectStorage, S3CompatibleObjectStorageFactory,
MultiPartPolicy, RegionProvider, S3CompatibleObjectStorage, S3CompatibleObjectStorageFactory,
};
pub use self::prefix_storage::add_prefix_to_storage;
pub use self::ram_storage::{RamStorage, RamStorageBuilder};
pub use self::storage_resolver::{localstack_region, StorageFactory, StorageUriResolver};
pub use self::storage_resolver::{
quickwit_storage_uri_resolver, StorageFactory, StorageUriResolver,
};
pub use crate::cache::{Cache, SliceCache, StorageWithCacheFactory};
pub use crate::error::{StorageError, StorageErrorKind, StorageResolverError, StorageResult};

Expand Down
4 changes: 3 additions & 1 deletion quickwit-storage/src/object_storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ mod file_slice_stream;

mod s3_compatible_storage;
pub use self::s3_compatible_storage::S3CompatibleObjectStorage;
pub use self::s3_compatible_storage_uri_resolver::{
RegionProvider, S3CompatibleObjectStorageFactory,
};

mod policy;
pub use crate::object_storage::policy::MultiPartPolicy;

mod s3_compatible_storage_uri_resolver;
pub use self::s3_compatible_storage_uri_resolver::S3CompatibleObjectStorageFactory;
Loading

0 comments on commit ac7001b

Please sign in to comment.