Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove unused code #836

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions crates/indexify_internal_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1194,27 +1194,6 @@ impl TaskResult {
}

pub type NamespaceName = String;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Namespace {
pub name: NamespaceName,
pub extraction_graphs: Vec<ExtractionGraph>,
}

impl TryFrom<Namespace> for indexify_coordinator::Namespace {
type Error = anyhow::Error;

fn try_from(value: Namespace) -> Result<Self> {
let mut extraction_graphs = vec![];
for graph in value.extraction_graphs {
extraction_graphs.push(graph.try_into()?)
}

Ok(indexify_coordinator::Namespace {
name: value.name,
extraction_graphs,
})
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
pub enum ChangeType {
Expand Down
92 changes: 0 additions & 92 deletions crates/indexify_proto/src/indexify_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,18 +390,6 @@ pub struct Extractor {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetNamespaceRequest {
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetNamespaceResponse {
#[prost(message, optional, tag = "1")]
pub namespace: ::core::option::Option<Namespace>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Empty {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down Expand Up @@ -1401,33 +1389,6 @@ pub mod coordinator_service_client {
);
self.inner.unary(req, path, codec).await
}
pub async fn get_ns(
&mut self,
request: impl tonic::IntoRequest<super::GetNamespaceRequest>,
) -> std::result::Result<
tonic::Response<super::GetNamespaceResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/indexify_coordinator.CoordinatorService/GetNS",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new("indexify_coordinator.CoordinatorService", "GetNS"),
);
self.inner.unary(req, path, codec).await
}
pub async fn list_extractors(
&mut self,
request: impl tonic::IntoRequest<super::ListExtractorsRequest>,
Expand Down Expand Up @@ -2396,13 +2357,6 @@ pub mod coordinator_service_server {
tonic::Response<super::ListNamespaceResponse>,
tonic::Status,
>;
async fn get_ns(
&self,
request: tonic::Request<super::GetNamespaceRequest>,
) -> std::result::Result<
tonic::Response<super::GetNamespaceResponse>,
tonic::Status,
>;
async fn list_extractors(
&self,
request: tonic::Request<super::ListExtractorsRequest>,
Expand Down Expand Up @@ -3135,52 +3089,6 @@ pub mod coordinator_service_server {
};
Box::pin(fut)
}
"/indexify_coordinator.CoordinatorService/GetNS" => {
#[allow(non_camel_case_types)]
struct GetNSSvc<T: CoordinatorService>(pub Arc<T>);
impl<
T: CoordinatorService,
> tonic::server::UnaryService<super::GetNamespaceRequest>
for GetNSSvc<T> {
type Response = super::GetNamespaceResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetNamespaceRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as CoordinatorService>::get_ns(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetNSSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/indexify_coordinator.CoordinatorService/ListExtractors" => {
#[allow(non_camel_case_types)]
struct ListExtractorsSvc<T: CoordinatorService>(pub Arc<T>);
Expand Down
10 changes: 0 additions & 10 deletions protos/coordinator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ service CoordinatorService {

rpc ListNS(ListNamespaceRequest) returns (ListNamespaceResponse) {}

rpc GetNS(GetNamespaceRequest) returns (GetNamespaceResponse) {}

rpc ListExtractors(ListExtractorsRequest) returns (ListExtractorsResponse) {}

rpc RegisterExecutor(RegisterExecutorRequest) returns (RegisterExecutorResponse) {}
Expand Down Expand Up @@ -349,14 +347,6 @@ message Extractor {
repeated string input_mime_types = 6;
}

message GetNamespaceRequest {
string name = 1;
}

message GetNamespaceResponse {
Namespace namespace = 1;
}

message Empty {
}

Expand Down
15 changes: 3 additions & 12 deletions src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,10 @@ impl Coordinator {
}

pub async fn create_namespace(&self, namespace: &str) -> Result<()> {
match self.shared_state.namespace(namespace).await {
Result::Ok(Some(_)) => {
return Ok(());
}
Result::Ok(None) => {}
Result::Err(_) => {}
if self.shared_state.namespace_exists(namespace).await? {
return Ok(());
}
self.shared_state.create_namespace(namespace).await?;
Ok(())
self.shared_state.create_namespace(namespace).await
}

pub async fn list_namespaces(&self) -> Result<Vec<String>> {
Expand All @@ -346,10 +341,6 @@ impl Coordinator {
self.shared_state.list_extraction_graphs(namespace).await
}

pub async fn get_namespace(&self, namespace: &str) -> Result<Option<internal_api::Namespace>> {
self.shared_state.namespace(namespace).await
}

pub async fn list_extractors(&self) -> Result<Vec<internal_api::ExtractorDescription>> {
self.shared_state.list_extractors().await
}
Expand Down
23 changes: 0 additions & 23 deletions src/coordinator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,29 +599,6 @@ impl CoordinatorService for CoordinatorServiceServer {
))
}

async fn get_ns(
&self,
request: tonic::Request<indexify_coordinator::GetNamespaceRequest>,
) -> Result<tonic::Response<indexify_coordinator::GetNamespaceResponse>, tonic::Status> {
let namespace = request.into_inner().name;
let namespace = self
.coordinator
.get_namespace(&namespace)
.await
.map_err(|e| tonic::Status::aborted(e.to_string()))?
.ok_or_else(|| tonic::Status::not_found("namespace not found"))?;

Ok(tonic::Response::new(
indexify_coordinator::GetNamespaceResponse {
namespace: Some(
namespace
.try_into()
.map_err(|e: anyhow::Error| tonic::Status::aborted(e.to_string()))?,
),
},
))
}

async fn list_extractors(
&self,
_request: tonic::Request<ListExtractorsRequest>,
Expand Down
15 changes: 0 additions & 15 deletions src/data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,6 @@ impl DataManager {
Ok(())
}

#[tracing::instrument]
pub async fn get(&self, name: &str) -> Result<api::DataNamespace> {
let req = indexify_coordinator::GetNamespaceRequest {
name: name.to_string(),
};
let response = self
.get_coordinator_client()
.await?
.get_ns(req)
.await?
.into_inner();
let namespace = response.namespace.ok_or(anyhow!("namespace not found"))?;
namespace.try_into()
}

pub async fn link_extraction_graphs(
&self,
namespace: String,
Expand Down
26 changes: 10 additions & 16 deletions src/ingest_extracted_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,9 @@ mod tests {

let ep = coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await
.unwrap()
.unwrap()
.extraction_graphs[0]
.unwrap()[0]
.extraction_policies[0]
.clone();
let content_metadata = coordinator
Expand Down Expand Up @@ -957,11 +955,9 @@ mod tests {

let ep = coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await
.unwrap()
.unwrap()
.extraction_graphs[0]
.unwrap()[0]
.extraction_policies[0]
.clone();
coordinator
Expand Down Expand Up @@ -1322,10 +1318,9 @@ mod tests {

let res = test_coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.await?
.unwrap();
assert_eq!(res.extraction_graphs.len(), 2);
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await?;
assert_eq!(res.len(), 2);

test_coordinator
.coordinator
Expand All @@ -1346,10 +1341,9 @@ mod tests {

let res = test_coordinator
.coordinator
.get_namespace(DEFAULT_TEST_NAMESPACE)
.await?
.unwrap();
assert_eq!(res.extraction_graphs.len(), 1);
.list_extraction_graphs(DEFAULT_TEST_NAMESPACE)
.await?;
assert_eq!(res.len(), 1);

let indexes = test_coordinator
.coordinator
Expand Down
19 changes: 4 additions & 15 deletions src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -927,8 +927,8 @@ impl App {
Ok(namespaces)
}

pub async fn namespace(&self, namespace: &str) -> Result<Option<internal_api::Namespace>> {
self.state_machine.get_namespace(namespace).await
pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.state_machine.namespace_exists(namespace).await
}

pub async fn register_executor(
Expand Down Expand Up @@ -1999,19 +1999,8 @@ mod tests {

// Read the namespace back and expect to get the extraction policies as well
// which will be asserted
let retrieved_namespace = node.namespace(namespace).await?;
assert_eq!(retrieved_namespace.clone().unwrap().name, namespace);
assert_eq!(
retrieved_namespace
.clone()
.unwrap()
.extraction_graphs
.first()
.unwrap()
.extraction_policies
.len(),
3
);
let graphs = node.list_extraction_graphs(namespace).await?;
assert_eq!(graphs.first().unwrap().extraction_policies.len(), 3);

// Read all namespaces back and assert that only the created namespace is
// present along with the extraction policies
Expand Down
7 changes: 2 additions & 5 deletions src/state/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,13 +931,10 @@ impl StateMachineStore {
.map_err(|e| anyhow::anyhow!(e))
}

pub async fn get_namespace(
&self,
namespace: &str,
) -> Result<Option<indexify_internal_api::Namespace>> {
pub async fn namespace_exists(&self, namespace: &str) -> Result<bool> {
self.data
.indexify_state
.get_namespace(namespace, &self.db.read().unwrap())
.namespace_exists(namespace, &self.db.read().unwrap())
}

pub async fn get_schemas(&self, ids: HashSet<String>) -> Result<Vec<StructuredDataSchema>> {
Expand Down
27 changes: 2 additions & 25 deletions src/state/store/state_machine_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2175,31 +2175,8 @@ impl IndexifyState {
}

/// This method will get the namespace based on the key provided
pub fn get_namespace(
&self,
namespace: &str,
db: &OptimisticTransactionDB,
) -> Result<Option<indexify_internal_api::Namespace>> {
let ns_name = match get_from_cf(db, StateMachineColumns::Namespaces, namespace)? {
Some(name) => name,
None => return Ok(None),
};
let extraction_graphs_ids = self
.extraction_graphs_by_ns
.get(&namespace.to_string())
.into_iter()
.collect_vec();
let txn = db.transaction();
let extraction_graphs = self
.get_extraction_graphs(&extraction_graphs_ids, db, &txn)?
.into_iter()
.flatten()
.collect();

Ok(Some(indexify_internal_api::Namespace {
name: ns_name,
extraction_graphs,
}))
pub fn namespace_exists(&self, namespace: &str, db: &OptimisticTransactionDB) -> Result<bool> {
Ok(get_from_cf::<String, &str>(db, StateMachineColumns::Namespaces, namespace)?.is_some())
}

pub fn get_schemas(
Expand Down
Loading