Skip to content

Commit

Permalink
fix: Tests and fixes for IncidentBl (#2868)
Browse files Browse the repository at this point in the history
  • Loading branch information
VladimirFilonov authored Dec 19, 2024
1 parent 54f3916 commit 1a51b20
Show file tree
Hide file tree
Showing 6 changed files with 479 additions and 46 deletions.
75 changes: 41 additions & 34 deletions keep/api/bl/incidents_bl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
add_alerts_to_incident_by_incident_id,
create_incident_from_dto,
delete_incident_by_id,
get_incident_alerts_by_incident_id,
get_incident_by_id,
get_incident_unique_fingerprint_count,
remove_alerts_to_incident_by_incident_id,
update_incident_from_dto_by_id,
enrich_alerts_with_incidents,
get_all_alerts_by_fingerprints,
)
from keep.api.core.elastic import ElasticClient
from keep.api.models.alert import IncidentDto, IncidentDtoIn
Expand Down Expand Up @@ -108,43 +109,30 @@ async def add_alerts_to_incident(
"Alerts added to incident",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__update_elastic(incident_id, alert_fingerprints)
self.logger.info(
"Alerts pushed to elastic",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__update_client_on_incident_change(incident_id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
incident_dto = IncidentDto.from_db_incident(incident)
self.__run_workflows(incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)
self.__postprocess_alerts_change(incident, alert_fingerprints)
await self.__generate_summary(incident_id, incident)
self.logger.info(
"Summary generated",
extra={"incident_id": incident_id, "alert_fingerprints": alert_fingerprints},
)

def __update_elastic(self, incident_id: UUID, alert_fingerprints: List[str]):
def __update_elastic(self, alert_fingerprints: List[str]):
try:
elastic_client = ElasticClient(self.tenant_id)
if elastic_client.enabled:
db_alerts, _ = get_incident_alerts_by_incident_id(
db_alerts = get_all_alerts_by_fingerprints(
tenant_id=self.tenant_id,
incident_id=incident_id,
limit=len(alert_fingerprints),
fingerprints=alert_fingerprints,
session=self.session,
)
db_alerts = enrich_alerts_with_incidents(self.tenant_id, db_alerts, session=self.session)
enriched_alerts_dto = convert_db_alerts_to_dto_alerts(
db_alerts, with_incidents=True
)
elastic_client.index_alerts(alerts=enriched_alerts_dto)
except Exception:
self.logger.exception("Failed to push alert to elasticsearch")
raise

def __update_client_on_incident_change(self, incident_id: Optional[UUID] = None):
if self.pusher_client is not None:
Expand Down Expand Up @@ -217,6 +205,7 @@ def delete_alerts_from_incident(
raise HTTPException(status_code=404, detail="Incident not found")

remove_alerts_to_incident_by_incident_id(self.tenant_id, incident_id, alert_fingerprints)
self.__postprocess_alerts_change(incident, alert_fingerprints)

def delete_incident(self, incident_id: UUID) -> None:
self.logger.info(
Expand Down Expand Up @@ -255,7 +244,7 @@ def update_incident(
incident_id: UUID,
updated_incident_dto: IncidentDtoIn,
generated_by_ai: bool,
) -> None:
) -> IncidentDto:
self.logger.info(
"Fetching incident",
extra={
Expand All @@ -270,16 +259,34 @@ def update_incident(
raise HTTPException(status_code=404, detail="Incident not found")

new_incident_dto = IncidentDto.from_db_incident(incident)
try:
workflow_manager = WorkflowManager.get_instance()
self.logger.info("Adding incident to the workflow manager queue")
workflow_manager.insert_incident(
self.tenant_id, new_incident_dto, "updated"
)
self.logger.info("Added incident to the workflow manager queue")
except Exception:
self.logger.exception(
"Failed to run workflows based on incident",
extra={"incident_id": new_incident_dto.id, "tenant_id": self.tenant_id},
)

self.__update_client_on_incident_change(incident.id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident.id},
)
self.__run_workflows(new_incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident.id},
)
return new_incident_dto

def __postprocess_alerts_change(self, incident, alert_fingerprints):

self.__update_elastic(alert_fingerprints)
self.logger.info(
"Alerts pushed to elastic",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
self.__update_client_on_incident_change(incident.id)
self.logger.info(
"Client updated on incident change",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
incident_dto = IncidentDto.from_db_incident(incident)
self.__run_workflows(incident_dto, "updated")
self.logger.info(
"Workflows run on incident",
extra={"incident_id": incident.id, "alert_fingerprints": alert_fingerprints},
)
18 changes: 15 additions & 3 deletions keep/api/core/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,18 @@ def get_alerts_by_fingerprint(

return alerts

def get_all_alerts_by_fingerprints(
tenant_id: str, fingerprints: List[str], session: Optional[Session] = None
) -> List[Alert]:
with existed_or_new_session(session) as session:
query = (
select(Alert)
.filter(Alert.tenant_id == tenant_id)
.filter(Alert.fingerprint.in_(fingerprints))
.order_by(Alert.timestamp.desc())
)
return session.exec(query).all()


def get_alert_by_fingerprint_and_event_id(
tenant_id: str, fingerprint: str, event_id: str
Expand Down Expand Up @@ -3215,11 +3227,11 @@ def enrich_alerts_with_incidents(
).all()

incidents_per_alert = defaultdict(list)
for alert_id, incident in alert_incidents:
incidents_per_alert[alert_id].append(incident)
for fingerprint, incident in alert_incidents:
incidents_per_alert[fingerprint].append(incident)

for alert in alerts:
alert._incidents = incidents_per_alert[incident.id]
alert._incidents = incidents_per_alert[alert.fingerprint]

return alerts

Expand Down
12 changes: 6 additions & 6 deletions keep/api/utils/enrichment_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ def calculated_start_firing_time(


def convert_db_alerts_to_dto_alerts(
alerts: list[Alert | tuple[Alert, LastAlertToIncident]],
with_incidents: bool = False,
session: Optional[Session] = None,
) -> list[AlertDto | AlertWithIncidentLinkMetadataDto]:
alerts: list[Alert | tuple[Alert, LastAlertToIncident]],
with_incidents: bool = False,
session: Optional[Session] = None,
) -> list[AlertDto | AlertWithIncidentLinkMetadataDto]:
"""
Enriches the alerts with the enrichment data.
Expand All @@ -109,8 +109,8 @@ def convert_db_alerts_to_dto_alerts(
if alert.alert_enrichment:
alert.event.update(alert.alert_enrichment.enrichments)
if with_incidents:
if alert.incidents:
alert.event["incident"] = ",".join(str(incident.id) for incident in alert.incidents)
if alert._incidents:
alert.event["incident"] = ",".join(str(incident.id) for incident in alert._incidents)
try:
if alert_to_incident is not None:
alert_dto = AlertWithIncidentLinkMetadataDto.from_db_instance(alert, alert_to_incident)
Expand Down
20 changes: 19 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ psycopg-binary = "^3.2.3"
psycopg = "^3.2.3"
prometheus-client = "^0.21.1"
psycopg2-binary = "^2.9.10"
pytest-asyncio = "0.21.0"
[tool.poetry.group.dev.dependencies]
pre-commit = "^3.0.4"
pre-commit-hooks = "^4.4.0"
Expand Down
Loading

0 comments on commit 1a51b20

Please sign in to comment.