Skip to content

Commit

Permalink
feat(api): memory consumption and performance improvements (#2908)
Browse files Browse the repository at this point in the history
Signed-off-by: Shahar Glazner <shaharglazner@gmail.com>
Signed-off-by: Tal <talboren2@gmail.com>
Co-authored-by: Tal <tal@keephq.dev>
  • Loading branch information
shahargl and talboren authored Dec 27, 2024
1 parent 995e8e2 commit 5c4ab0f
Show file tree
Hide file tree
Showing 14 changed files with 419 additions and 368 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,5 @@ oauth2.cfg
scripts/keep_slack_bot.py
keepnew.db
providers_cache.json

tests/provision/*
12 changes: 5 additions & 7 deletions keep/api/alert_deduplicator/alert_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
DeduplicationRuleRequestDto,
)
from keep.providers.providers_factory import ProvidersFactory
from keep.searchengine.searchengine import SearchEngine

DEFAULT_RULE_UUID = "00000000-0000-0000-0000-000000000000"

Expand All @@ -42,7 +41,6 @@ class AlertDeduplicator:
def __init__(self, tenant_id):
self.logger = logging.getLogger(__name__)
self.tenant_id = tenant_id
self.search_engine = SearchEngine(self.tenant_id)

def _apply_deduplication_rule(
self, alert: AlertDto, rule: DeduplicationRuleDto
Expand Down Expand Up @@ -264,7 +262,7 @@ def _get_default_full_deduplication_rule(
ingested=0,
dedup_ratio=0.0,
enabled=True,
is_provisioned=False
is_provisioned=False,
)

def get_deduplications(self) -> list[DeduplicationRuleDto]:
Expand Down Expand Up @@ -502,15 +500,15 @@ def update_deduplication_rule(
rule_dto = self.create_deduplication_rule(rule, updated_by)
self.logger.info("Default rule updated")
return rule_dto

rule_before_update = get_deduplication_rule_by_id(self.tenant_id, rule_id)

if not rule_before_update:
raise HTTPException(
status_code=404,
detail="Deduplication rule not found",
)

if rule_before_update.is_provisioned:
raise HTTPException(
status_code=409,
Expand Down Expand Up @@ -557,7 +555,7 @@ def delete_deduplication_rule(self, rule_id: str) -> bool:
status_code=404,
detail="Deduplication rule not found",
)

if deduplication_rule_to_be_deleted.is_provisioned:
raise HTTPException(
status_code=409,
Expand Down
7 changes: 5 additions & 2 deletions keep/api/core/demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,6 @@ async def simulate_alerts_async(
logger.info(
"Sleeping for {} seconds before next iteration".format(sleep_interval)
)
await asyncio.sleep(sleep_interval)


def launch_demo_mode_thread(
Expand Down Expand Up @@ -623,11 +622,14 @@ async def simulate_alerts_worker(worker_id, keep_api_key, rps=1):
url, alert = await REQUESTS_QUEUE.get()

async with session.post(url, json=alert, headers=headers) as response:
response_time = time.time() - start
total_requests += 1
if not response.ok:
logger.error("Failed to send alert: {}".format(response.text))
else:
logger.info("Alert sent successfully")
logger.info(
f"Alert sent successfully in {response_time:.3f} seconds"
)

if rps:
delay = 1 / rps - (time.time() - start)
Expand All @@ -639,6 +641,7 @@ async def simulate_alerts_worker(worker_id, keep_api_key, rps=1):
worker_id,
total_requests / (time.time() - total_start),
)
logger.info("Total requests: %d", total_requests)


if __name__ == "__main__":
Expand Down
54 changes: 53 additions & 1 deletion keep/api/core/metrics.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os

from prometheus_client import Counter, Gauge, Summary
from prometheus_client import Counter, Gauge, Histogram, Summary

PROMETHEUS_MULTIPROC_DIR = os.environ.get("PROMETHEUS_MULTIPROC_DIR", "/tmp/prometheus")
os.makedirs(PROMETHEUS_MULTIPROC_DIR, exist_ok=True)
Expand Down Expand Up @@ -37,3 +37,55 @@
labelnames=["pid"],
multiprocess_mode="livesum",
)

### WORKFLOWS
METRIC_PREFIX = "keep_workflows_"

# Workflow execution metrics
workflow_executions_total = Counter(
f"{METRIC_PREFIX}executions_total",
"Total number of workflow executions",
labelnames=["tenant_id", "workflow_id", "trigger_type"],
)

workflow_execution_errors_total = Counter(
f"{METRIC_PREFIX}execution_errors_total",
"Total number of workflow execution errors",
labelnames=["tenant_id", "workflow_id", "error_type"],
)

workflow_execution_status = Counter(
f"{METRIC_PREFIX}execution_status_total",
"Total number of workflow executions by status",
labelnames=["tenant_id", "workflow_id", "status"],
)

# Workflow performance metrics
workflow_execution_duration = Histogram(
f"{METRIC_PREFIX}execution_duration_seconds",
"Time spent executing workflows",
labelnames=["tenant_id", "workflow_id"],
buckets=(1, 5, 10, 30, 60, 120, 300, 600), # 1s, 5s, 10s, 30s, 1m, 2m, 5m, 10m
)

workflow_execution_step_duration = Histogram(
f"{METRIC_PREFIX}execution_step_duration_seconds",
"Time spent executing individual workflow steps",
labelnames=["tenant_id", "workflow_id", "step_name"],
buckets=(0.1, 0.5, 1, 2, 5, 10, 30, 60),
)

# Workflow state metrics
workflows_running = Gauge(
f"{METRIC_PREFIX}running",
"Number of currently running workflows",
labelnames=["tenant_id"],
multiprocess_mode="livesum",
)

workflow_queue_size = Gauge(
f"{METRIC_PREFIX}queue_size",
"Number of workflows waiting to be executed",
labelnames=["tenant_id"],
multiprocess_mode="livesum",
)
15 changes: 14 additions & 1 deletion keep/api/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def process(self, msg, kwargs):


LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")
KEEP_LOG_FILE = os.environ.get("KEEP_LOG_FILE")

LOG_FORMAT_OPEN_TELEMETRY = "open_telemetry"
LOG_FORMAT_DEVELOPMENT_TERMINAL = "dev_terminal"
Expand Down Expand Up @@ -234,7 +235,7 @@ def format(self, record):
},
"dev_terminal": {
"()": DevTerminalFormatter,
"format": "%(asctime)s - %(levelname)s - %(message)s",
"format": "%(asctime)s - %(thread)s %(threadName)s %(levelname)s - %(message)s",
},
},
"handlers": {
Expand Down Expand Up @@ -369,6 +370,18 @@ def _log(


def setup_logging():
# Add file handler if KEEP_LOG_FILE is set
if KEEP_LOG_FILE:
CONFIG["handlers"]["file"] = {
"level": "DEBUG",
"formatter": ("json"),
"class": "logging.FileHandler",
"filename": KEEP_LOG_FILE,
"mode": "a",
}
# Add file handler to root logger
CONFIG["loggers"][""]["handlers"].append("file")

logging.config.dictConfig(CONFIG)
uvicorn_error_logger = logging.getLogger("uvicorn.error")
uvicorn_error_logger.__class__ = CustomizedUvicornLogger
34 changes: 20 additions & 14 deletions keep/api/observability.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,26 @@
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


def get_protocol_from_endpoint(endpoint):
parsed_url = urlparse(endpoint)
if parsed_url.scheme == "http":
return HTTPOTLPSpanExporter
elif parsed_url.scheme == "grpc":
return GRPCOTLPSpanExporter
else:
raise ValueError(f"Unsupported protocol: {parsed_url.scheme}")
parsed_url = urlparse(endpoint)
if parsed_url.scheme == "http":
return HTTPOTLPSpanExporter
elif parsed_url.scheme == "grpc":
return GRPCOTLPSpanExporter
else:
raise ValueError(f"Unsupported protocol: {parsed_url.scheme}")


def setup(app: FastAPI):
logger = logging.getLogger(__name__)
# Configure the OpenTelemetry SDK
service_name = os.environ.get("OTEL_SERVICE_NAME", os.environ.get("SERVICE_NAME", "keep-api"))
otlp_collector_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", os.environ.get("OTLP_ENDPOINT", False))
service_name = os.environ.get(
"OTEL_SERVICE_NAME", os.environ.get("SERVICE_NAME", "keep-api")
)
otlp_collector_endpoint = os.environ.get(
"OTEL_EXPORTER_OTLP_ENDPOINT", os.environ.get("OTLP_ENDPOINT", False)
)
otlp_traces_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT", None)
otlp_logs_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_LOGS_ENDPOINT", None)
otlp_metrics_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_METRICS_ENDPOINT", None)
Expand All @@ -45,21 +51,21 @@ def setup(app: FastAPI):

resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)

if otlp_collector_endpoint:

logger.info(f"OTLP endpoint set to {otlp_collector_endpoint}")

if otlp_traces_endpoint:
logger.info(f"OTLP Traces endpoint set to {otlp_traces_endpoint}")
SpanExporter = get_protocol_from_endpoint(otlp_traces_endpoint)
processor = BatchSpanProcessor(
SpanExporter(endpoint=otlp_traces_endpoint)
)
processor = BatchSpanProcessor(SpanExporter(endpoint=otlp_traces_endpoint))
provider.add_span_processor(processor)

if metrics_enabled.lower() == "true" and otlp_metrics_endpoint:
logger.info(f"Metrics enabled. OTLP Metrics endpoint set to {otlp_metrics_endpoint}")
logger.info(
f"Metrics enabled. OTLP Metrics endpoint set to {otlp_metrics_endpoint}"
)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint=otlp_metrics_endpoint)
)
Expand Down
Loading

0 comments on commit 5c4ab0f

Please sign in to comment.