Skip to content

Commit

Permalink
Delete services nodes (#1424)
Browse files Browse the repository at this point in the history
* delete nodes and services

* removed try, catch to enforce cache reset on exception

* safer delete

* changing to initial flow

* small CR fixes

---------

Co-authored-by: Arik Alon <alon.arik@gmail.com>
  • Loading branch information
Avi-Robusta and arikalon1 authored May 16, 2024
1 parent 3f45a60 commit 8e38b2c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 40 deletions.
38 changes: 37 additions & 1 deletion src/robusta/core/sinks/robusta/dal/supabase_dal.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def persist_services(self, services: List[ServiceInfo]):
try:
self.client.table(SERVICES_TABLE).upsert(db_services, returning=ReturnMethod.minimal).execute()
except Exception as e:
logging.error(f"Failed to persist services {services} error: {e}")
logging.exception(f"Failed to persist services {services} error: {e}")
self.handle_supabase_error()
raise

Expand Down Expand Up @@ -384,6 +384,42 @@ def publish_jobs(self, jobs: List[JobInfo]):
self.handle_supabase_error()
raise

def remove_deleted_node(self, node_name: str):
if not node_name:
return

try:
(
self.client.table(NODES_TABLE)
.delete(returning=ReturnMethod.minimal)
.eq("account_id", self.account_id)
.eq("cluster_id", self.cluster)
.eq("name", node_name)
.execute()
)
except Exception as e:
logging.exception(f"Failed to delete node {node_name} error: {e}")
self.handle_supabase_error()
raise

def remove_deleted_service(self, service_key: str):
if not service_key:
return

try:
(
self.client.table(SERVICES_TABLE)
.delete(returning=ReturnMethod.minimal)
.eq("account_id", self.account_id)
.eq("cluster", self.cluster)
.eq("service_key", service_key)
.execute()
)
except Exception as e:
logging.exception(f"Failed to delete service {service_key} error: {e}")
self.handle_supabase_error()
raise

def remove_deleted_job(self, job: JobInfo):
if not job:
return
Expand Down
75 changes: 36 additions & 39 deletions src/robusta/core/sinks/robusta/robusta_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,31 +216,23 @@ def write_finding(self, finding: Finding, platform_enabled: bool):
self.dal.persist_finding(finding)

def __publish_single_service(self, new_service: ServiceInfo, operation: K8sOperationType):
try:
with self.services_publish_lock:
service_key = new_service.get_service_key()
cached_service = self.__services_cache.get(service_key, None)

# prevent service updates if the resource version in the cache is lower than the new service
if cached_service and cached_service.resource_version > new_service.resource_version:
return
with self.services_publish_lock:
service_key = new_service.get_service_key()
cached_service = self.__services_cache.get(service_key, None)

if operation == K8sOperationType.CREATE or operation == K8sOperationType.UPDATE:
# handle created/updated services
self.__services_cache[service_key] = new_service
self.dal.persist_services([new_service])
# prevent service updates if the resource version in the cache is lower than the new service
if cached_service and cached_service.resource_version > new_service.resource_version:
return

elif operation == K8sOperationType.DELETE:
if cached_service:
del self.__services_cache[service_key]
if operation == K8sOperationType.CREATE or operation == K8sOperationType.UPDATE:
# handle created/updated services
self.__services_cache[service_key] = new_service
self.dal.persist_services([new_service])

new_service.deleted = True
self.dal.persist_services([new_service])
elif operation == K8sOperationType.DELETE:
self.__safe_delete_service(service_key)

except Exception as e:
logging.error(
f"An error occurred while publishing single service: name - {new_service.name}, namespace - {new_service.namespace} service type: {new_service.service_type} | {e}"
)
self.__discovery_metrics.on_services_updated(1)

def __publish_new_services(self, active_services: List[ServiceInfo]):
with self.services_publish_lock:
Expand All @@ -254,9 +246,7 @@ def __publish_new_services(self, active_services: List[ServiceInfo]):
updated_services: List[ServiceInfo] = []
for service_key in cache_keys:
if not curr_services.get(service_key): # service doesn't exist any more, delete it
self.__services_cache[service_key].deleted = True
updated_services.append(self.__services_cache[service_key])
del self.__services_cache[service_key]
self.__safe_delete_service(service_key)

# new or changed services
for service_key in curr_services.keys():
Expand Down Expand Up @@ -418,10 +408,8 @@ def __publish_new_nodes(self, current_nodes: V1NodeList, node_requests: Dict[str
updated_nodes: List[NodeInfo] = []
cache_keys = list(self.__nodes_cache.keys())
for node_name in cache_keys:
if not curr_nodes.get(node_name): # node doesn't exist any more, delete it
self.__nodes_cache[node_name].deleted = True
updated_nodes.append(self.__nodes_cache[node_name])
del self.__nodes_cache[node_name]
if not curr_nodes.get(node_name): # node doesn't exist anymore, delete it
self.__safe_delete_node(node_name)

# new or changed nodes
for node_name in curr_nodes.keys():
Expand All @@ -435,16 +423,24 @@ def __publish_new_nodes(self, current_nodes: V1NodeList, node_requests: Dict[str

self.dal.publish_nodes(updated_nodes)

def __safe_delete_node(self, node_name):
self.__nodes_cache.pop(node_name, None)

# could be case where it is not in cache but is in db, i.e. after cache reset
if node_name:
self.dal.remove_deleted_node(node_name)

def __safe_delete_service(self, service_key):
self.__services_cache.pop(service_key, None)

# could be case where it is not in cache but is in db, i.e. after cache reset
if service_key:
self.dal.remove_deleted_service(service_key)

def __safe_delete_job(self, job_key):
try:
# incase remove_deleted_job fails we mark it deleted in cache so our DB atleast has it saved as deleted instead of active
job_info = self.__jobs_cache.get(job_key, None)
if job_info:
job_info.deleted = True
self.dal.remove_deleted_job(job_info)
del self.__jobs_cache[job_key]
except Exception:
logging.error(f"Failed to delete job with service key {job_key}", exc_info=True)
job_info = self.__jobs_cache.pop(job_key, None)
if job_info:
self.dal.remove_deleted_job(job_info)

def __publish_new_jobs(self, active_jobs: List[JobInfo]):
# convert to map
Expand Down Expand Up @@ -643,8 +639,9 @@ def __update_node(self, new_node: Node, operation: K8sOperationType):
self.__nodes_cache[name] = new_info
elif operation == K8sOperationType.DELETE:
name = new_node.metadata.name
self.__nodes_cache.pop(name, None)
new_info.deleted = True
self.__safe_delete_node(name)
self.__discovery_metrics.on_nodes_updated(1)
return

self.dal.publish_nodes([new_info])
self.__discovery_metrics.on_nodes_updated(1)
Expand Down

0 comments on commit 8e38b2c

Please sign in to comment.