Skip to content

Commit

Permalink
add support delete index in es API
Browse files Browse the repository at this point in the history
closes #3841
  • Loading branch information
PSeitz committed Feb 21, 2024
1 parent 1891932 commit 26d1564
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 12 deletions.
107 changes: 103 additions & 4 deletions quickwit/quickwit-index-management/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,26 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::Duration;

use futures_util::StreamExt;
use itertools::Itertools;
use quickwit_common::fs::{empty_dir, get_cache_directory_path};
use quickwit_common::PrettySample;
use quickwit_config::{validate_identifier, IndexConfig, SourceConfig};
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::{
AddSourceRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, MetastoreServiceStreamSplitsExt, SplitInfo,
SplitMetadata, SplitState,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, SplitInfo, SplitMetadata, SplitState,
};
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, EntityKind,
IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest,
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest,
MarkSplitsForDeletionRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
ResetSourceCheckpointRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::{ServiceError, ServiceErrorCode};
Expand Down Expand Up @@ -216,6 +221,100 @@ impl IndexService {
Ok(deleted_splits)
}

/// Deletes the indexes specified with `index_id_patterns`.
/// This is a wrapper of delete_index, and support index delete with index pattern
///
/// * `index_id_patterns` - The targeted index ID patterns.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
pub async fn delete_indexes(
&self,
index_id_patterns: Vec<String>,
dry_run: bool,
) -> Result<Vec<SplitInfo>, IndexServiceError> {
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: index_id_patterns.to_owned(),
};
// disallow index_id patterns containing *
for index_id_pattern in &index_id_patterns {
if index_id_pattern.contains('*') {
return Err(IndexServiceError::Metastore(
MetastoreError::InvalidArgument {
message: format!("index_id pattern {} contains *", index_id_pattern),
},
));
}
}

let mut metastore = self.metastore.clone();
let indexes_metadata = metastore
.list_indexes_metadata(list_indexes_metadatas_request)
.await?
.deserialize_indexes_metadata()?;

if indexes_metadata.len() != index_id_patterns.len() {
let found_index_ids: HashSet<&str> = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect();
let missing_index_ids: Vec<String> = index_id_patterns
.iter()
.filter(|index_id| !found_index_ids.contains(index_id.as_str()))
.map(|index_id| index_id.to_string())
.collect_vec();
return Err(IndexServiceError::Metastore(MetastoreError::NotFound(
EntityKind::Indexes {
index_ids: missing_index_ids.to_vec(),
},
)));
}
let index_ids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_id())
.collect_vec();
info!(index_ids = ?PrettySample::new(&index_ids, 5), "delete indexes");

// setup delete index tasks
let mut delete_index_tasks = Vec::new();
for index_id in index_ids {
let task = async move {
let result = self.clone().delete_index(index_id, dry_run).await;
(index_id, result)
};
delete_index_tasks.push(task);
}
let mut delete_responses: HashMap<String, Vec<SplitInfo>> = HashMap::new();
let mut delete_errors: HashMap<String, IndexServiceError> = HashMap::new();
let mut stream = futures::stream::iter(delete_index_tasks).buffer_unordered(100);
while let Some((index_id, delete_response)) = stream.next().await {
match delete_response {
Ok(split_infos) => {
delete_responses.insert(index_id.to_string(), split_infos);
}
Err(error) => {
delete_errors.insert(index_id.to_string(), error);
}
}
}

if delete_errors.is_empty() {
let mut concatenated_split_infos = Vec::new();
for (_, split_info_vec) in delete_responses.into_iter() {
concatenated_split_infos.extend(split_info_vec);
}
Ok(concatenated_split_infos)
} else {
Err(IndexServiceError::Metastore(MetastoreError::Internal {
message: format!(
"errors occurred when deleting indexes: {:?}",
index_id_patterns
),
cause: format!(
"errors: {:?}\ndeleted indexes: {:?}",
delete_errors, delete_responses
),
}))
}
}
/// Detect all dangling splits and associated files from the index and removes them.
///
/// * `index_id` - The target index Id.
Expand Down
24 changes: 24 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,13 @@ mod tests {

use hyper::StatusCode;
use quickwit_config::{IngestApiConfig, NodeConfig};
use quickwit_index_management::IndexService;
use quickwit_ingest::{FetchRequest, IngestServiceClient, SuggestTruncateRequest};
use quickwit_metastore::metastore_for_test;
use quickwit_proto::ingest::router::IngestRouterServiceClient;
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_search::MockSearchService;
use quickwit_storage::StorageResolver;

use crate::elasticsearch_api::bulk_v2::ElasticBulkResponse;
use crate::elasticsearch_api::elastic_api_handlers;
Expand All @@ -166,12 +169,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index", "_id" : "1"} }
Expand All @@ -196,12 +202,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -230,12 +239,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = "
{\"create\": {\"_index\": \"my-index-1\", \"_id\": \"1674834324802805760\"}}
Expand All @@ -261,12 +273,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, _) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -295,12 +310,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -380,12 +398,15 @@ mod tests {
let (universe, _temp_dir, ingest_service, ingest_service_mailbox) =
setup_ingest_service(&["my-index-1", "my-index-2"], &IngestApiConfig::default()).await;
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{ "create" : { "_index" : "my-index-1", "_id" : "1"} }
Expand Down Expand Up @@ -463,12 +484,15 @@ mod tests {
let metastore_service = MetastoreServiceClient::mock();
let ingest_service = IngestServiceClient::from(IngestServiceClient::mock());
let ingest_router = IngestRouterServiceClient::from(IngestRouterServiceClient::mock());
let index_service =
IndexService::new(metastore_for_test(), StorageResolver::unconfigured());
let elastic_api_handlers = elastic_api_handlers(
config,
search_service,
ingest_service,
ingest_router,
metastore_service.into(),
index_service,
);
let payload = r#"
{"create": {"_index": "my-index", "_id": "1"},}
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ pub(crate) fn elastic_index_count_filter(
.and(json_or_empty())
}

#[utoipa::path(delete, tag = "Indexes", path = "/{index}")]
pub(crate) fn elastic_delete_index_filter(
) -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
warp::path!("_elastic" / String)
.and_then(extract_index_id_patterns)
.and(warp::get())
}

// No support for any query parameters for now.
#[utoipa::path(get, tag = "Search", path = "/{index}/_stats")]
pub(crate) fn elastic_index_stats_filter(
Expand Down
Loading

0 comments on commit 26d1564

Please sign in to comment.