Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement shutdown procedure for OTLP grpc exporters #3138

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@

"""OTLP Exporter"""

from logging import getLogger
import threading
from abc import ABC, abstractmethod
from collections.abc import Sequence
from logging import getLogger
from os import environ
from time import sleep
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union
from typing import Sequence as TypingSequence
from typing import TypeVar
from urllib.parse import urlparse
from opentelemetry.sdk.trace import ReadableSpan

import backoff
from google.rpc.error_details_pb2 import RetryInfo
Expand All @@ -37,6 +37,9 @@
ssl_channel_credentials,
)

from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)
from opentelemetry.proto.common.v1.common_pb2 import (
AnyValue,
ArrayValue,
Expand All @@ -51,12 +54,10 @@
OTEL_EXPORTER_OTLP_INSECURE,
OTEL_EXPORTER_OTLP_TIMEOUT,
)
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.metrics.export import MetricsData
from opentelemetry.sdk.resources import Resource as SDKResource
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.util.re import parse_env_headers
from opentelemetry.exporter.otlp.proto.grpc import (
_OTLP_GRPC_HEADERS,
)

logger = getLogger(__name__)
SDKDataT = TypeVar("SDKDataT")
Expand Down Expand Up @@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]:


def _translate_value(value: Any) -> KeyValue:

if isinstance(value, bool):
any_value = AnyValue(bool_value=value)

Expand Down Expand Up @@ -135,7 +135,6 @@ def get_resource_data(
resource_class: Callable[..., TypingResourceT],
name: str,
) -> List[TypingResourceT]:

resource_data = []

