Skip to content

Commit

Permalink
remove unused code (#836)
Browse files Browse the repository at this point in the history
Remove get_ns() rpc.

Remove get_namespace(), used for test only.
  • Loading branch information
maxkozlovsky authored Aug 15, 2024
1 parent 0b72ec8 commit 9cf0730
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 234 deletions.
21 changes: 0 additions & 21 deletions crates/indexify_internal_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,27 +1194,6 @@ impl TaskResult {
}

pub type NamespaceName = String;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Namespace {
pub name: NamespaceName,
pub extraction_graphs: Vec<ExtractionGraph>,
}

impl TryFrom<Namespace> for indexify_coordinator::Namespace {
type Error = anyhow::Error;

fn try_from(value: Namespace) -> Result<Self> {
let mut extraction_graphs = vec![];
for graph in value.extraction_graphs {
extraction_graphs.push(graph.try_into()?)
}

Ok(indexify_coordinator::Namespace {
name: value.name,
extraction_graphs,
})
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum ChangeType {
Expand Down
92 changes: 0 additions & 92 deletions crates/indexify_proto/src/indexify_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,6 @@ pub struct Extractor {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetNamespaceRequest {
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetNamespaceResponse {
#[prost(message, optional, tag = "1")]
pub namespace: ::core::option::Option<Namespace>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Empty {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1401,33 +1389,6 @@ pub mod coordinator_service_client {
);
self.inner.unary(req, path, codec).await
}
pub async fn get_ns(
&mut self,
request: impl tonic::IntoRequest<super::GetNamespaceRequest>,
) -> std::result::Result<
tonic::Response<super::GetNamespaceResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/indexify_coordinator.CoordinatorService/GetNS",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("indexify_coordinator.CoordinatorService", "GetNS"),
);
self.inner.unary(req, path, codec).await
}
pub async fn list_extractors(
&mut self,
request: impl tonic::IntoRequest<super::ListExtractorsRequest>,
Expand Down Expand Up @@ -2396,13 +2357,6 @@ pub mod coordinator_service_server {
tonic::Response<super::ListNamespaceResponse>,
tonic::Status,
>;
async fn get_ns(
&self,
request: tonic::Request<super::GetNamespaceRequest>,
) -> std::result::Result<
tonic::Response<super::GetNamespaceResponse>,
tonic::Status,
>;
async fn list_extractors(
&self,
request: tonic::Request<super::ListExtractorsRequest>,
Expand Down Expand Up @@ -3135,52 +3089,6 @@ pub mod coordinator_service_server {
};
Box::pin(fut)
}
"/indexify_coordinator.CoordinatorService/GetNS" => {
#[allow(non_camel_case_types)]
struct GetNSSvc<T: CoordinatorService>(pub Arc<T>);
impl<
T: CoordinatorService,
> tonic::server::UnaryService<super::GetNamespaceRequest>
for GetNSSvc<T> {
type Response = super::GetNamespaceResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetNamespaceRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as CoordinatorService>::get_ns(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetNSSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/indexify_coordinator.CoordinatorService/ListExtractors" => {
#[allow(non_camel_case_types)]
struct ListExtractorsSvc<T: CoordinatorService>(pub Arc<T>);
Expand Down
10 changes: 0 additions & 10 deletions protos/coordinator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ service CoordinatorService {

rpc ListNS(ListNamespaceRequest) returns (ListNamespaceResponse) {}

rpc GetNS(GetNamespaceRequest) returns (GetNamespaceResponse) {}

rpc ListExtractors(ListExtractorsRequest) returns (ListExtractorsResponse) {}

rpc RegisterExecutor(RegisterExecutorRequest) returns (RegisterExecutorResponse) {}
Expand Down Expand Up @@ -349,14 +347,6 @@ message Extractor {
repeated string input_mime_types = 6;
}

message GetNamespaceRequest {
string name = 1;
}

message GetNamespaceResponse {
Namespace namespace = 1;
}

message Empty {
}

Expand Down
15 changes: 3 additions & 12 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,10 @@ impl Coordinator {
}

pub async fn create_namespace(&self, namespace: &str) -> Result<()> {
match self.shared_state.namespace(namespace).await {
Result::Ok(Some(_)) => {
return Ok(());
}
Result::Ok(None) => {}
Result::Err(_) => {}
if self.shared_state.namespace_exists(namespace).await? {
return Ok(());
}
self.shared_state.create_namespace(namespace).await?;
Ok(())
self.shared_state.create_namespace(namespace).await
}

pub async fn list_namespaces(&self) -> Result<Vec<String>> {
Expand All @@ -346,10 +341,6 @@ impl Coordinator {
self.shared_state.list_extraction_graphs(namespace).await
}

pub async fn get_namespace(&self, namespace: &str) -> Result<Option<internal_api::Namespace>> {
self.shared_state.namespace(namespace).await
}

pub async fn list_extractors(&self) -> Result<Vec<internal_api::ExtractorDescription>> {
self.shared_state.list_extractors().await
}
Expand Down
23 changes: 0 additions & 23 deletions src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,29 +599,6 @@ impl CoordinatorService for CoordinatorServiceServer {
))
}

async fn get_ns(
&self,
request: tonic::Request<indexify_coordinator::GetNamespaceRequest>,
) -> Result<tonic::Response<indexify_coordinator::GetNamespaceResponse>, tonic::Status> {
let namespace = request.into_inner().name;
let namespace = self
.coordinator
.get_namespace(&namespace)
.await
.map_err(|e| tonic::Status::aborted(e.to_string()))?
.ok_or_else(|| tonic::Status::not_found("namespace not found"))?;

Ok(tonic::Response::new(
indexify_coordinator::GetNamespaceResponse {
namespace: Some(
namespace
.try_into()
.map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?,
),
},
))
}

async fn list_extractors(
&self,
_request: tonic::Request<ListExtractorsRequest>,
Expand Down
15 changes: 0 additions & 15 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,6 @@ impl DataManager {
Ok(())
}

#[tracing::instrument]
pub async fn get(&self, name: &str) -> Result<api::DataNamespace> {
let req = indexify_coordinator::GetNamespaceRequest {
name: name.to_string(),
};
let response = self
.get_coordinator_client()
.await?
.get_ns(req)
.await?
.into_inner();
let namespace = response.namespace.ok_or(anyhow!("namespace not found"))?;
namespace.try_into()
}

pub async fn link_extraction_graphs(
&self,
namespace: String,
Expand Down
26 changes: 10 additions & 16 deletions src/ingest_extracted_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,9 @@ mod tests {

let ep = coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await
.unwrap()
.unwrap()
.extraction_graphs[0]
.unwrap()[0]
.extraction_policies[0]
.clone();
let content_metadata = coordinator
Expand Down Expand Up @@ -957,11 +955,9 @@ mod tests {

let ep = coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await
.unwrap()
.unwrap()
.extraction_graphs[0]
.unwrap()[0]
.extraction_policies[0]
.clone();
coordinator
Expand Down Expand Up @@ -1322,10 +1318,9 @@ mod tests {

let res = test_coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.await?
.unwrap();
assert_eq!(res.extraction_graphs.len(), 2);
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await?;
assert_eq!(res.len(), 2);

test_coordinator
.coordinator
Expand All @@ -1346,10 +1341,9 @@ mod tests {

let res = test_coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.await?
.unwrap();
assert_eq!(res.extraction_graphs.len(), 1);
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await?;
assert_eq!(res.len(), 1);

let indexes = test_coordinator
.coordinator
Expand Down
19 changes: 4 additions & 15 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,8 @@ impl App {
Ok(namespaces)
}

pub async fn namespace(&self, namespace: &str) -> Result<Option<internal_api::Namespace>> {
self.state_machine.get_namespace(namespace).await
pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.state_machine.namespace_exists(namespace).await
}

pub async fn register_executor(
Expand Down Expand Up @@ -1999,19 +1999,8 @@ mod tests {

// Read the namespace back and expect to get the extraction policies as well
// which will be asserted
let retrieved_namespace = node.namespace(namespace).await?;
assert_eq!(retrieved_namespace.clone().unwrap().name, namespace);
assert_eq!(
retrieved_namespace
.clone()
.unwrap()
.extraction_graphs
.first()
.unwrap()
.extraction_policies
.len(),
3
);
let graphs = node.list_extraction_graphs(namespace).await?;
assert_eq!(graphs.first().unwrap().extraction_policies.len(), 3);

// Read all namespaces back and assert that only the created namespace is
// present along with the extraction policies
Expand Down
7 changes: 2 additions & 5 deletions src/state/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,13 +931,10 @@ impl StateMachineStore {
.map_err(|e| anyhow::anyhow!(e))
}

pub async fn get_namespace(
&self,
namespace: &str,
) -> Result<Option<indexify_internal_api::Namespace>> {
pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.data
.indexify_state
.get_namespace(namespace, &self.db.read().unwrap())
.namespace_exists(namespace, &self.db.read().unwrap())
}

pub async fn get_schemas(&self, ids: HashSet<String>) -> Result<Vec<StructuredDataSchema>> {
Expand Down
27 changes: 2 additions & 25 deletions src/state/store/state_machine_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2175,31 +2175,8 @@ impl IndexifyState {
}

/// This method will get the namespace based on the key provided
pub fn get_namespace(
&self,
namespace: &str,
db: &OptimisticTransactionDB,
) -> Result<Option<indexify_internal_api::Namespace>> {
let ns_name = match get_from_cf(db, StateMachineColumns::Namespaces, namespace)? {
Some(name) => name,
None => return Ok(None),
};
let extraction_graphs_ids = self
.extraction_graphs_by_ns
.get(&namespace.to_string())
.into_iter()
.collect_vec();
let txn = db.transaction();
let extraction_graphs = self
.get_extraction_graphs(&extraction_graphs_ids, db, &txn)?
.into_iter()
.flatten()
.collect();

Ok(Some(indexify_internal_api::Namespace {
name: ns_name,
extraction_graphs,
}))
pub fn namespace_exists(&self, namespace: &str, db: &OptimisticTransactionDB) -> Result<bool> {
Ok(get_from_cf::<String, &str>(db, StateMachineColumns::Namespaces, namespace)?.is_some())
}

pub fn get_schemas(
Expand Down

0 comments on commit 9cf0730

Please sign in to comment.