diff --git a/agent/src/agent/api/routes/monitoring.py b/agent/src/agent/api/routes/monitoring.py index f656b6307..e380dbb6e 100644 --- a/agent/src/agent/api/routes/monitoring.py +++ b/agent/src/agent/api/routes/monitoring.py @@ -9,7 +9,8 @@ @monitoring_bp.route('/metrics', methods=['GET']) def metrics(): monitoring.pull_latest() - return generate_latest(registry=monitoring.metrics.registry) + return jsonify(monitoring.get_monitoring_metrics()) + # return monitoring.generate_latest() @monitoring_bp.route('/monitoring', methods=['GET']) diff --git a/agent/src/agent/monitoring/__init__.py b/agent/src/agent/monitoring/__init__.py index 456685dba..cc01a0355 100644 --- a/agent/src/agent/monitoring/__init__.py +++ b/agent/src/agent/monitoring/__init__.py @@ -4,10 +4,11 @@ from . import metrics, streamsets, sender from agent.modules import constants, logger +from agent.monitoring.dataclasses import Counter from datetime import datetime from agent import pipeline from agent.pipeline import Pipeline -from typing import Dict +from typing import Dict, List logger_ = logger.get_logger(__name__) @@ -16,12 +17,12 @@ def pull_latest(): streamsets.pull_metrics() -def get_monitoring_metrics() -> list[dict]: +def get_monitoring_metrics() -> List[Dict]: pull_latest() data = [] pipelines = pipeline.repository.get_all() pipelines: Dict[str, Pipeline] = dict(zip({p.name for p in pipelines}, pipelines)) - for metric in metrics.registry.collect(): + for metric in metrics.collect_metrics(): target_type = anodot.TargetType.COUNTER if metric.type == 'counter' else anodot.TargetType.GAUGE for sample in metric.samples: if sample.name.endswith('_created'): @@ -36,7 +37,6 @@ def get_monitoring_metrics() -> list[dict]: data.append( anodot.Metric20(sample.name, sample.value, target_type, datetime.utcnow(), dimensions=dims).to_dict() ) - return data diff --git a/agent/src/agent/monitoring/dataclasses.py b/agent/src/agent/monitoring/dataclasses.py new file mode 100644 index 000000000..b28564b4c --- /dev/null +++ b/agent/src/agent/monitoring/dataclasses.py @@ -0,0 +1,97 @@ +class Sample: + def __init__(self, name, value, labels, timestamp=None): + self.name = name + self.value = value + self.labels = labels + self.timestamp = timestamp + + +class MetricValue: + def __init__(self): + self._value = 0.0 + + def inc(self, value=1): + self._value += value + + def dec(self, value=1): + self._value -= value + + def set(self, value): + self._value = value + + def get(self): + return self._value + + @property + def value(self): + return self._value + + +class Metric: + def __init__(self, + name, + documentation, + labelnames=(),): + self.name = name + self.documentation = documentation + self._labelnames = labelnames + self._metrics = {} + + def labels(self, *labelvalues, **labelkwargs): + if labelvalues and labelkwargs: + raise ValueError("Can't pass both *args and **kwargs") + if labelkwargs: + if sorted(labelkwargs) != sorted(self._labelnames): + raise ValueError('Incorrect label names') + labelvalues = tuple(labelkwargs[l] for l in self._labelnames) + else: + if len(labelvalues) != len(self._labelnames): + raise ValueError('Incorrect label count') + labelvalues = tuple(l for l in labelvalues) + + if labelvalues not in self._metrics: + self._metrics[labelvalues] = MetricValue() + + return self._metrics[labelvalues] + + @property + def samples(self): + return [Sample( + name=self.name, + value=self._metrics[labelvalues].value, + labels=dict(zip(self._labelnames, labelvalues)), + ) for labelvalues in self._metrics] + + +class Gauge(Metric): + def __init__(self, + name, + documentation, + labelnames=(), **kwargs): + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + ) + self.type = 'gauge' + + +class Counter(Metric): + def __init__(self, + name, + documentation, + labelnames=(), **kwargs): + super().__init__( + name=name, + documentation=documentation, + labelnames=labelnames, + ) + self.type = 'counter' + + @property + def samples(self): + return [Sample( + name=self.name + '_total', + value=self._metrics[labelvalues].value, + labels=dict(zip(self._labelnames, labelvalues)), + ) for labelvalues in self._metrics] diff --git a/agent/src/agent/monitoring/metrics.py b/agent/src/agent/monitoring/metrics.py index cb771fede..e2243afb1 100644 --- a/agent/src/agent/monitoring/metrics.py +++ b/agent/src/agent/monitoring/metrics.py @@ -1,5 +1,10 @@ -from prometheus_client import Info, Gauge, Counter, CollectorRegistry +from prometheus_client import (Info, + Counter as PrometheusCounter, + Gauge as PrometheusGauge, + CollectorRegistry, +) from agent import version +from agent.monitoring.dataclasses import Counter, Gauge registry = CollectorRegistry() @@ -27,7 +32,7 @@ PIPELINE_ERROR_RECORDS = Counter( 'pipeline_error_records', 'Pipeline error records', ['streamsets_url', 'pipeline_id', 'pipeline_type'] ) -PIPELINE_AVG_LAG = Gauge( +PIPELINE_AVG_LAG = PrometheusGauge( 'pipeline_avg_lag_seconds', 'Pipeline average lag metrics', ['streamsets_url', 'pipeline_id', 'pipeline_type'], multiprocess_mode='max' @@ -69,27 +74,37 @@ KAFKA_CONSUMER_LAG = Gauge('kafka_consumer_lag', 'Kafka consumer lag', ['topic'], multiprocess_mode='max') -SOURCE_HTTP_ERRORS = Counter('source_http_errors', 'Source HTTP errors', ['pipeline_id', 'pipeline_type', 'code']) -SOURCE_MYSQL_ERRORS = Counter('source_mysql_errors', 'Source MySQL errors', ['pipeline_id']) -SCHEDULED_SCRIPTS_ERRORS = Counter('scheduled_scripts_errors', 'Scheduled scripts errors', ['script_name']) -SCHEDULED_SCRIPT_EXECUTION_TIME = Gauge( +SOURCE_HTTP_ERRORS = PrometheusCounter('source_http_errors', 'Source HTTP errors', ['pipeline_id', 'pipeline_type', 'code']) +SOURCE_MYSQL_ERRORS = PrometheusCounter('source_mysql_errors', 'Source MySQL errors', ['pipeline_id']) +SCHEDULED_SCRIPTS_ERRORS = PrometheusCounter('scheduled_scripts_errors', 'Scheduled scripts errors', ['script_name']) +SCHEDULED_SCRIPT_EXECUTION_TIME = PrometheusGauge( 'scheduled_script_execution_time', 'Time to execute a scheduled script', ['script_name'], multiprocess_mode='max' ) -DIRECTORY_FILE_PROCESSED = Counter( +DIRECTORY_FILE_PROCESSED = PrometheusCounter( 'directory_file_processed', 'Finished processing one file', ['streamsets_url', 'pipeline_id'] ) -WATERMARK_DELTA = Gauge( +WATERMARK_DELTA = PrometheusGauge( 'watermark_delta', 'Difference between time.now() and watermark timestamp', ['streamsets_url', 'pipeline_id', 'pipeline_type'], multiprocess_mode='max' ) -WATERMARK_SENT = Counter( +WATERMARK_SENT = PrometheusCounter( 'watermark_sent', 'Number of sent watermarks', ['streamsets_url', 'pipeline_id', 'pipeline_type'] ) + +def collect_metrics(): + return [ + *registry.collect(), + STREAMSETS_CPU, STREAMSETS_HEAP_MEMORY, STREAMSETS_NON_HEAP_MEMORY, + PIPELINE_INCOMING_RECORDS, PIPELINE_OUTGOING_RECORDS, PIPELINE_ERROR_RECORDS, + PIPELINE_DESTINATION_LATENCY, PIPELINE_SOURCE_LATENCY, + PIPELINE_STAGE_BATCH_PROCESSING_TIME_AVG, PIPELINE_STAGE_BATCH_PROCESSING_TIME_50th, + PIPELINE_STAGE_BATCH_PROCESSING_TIME_999th, PIPELINE_STATUS, KAFKA_CONSUMER_LAG, + ] # # Not for every endpoint # AGENT_API_REQUESTS_LATENCY = Gauge('agent_api_requests_latency_seconds', 'Agent API requests time in seconds', # ['endpoint'], registry=registry) diff --git a/agent/src/agent/monitoring/streamsets.py b/agent/src/agent/monitoring/streamsets.py index f803232d2..b3f07c2ee 100644 --- a/agent/src/agent/monitoring/streamsets.py +++ b/agent/src/agent/monitoring/streamsets.py @@ -2,9 +2,10 @@ import sdc_client import re -from typing import List +from typing import List, Dict, Union from agent import streamsets, pipeline, source from agent.monitoring import metrics +from agent.monitoring.dataclasses import Counter from agent.modules import constants from agent.modules import logger @@ -21,9 +22,12 @@ def _pull_system_metrics(streamsets_: streamsets.StreamSets, jmx: dict): metrics.STREAMSETS_CPU.labels(streamsets_.url).set(bean['ProcessCpuLoad']) -def _increase_counter(total: int, metric: prometheus_client.Counter): +def _increase_counter(total: int, metric: Union[prometheus_client.Counter, Counter]): # TODO: do not access private property - val = total - metric._value.get() + if isinstance(metric, prometheus_client.Counter): + val = total - metric._value.get() + else: + val = total - metric.value if val > 0: metric.inc(val) @@ -79,7 +83,7 @@ def pull_metrics(): ) -def _process_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> None: +def _get_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> List[Dict]: if not asynchronous: jmxes = [sdc_client.get_jmx(pipeline_.streamsets, f'metrics:name=sdc.pipeline.{pipeline_.name}.*') for pipeline_ in pipelines] @@ -87,12 +91,17 @@ def _process_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: jmxes = sdc_client.get_jmxes_async([ (pipeline_.streamsets, f'metrics:name=sdc.pipeline.{pipeline_.name}.*',) for pipeline_ in pipelines], return_exceptions=True) + return jmxes + + +def _process_pipeline_metrics(pipelines: List[pipeline.Pipeline], asynchronous: bool = False) -> None: + jmxes = _get_pipeline_metrics(pipelines, asynchronous) for pipeline_, jmx in zip(pipelines, jmxes): _pull_pipeline_metrics(pipeline_, jmx) if not isinstance(jmx, Exception) \ else logger_.error(f"Error: {jmx} for pipeline {pipeline_.name}") -def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> None: +def _get_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> List[Dict]: sys_queries = [(streamset_, 'java.lang:type=*',) for streamset_ in streamsets_] kafka_queries = [ (streamset_, 'kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=*,partition=*',) @@ -101,6 +110,11 @@ def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynch jmxes = [sdc_client.get_jmx(streamset_, query) for streamset_, query in sys_queries + kafka_queries] else: jmxes = sdc_client.get_jmxes_async(sys_queries + kafka_queries, return_exceptions=True) + return jmxes + + +def _process_streamsets_metrics(streamsets_: List[streamsets.StreamSets], asynchronous: bool = False) -> None: + jmxes = _get_streamsets_metrics(streamsets_, asynchronous) for index, streamset_ in enumerate(streamsets_): _pull_system_metrics(streamset_, jmxes[index]) if not isinstance(jmxes[index], Exception) \ else logger_.error(f"Error: {jmxes[index]} for streamset {streamset_.url}") diff --git a/agent/tests/input_files/raw/sources/kafka.json b/agent/tests/input_files/raw/sources/kafka.json index dea3fdb3a..ec5ed95db 100644 --- a/agent/tests/input_files/raw/sources/kafka.json +++ b/agent/tests/input_files/raw/sources/kafka.json @@ -13,7 +13,7 @@ "name": "test_kafka_raw_json", "config": { "conf.brokerURI": "kafka:29092", - "conf.topicList": ["test_raw_json"], + "conf.topicList": ["test_kfk"], "conf.dataFormat": "JSON" } } diff --git a/agent/tests/test_monitoring.py b/agent/tests/test_monitoring.py index 02cac4244..ef8506fcb 100644 --- a/agent/tests/test_monitoring.py +++ b/agent/tests/test_monitoring.py @@ -1,4 +1,6 @@ import pytest +import requests + from .test_input.test_zpipeline_base import TestInputBase from .test_pipelines.test_zpipeline_base import TestPipelineBase @@ -58,9 +60,9 @@ def test_output(self, name=None, pipeline_type=None, output=None): def test_output_schema(self, name=None, pipeline_type=None, output=None): pytest.skip() - def test_monitoring_metrics(self, api_client, name, metric_type): - response = api_client.get('/metrics') + def test_monitoring_metrics(self, name, metric_type): + response = requests.get('http://localhost/metrics') assert response.status_code == 200 - metrics = response.data.decode('utf-8').split('\n') - metric_found = any(i.startswith(metric_type) and i.find(name) != -1 for i in metrics) + metrics = response.json() + metric_found = any(i['properties'].get('pipeline_id') == name and i['properties']['what'] == metric_type for i in metrics) assert metric_found diff --git a/agent/tests/test_raw/test_pipelines/test_kafka.py b/agent/tests/test_raw/test_pipelines/test_kafka.py index 490f2c6b5..48a2efe7e 100644 --- a/agent/tests/test_raw/test_pipelines/test_kafka.py +++ b/agent/tests/test_raw/test_pipelines/test_kafka.py @@ -3,6 +3,7 @@ class TestRawKafka(TestRawPipelineBase): __test__ = True + MAX_TIMES_TO_WAIT = 600 params = { 'test_start': [ {'name': 'kafka_raw_csv'}, diff --git a/scripts/upload-test-data-to-kafka.sh b/scripts/upload-test-data-to-kafka.sh index dc9754770..7f153e31d 100755 --- a/scripts/upload-test-data-to-kafka.sh +++ b/scripts/upload-test-data-to-kafka.sh @@ -2,7 +2,7 @@ docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_csv < /home/test.csv" docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_csv < /home/test.csv" -docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_json < /home/test_json_items_for_kafka" +#docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_raw_json < /home/test_json_items_for_kafka" docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --topic test_kfk < /home/test_json_items_for_kafka" docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --compression-codec zstd --topic test_running_counters < /home/test_running_counter.txt" docker exec agent-kafka bash -c "kafka-console-producer.sh --broker-list localhost:9092 --compression-codec zstd --topic test_json_arrays < /home/test_with_arrays.json"