for (
Expand Down Expand Up @@ -282,6 +281,9 @@ def __init__(
secure_channel(endpoint, credentials, compression=compression)
)

self._export_lock = threading.Lock()
self._shutdown = False

@abstractmethod
def _translate_data(
self, data: TypingSequence[SDKDataT]
Expand All @@ -302,6 +304,11 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]:
def _export(
self, data: Union[TypingSequence[ReadableSpan], MetricsData]
) -> ExportResultT:
# After the call to shutdown, subsequent calls to Export are
# not allowed and should return a Failure result.
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring batch")
return self._result.FAILURE

# FIXME remove this check if the export type for traces
# gets updated to a class that represents the proto
Expand All @@ -317,69 +324,75 @@ def _export(
# exponentially. Once delay is greater than max_value, the yielded
# value will remain constant.
for delay in _expo(max_value=max_value):

if delay == max_value:
if delay == max_value or self._shutdown:
return self._result.FAILURE

try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)
with self._export_lock:
try:
self._client.Export(
request=self._translate_data(data),
metadata=self._headers,
timeout=self._timeout,
)

return self._result.SUCCESS
return self._result.SUCCESS

except RpcError as error:
except RpcError as error:

if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:
if error.code() in [
StatusCode.CANCELLED,
StatusCode.DEADLINE_EXCEEDED,
StatusCode.RESOURCE_EXHAUSTED,
StatusCode.ABORTED,
StatusCode.OUT_OF_RANGE,
StatusCode.UNAVAILABLE,
StatusCode.DATA_LOSS,
]:

retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
retry_info_bin = dict(error.trailing_metadata()).get(
"google.rpc.retryinfo-bin"
)
if retry_info_bin is not None:
retry_info = RetryInfo()
retry_info.ParseFromString(retry_info_bin)
delay = (
retry_info.retry_delay.seconds
+ retry_info.retry_delay.nanos / 1.0e9
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
)

logger.warning(
(
"Transient error %s encountered while exporting "
"%s, retrying in %ss."
),
error.code(),
self._exporting,
delay,
)
sleep(delay)
continue
else:
logger.error(
"Failed to export %s, error code: %s",
self._exporting,
error.code(),
)

if error.code() == StatusCode.OK:
return self._result.SUCCESS
if error.code() == StatusCode.OK:
return self._result.SUCCESS

return self._result.FAILURE
return self._result.FAILURE

return self._result.FAILURE

def shutdown(self) -> None:
pass
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
if self._shutdown:
logger.warning("Exporter already shutdown, ignoring call")
return
# wait for the last export if any
self._export_lock.acquire(timeout=timeout_millis)
girishc13 marked this conversation as resolved.
Show resolved Hide resolved
self._shutdown = True
self._export_lock.release()

@property
@abstractmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def _split_metrics_data(
yield MetricsData(resource_metrics=split_resource_metrics)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis)

@property
def _exporting(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def _translate_data(
def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
return self._export(spans)

def shutdown(self) -> None:
OTLPExporterMixin.shutdown(self)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@girishc13 Can you please help me understand why you are calling shutdown on mixin class directly instead of using super()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember the exact details but the shutdown method is implemented by both the OTLPExporterMixin and the SpanExporter interfaces. The exporter.shutdown is handled by different logic and this pr was targeting the backend client that needs to be shutdown. You need to trace the calls for shutdown from the usage of the OTLPSpanExporter.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the issue maybe due to mixin inheritance being applied incorrectly, currently we have

class OTLPSpanExporter(SpanExporter, OTLPExporterMixin[...]):

but usually in python mixin should come before the base class, e.g.

class OTLPSpanExporter(OTLPExporterMixin[...], SpanExporter):

this way, super().shutdown() call will correctly use shutdown method from mixin


def force_flush(self, timeout_millis: int = 30000) -> bool:
return True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from unittest import TestCase
from unittest.mock import Mock, patch

from google.protobuf.duration_pb2 import Duration
from google.rpc.error_details_pb2 import RetryInfo
from grpc import Compression

from opentelemetry.exporter.otlp.proto.grpc.exporter import (
Expand Down Expand Up @@ -58,7 +60,6 @@ def test_environ_to_compression(self):

@patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo")
def test_export_warning(self, mock_expo):

mock_expo.configure_mock(**{"return_value": [0]})

rpc_error = RpcError()
Expand All @@ -69,7 +70,6 @@ def code(self):
rpc_error.code = MethodType(code, rpc_error)

class OTLPMockExporter(OTLPExporterMixin):

_result = Mock()
_stub = Mock(
**{"return_value": Mock(**{"Export.side_effect": rpc_error})}
Expand Down Expand Up @@ -113,3 +113,88 @@ def trailing_metadata(self):
"while exporting mock, retrying in 0s."
),
)

def test_shutdown(self):
result_mock = Mock()

class OTLPMockExporter(OTLPExporterMixin):
_result = result_mock
_stub = Mock(**{"return_value": Mock()})

def _translate_data(
self, data: Sequence[SDKDataT]
) -> ExportServiceRequestT:
pass

@property
def _exporting(self) -> str:
return "mock"

otlp_mock_exporter = OTLPMockExporter()

with self.assertLogs(level=WARNING) as warning:
# pylint: disable=protected-access
self.assertEqual(
otlp_mock_exporter._export(data={}), result_mock.SUCCESS
)
otlp_mock_exporter.shutdown()
self.assertEqual(
otlp_mock_exporter._export(data={}), result_mock.FAILURE
)
self.assertEqual(
warning.records[0].message,
"Exporter already shutdown, ignoring batch",
)

def test_shutdown_wait_last_export(self):
import threading
import time

result_mock = Mock()
rpc_error = RpcError()

def code(self):
return StatusCode.UNAVAILABLE

def trailing_metadata(self):
return {
"google.rpc.retryinfo-bin": RetryInfo(
retry_delay=Duration(seconds=1)
).SerializeToString()
}

rpc_error.code = MethodType(code, rpc_error)
rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error)

class OTLPMockExporter(OTLPExporterMixin):
_result = result_mock
_stub = Mock(
**{"return_value": Mock(**{"Export.side_effect": rpc_error})}
)

def _translate_data(
self, data: Sequence[SDKDataT]
) -> ExportServiceRequestT:
pass

@property
def _exporting(self) -> str:
return "mock"

otlp_mock_exporter = OTLPMockExporter()

export_thread = threading.Thread(
target=otlp_mock_exporter._export, args=({},)
)
export_thread.start()
try:
self.assertTrue(otlp_mock_exporter._export_lock.locked())
# delay is 1 second while the default shutdown timeout is 30_000 milliseconds
start_time = time.time()
otlp_mock_exporter.shutdown()
now = time.time()
self.assertGreaterEqual(now, (start_time + 30 / 1000))
self.assertTrue(otlp_mock_exporter._shutdown)
self.assertFalse(otlp_mock_exporter._export_lock.locked())
finally:
export_thread.join()
Loading