Skip to content

Commit

Permalink
Metrics (#3216)
Browse files Browse the repository at this point in the history
* Add Prometheus instrumentation

Closes #3214

* Fix missing bind argument

* Run Prometheus exporter as a separate service

* Expose number of streaming requests and number of streamed entities as metrics

* Expose number of auth attempts as Prometheus metrics

* Update Helm chart to expose metrics endpoints, setup ServiceMonitors

* Handle requests without Authz object gracefully

* Rename Prometheus label to "api_endpoint" to prevent naming clashes

Prometheus Operator also uses the "endpoint" label and automatically renames "endpoint" labels exposed by the metrics endpoint to "exported_endpoints" which is ugly.

* Add xref metrics

* Use common prefix for all metric names

Even though it is considered an anti-pattern to add a prefix with the name of the software or component to metrics (according to the official Prometheus documentation), I have decided to add a prefix. I’ve found that this makes it much easier to find relevant metrics. The main disadvantage of per-component prefixes queries become slightly more complex if you want to query the same metric (e.g. HTTP request duration) across multiple components. This isn’t super important in our case though, so I think the trade-off is acceptable.

* Expose Python platform information as Prometheus metrics

* Remove unused port, network policy from K8s specs

Although I'm not 100% sure, the exposed port 3000 probably is a left-over from the past, possibly when convert-document was still part of ingest-file. The network policy prevented Prometheus from scraping ingest-file metrics (and as the metrics port is now the only port exposed by ingest-file, should be otherwise unnecessary).

* Use keyword args to set Prometheus metric labels

As suggested by @stchris

* Bump servicelayer from 1.22.0 to 1.22.1

* Simplify entity streaming metrics code

There’s no need to do batched metric increments until this becomes a performance bottleneck.

* Limit maximum size of Prometheus multiprocessing directory

* Do not let collector classes inherit from `object`

I copied the boilerplate for custom collectors from the docs without thinking about it too much, but inheriting from `object` really isn’t necessary anymore in Python 3.

The Prometheus client also exports an abstract `Collector` class -- it doesn’t do anything except providing type hints for the `collect` method which is nice.

* Add `aleph_` prefix to Prometheus API metrics

* Fix metrics name (singular -> plural)

* Add documentation on how to test Prometheus instrumentation in local Kubernetes cluster
  • Loading branch information
tillprochaska authored Jan 16, 2024
1 parent 3827d97 commit e5eba0d
Show file tree
Hide file tree
Showing 24 changed files with 906 additions and 28 deletions.
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,7 @@ ENV ALEPH_ELASTICSEARCH_URI=http://elasticsearch:9200/ \
FTM_COMPARE_FREQUENCIES_DIR=/opt/ftm-compare/word-frequencies/ \
FTM_COMPARE_MODEL=/opt/ftm-compare/model.pkl

RUN mkdir /run/prometheus

# Run the green unicorn
CMD gunicorn -w 5 -b 0.0.0.0:8000 --log-level info --log-file - aleph.manage:app
CMD gunicorn --config /aleph/gunicorn.conf.py --workers 5 --log-level info --log-file - aleph.manage:app
3 changes: 3 additions & 0 deletions aleph/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from aleph.cache import Cache
from aleph.oauth import configure_oauth
from aleph.util import LoggingTransport
from aleph.metrics.flask import PrometheusExtension

import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
Expand All @@ -39,6 +40,7 @@
mail = Mail()
babel = Babel()
talisman = Talisman()
prometheus = PrometheusExtension()


def determine_locale():
Expand Down Expand Up @@ -94,6 +96,7 @@ def create_app(config=None):
mail.init_app(app)
db.init_app(app)
babel.init_app(app, locale_selector=determine_locale)
prometheus.init_app(app)
CORS(
app,
resources=r"/api/*",
Expand Down
53 changes: 53 additions & 0 deletions aleph/logic/xref.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pprint import pformat, pprint # noqa
from tempfile import mkdtemp
from dataclasses import dataclass
from timeit import default_timer

import followthemoney
from followthemoney import model
Expand All @@ -15,6 +16,7 @@
from followthemoney_compare.models import GLMBernoulli2EEvaluator
from followthemoney.proxy import EntityProxy
from servicelayer.archive.util import ensure_path
from prometheus_client import Counter, Histogram

from aleph.core import es, db
from aleph.settings import SETTINGS
Expand All @@ -40,6 +42,36 @@
ORIGIN = "xref"
MODEL = None
FTM_VERSION_STR = f"ftm-{followthemoney.__version__}"
SCORE_CUTOFF = 0.5

XREF_ENTITIES = Counter(
"aleph_xref_entities_total",
"Total number of entities and mentions that have been xref'ed",
)

XREF_MATCHES = Histogram(
"aleph_xref_matches",
"Number of matches per xref'ed entitiy or mention",
buckets=[
# Listing 0 as a separate bucket size because it's interesting to know
# what percentage of entities result in no matches at all
0,
5,
10,
25,
50,
],
)

XREF_CANDIDATES_QUERY_DURATION = Histogram(
"aleph_xref_candidates_query_duration_seconds",
"Processing duration of the candidates query (excl. network, serialization etc.)",
)

XREF_CANDIDATES_QUERY_ROUNDTRIP_DURATION = Histogram(
"aleph_xref_candidates_query_roundtrip_duration_seconds",
"Roundtrip duration of the candidates query (incl. network, serialization etc.)",
)


@dataclass
Expand Down Expand Up @@ -102,7 +134,15 @@ def _query_item(entity, entitysets=True):
query = {"query": query, "size": 50, "_source": ENTITY_SOURCE}
schemata = list(entity.schema.matchable_schemata)
index = entities_read_index(schema=schemata, expand=False)

start_time = default_timer()
result = es.search(index=index, body=query)
roundtrip_duration = max(0, default_timer() - start_time)
query_duration = result.get("took")
if query_duration is not None:
# ES returns milliseconds, but we track query time in seconds
query_duration = result.get("took") / 1000

candidates = []
for result in result.get("hits").get("hits"):
result = unpack_result(result)
Expand All @@ -116,7 +156,9 @@ def _query_item(entity, entitysets=True):
entity.caption,
len(candidates),
)

results = _bulk_compare([(entity, c) for c in candidates])
match_count = 0
for match, (score, doubt, method) in zip(candidates, results):
log.debug(
"Match: %s: %s <[%.2f]@%0.2f> %s",
Expand All @@ -136,6 +178,17 @@ def _query_item(entity, entitysets=True):
match=match,
entityset_ids=entityset_ids,
)
if score > SCORE_CUTOFF:
# While we store all xref matches with a score > 0, we only count matches
# with a score above a threshold. This is in line with the user-facing behavior
# which also only shows matches above the threshold.
match_count += 1

XREF_ENTITIES.inc()
XREF_MATCHES.observe(match_count)
XREF_CANDIDATES_QUERY_ROUNDTRIP_DURATION.observe(roundtrip_duration)
if query_duration:
XREF_CANDIDATES_QUERY_DURATION.observe(query_duration)


def _iter_mentions(collection):
Expand Down
Empty file added aleph/metrics/__init__.py
Empty file.
178 changes: 178 additions & 0 deletions aleph/metrics/collectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from sqlalchemy import func
from prometheus_client.core import GaugeMetricFamily, InfoMetricFamily
from prometheus_client.registry import Collector
from followthemoney import __version__ as ftm_version

from aleph import __version__ as aleph_version
from aleph.core import create_app as create_flask_app
from aleph.queues import get_active_dataset_status
from aleph.model import Role, Collection, EntitySet, Bookmark


class InfoCollector(Collector):
def collect(self):
yield InfoMetricFamily(
"aleph_system",
"Aleph system information",
value={
"aleph_version": aleph_version,
"ftm_version": ftm_version,
},
)


class DatabaseCollector(Collector):
def __init__(self):
self._flask_app = create_flask_app()

def collect(self):
with self._flask_app.app_context():
yield self._users()
yield self._collections()
yield self._collection_users()
yield self._entitysets()
yield self._entityset_users()
yield self._bookmarks()
yield self._bookmark_users()

def _users(self):
return GaugeMetricFamily(
"aleph_users",
"Total number of users",
value=Role.all_users().count(),
)

def _collections(self):
gauge = GaugeMetricFamily(
"aleph_collections",
"Total number of collections by category",
labels=["category"],
)

query = (
Collection.all()
.with_entities(Collection.category, func.count())
.group_by(Collection.category)
)

for category, count in query:
gauge.add_metric([category], count)

return gauge

def _collection_users(self):
gauge = GaugeMetricFamily(
"aleph_collection_users",
"Total number of users that have created at least one collection",
labels=["category"],
)

query = (
Collection.all()
.with_entities(
Collection.category,
func.count(func.distinct(Collection.creator_id)),
)
.group_by(Collection.category)
)

for category, count in query:
gauge.add_metric([category], count)

return gauge

def _entitysets(self):
gauge = GaugeMetricFamily(
"aleph_entitysets",
"Total number of entity set by type",
labels=["type"],
)

query = (
EntitySet.all()
.with_entities(EntitySet.type, func.count())
.group_by(EntitySet.type)
)

for entityset_type, count in query:
gauge.add_metric([entityset_type], count)

return gauge

def _entityset_users(self):
gauge = GaugeMetricFamily(
"aleph_entityset_users",
"Number of users that have created at least on entity set of the given type",
labels=["type"],
)

query = (
EntitySet.all()
.with_entities(
EntitySet.type,
func.count(func.distinct(EntitySet.role_id)),
)
.group_by(EntitySet.type)
)

for entityset_type, count in query:
gauge.add_metric([entityset_type], count)

return gauge

def _bookmarks(self):
return GaugeMetricFamily(
"aleph_bookmarks",
"Total number of bookmarks",
value=Bookmark.query.count(),
)

def _bookmark_users(self):
return GaugeMetricFamily(
"aleph_bookmark_users",
"Number of users that have created at least one bookmark",
value=Bookmark.query.distinct(Bookmark.role_id).count(),
)


class QueuesCollector(Collector):
def collect(self):
status = get_active_dataset_status()

yield GaugeMetricFamily(
"aleph_active_datasets",
"Total number of active datasets",
value=status["total"],
)

stages = {}

for collection_status in status["datasets"].values():
for job_status in collection_status["jobs"]:
for stage_status in job_status["stages"]:
stage = stage_status["stage"]
pending = stage_status["pending"]
running = stage_status["running"]

if stage not in stages:
stages[stage] = {
"pending": 0,
"running": 0,
}

stages[stage] = {
"pending": stages[stage].get("pending") + pending,
"running": stages[stage].get("running") + running,
}

tasks_gauge = GaugeMetricFamily(
"aleph_tasks",
"Total number of pending or running tasks in a given stage",
labels=["stage", "status"],
)

for stage, tasks in stages.items():
tasks_gauge.add_metric([stage, "pending"], tasks["pending"])
tasks_gauge.add_metric([stage, "running"], tasks["running"])

yield tasks_gauge
17 changes: 17 additions & 0 deletions aleph/metrics/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from prometheus_client import make_wsgi_app, PLATFORM_COLLECTOR
from prometheus_client.core import CollectorRegistry

from aleph.metrics.collectors import InfoCollector, DatabaseCollector, QueuesCollector


def create_app():
registry = CollectorRegistry()
registry.register(PLATFORM_COLLECTOR)
registry.register(InfoCollector())
registry.register(DatabaseCollector())
registry.register(QueuesCollector())

return make_wsgi_app(registry=registry)


app = create_app()
Loading

0 comments on commit e5eba0d

Please sign in to comment.