Skip to content

Commit

Permalink
AL-12056: Send metrics directly (#460)
Browse files Browse the repository at this point in the history
* AL-12056: Send metrics directly

* AL-12056: Increase sleep to 60

* AL-12056: Format jmx to anodot metric

* AL-12056: Use both cases of metrics

* AL-12056: Use own classes for process metrics

* AL-12056: add typing

* AL-12056: Use prometheus and manual types of metrics

* AL-12056: Read kafka raw json data from test_kfk

* AL-12056: Refactoring

* AL-12056: Wait kafka raw test for a long time

* AL-12056: Fix

* AL-12056: generate metrics

* AL-12056: Fix tests

* AL-12056: Fix tests

* AL-12056: Fix tests

* AL-12056: Fix tests

* AL-12056: /metrics return json

* AL-12056: refactoring

* AL-12056: refactoring
  • Loading branch information
AlexMuliar authored Oct 4, 2022
1 parent 700f1ad commit 0301b1f
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 25 deletions.
3 changes: 2 additions & 1 deletion agent/src/agent/api/routes/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
@monitoring_bp.route('/metrics', methods=['GET'])
def metrics():
monitoring.pull_latest()
return generate_latest(registry=monitoring.metrics.registry)
return jsonify(monitoring.get_monitoring_metrics())
# return monitoring.generate_latest()


@monitoring_bp.route('/monitoring', methods=['GET'])
Expand Down
8 changes: 4 additions & 4 deletions agent/src/agent/monitoring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

from . import metrics, streamsets, sender
from agent.modules import constants, logger
from agent.monitoring.dataclasses import Counter
from datetime import datetime
from agent import pipeline
from agent.pipeline import Pipeline
from typing import Dict
from typing import Dict, List

logger_ = logger.get_logger(__name__)

Expand All @@ -16,12 +17,12 @@ def pull_latest():
streamsets.pull_metrics()


def get_monitoring_metrics() -> list[dict]:
def get_monitoring_metrics() -> List[Dict]:
pull_latest()
data = []
pipelines = pipeline.repository.get_all()
pipelines: Dict[str, Pipeline] = dict(zip({p.name for p in pipelines}, pipelines))
for metric in metrics.registry.collect():
for metric in metrics.collect_metrics():
target_type = anodot.TargetType.COUNTER if metric.type == 'counter' else anodot.TargetType.GAUGE
for sample in metric.samples:
if sample.name.endswith('_created'):
Expand All @@ -36,7 +37,6 @@ def get_monitoring_metrics() -> list[dict]:
data.append(
anodot.Metric20(sample.name, sample.value, target_type, datetime.utcnow(), dimensions=dims).to_dict()
)

return data


Expand Down
97 changes: 97 additions & 0 deletions agent/src/agent/monitoring/dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
class Sample:
def __init__(self, name, value, labels, timestamp=None):
self.name = name
self.value = value
self.labels = labels
self.timestamp = timestamp


class MetricValue:
def __init__(self):
self._value = 0.0

def inc(self, value=1):
self._value += value

def dec(self, value=1):
self._value -= value

def set(self, value):
self._value = value

def get(self):
return self._value

@property
def value(self):
return self._value


class Metric:
def __init__(self,
name,
documentation,
labelnames=(),):
self.name = name
self.documentation = documentation
self._labelnames = labelnames
self._metrics = {}

def labels(self, *labelvalues, **labelkwargs):
if labelvalues and labelkwargs:
raise ValueError("Can't pass both *args and **kwargs")
if labelkwargs:
if sorted(labelkwargs) != sorted(self._labelnames):
raise ValueError('Incorrect label names')
labelvalues = tuple(labelkwargs[l] for l in self._labelnames)
else:
if len(labelvalues) != len(self._labelnames):
raise ValueError('Incorrect label count')
labelvalues = tuple(l for l in labelvalues)

if labelvalues not in self._metrics:
self._metrics[labelvalues] = MetricValue()

return self._metrics[labelvalues]

@property
def samples(self):
return [Sample(
name=self.name,
value=self._metrics[labelvalues].value,
labels=dict(zip(self._labelnames, labelvalues)),
) for labelvalues in self._metrics]


class Gauge(Metric):
def __init__(self,
name,
documentation,
labelnames=(), **kwargs):
super().__init__(
name=name,
documentation=documentation,
labelnames=labelnames,
)
self.type = 'gauge'


class Counter(Metric):
def __init__(self,
name,
documentation,
labelnames=(), **kwargs):
super().__init__(
name=name,
documentation=documentation,
labelnames=labelnames,
)
self.type = 'counter'

@property
def samples(self):
return [Sample(
name=self.name + '_total',
value=self._metrics[labelvalues].value,
labels=dict(zip(self._labelnames, labelvalues)),
) for labelvalues in self._metrics]
33 changes: 24 additions & 9 deletions agent/src/agent/monitoring/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from prometheus_client import Info, Gauge, Counter, CollectorRegistry
from prometheus_client import (Info,
Counter as PrometheusCounter,
Gauge as PrometheusGauge,
CollectorRegistry,
)
from agent import version
from agent.monitoring.dataclasses import Counter, Gauge

registry = CollectorRegistry()

Expand Down Expand Up @@ -27,7 +32,7 @@
PIPELINE_ERROR_RECORDS = Counter(
'pipeline_error_records', 'Pipeline error records', ['streamsets_url', 'pipeline_id', 'pipeline_type']
)
PIPELINE_AVG_LAG = Gauge(
PIPELINE_AVG_LAG = PrometheusGauge(
'pipeline_avg_lag_seconds',
'Pipeline average lag metrics', ['streamsets_url', 'pipeline_id', 'pipeline_type'],
multiprocess_mode='max'
Expand Down Expand Up @@ -69,27 +74,37 @@

KAFKA_CONSUMER_LAG = Gauge('kafka_consumer_lag', 'Kafka consumer lag', ['topic'], multiprocess_mode='max')

SOURCE_HTTP_ERRORS = Counter('source_http_errors', 'Source HTTP errors', ['pipeline_id', 'pipeline_type', 'code'])
SOURCE_MYSQL_ERRORS = Counter('source_mysql_errors', 'Source MySQL errors', ['pipeline_id'])
SCHEDULED_SCRIPTS_ERRORS = Counter('scheduled_scripts_errors', 'Scheduled scripts errors', ['script_name'])
SCHEDULED_SCRIPT_EXECUTION_TIME = Gauge(
SOURCE_HTTP_ERRORS = PrometheusCounter('source_http_errors', 'Source HTTP errors', ['pipeline_id', 'pipeline_type', 'code'])
SOURCE_MYSQL_ERRORS = PrometheusCounter('source_mysql_errors', 'Source MySQL errors', ['pipeline_id'])
SCHEDULED_SCRIPTS_ERRORS = PrometheusCounter('scheduled_scripts_errors', 'Scheduled scripts errors', ['script_name'])
SCHEDULED_SCRIPT_EXECUTION_TIME = PrometheusGauge(
'scheduled_script_execution_time', 'Time to execute a scheduled script', ['script_name'], multiprocess_mode='max'
)

DIRECTORY_FILE_PROCESSED = Counter(
DIRECTORY_FILE_PROCESSED = PrometheusCounter(
'directory_file_processed', 'Finished processing one file', ['streamsets_url', 'pipeline_id']
)

WATERMARK_DELTA = Gauge(
WATERMARK_DELTA = PrometheusGauge(
'watermark_delta',
'Difference between time.now() and watermark timestamp', ['streamsets_url', 'pipeline_id', 'pipeline_type'],
multiprocess_mode='max'
)

WATERMARK_SENT = Counter(
WATERMARK_SENT = PrometheusCounter(
'watermark_sent', 'Number of sent watermarks', ['streamsets_url', 'pipeline_id', 'pipeline_type']
)


def collect_metrics():
return [
*registry.collect(),
STREAMSETS_CPU, STREAMSETS_HEAP_MEMORY, STREAMSETS_NON_HEAP_MEMORY,
PIPELINE_INCOMING_RECORDS, PIPELINE_OUTGOING_RECORDS, PIPELINE_ERROR_RECORDS,
PIPELINE_DESTINATION_LATENCY, PIPELINE_SOURCE_LATENCY,
PIPELINE_STAGE_BATCH_PROCESSING_TIME_AVG, PIPELINE_STAGE_BATCH_PROCESSING_TIME_50th,
PIPELINE_STAGE_BATCH_PROCESSING_TIME_999th, PIPELINE_STATUS, KAFKA_CONSUMER_LAG,
]
# # Not for every endpoint
# AGENT_API_REQUESTS_LATENCY = Gauge('agent_api_requests_latency_seconds', 'Agent API requests time in seconds',
# ['endpoint'], registry=registry)
Expand Down
24 changes: 19 additions & 5 deletions agent/src/agent/monitoring/streamsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import sdc_client
import re

from typing import List
from typing import List, Dict, Union
from agent import streamsets, pipeline, source
from agent.monitoring import metrics
from agent.monitoring.dataclasses import Counter
from agent.modules import constants
from agent.modules import logger

Expand All @@ -21,9 +22,12 @@ def _pull_system_metrics(streamsets_: streamsets.StreamSets, jmx: dict):
metrics.STREAMSETS_CPU.labels(streamsets_.url).set(bean['ProcessCpuLoad'])


def _increase_counter(total: int, metric: prometheus_client.Counter):
def _increase_counter(total: int, metric: Union[prometheus_client.Counter, Counter]):
# TODO: do not access private property
val = total - metric._value.get()
if isinstance(metric, prometheus_client.Counter):
val = total - metric._value.get()
else:
val = total - metric.value
if val > 0:
metric.inc(val)

Expand Down Expand Up @@ -79,20 +83,25 @@ def pull_metrics():
)


def _process_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> None:
def _get_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> List[Dict]:
if not asynchronous:
jmxes = [sdc_client.get_jmx(pipeline_.streamsets, f'metrics:name=sdc.pipeline.{pipeline_.name}.*')
for pipeline_ in pipelines]
else:
jmxes = sdc_client.get_jmxes_async([
(pipeline_.streamsets, f'metrics:name=sdc.pipeline.{pipeline_.name}.*',)
for pipeline_ in pipelines], return_exceptions=True)
return jmxes


def _process_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> None:
jmxes = _get_pipeline_metrics(pipelines, asynchronous)
for pipeline_, jmx in zip(pipelines, jmxes):
_pull_pipeline_metrics(pipeline_, jmx) if not isinstance(jmx, Exception) \
else logger_.error(f"Error: {jmx} for pipeline {pipeline_.name}")


def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> None:
def _get_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> List[Dict]:
sys_queries = [(streamset_, 'java.lang:type=*',) for streamset_ in streamsets_]
kafka_queries = [
(streamset_, 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*',)
Expand All @@ -101,6 +110,11 @@ def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynch
jmxes = [sdc_client.get_jmx(streamset_, query) for streamset_, query in sys_queries + kafka_queries]
else:
jmxes = sdc_client.get_jmxes_async(sys_queries + kafka_queries, return_exceptions=True)
return jmxes


def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> None:
jmxes = _get_streamsets_metrics(streamsets_, asynchronous)
for index, streamset_ in enumerate(streamsets_):
_pull_system_metrics(streamset_, jmxes[index]) if not isinstance(jmxes[index], Exception) \
else logger_.error(f"Error: {jmxes[index]} for streamset {streamset_.url}")
Expand Down
2 changes: 1 addition & 1 deletion agent/tests/input_files/raw/sources/kafka.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name": "test_kafka_raw_json",
"config": {
"conf.brokerURI": "kafka:29092",
"conf.topicList": ["test_raw_json"],
"conf.topicList": ["test_kfk"],
"conf.dataFormat": "JSON"
}
}
Expand Down
10 changes: 6 additions & 4 deletions agent/tests/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import pytest
import requests

from .test_input.test_zpipeline_base import TestInputBase
from .test_pipelines.test_zpipeline_base import TestPipelineBase

Expand Down Expand Up @@ -58,9 +60,9 @@ def test_output(self, name=None, pipeline_type=None, output=None):
def test_output_schema(self, name=None, pipeline_type=None, output=None):
pytest.skip()

def test_monitoring_metrics(self, api_client, name, metric_type):
response = api_client.get('/metrics')
def test_monitoring_metrics(self, name, metric_type):
response = requests.get('http://localhost/metrics')
assert response.status_code == 200
metrics = response.data.decode('utf-8').split('\n')
metric_found = any(i.startswith(metric_type) and i.find(name) != -1 for i in metrics)
metrics = response.json()
metric_found = any(i['properties'].get('pipeline_id') == name and i['properties']['what'] == metric_type for i in metrics)
assert metric_found
1 change: 1 addition & 0 deletions agent/tests/test_raw/test_pipelines/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class TestRawKafka(TestRawPipelineBase):
__test__ = True
MAX_TIMES_TO_WAIT = 600
params = {
'test_start': [
{'name': 'kafka_raw_csv'},
Expand Down
2 changes: 1 addition & 1 deletion scripts/upload-test-data-to-kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_csv < /home/test.csv"
docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_csv < /home/test.csv"
docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_json < /home/test_json_items_for_kafka"
#docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_json < /home/test_json_items_for_kafka"
docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_kfk < /home/test_json_items_for_kafka"
docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --compression-codec zstd --topic test_running_counters < /home/test_running_counter.txt"
docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --compression-codec zstd --topic test_json_arrays < /home/test_with_arrays.json"

0 comments on commit 0301b1f

Please sign in to comment.