Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced service time metrics to OpenSearch-Py client. #716

Merged
merged 5 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
## [Unreleased]
### Added
- Added support for Python 3.12 ([#717](https://github.com/opensearch-project/opensearch-py/pull/717))
- Added service time metrics ([#716](https://github.com/opensearch-project/opensearch-py/pull/716))
### Changed
### Deprecated
### Removed
Expand Down
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sphinx_rtd_theme
jinja2
pytz
deepmerge
Events
setuptools

# No wheels for Python 3.10 yet!
Expand Down
14 changes: 14 additions & 0 deletions docs/source/api-ref/metrics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# metrics

```{eval-rst}
.. autoclass:: opensearchpy.Metrics
```

```{eval-rst}
.. autoclass:: opensearchpy.MetricsEvents
```

```{eval-rst}
.. autoclass:: opensearchpy.MetricsNone
```

4 changes: 4 additions & 0 deletions opensearchpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
from .helpers.update_by_query import UpdateByQuery
from .helpers.utils import AttrDict, AttrList, DslBase
from .helpers.wrappers import Range
from .metrics import Metrics, MetricsEvents, MetricsNone
from .serializer import JSONSerializer
from .transport import Transport

Expand Down Expand Up @@ -240,6 +241,9 @@
"token_filter",
"tokenizer",
"__versionstr__",
"Metrics",
"MetricsEvents",
"MetricsNone",
]

try:
Expand Down
10 changes: 10 additions & 0 deletions opensearchpy/connection/http_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
except ImportError:
REQUESTS_AVAILABLE = False

from opensearchpy.metrics import Metrics, MetricsNone

from ..compat import reraise_exceptions, string_types, urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -69,6 +71,9 @@ class RequestsHttpConnection(Connection):
For tracing all requests made by this transport.
:arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;
"""

def __init__(
Expand All @@ -86,8 +91,10 @@ def __init__(
http_compress: Any = None,
opaque_id: Any = None,
pool_maxsize: Any = None,
metrics: Metrics = MetricsNone(),
**kwargs: Any
) -> None:
self.metrics = metrics
if not REQUESTS_AVAILABLE:
raise ImproperlyConfigured(
"Please install requests to use RequestsHttpConnection."
Expand Down Expand Up @@ -188,6 +195,7 @@ def perform_request( # type: ignore
}
send_kwargs.update(settings)
try:
self.metrics.request_start()
response = self.session.send(prepared_request, **send_kwargs)
duration = time.time() - start
raw_data = response.content.decode("utf-8", "surrogatepass")
Expand All @@ -207,6 +215,8 @@ def perform_request( # type: ignore
if isinstance(e, requests.Timeout):
raise ConnectionTimeout("TIMEOUT", str(e), e)
raise ConnectionError("N/A", str(e), e)
finally:
self.metrics.request_end()

# raise warnings if any from the 'Warnings' header.
warnings_headers = (
Expand Down
11 changes: 11 additions & 0 deletions opensearchpy/connection/http_urllib3.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
from urllib3.exceptions import SSLError as UrllibSSLError
from urllib3.util.retry import Retry

from opensearchpy.metrics import Metrics, MetricsNone

from ..compat import reraise_exceptions, urlencode
from ..exceptions import (
ConnectionError,
Expand Down Expand Up @@ -94,6 +96,9 @@ class Urllib3HttpConnection(Connection):
:arg http_compress: Use gzip compression
:arg opaque_id: Send this value in the 'X-Opaque-Id' HTTP header
For tracing all requests made by this transport.
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;
"""

def __init__(
Expand All @@ -115,8 +120,10 @@ def __init__(
ssl_context: Any = None,
http_compress: Any = None,
opaque_id: Any = None,
metrics: Metrics = MetricsNone(),
**kwargs: Any
) -> None:
self.metrics = metrics
saimedhi marked this conversation as resolved.
Show resolved Hide resolved
# Initialize headers before calling super().__init__().
self.headers = urllib3.make_headers(keep_alive=True)

Expand Down Expand Up @@ -268,6 +275,8 @@ def perform_request(
if isinstance(self.http_auth, Callable): # type: ignore
request_headers.update(self.http_auth(method, full_url, body))

self.metrics.request_start()

response = self.pool.urlopen(
method, url, body, retries=Retry(False), headers=request_headers, **kw
)
Expand All @@ -284,6 +293,8 @@ def perform_request(
if isinstance(e, ReadTimeoutError):
raise ConnectionTimeout("TIMEOUT", str(e), e)
raise ConnectionError("N/A", str(e), e)
finally:
self.metrics.request_end()

# raise warnings if any from the 'Warnings' header.
warning_headers = response.headers.get_all("warning", ())
Expand Down
18 changes: 18 additions & 0 deletions opensearchpy/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from .metrics import Metrics
from .metrics_events import MetricsEvents
from .metrics_none import MetricsNone

__all__ = [
"Metrics",
"MetricsEvents",
"MetricsNone",
]
42 changes: 42 additions & 0 deletions opensearchpy/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from abc import ABC, abstractmethod
from typing import Optional


class Metrics(ABC):
"""
The Metrics class defines methods and properties for managing
request metrics, including start time, end time, and service time,
serving as a blueprint for concrete implementations.
"""

@abstractmethod
def request_start(self) -> None:
pass

@abstractmethod
def request_end(self) -> None:
pass

@property
@abstractmethod
def start_time(self) -> Optional[float]:
pass

@property
@abstractmethod
def end_time(self) -> Optional[float]:
pass

@property
@abstractmethod
def service_time(self) -> Optional[float]:
pass
61 changes: 61 additions & 0 deletions opensearchpy/metrics/metrics_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

import time
from typing import Optional

from events import Events

from opensearchpy.metrics.metrics import Metrics


class MetricsEvents(Metrics):
"""
The MetricsEvents class implements the Metrics abstract base class
and tracks metrics such as start time, end time, and service time
during request processing.
"""

@property
def start_time(self) -> Optional[float]:
return self._start_time

@property
def end_time(self) -> Optional[float]:
return self._end_time

@property
def service_time(self) -> Optional[float]:
return self._service_time

def __init__(self) -> None:
self.events = Events()
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._service_time: Optional[float] = None

# Subscribe to the request_start and request_end events
self.events.request_start += self._on_request_start
self.events.request_end += self._on_request_end

def request_start(self) -> None:
self.events.request_start()

def _on_request_start(self) -> None:
self._start_time = time.perf_counter()
self._end_time = None
self._service_time = None

def request_end(self) -> None:
self.events.request_end()

def _on_request_end(self) -> None:
self._end_time = time.perf_counter()
if self._start_time is not None:
self._service_time = self._end_time - self._start_time
47 changes: 47 additions & 0 deletions opensearchpy/metrics/metrics_none.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

from typing import Optional

from opensearchpy.metrics.metrics import Metrics


class MetricsNone(Metrics):
"""
Default metrics class. It sets the start time, end time, and service time to None.
"""

@property
def start_time(self) -> Optional[float]:
return self._start_time

@property
def end_time(self) -> Optional[float]:
return self._end_time

@property
def service_time(self) -> Optional[float]:
return self._service_time

def __init__(self) -> None:
self._start_time: Optional[float] = None
self._end_time: Optional[float] = None
self._service_time: Optional[float] = None

# request_start and request_end are placeholders,
# not implementing actual metrics collection in this subclass.

def request_start(self) -> None:
self._start_time = None
self._end_time = None
self._service_time = None

def request_end(self) -> None:
self._end_time = None
self._service_time = None
10 changes: 9 additions & 1 deletion opensearchpy/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from itertools import chain
from typing import Any, Callable, Collection, Dict, List, Mapping, Optional, Type, Union

from opensearchpy.metrics import Metrics, MetricsNone

from .connection import Connection, Urllib3HttpConnection
from .connection_pool import ConnectionPool, DummyConnectionPool, EmptyConnectionPool
from .exceptions import (
Expand Down Expand Up @@ -91,6 +93,7 @@ class Transport(object):
last_sniff: float
sniff_timeout: Optional[float]
host_info_callback: Any
metrics: Metrics

def __init__(
self,
Expand All @@ -112,6 +115,7 @@ def __init__(
retry_on_status: Collection[int] = (502, 503, 504),
retry_on_timeout: bool = False,
send_get_body_as: str = "GET",
metrics: Metrics = MetricsNone(),
**kwargs: Any
) -> None:
"""
Expand Down Expand Up @@ -148,11 +152,15 @@ def __init__(
will be serialized and passed as a query parameter `source`.
:arg pool_maxsize: Maximum connection pool size used by pool-manager
For custom connection-pooling on current session
:arg metrics: metrics is an instance of a subclass of the
:class:`~opensearchpy.Metrics` class, used for collecting
and reporting metrics related to the client's operations;

Any extra keyword arguments will be passed to the `connection_class`
when creating and instance unless overridden by that connection's
options provided as part of the hosts parameter.
"""
self.metrics = metrics
if connection_class is None:
connection_class = self.DEFAULT_CONNECTION_CLASS

Expand Down Expand Up @@ -242,7 +250,7 @@ def _create_connection(host: Any) -> Any:
kwargs.update(host)
if self.pool_maxsize and isinstance(self.pool_maxsize, int):
kwargs["pool_maxsize"] = self.pool_maxsize
return self.connection_class(**kwargs)
return self.connection_class(metrics=self.metrics, **kwargs)

connections = list(zip(map(_create_connection, hosts), hosts))
if len(connections) == 1:
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"six",
"python-dateutil",
"certifi>=2022.12.07",
"Events",
]
tests_require = [
"requests>=2.0.0, <3.0.0",
Expand Down
Loading
Loading