diff --git a/crates/indexify_internal_api/src/lib.rs b/crates/indexify_internal_api/src/lib.rs index 604e03e2f..d6c27482c 100644 --- a/crates/indexify_internal_api/src/lib.rs +++ b/crates/indexify_internal_api/src/lib.rs @@ -1194,27 +1194,6 @@ impl TaskResult { } pub type NamespaceName = String; -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Namespace { - pub name: NamespaceName, - pub extraction_graphs: Vec, -} - -impl TryFrom for indexify_coordinator::Namespace { - type Error = anyhow::Error; - - fn try_from(value: Namespace) -> Result { - let mut extraction_graphs = vec![]; - for graph in value.extraction_graphs { - extraction_graphs.push(graph.try_into()?) - } - - Ok(indexify_coordinator::Namespace { - name: value.name, - extraction_graphs, - }) - } -} #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] pub enum ChangeType { diff --git a/crates/indexify_proto/src/indexify_coordinator.rs b/crates/indexify_proto/src/indexify_coordinator.rs index 2426a20bc..3ac4cb8cb 100644 --- a/crates/indexify_proto/src/indexify_coordinator.rs +++ b/crates/indexify_proto/src/indexify_coordinator.rs @@ -390,18 +390,6 @@ pub struct Extractor { } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetNamespaceRequest { - #[prost(string, tag = "1")] - pub name: ::prost::alloc::string::String, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct GetNamespaceResponse { - #[prost(message, optional, tag = "1")] - pub namespace: ::core::option::Option, -} -#[allow(clippy::derive_partial_eq_without_eq)] -#[derive(Clone, PartialEq, ::prost::Message)] pub struct Empty {} #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1401,33 +1389,6 @@ pub mod coordinator_service_client { ); self.inner.unary(req, path, codec).await } - pub async fn get_ns( - &mut self, - request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/indexify_coordinator.CoordinatorService/GetNS", - ); - let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("indexify_coordinator.CoordinatorService", "GetNS"), - ); - self.inner.unary(req, path, codec).await - } pub async fn list_extractors( &mut self, request: impl tonic::IntoRequest, @@ -2396,13 +2357,6 @@ pub mod coordinator_service_server { tonic::Response, tonic::Status, >; - async fn get_ns( - &self, - request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; async fn list_extractors( &self, request: tonic::Request, @@ -3135,52 +3089,6 @@ pub mod coordinator_service_server { }; Box::pin(fut) } - "/indexify_coordinator.CoordinatorService/GetNS" => { - #[allow(non_camel_case_types)] - struct GetNSSvc(pub Arc); - impl< - T: CoordinatorService, - > tonic::server::UnaryService - for GetNSSvc { - type Response = super::GetNamespaceResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = Arc::clone(&self.0); - let fut = async move { - ::get_ns(&inner, request).await - }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let max_decoding_message_size = self.max_decoding_message_size; - let max_encoding_message_size = self.max_encoding_message_size; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetNSSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ) - .apply_max_message_size_config( - max_decoding_message_size, - max_encoding_message_size, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } "/indexify_coordinator.CoordinatorService/ListExtractors" => { #[allow(non_camel_case_types)] struct ListExtractorsSvc(pub Arc); diff --git a/protos/coordinator_service.proto b/protos/coordinator_service.proto index 72fec5d95..8f4231fc7 100644 --- a/protos/coordinator_service.proto +++ b/protos/coordinator_service.proto @@ -23,8 +23,6 @@ service CoordinatorService { rpc ListNS(ListNamespaceRequest) returns (ListNamespaceResponse) {} - rpc GetNS(GetNamespaceRequest) returns (GetNamespaceResponse) {} - rpc ListExtractors(ListExtractorsRequest) returns (ListExtractorsResponse) {} rpc RegisterExecutor(RegisterExecutorRequest) returns (RegisterExecutorResponse) {} @@ -349,14 +347,6 @@ message Extractor { repeated string input_mime_types = 6; } -message GetNamespaceRequest { - string name = 1; -} - -message GetNamespaceResponse { - Namespace namespace = 1; -} - message Empty { } diff --git a/src/coordinator.rs b/src/coordinator.rs index 076d5c885..0259fedbc 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -327,15 +327,10 @@ impl Coordinator { } pub async fn create_namespace(&self, namespace: &str) -> Result<()> { - match self.shared_state.namespace(namespace).await { - Result::Ok(Some(_)) => { - return Ok(()); - } - Result::Ok(None) => {} - Result::Err(_) => {} + if self.shared_state.namespace_exists(namespace).await? { + return Ok(()); } - self.shared_state.create_namespace(namespace).await?; - Ok(()) + self.shared_state.create_namespace(namespace).await } pub async fn list_namespaces(&self) -> Result> { @@ -346,10 +341,6 @@ impl Coordinator { self.shared_state.list_extraction_graphs(namespace).await } - pub async fn get_namespace(&self, namespace: &str) -> Result> { - self.shared_state.namespace(namespace).await - } - pub async fn list_extractors(&self) -> Result> { self.shared_state.list_extractors().await } diff --git a/src/coordinator_service.rs b/src/coordinator_service.rs index 91f838d77..1f6c582cb 100644 --- a/src/coordinator_service.rs +++ b/src/coordinator_service.rs @@ -599,29 +599,6 @@ impl CoordinatorService for CoordinatorServiceServer { )) } - async fn get_ns( - &self, - request: tonic::Request, - ) -> Result, tonic::Status> { - let namespace = request.into_inner().name; - let namespace = self - .coordinator - .get_namespace(&namespace) - .await - .map_err(|e| tonic::Status::aborted(e.to_string()))? - .ok_or_else(|| tonic::Status::not_found("namespace not found"))?; - - Ok(tonic::Response::new( - indexify_coordinator::GetNamespaceResponse { - namespace: Some( - namespace - .try_into() - .map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?, - ), - }, - )) - } - async fn list_extractors( &self, _request: tonic::Request, diff --git a/src/data_manager.rs b/src/data_manager.rs index f8273cf00..6fb4d9b52 100644 --- a/src/data_manager.rs +++ b/src/data_manager.rs @@ -165,21 +165,6 @@ impl DataManager { Ok(()) } - #[tracing::instrument] - pub async fn get(&self, name: &str) -> Result { - let req = indexify_coordinator::GetNamespaceRequest { - name: name.to_string(), - }; - let response = self - .get_coordinator_client() - .await? - .get_ns(req) - .await? - .into_inner(); - let namespace = response.namespace.ok_or(anyhow!("namespace not found"))?; - namespace.try_into() - } - pub async fn link_extraction_graphs( &self, namespace: String, diff --git a/src/ingest_extracted_content.rs b/src/ingest_extracted_content.rs index 44f44818c..86705390a 100644 --- a/src/ingest_extracted_content.rs +++ b/src/ingest_extracted_content.rs @@ -824,11 +824,9 @@ mod tests { let ep = coordinator .coordinator - .get_namespace(DEFAULT_TEST_NAMESPACE) + .list_extraction_graphs(DEFAULT_TEST_NAMESPACE) .await - .unwrap() - .unwrap() - .extraction_graphs[0] + .unwrap()[0] .extraction_policies[0] .clone(); let content_metadata = coordinator @@ -957,11 +955,9 @@ mod tests { let ep = coordinator .coordinator - .get_namespace(DEFAULT_TEST_NAMESPACE) + .list_extraction_graphs(DEFAULT_TEST_NAMESPACE) .await - .unwrap() - .unwrap() - .extraction_graphs[0] + .unwrap()[0] .extraction_policies[0] .clone(); coordinator @@ -1322,10 +1318,9 @@ mod tests { let res = test_coordinator .coordinator - .get_namespace(DEFAULT_TEST_NAMESPACE) - .await? - .unwrap(); - assert_eq!(res.extraction_graphs.len(), 2); + .list_extraction_graphs(DEFAULT_TEST_NAMESPACE) + .await?; + assert_eq!(res.len(), 2); test_coordinator .coordinator @@ -1346,10 +1341,9 @@ mod tests { let res = test_coordinator .coordinator - .get_namespace(DEFAULT_TEST_NAMESPACE) - .await? - .unwrap(); - assert_eq!(res.extraction_graphs.len(), 1); + .list_extraction_graphs(DEFAULT_TEST_NAMESPACE) + .await?; + assert_eq!(res.len(), 1); let indexes = test_coordinator .coordinator diff --git a/src/state/mod.rs b/src/state/mod.rs index 5959d728d..40a765b1d 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -927,8 +927,8 @@ impl App { Ok(namespaces) } - pub async fn namespace(&self, namespace: &str) -> Result> { - self.state_machine.get_namespace(namespace).await + pub async fn namespace_exists(&self, namespace: &str) -> Result { + self.state_machine.namespace_exists(namespace).await } pub async fn register_executor( @@ -1999,19 +1999,8 @@ mod tests { // Read the namespace back and expect to get the extraction policies as well // which will be asserted - let retrieved_namespace = node.namespace(namespace).await?; - assert_eq!(retrieved_namespace.clone().unwrap().name, namespace); - assert_eq!( - retrieved_namespace - .clone() - .unwrap() - .extraction_graphs - .first() - .unwrap() - .extraction_policies - .len(), - 3 - ); + let graphs = node.list_extraction_graphs(namespace).await?; + assert_eq!(graphs.first().unwrap().extraction_policies.len(), 3); // Read all namespaces back and assert that only the created namespace is // present along with the extraction policies diff --git a/src/state/store/mod.rs b/src/state/store/mod.rs index 51eb9b45a..fb8c5cdd2 100644 --- a/src/state/store/mod.rs +++ b/src/state/store/mod.rs @@ -931,13 +931,10 @@ impl StateMachineStore { .map_err(|e| anyhow::anyhow!(e)) } - pub async fn get_namespace( - &self, - namespace: &str, - ) -> Result> { + pub async fn namespace_exists(&self, namespace: &str) -> Result { self.data .indexify_state - .get_namespace(namespace, &self.db.read().unwrap()) + .namespace_exists(namespace, &self.db.read().unwrap()) } pub async fn get_schemas(&self, ids: HashSet) -> Result> { diff --git a/src/state/store/state_machine_objects.rs b/src/state/store/state_machine_objects.rs index d109648b9..ffbc91dae 100644 --- a/src/state/store/state_machine_objects.rs +++ b/src/state/store/state_machine_objects.rs @@ -2175,31 +2175,8 @@ impl IndexifyState { } /// This method will get the namespace based on the key provided - pub fn get_namespace( - &self, - namespace: &str, - db: &OptimisticTransactionDB, - ) -> Result> { - let ns_name = match get_from_cf(db, StateMachineColumns::Namespaces, namespace)? { - Some(name) => name, - None => return Ok(None), - }; - let extraction_graphs_ids = self - .extraction_graphs_by_ns - .get(&namespace.to_string()) - .into_iter() - .collect_vec(); - let txn = db.transaction(); - let extraction_graphs = self - .get_extraction_graphs(&extraction_graphs_ids, db, &txn)? - .into_iter() - .flatten() - .collect(); - - Ok(Some(indexify_internal_api::Namespace { - name: ns_name, - extraction_graphs, - })) + pub fn namespace_exists(&self, namespace: &str, db: &OptimisticTransactionDB) -> Result { + Ok(get_from_cf::(db, StateMachineColumns::Namespaces, namespace)?.is_some()) } pub fn get_schemas(