diff --git a/CHANGELOG.md b/CHANGELOG.md index 62efe35e..06fd1386 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased] ### Added - Added support for Python 3.12 ([#717](https://github.com/opensearch-project/opensearch-py/pull/717)) +- Added service time metrics ([#716](https://github.com/opensearch-project/opensearch-py/pull/716)) ### Changed ### Deprecated ### Removed diff --git a/dev-requirements.txt b/dev-requirements.txt index 7c260032..c3dbc31e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -8,6 +8,7 @@ sphinx_rtd_theme jinja2 pytz deepmerge +Events setuptools # No wheels for Python 3.10 yet! diff --git a/docs/source/api-ref/metrics.md b/docs/source/api-ref/metrics.md new file mode 100644 index 00000000..dd40d986 --- /dev/null +++ b/docs/source/api-ref/metrics.md @@ -0,0 +1,14 @@ +# metrics + +```{eval-rst} +.. autoclass:: opensearchpy.Metrics +``` + +```{eval-rst} +.. autoclass:: opensearchpy.MetricsEvents +``` + +```{eval-rst} +.. autoclass:: opensearchpy.MetricsNone +``` + diff --git a/opensearchpy/__init__.py b/opensearchpy/__init__.py index b852272b..4cf251db 100644 --- a/opensearchpy/__init__.py +++ b/opensearchpy/__init__.py @@ -133,6 +133,7 @@ from .helpers.update_by_query import UpdateByQuery from .helpers.utils import AttrDict, AttrList, DslBase from .helpers.wrappers import Range +from .metrics import Metrics, MetricsEvents, MetricsNone from .serializer import JSONSerializer from .transport import Transport @@ -240,6 +241,9 @@ "token_filter", "tokenizer", "__versionstr__", + "Metrics", + "MetricsEvents", + "MetricsNone", ] try: diff --git a/opensearchpy/connection/http_requests.py b/opensearchpy/connection/http_requests.py index 9bf83004..2c173725 100644 --- a/opensearchpy/connection/http_requests.py +++ b/opensearchpy/connection/http_requests.py @@ -36,6 +36,8 @@ except ImportError: REQUESTS_AVAILABLE = False +from opensearchpy.metrics import Metrics, MetricsNone + from ..compat import reraise_exceptions, string_types, urlencode from ..exceptions import ( ConnectionError, @@ -69,6 +71,9 @@ class RequestsHttpConnection(Connection): For tracing all requests made by this transport. :arg pool_maxsize: Maximum connection pool size used by pool-manager For custom connection-pooling on current session + :arg metrics: metrics is an instance of a subclass of the + :class:`~opensearchpy.Metrics` class, used for collecting + and reporting metrics related to the client's operations; """ def __init__( @@ -86,8 +91,10 @@ def __init__( http_compress: Any = None, opaque_id: Any = None, pool_maxsize: Any = None, + metrics: Metrics = MetricsNone(), **kwargs: Any ) -> None: + self.metrics = metrics if not REQUESTS_AVAILABLE: raise ImproperlyConfigured( "Please install requests to use RequestsHttpConnection." @@ -188,6 +195,7 @@ def perform_request( # type: ignore } send_kwargs.update(settings) try: + self.metrics.request_start() response = self.session.send(prepared_request, **send_kwargs) duration = time.time() - start raw_data = response.content.decode("utf-8", "surrogatepass") @@ -207,6 +215,8 @@ def perform_request( # type: ignore if isinstance(e, requests.Timeout): raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionError("N/A", str(e), e) + finally: + self.metrics.request_end() # raise warnings if any from the 'Warnings' header. warnings_headers = ( diff --git a/opensearchpy/connection/http_urllib3.py b/opensearchpy/connection/http_urllib3.py index ab9a1a78..e3b60cf3 100644 --- a/opensearchpy/connection/http_urllib3.py +++ b/opensearchpy/connection/http_urllib3.py @@ -34,6 +34,8 @@ from urllib3.exceptions import SSLError as UrllibSSLError from urllib3.util.retry import Retry +from opensearchpy.metrics import Metrics, MetricsNone + from ..compat import reraise_exceptions, urlencode from ..exceptions import ( ConnectionError, @@ -94,6 +96,9 @@ class Urllib3HttpConnection(Connection): :arg http_compress: Use gzip compression :arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header For tracing all requests made by this transport. + :arg metrics: metrics is an instance of a subclass of the + :class:`~opensearchpy.Metrics` class, used for collecting + and reporting metrics related to the client's operations; """ def __init__( @@ -115,8 +120,10 @@ def __init__( ssl_context: Any = None, http_compress: Any = None, opaque_id: Any = None, + metrics: Metrics = MetricsNone(), **kwargs: Any ) -> None: + self.metrics = metrics # Initialize headers before calling super().__init__(). self.headers = urllib3.make_headers(keep_alive=True) @@ -268,6 +275,8 @@ def perform_request( if isinstance(self.http_auth, Callable): # type: ignore request_headers.update(self.http_auth(method, full_url, body)) + self.metrics.request_start() + response = self.pool.urlopen( method, url, body, retries=Retry(False), headers=request_headers, **kw ) @@ -284,6 +293,8 @@ def perform_request( if isinstance(e, ReadTimeoutError): raise ConnectionTimeout("TIMEOUT", str(e), e) raise ConnectionError("N/A", str(e), e) + finally: + self.metrics.request_end() # raise warnings if any from the 'Warnings' header. warning_headers = response.headers.get_all("warning", ()) diff --git a/opensearchpy/metrics/__init__.py b/opensearchpy/metrics/__init__.py new file mode 100644 index 00000000..2d0e0d11 --- /dev/null +++ b/opensearchpy/metrics/__init__.py @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from .metrics import Metrics +from .metrics_events import MetricsEvents +from .metrics_none import MetricsNone + +__all__ = [ + "Metrics", + "MetricsEvents", + "MetricsNone", +] diff --git a/opensearchpy/metrics/metrics.py b/opensearchpy/metrics/metrics.py new file mode 100644 index 00000000..8764976c --- /dev/null +++ b/opensearchpy/metrics/metrics.py @@ -0,0 +1,42 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from abc import ABC, abstractmethod +from typing import Optional + + +class Metrics(ABC): + """ + The Metrics class defines methods and properties for managing + request metrics, including start time, end time, and service time, + serving as a blueprint for concrete implementations. + """ + + @abstractmethod + def request_start(self) -> None: + pass + + @abstractmethod + def request_end(self) -> None: + pass + + @property + @abstractmethod + def start_time(self) -> Optional[float]: + pass + + @property + @abstractmethod + def end_time(self) -> Optional[float]: + pass + + @property + @abstractmethod + def service_time(self) -> Optional[float]: + pass diff --git a/opensearchpy/metrics/metrics_events.py b/opensearchpy/metrics/metrics_events.py new file mode 100644 index 00000000..994d5b10 --- /dev/null +++ b/opensearchpy/metrics/metrics_events.py @@ -0,0 +1,61 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +import time +from typing import Optional + +from events import Events + +from opensearchpy.metrics.metrics import Metrics + + +class MetricsEvents(Metrics): + """ + The MetricsEvents class implements the Metrics abstract base class + and tracks metrics such as start time, end time, and service time + during request processing. + """ + + @property + def start_time(self) -> Optional[float]: + return self._start_time + + @property + def end_time(self) -> Optional[float]: + return self._end_time + + @property + def service_time(self) -> Optional[float]: + return self._service_time + + def __init__(self) -> None: + self.events = Events() + self._start_time: Optional[float] = None + self._end_time: Optional[float] = None + self._service_time: Optional[float] = None + + # Subscribe to the request_start and request_end events + self.events.request_start += self._on_request_start + self.events.request_end += self._on_request_end + + def request_start(self) -> None: + self.events.request_start() + + def _on_request_start(self) -> None: + self._start_time = time.perf_counter() + self._end_time = None + self._service_time = None + + def request_end(self) -> None: + self.events.request_end() + + def _on_request_end(self) -> None: + self._end_time = time.perf_counter() + if self._start_time is not None: + self._service_time = self._end_time - self._start_time diff --git a/opensearchpy/metrics/metrics_none.py b/opensearchpy/metrics/metrics_none.py new file mode 100644 index 00000000..bbc7b335 --- /dev/null +++ b/opensearchpy/metrics/metrics_none.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from typing import Optional + +from opensearchpy.metrics.metrics import Metrics + + +class MetricsNone(Metrics): + """ + Default metrics class. It sets the start time, end time, and service time to None. + """ + + @property + def start_time(self) -> Optional[float]: + return self._start_time + + @property + def end_time(self) -> Optional[float]: + return self._end_time + + @property + def service_time(self) -> Optional[float]: + return self._service_time + + def __init__(self) -> None: + self._start_time: Optional[float] = None + self._end_time: Optional[float] = None + self._service_time: Optional[float] = None + + # request_start and request_end are placeholders, + # not implementing actual metrics collection in this subclass. + + def request_start(self) -> None: + self._start_time = None + self._end_time = None + self._service_time = None + + def request_end(self) -> None: + self._end_time = None + self._service_time = None diff --git a/opensearchpy/transport.py b/opensearchpy/transport.py index f582a3be..5c7e6297 100644 --- a/opensearchpy/transport.py +++ b/opensearchpy/transport.py @@ -29,6 +29,8 @@ from itertools import chain from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union +from opensearchpy.metrics import Metrics, MetricsNone + from .connection import Connection, Urllib3HttpConnection from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool from .exceptions import ( @@ -91,6 +93,7 @@ class Transport(object): last_sniff: float sniff_timeout: Optional[float] host_info_callback: Any + metrics: Metrics def __init__( self, @@ -112,6 +115,7 @@ def __init__( retry_on_status: Collection[int] = (502, 503, 504), retry_on_timeout: bool = False, send_get_body_as: str = "GET", + metrics: Metrics = MetricsNone(), **kwargs: Any ) -> None: """ @@ -148,11 +152,15 @@ def __init__( will be serialized and passed as a query parameter `source`. :arg pool_maxsize: Maximum connection pool size used by pool-manager For custom connection-pooling on current session + :arg metrics: metrics is an instance of a subclass of the + :class:`~opensearchpy.Metrics` class, used for collecting + and reporting metrics related to the client's operations; Any extra keyword arguments will be passed to the `connection_class` when creating and instance unless overridden by that connection's options provided as part of the hosts parameter. """ + self.metrics = metrics if connection_class is None: connection_class = self.DEFAULT_CONNECTION_CLASS @@ -242,7 +250,7 @@ def _create_connection(host: Any) -> Any: kwargs.update(host) if self.pool_maxsize and isinstance(self.pool_maxsize, int): kwargs["pool_maxsize"] = self.pool_maxsize - return self.connection_class(**kwargs) + return self.connection_class(metrics=self.metrics, **kwargs) connections = list(zip(map(_create_connection, hosts), hosts)) if len(connections) == 1: diff --git a/setup.py b/setup.py index ab85bc05..8198b364 100644 --- a/setup.py +++ b/setup.py @@ -59,6 +59,7 @@ "six", "python-dateutil", "certifi>=2022.12.07", + "Events", ] tests_require = [ "requests>=2.0.0, <3.0.0", diff --git a/test_opensearchpy/test_server/test_metrics.py b/test_opensearchpy/test_server/test_metrics.py new file mode 100644 index 00000000..189fc739 --- /dev/null +++ b/test_opensearchpy/test_server/test_metrics.py @@ -0,0 +1,117 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +# Modifications Copyright OpenSearch Contributors. See +# GitHub history for details. + +from __future__ import unicode_literals + +import time + +import pytest + +from opensearchpy import RequestsHttpConnection +from opensearchpy.metrics.metrics_events import MetricsEvents +from opensearchpy.metrics.metrics_none import MetricsNone + +from . import OpenSearchTestCase, get_client + + +class TestMetrics(OpenSearchTestCase): + def tearDown(self) -> None: + client = get_client() + client.indices.delete(index=["test-index"], ignore_unavailable=True) + + def test_metrics_default_behavior(self) -> None: + # Test default behavior when metrics is not passed to the client + client = get_client() + index_name = "test-index" + index_body = {"settings": {"index": {"number_of_shards": 4}}} + try: + client.indices.create(index=index_name, body=index_body) + except Exception as e: + assert False, f"Error creating index: {e}" + + def test_metrics_raises_error_when_value_is_none(self) -> None: + # Test behavior when metrics is given None. + metrics = None + with pytest.raises(AttributeError): + get_client(metrics=metrics) + + def test_metrics_none_behavior(self) -> None: + # Test behavior when metrics is an instance of MetricsNone + metrics = MetricsNone() + client = get_client(metrics=metrics) + index_name = "test-index" + index_body = {"settings": {"index": {"number_of_shards": 4}}} + client.indices.create(index=index_name, body=index_body) + assert metrics.service_time is None + + +class TestMetricsEvents(OpenSearchTestCase): + def tearDown(self) -> None: + client = get_client() + client.indices.delete(index=["test-index"], ignore_unavailable=True) + + def test_metrics_events_with_urllib3_connection(self) -> None: + # Test MetricsEvents behavior with urllib3 connection + metrics = MetricsEvents() + client = get_client(metrics=metrics) + + # Calculate service time for create index operation + index_name = "test-index" + index_body = {"settings": {"index": {"number_of_shards": 4}}} + start1 = time.perf_counter() + client.indices.create(index=index_name, body=index_body) + duration1 = time.perf_counter() - start1 + create_index_service_time = metrics.service_time + assert ( + isinstance(create_index_service_time, float) + and create_index_service_time < duration1 + ) + + # Calculate service time for adding document operation + document = {"title": "Moneyball", "director": "Bennett Miller", "year": "2011"} + id = "1" + start2 = time.perf_counter() + client.index(index=index_name, body=document, id=id, refresh=True) + duration2 = time.perf_counter() - start2 + assert ( + isinstance(metrics.service_time, float) + and metrics.service_time < duration2 + and metrics.service_time != create_index_service_time + # Above check is to confirm service time differs from the previous API call. + ) + + def test_metrics_events_with_requests_http_connection(self) -> None: + # Test MetricsEvents behavior with requests HTTP connection + metrics = MetricsEvents() + client = get_client(metrics=metrics, connection_class=RequestsHttpConnection) + + # Calculate service time for create index operation + index_name = "test-index" + index_body = {"settings": {"index": {"number_of_shards": 4}}} + start1 = time.perf_counter() + client.indices.create(index_name, body=index_body) + duration1 = time.perf_counter() - start1 + create_index_service_time = metrics.service_time + assert ( + isinstance(create_index_service_time, float) + and create_index_service_time < duration1 + ) + + # Calculate service time for adding document operation + document = {"title": "Moneyball", "director": "Bennett Miller", "year": "2011"} + id = "1" + start2 = time.perf_counter() + client.index(index=index_name, body=document, id=id, refresh=True) + duration2 = time.perf_counter() - start2 + assert ( + isinstance(metrics.service_time, float) + and metrics.service_time < duration2 + and metrics.service_time != create_index_service_time + # Above check is to confirm service time differs from the previous API call. + )