Skip to content

Commit

Permalink
feat(api): Add a new API for exporting data as a CSV (#33)
Browse files Browse the repository at this point in the history
* feature(api): Add a new API for generating CSV files

Signed-off-by: hayk96 <hayko5999@gmail.com>

* chore(api): Bump app version #minor

Signed-off-by: hayk96 <hayko5999@gmail.com>

---------

Signed-off-by: hayk96 <hayko5999@gmail.com>
  • Loading branch information
hayk96 authored Jun 23, 2024
1 parent 8e30efb commit 78c1909
Show file tree
Hide file tree
Showing 7 changed files with 286 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 0.4.0 / 2024-06-23

* [ENHANCEMENT] Added a new API endpoint: `/export` for exporting data from Prometheus as a CSV file. This feature allows users to export data from Prometheus easily.
It supports both instant queries and range queries. More details can be found in the [API documentation](https://hayk96.github.io/prometheus-api/). #33

## 0.3.3 / 2024-06-16

* [ENHANCEMENT] Added a new endpoint: `/metrics-lifecycle-policies/trigger` for force-triggering all Metrics Lifecycle Policies. #29
Expand Down
3 changes: 2 additions & 1 deletion src/api/v1/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .. v1.endpoints import reverse_proxy, rules, policies, web, health
from .. v1.endpoints import reverse_proxy, rules, policies, web, health, export
from fastapi import APIRouter

api_router = APIRouter()
api_router.include_router(rules.router, prefix="/api/v1")
api_router.include_router(export.router, prefix="/api/v1")
api_router.include_router(policies.router, prefix="/api/v1")
api_router.include_router(web.router, prefix="")
api_router.include_router(health.router, prefix="")
Expand Down
98 changes: 98 additions & 0 deletions src/api/v1/endpoints/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from fastapi import APIRouter, Response, Request, Body, status
from starlette.background import BackgroundTask
from fastapi.responses import FileResponse
from src.models.export import ExportData
from src.core import export as exp
from src.utils.log import logger
from typing import Annotated

router = APIRouter()


@router.post("/export",
name="Export data from Prometheus",
description="Exports data from Prometheus based on the provided PromQL",
status_code=status.HTTP_200_OK,
tags=["export"],
responses={
200: {
"description": "OK",
"content": {
"text/csv; charset=utf-8": {
"example": "__name__,instance,job,timestamp,value\n"
"up,prometheus-api:5000,prometheus-api,1719131438.585,1\n"
"up,localhost:9090,prometheus,1719131438.585,1"
}
}
},
400: {
"description": "Bad Request",
"content": {
"application/json": {
"example": [
{
"status": "error",
"query": "sum by (instance) (prometheus_build_info",
"message": "invalid parameter 'query': 1:41: parse error: unclosed left parenthesis"
}
]
}
}
},
500: {
"description": "Internal Server Error",
"content": {
"application/json": {
"example": [
{
"status": "error",
"query": "sum by (instance) (prometheus_build_info)",
"message": "Prometheus query has failed. HTTPConnectionPool(host='localhost', port=9090)"
}
]
}
}
}
}
)
async def export(
request: Request,
response: FileResponse or Response,
data: Annotated[
ExportData,
Body(
openapi_examples=ExportData._request_body_examples,
)
]
):
data = data.dict()
filename = "data.csv"
expr, start = data.get("expr"), data.get("start")
end, step = data.get("end"), data.get("step")
validation_status, response.status_code, sts, msg = exp.validate_request(
"export.json", data)
if validation_status:
range_query = True if all([start, end, step]) else False
resp_status, response.status_code, resp_data = exp.prom_query(
range_query=range_query,
query=expr, start=start,
end=end, step=step)
if resp_status:
labels, data_processed = exp.data_processor(source_data=resp_data)
csv_generator_status, sts, msg = exp.csv_generator(
data=data_processed, fields=labels, filename=filename)
else:
sts, msg = resp_data.get("status"), resp_data.get("error")

logger.info(
msg=msg,
extra={
"status": response.status_code,
"query": expr,
"method": request.method,
"request_path": request.url.path})
if sts == "success":
return FileResponse(path=filename,
background=BackgroundTask(exp.cleanup_files, filename))
return {"status": sts, "query": expr, "message": msg}
119 changes: 119 additions & 0 deletions src/core/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from jsonschema import validate, exceptions
from src.utils.arguments import arg_parser
import requests
import json
import copy
import csv
import os

prom_addr = arg_parser().get("prom.addr")


def prom_query(query, range_query=False, start="0", end="0",
step="0", url=prom_addr) -> tuple[bool, int, dict]:
"""
This function queries data from Prometheus
based on the information provided by the
user and returns the data as a dictionary.
"""
try:
r = requests.post(f"{url}/api/v1/{'query_range' if range_query else 'query'}",
data={
"query": query,
"start": start,
"end": end,
"step": step},
headers={"Content-Type": "application/x-www-form-urlencoded"})
except BaseException as e:
return False, 500, {"status": "error",
"error": f"Prometheus query has failed. {e}"}
else:
return True if r.status_code == 200 else False, r.status_code, r.json()


def data_processor(source_data: dict) -> tuple[list, list]:
"""
This function preprocesses the results
of the Prometheus query for future formatting.
It returns all labels of the query result
and the data of each time series.
"""
data_raw = copy.deepcopy(source_data)
data_processed, unique_labels = [], set()
data_result = data_raw["data"]["result"]

def vector_processor():
for ts in data_result:
ts_labels = set(ts["metric"].keys())
unique_labels.update(ts_labels)
series = ts["metric"]
series["timestamp"] = ts["value"][0]
series["value"] = ts["value"][1]
data_processed.append(series)

def matrix_processor():
for ts in data_result:
ts_labels = set(ts["metric"].keys())
unique_labels.update(ts_labels)
series = ts["metric"]
for idx in range(len(ts["values"])):
series_nested = copy.deepcopy(series)
series_nested["timestamp"] = ts["values"][idx][0]
series_nested["value"] = ts["values"][idx][1]
data_processed.append(series_nested)
del series_nested

if data_raw["data"]["resultType"] == "vector":
vector_processor()
elif data_raw["data"]["resultType"] == "matrix":
matrix_processor()

unique_labels = sorted(unique_labels)
unique_labels.extend(["timestamp", "value"])
return unique_labels, data_processed


def validate_request(schema_file, data) -> tuple[bool, int, str, str]:
"""
This function validates the request object
provided by the user against the required schema.
It will be moved into the utils package in the future.
"""
schema_file = f"src/schemas/{schema_file}"
with open(schema_file) as f:
schema = json.load(f)
try:
validate(instance=data, schema=schema)
except exceptions.ValidationError as e:
return False, 400, "error", e.args[0]
return True, 200, "success", "Request is valid"


def cleanup_files(file) -> tuple[True, str]:
"""
This function removes the generated file
once it sends a response to the user.
"""
try:
os.remove(file)
except BaseException as e:
return False, str(e)
else:
return True, "File has been removed successfully"


def csv_generator(data, fields, filename) -> tuple[bool, str, str]:
"""
This function generates a CSV file
based on the provided objects.
"""
try:
with open(filename, 'w') as csvfile:
writer = csv.DictWriter(
csvfile, fieldnames=fields, extrasaction='ignore')
writer.writeheader()
writer.writerows(data)
except BaseException as e:
return False, "error", str(e)
else:
return True, "success", "CSV file has been generated successfully"
20 changes: 20 additions & 0 deletions src/models/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pydantic import BaseModel, Extra
from typing import Optional


class ExportData(BaseModel, extra=Extra.allow):
expr: str
start: Optional[str] = None
end: Optional[str] = None
step: Optional[str] = None
_request_body_examples = {
"Count of successful logins by users per hour in a day": {
"description": "Count of successful logins by users per hour in a day",
"value": {
"expr": "users_login_count{status='success'}",
"start": "2024-01-30T00:00:00Z",
"end": "2024-01-31T23:59:59Z",
"step": "1h"
}
}
}
41 changes: 41 additions & 0 deletions src/schemas/export.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"expr": {
"type": "string"
},
"start": {
"type": ["string", "null"],
"format": "date-time"
},
"end": {
"type": ["string", "null"],
"format": "date-time"
},
"step": {
"type": ["string", "null"],
"pattern": "^((([0-9]+)y)?(([0-9]+)w)?(([0-9]+)d)?(([0-9]+)h)?(([0-9]+)m)?(([0-9]+)s)?(([0-9]+)ms)?|0)$"
}
},
"required": ["expr"],
"additionalProperties": false,
"oneOf": [
{
"properties": {
"start": { "type": "string" },
"end": { "type": "string" },
"step": { "type": "string" }
},
"required": ["start", "end", "step"]
},
{
"properties": {
"start": { "type": "null" },
"end": { "type": "null" },
"step": { "type": "null" }
}
}
],
"title": "Export data from Prometheus"
}
2 changes: 1 addition & 1 deletion src/utils/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def openapi(app: FastAPI):
"providing additional features and addressing its limitations. "
"Running as a sidecar alongside the Prometheus server enables "
"users to extend the capabilities of the API.",
version="0.3.3",
version="0.4.0",
contact={
"name": "Hayk Davtyan",
"url": "https://hayk96.github.io",
Expand Down

0 comments on commit 78c1909

Please sign in to comment.