From 3a9137833aeb5316b1033150886ac9809d5ddea5 Mon Sep 17 00:00:00 2001 From: Girish Chandrashekar Date: Mon, 23 Jan 2023 14:32:36 +0100 Subject: [PATCH 1/4] Implement shutdown procedure for OTLP grpc exporters - Add `_shutdown` variable for checking if the exporter has been shutdown. - Prevent export if the `_shutdown` flag has been set. Log a warning message is exporter has been shutdown. - Use thread lock to synchronize the last export call before shutdown timeout. The `shutdown` method will wait until the `timeout_millis` if there is an ongoing export. If there is no ongiong export, set the `_shutdown` flag to prevent further exports and return. - Add unit tests for the `OTLPExporterMixIn` and the sub classes for traces and metrics. --- .../exporter/otlp/proto/grpc/exporter.py | 171 ++++++++++-------- .../proto/grpc/metric_exporter/__init__.py | 2 +- .../proto/grpc/trace_exporter/__init__.py | 3 + .../tests/test_otlp_exporter_mixin.py | 96 +++++++++- .../tests/test_otlp_metrics_exporter.py | 56 +++++- .../tests/test_otlp_trace_exporter.py | 62 +++++-- 6 files changed, 283 insertions(+), 107 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index c068f87d783..4483e15986d 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -14,16 +14,16 @@ """OTLP Exporter""" -from logging import getLogger +import threading from abc import ABC, abstractmethod from collections.abc import Sequence +from logging import getLogger from os import environ from time import sleep from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Union from typing import Sequence as TypingSequence from typing import TypeVar from urllib.parse import urlparse -from opentelemetry.sdk.trace import ReadableSpan import backoff from google.rpc.error_details_pb2 import RetryInfo @@ -37,6 +37,9 @@ ssl_channel_credentials, ) +from opentelemetry.exporter.otlp.proto.grpc import ( + _OTLP_GRPC_HEADERS, +) from opentelemetry.proto.common.v1.common_pb2 import ( AnyValue, ArrayValue, @@ -51,12 +54,10 @@ OTEL_EXPORTER_OTLP_INSECURE, OTEL_EXPORTER_OTLP_TIMEOUT, ) -from opentelemetry.sdk.resources import Resource as SDKResource from opentelemetry.sdk.metrics.export import MetricsData +from opentelemetry.sdk.resources import Resource as SDKResource +from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.util.re import parse_env_headers -from opentelemetry.exporter.otlp.proto.grpc import ( - _OTLP_GRPC_HEADERS, -) logger = getLogger(__name__) SDKDataT = TypeVar("SDKDataT") @@ -92,7 +93,6 @@ def environ_to_compression(environ_key: str) -> Optional[Compression]: def _translate_value(value: Any) -> KeyValue: - if isinstance(value, bool): any_value = AnyValue(bool_value=value) @@ -131,16 +131,15 @@ def _translate_key_values(key: str, value: Any) -> KeyValue: def get_resource_data( - sdk_resource_scope_data: Dict[SDKResource, ResourceDataT], - resource_class: Callable[..., TypingResourceT], - name: str, + sdk_resource_scope_data: Dict[SDKResource, ResourceDataT], + resource_class: Callable[..., TypingResourceT], + name: str, ) -> List[TypingResourceT]: - resource_data = [] for ( - sdk_resource, - scope_data, + sdk_resource, + scope_data, ) in sdk_resource_scope_data.items(): collector_resource = Resource() @@ -215,15 +214,15 @@ class OTLPExporterMixin( """ def __init__( - self, - endpoint: Optional[str] = None, - insecure: Optional[bool] = None, - credentials: Optional[ChannelCredentials] = None, - headers: Optional[ - Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] - ] = None, - timeout: Optional[int] = None, - compression: Optional[Compression] = None, + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[ + Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] + ] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, ): super().__init__() @@ -265,10 +264,10 @@ def __init__( self._collector_kwargs = None compression = ( - environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) - if compression is None - else compression - ) or Compression.NoCompression + environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) + if compression is None + else compression + ) or Compression.NoCompression if insecure: self._client = self._stub( @@ -282,9 +281,12 @@ def __init__( secure_channel(endpoint, credentials, compression=compression) ) + self._export_lock = threading.Lock() + self._shutdown = False + @abstractmethod def _translate_data( - self, data: TypingSequence[SDKDataT] + self, data: TypingSequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -300,8 +302,13 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: return output def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, data: Union[TypingSequence[ReadableSpan], MetricsData] ) -> ExportResultT: + # After the call to shutdown, subsequent calls to Export are + # not allowed and should return a Failure result. + if self._shutdown: + logger.warning("Exporter already shutdown, ignoring batch") + return self._result.FAILURE # FIXME remove this check if the export type for traces # gets updated to a class that represents the proto @@ -317,69 +324,75 @@ def _export( # exponentially. Once delay is greater than max_value, the yielded # value will remain constant. for delay in _expo(max_value=max_value): - - if delay == max_value: + if delay == max_value or self._shutdown: return self._result.FAILURE - try: - self._client.Export( - request=self._translate_data(data), - metadata=self._headers, - timeout=self._timeout, - ) + with self._export_lock: + try: + self._client.Export( + request=self._translate_data(data), + metadata=self._headers, + timeout=self._timeout, + ) - return self._result.SUCCESS + return self._result.SUCCESS - except RpcError as error: + except RpcError as error: - if error.code() in [ - StatusCode.CANCELLED, - StatusCode.DEADLINE_EXCEEDED, - StatusCode.RESOURCE_EXHAUSTED, - StatusCode.ABORTED, - StatusCode.OUT_OF_RANGE, - StatusCode.UNAVAILABLE, - StatusCode.DATA_LOSS, - ]: + if error.code() in [ + StatusCode.CANCELLED, + StatusCode.DEADLINE_EXCEEDED, + StatusCode.RESOURCE_EXHAUSTED, + StatusCode.ABORTED, + StatusCode.OUT_OF_RANGE, + StatusCode.UNAVAILABLE, + StatusCode.DATA_LOSS, + ]: - retry_info_bin = dict(error.trailing_metadata()).get( - "google.rpc.retryinfo-bin" - ) - if retry_info_bin is not None: - retry_info = RetryInfo() - retry_info.ParseFromString(retry_info_bin) - delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 + retry_info_bin = dict(error.trailing_metadata()).get( + "google.rpc.retryinfo-bin" + ) + if retry_info_bin is not None: + retry_info = RetryInfo() + retry_info.ParseFromString(retry_info_bin) + delay = ( + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 + ) + + logger.warning( + ( + "Transient error %s encountered while exporting " + "%s, retrying in %ss." + ), + error.code(), + self._exporting, + delay, + ) + sleep(delay) + continue + else: + logger.error( + "Failed to export %s, error code: %s", + self._exporting, + error.code(), ) - logger.warning( - ( - "Transient error %s encountered while exporting " - "%s, retrying in %ss." - ), - error.code(), - self._exporting, - delay, - ) - sleep(delay) - continue - else: - logger.error( - "Failed to export %s, error code: %s", - self._exporting, - error.code(), - ) - - if error.code() == StatusCode.OK: - return self._result.SUCCESS + if error.code() == StatusCode.OK: + return self._result.SUCCESS - return self._result.FAILURE + return self._result.FAILURE return self._result.FAILURE - def shutdown(self) -> None: - pass + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + if self._shutdown: + logger.warning("Exporter already shutdown, ignoring call") + return + # wait for the last export if any + self._export_lock.acquire(timeout=timeout_millis) + self._shutdown = True + self._export_lock.release() @property @abstractmethod diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py index 3c6f59c35f3..08f7860cbc7 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/metric_exporter/__init__.py @@ -387,7 +387,7 @@ def _split_metrics_data( yield MetricsData(resource_metrics=split_resource_metrics) def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: - pass + OTLPExporterMixin.shutdown(self, timeout_millis=timeout_millis) @property def _exporting(self) -> str: diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py index 555c9031560..0203c00ec36 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/trace_exporter/__init__.py @@ -290,6 +290,9 @@ def _translate_data( def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: return self._export(spans) + def shutdown(self) -> None: + OTLPExporterMixin.shutdown(self) + def force_flush(self, timeout_millis: int = 30000) -> bool: return True diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 81a874af705..8f63f93d582 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -18,6 +18,8 @@ from unittest import TestCase from unittest.mock import Mock, patch +from google.protobuf.duration_pb2 import Duration +from google.rpc.error_details_pb2 import RetryInfo from grpc import Compression from opentelemetry.exporter.otlp.proto.grpc.exporter import ( @@ -34,12 +36,12 @@ class TestOTLPExporterMixin(TestCase): def test_environ_to_compression(self): with patch.dict( - "os.environ", - { - "test_gzip": "gzip", - "test_gzip_caseinsensitive_with_whitespace": " GzIp ", - "test_invalid": "some invalid compression", - }, + "os.environ", + { + "test_gzip": "gzip", + "test_gzip_caseinsensitive_with_whitespace": " GzIp ", + "test_invalid": "some invalid compression", + }, ): self.assertEqual( environ_to_compression("test_gzip"), Compression.Gzip @@ -58,7 +60,6 @@ def test_environ_to_compression(self): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter._expo") def test_export_warning(self, mock_expo): - mock_expo.configure_mock(**{"return_value": [0]}) rpc_error = RpcError() @@ -69,14 +70,13 @@ def code(self): rpc_error.code = MethodType(code, rpc_error) class OTLPMockExporter(OTLPExporterMixin): - _result = Mock() _stub = Mock( **{"return_value": Mock(**{"Export.side_effect": rpc_error})} ) def _translate_data( - self, data: Sequence[SDKDataT] + self, data: Sequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -113,3 +113,81 @@ def trailing_metadata(self): "while exporting mock, retrying in 0s." ), ) + + def test_shutdown(self): + result_mock = Mock() + + class OTLPMockExporter(OTLPExporterMixin): + _result = result_mock + _stub = Mock( + **{"return_value": Mock()} + ) + + def _translate_data( + self, data: Sequence[SDKDataT] + ) -> ExportServiceRequestT: + pass + + @property + def _exporting(self) -> str: + return "mock" + + otlp_mock_exporter = OTLPMockExporter() + + with self.assertLogs(level=WARNING) as warning: + # pylint: disable=protected-access + self.assertEqual(otlp_mock_exporter._export(data={}), result_mock.SUCCESS) + otlp_mock_exporter.shutdown() + self.assertEqual(otlp_mock_exporter._export(data={}), result_mock.FAILURE) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + import threading + import time + result_mock = Mock() + rpc_error = RpcError() + + def code(self): + return StatusCode.UNAVAILABLE + + def trailing_metadata(self): + return { + "google.rpc.retryinfo-bin": RetryInfo(retry_delay=Duration(seconds=1)).SerializeToString() + } + + rpc_error.code = MethodType(code, rpc_error) + rpc_error.trailing_metadata = MethodType(trailing_metadata, rpc_error) + + class OTLPMockExporter(OTLPExporterMixin): + _result = result_mock + _stub = Mock( + **{"return_value": Mock(**{"Export.side_effect": rpc_error})} + ) + + def _translate_data( + self, data: Sequence[SDKDataT] + ) -> ExportServiceRequestT: + pass + + @property + def _exporting(self) -> str: + return "mock" + + otlp_mock_exporter = OTLPMockExporter() + + export_thread = threading.Thread(target=otlp_mock_exporter._export, args=({},)) + export_thread.start() + try: + self.assertTrue(otlp_mock_exporter._export_lock.locked()) + # delay is 1 second while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + otlp_mock_exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + self.assertTrue(otlp_mock_exporter._shutdown) + self.assertFalse(otlp_mock_exporter._export_lock.locked()) + finally: + export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index 6436661a985..c516f9825b1 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=too-many-lines from concurrent.futures import ThreadPoolExecutor +# pylint: disable=too-many-lines +from logging import WARNING from os.path import dirname from typing import List from unittest import TestCase @@ -22,6 +23,7 @@ from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server +from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter, @@ -77,7 +79,6 @@ from opentelemetry.sdk.util.instrumentation import ( InstrumentationScope as SDKInstrumentationScope, ) -from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum THIS_DIR = dirname(__file__) @@ -365,7 +366,7 @@ def test_preferred_temporality(self): { OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "collector:4317", OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE: THIS_DIR - + "/fixtures/test.cert", + + "/fixtures/test.cert", OTEL_EXPORTER_OTLP_METRICS_HEADERS: " key1=value1,KEY2 = value=2", OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: "10", OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: "gzip", @@ -396,7 +397,7 @@ def test_env_variables(self, mock_exporter_mixin): ) # pylint: disable=unused-argument def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub + self, mock_ssl_channel, mock_secure, mock_stub ): OTLPMetricExporter(insecure=False) self.assertTrue(mock_ssl_channel.called) @@ -526,7 +527,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel, mock_expo + self, mock_insecure_channel, mock_expo ): """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPMetricExporter(insecure=True) @@ -550,7 +551,7 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {}) def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel + self, mock_insecure_channel ): """No env or kwarg should be NoCompression""" OTLPMetricExporter(insecure=True) @@ -1357,9 +1358,50 @@ def test_insecure_https_endpoint(self, mock_secure_channel): OTLPMetricExporter(endpoint="https://ab.c:123", insecure=True) mock_secure_channel.assert_called() + def test_shutdown(self): + add_MetricsServiceServicer_to_server( + MetricsServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.SUCCESS, + ) + self.exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + self.exporter.export(self.metrics["sum_int"]), + MetricExportResult.FAILURE, + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + import threading + import time + + add_MetricsServiceServicer_to_server( + MetricsServiceServicerUNAVAILABLEDelay(), self.server + ) + + export_thread = threading.Thread(target=self.exporter.export, args=(self.metrics["sum_int"],)) + export_thread.start() + try: + self.assertTrue(self.exporter._export_lock.locked()) + # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + self.exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + self.assertTrue(self.exporter._shutdown) + self.assertFalse(self.exporter._export_lock.locked()) + finally: + export_thread.join() + def _resource_metrics( - index: int, scope_metrics: List[ScopeMetrics] + index: int, scope_metrics: List[ScopeMetrics] ) -> ResourceMetrics: return ResourceMetrics( resource=Resource( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 5c8f0407254..602d2f05ab2 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -15,12 +15,16 @@ import os from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor +from logging import WARNING from unittest import TestCase from unittest.mock import Mock, PropertyMock, patch from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server +from opentelemetry.test.spantestutil import ( + get_span_with_dropped_attributes_events_links, +) from opentelemetry.attributes import BoundedAttributes from opentelemetry.exporter.otlp.proto.grpc.exporter import ( @@ -68,9 +72,6 @@ SpanExportResult, ) from opentelemetry.sdk.util.instrumentation import InstrumentationScope -from opentelemetry.test.spantestutil import ( - get_span_with_dropped_attributes_events_links, -) THIS_DIR = os.path.dirname(__file__) @@ -228,7 +229,7 @@ def test_exporting(self): { OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "collector:4317", OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE: THIS_DIR - + "/fixtures/test.cert", + + "/fixtures/test.cert", OTEL_EXPORTER_OTLP_TRACES_HEADERS: " key1=value1,KEY2 = value=2", OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: "10", OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip", @@ -259,7 +260,7 @@ def test_env_variables(self, mock_exporter_mixin): ) # pylint: disable=unused-argument def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub + self, mock_ssl_channel, mock_secure, mock_stub ): OTLPSpanExporter(insecure=False) self.assertTrue(mock_ssl_channel.called) @@ -396,7 +397,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel + self, mock_insecure_channel ): """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPSpanExporter(insecure=True) @@ -418,7 +419,7 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {}) def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel + self, mock_insecure_channel ): """No env or kwarg should be NoCompression""" OTLPSpanExporter(insecure=True) @@ -433,7 +434,7 @@ def test_otlp_exporter_otlp_compression_unspecified( {OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip"}, ) def test_otlp_exporter_otlp_compression_precendence( - self, mock_insecure_channel + self, mock_insecure_channel ): """OTEL_EXPORTER_OTLP_TRACES_COMPRESSION as higher priority than OTEL_EXPORTER_OTLP_COMPRESSION @@ -797,9 +798,9 @@ def test_translate_spans_multi(self): ) def _check_translated_status( - self, - translated: ExportTraceServiceRequest, - code_expected: Status, + self, + translated: ExportTraceServiceRequest, + code_expected: Status, ): status = translated.resource_spans[0].scope_spans[0].spans[0].status @@ -929,6 +930,45 @@ def test_dropped_values(self): .dropped_attributes_count, ) + def test_shutdown(self): + add_TraceServiceServicer_to_server( + TraceServiceServicerSUCCESS(), self.server + ) + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.SUCCESS + ) + self.exporter.shutdown() + with self.assertLogs(level=WARNING) as warning: + self.assertEqual( + self.exporter.export([self.span]), SpanExportResult.FAILURE + ) + self.assertEqual( + warning.records[0].message, + "Exporter already shutdown, ignoring batch", + ) + + def test_shutdown_wait_last_export(self): + import threading + import time + + add_TraceServiceServicer_to_server( + TraceServiceServicerUNAVAILABLEDelay(), self.server + ) + + export_thread = threading.Thread(target=self.exporter.export, args=([self.span],)) + export_thread.start() + try: + self.assertTrue(self.exporter._export_lock.locked()) + # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds + start_time = time.time() + self.exporter.shutdown() + now = time.time() + self.assertGreaterEqual(now, (start_time + 30 / 1000)) + self.assertTrue(self.exporter._shutdown) + self.assertFalse(self.exporter._export_lock.locked()) + finally: + export_thread.join() + def _create_span_with_status(status: SDKStatus): span = _Span( From 8bca278cc9be38de60ce92c42b38560dd7f7867b Mon Sep 17 00:00:00 2001 From: Girish Chandrashekar Date: Mon, 23 Jan 2023 15:00:45 +0100 Subject: [PATCH 2/4] lint files --- .../exporter/otlp/proto/grpc/exporter.py | 44 +++++++++---------- .../tests/test_otlp_exporter_mixin.py | 39 +++++++++------- .../tests/test_otlp_metrics_exporter.py | 17 ++++--- .../tests/test_otlp_trace_exporter.py | 26 ++++++----- 4 files changed, 69 insertions(+), 57 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py index 4483e15986d..7a56120014c 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/exporter.py @@ -131,15 +131,15 @@ def _translate_key_values(key: str, value: Any) -> KeyValue: def get_resource_data( - sdk_resource_scope_data: Dict[SDKResource, ResourceDataT], - resource_class: Callable[..., TypingResourceT], - name: str, + sdk_resource_scope_data: Dict[SDKResource, ResourceDataT], + resource_class: Callable[..., TypingResourceT], + name: str, ) -> List[TypingResourceT]: resource_data = [] for ( - sdk_resource, - scope_data, + sdk_resource, + scope_data, ) in sdk_resource_scope_data.items(): collector_resource = Resource() @@ -214,15 +214,15 @@ class OTLPExporterMixin( """ def __init__( - self, - endpoint: Optional[str] = None, - insecure: Optional[bool] = None, - credentials: Optional[ChannelCredentials] = None, - headers: Optional[ - Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] - ] = None, - timeout: Optional[int] = None, - compression: Optional[Compression] = None, + self, + endpoint: Optional[str] = None, + insecure: Optional[bool] = None, + credentials: Optional[ChannelCredentials] = None, + headers: Optional[ + Union[TypingSequence[Tuple[str, str]], Dict[str, str], str] + ] = None, + timeout: Optional[int] = None, + compression: Optional[Compression] = None, ): super().__init__() @@ -264,10 +264,10 @@ def __init__( self._collector_kwargs = None compression = ( - environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) - if compression is None - else compression - ) or Compression.NoCompression + environ_to_compression(OTEL_EXPORTER_OTLP_COMPRESSION) + if compression is None + else compression + ) or Compression.NoCompression if insecure: self._client = self._stub( @@ -286,7 +286,7 @@ def __init__( @abstractmethod def _translate_data( - self, data: TypingSequence[SDKDataT] + self, data: TypingSequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -302,7 +302,7 @@ def _translate_attributes(self, attributes) -> TypingSequence[KeyValue]: return output def _export( - self, data: Union[TypingSequence[ReadableSpan], MetricsData] + self, data: Union[TypingSequence[ReadableSpan], MetricsData] ) -> ExportResultT: # After the call to shutdown, subsequent calls to Export are # not allowed and should return a Failure result. @@ -356,8 +356,8 @@ def _export( retry_info = RetryInfo() retry_info.ParseFromString(retry_info_bin) delay = ( - retry_info.retry_delay.seconds - + retry_info.retry_delay.nanos / 1.0e9 + retry_info.retry_delay.seconds + + retry_info.retry_delay.nanos / 1.0e9 ) logger.warning( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 8f63f93d582..748b255352a 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -36,12 +36,12 @@ class TestOTLPExporterMixin(TestCase): def test_environ_to_compression(self): with patch.dict( - "os.environ", - { - "test_gzip": "gzip", - "test_gzip_caseinsensitive_with_whitespace": " GzIp ", - "test_invalid": "some invalid compression", - }, + "os.environ", + { + "test_gzip": "gzip", + "test_gzip_caseinsensitive_with_whitespace": " GzIp ", + "test_invalid": "some invalid compression", + }, ): self.assertEqual( environ_to_compression("test_gzip"), Compression.Gzip @@ -76,7 +76,7 @@ class OTLPMockExporter(OTLPExporterMixin): ) def _translate_data( - self, data: Sequence[SDKDataT] + self, data: Sequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -119,12 +119,10 @@ def test_shutdown(self): class OTLPMockExporter(OTLPExporterMixin): _result = result_mock - _stub = Mock( - **{"return_value": Mock()} - ) + _stub = Mock(**{"return_value": Mock()}) def _translate_data( - self, data: Sequence[SDKDataT] + self, data: Sequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -136,9 +134,13 @@ def _exporting(self) -> str: with self.assertLogs(level=WARNING) as warning: # pylint: disable=protected-access - self.assertEqual(otlp_mock_exporter._export(data={}), result_mock.SUCCESS) + self.assertEqual( + otlp_mock_exporter._export(data={}), result_mock.SUCCESS + ) otlp_mock_exporter.shutdown() - self.assertEqual(otlp_mock_exporter._export(data={}), result_mock.FAILURE) + self.assertEqual( + otlp_mock_exporter._export(data={}), result_mock.FAILURE + ) self.assertEqual( warning.records[0].message, "Exporter already shutdown, ignoring batch", @@ -147,6 +149,7 @@ def _exporting(self) -> str: def test_shutdown_wait_last_export(self): import threading import time + result_mock = Mock() rpc_error = RpcError() @@ -155,7 +158,9 @@ def code(self): def trailing_metadata(self): return { - "google.rpc.retryinfo-bin": RetryInfo(retry_delay=Duration(seconds=1)).SerializeToString() + "google.rpc.retryinfo-bin": RetryInfo( + retry_delay=Duration(seconds=1) + ).SerializeToString() } rpc_error.code = MethodType(code, rpc_error) @@ -168,7 +173,7 @@ class OTLPMockExporter(OTLPExporterMixin): ) def _translate_data( - self, data: Sequence[SDKDataT] + self, data: Sequence[SDKDataT] ) -> ExportServiceRequestT: pass @@ -178,7 +183,9 @@ def _exporting(self) -> str: otlp_mock_exporter = OTLPMockExporter() - export_thread = threading.Thread(target=otlp_mock_exporter._export, args=({},)) + export_thread = threading.Thread( + target=otlp_mock_exporter._export, args=({},) + ) export_thread.start() try: self.assertTrue(otlp_mock_exporter._export_lock.locked()) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index c516f9825b1..ba7f7101653 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent.futures import ThreadPoolExecutor + # pylint: disable=too-many-lines from logging import WARNING from os.path import dirname @@ -23,7 +24,6 @@ from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server -from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( OTLPMetricExporter, @@ -79,6 +79,7 @@ from opentelemetry.sdk.util.instrumentation import ( InstrumentationScope as SDKInstrumentationScope, ) +from opentelemetry.test.metrictestutil import _generate_gauge, _generate_sum THIS_DIR = dirname(__file__) @@ -366,7 +367,7 @@ def test_preferred_temporality(self): { OTEL_EXPORTER_OTLP_METRICS_ENDPOINT: "collector:4317", OTEL_EXPORTER_OTLP_METRICS_CERTIFICATE: THIS_DIR - + "/fixtures/test.cert", + + "/fixtures/test.cert", OTEL_EXPORTER_OTLP_METRICS_HEADERS: " key1=value1,KEY2 = value=2", OTEL_EXPORTER_OTLP_METRICS_TIMEOUT: "10", OTEL_EXPORTER_OTLP_METRICS_COMPRESSION: "gzip", @@ -397,7 +398,7 @@ def test_env_variables(self, mock_exporter_mixin): ) # pylint: disable=unused-argument def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub + self, mock_ssl_channel, mock_secure, mock_stub ): OTLPMetricExporter(insecure=False) self.assertTrue(mock_ssl_channel.called) @@ -527,7 +528,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel, mock_expo + self, mock_insecure_channel, mock_expo ): """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPMetricExporter(insecure=True) @@ -551,7 +552,7 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {}) def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel + self, mock_insecure_channel ): """No env or kwarg should be NoCompression""" OTLPMetricExporter(insecure=True) @@ -1385,7 +1386,9 @@ def test_shutdown_wait_last_export(self): MetricsServiceServicerUNAVAILABLEDelay(), self.server ) - export_thread = threading.Thread(target=self.exporter.export, args=(self.metrics["sum_int"],)) + export_thread = threading.Thread( + target=self.exporter.export, args=(self.metrics["sum_int"],) + ) export_thread.start() try: self.assertTrue(self.exporter._export_lock.locked()) @@ -1401,7 +1404,7 @@ def test_shutdown_wait_last_export(self): def _resource_metrics( - index: int, scope_metrics: List[ScopeMetrics] + index: int, scope_metrics: List[ScopeMetrics] ) -> ResourceMetrics: return ResourceMetrics( resource=Resource( diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index 602d2f05ab2..dd03894a378 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -22,9 +22,6 @@ from google.protobuf.duration_pb2 import Duration from google.rpc.error_details_pb2 import RetryInfo from grpc import ChannelCredentials, Compression, StatusCode, server -from opentelemetry.test.spantestutil import ( - get_span_with_dropped_attributes_events_links, -) from opentelemetry.attributes import BoundedAttributes from opentelemetry.exporter.otlp.proto.grpc.exporter import ( @@ -72,6 +69,9 @@ SpanExportResult, ) from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.test.spantestutil import ( + get_span_with_dropped_attributes_events_links, +) THIS_DIR = os.path.dirname(__file__) @@ -229,7 +229,7 @@ def test_exporting(self): { OTEL_EXPORTER_OTLP_TRACES_ENDPOINT: "collector:4317", OTEL_EXPORTER_OTLP_TRACES_CERTIFICATE: THIS_DIR - + "/fixtures/test.cert", + + "/fixtures/test.cert", OTEL_EXPORTER_OTLP_TRACES_HEADERS: " key1=value1,KEY2 = value=2", OTEL_EXPORTER_OTLP_TRACES_TIMEOUT: "10", OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip", @@ -260,7 +260,7 @@ def test_env_variables(self, mock_exporter_mixin): ) # pylint: disable=unused-argument def test_no_credentials_error( - self, mock_ssl_channel, mock_secure, mock_stub + self, mock_ssl_channel, mock_secure, mock_stub ): OTLPSpanExporter(insecure=False) self.assertTrue(mock_ssl_channel.called) @@ -397,7 +397,7 @@ def test_otlp_exporter_endpoint(self, mock_secure, mock_insecure): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {OTEL_EXPORTER_OTLP_COMPRESSION: "gzip"}) def test_otlp_exporter_otlp_compression_envvar( - self, mock_insecure_channel + self, mock_insecure_channel ): """Just OTEL_EXPORTER_OTLP_COMPRESSION should work""" OTLPSpanExporter(insecure=True) @@ -419,7 +419,7 @@ def test_otlp_exporter_otlp_compression_kwarg(self, mock_insecure_channel): @patch("opentelemetry.exporter.otlp.proto.grpc.exporter.insecure_channel") @patch.dict("os.environ", {}) def test_otlp_exporter_otlp_compression_unspecified( - self, mock_insecure_channel + self, mock_insecure_channel ): """No env or kwarg should be NoCompression""" OTLPSpanExporter(insecure=True) @@ -434,7 +434,7 @@ def test_otlp_exporter_otlp_compression_unspecified( {OTEL_EXPORTER_OTLP_TRACES_COMPRESSION: "gzip"}, ) def test_otlp_exporter_otlp_compression_precendence( - self, mock_insecure_channel + self, mock_insecure_channel ): """OTEL_EXPORTER_OTLP_TRACES_COMPRESSION as higher priority than OTEL_EXPORTER_OTLP_COMPRESSION @@ -798,9 +798,9 @@ def test_translate_spans_multi(self): ) def _check_translated_status( - self, - translated: ExportTraceServiceRequest, - code_expected: Status, + self, + translated: ExportTraceServiceRequest, + code_expected: Status, ): status = translated.resource_spans[0].scope_spans[0].spans[0].status @@ -955,7 +955,9 @@ def test_shutdown_wait_last_export(self): TraceServiceServicerUNAVAILABLEDelay(), self.server ) - export_thread = threading.Thread(target=self.exporter.export, args=([self.span],)) + export_thread = threading.Thread( + target=self.exporter.export, args=([self.span],) + ) export_thread.start() try: self.assertTrue(self.exporter._export_lock.locked()) From a229f6ab921472381a14d5a4a1acc5d21b3883ff Mon Sep 17 00:00:00 2001 From: Girish Chandrashekar Date: Tue, 24 Jan 2023 13:50:27 +0100 Subject: [PATCH 3/4] add changelog entry for fix --- CHANGELOG.md | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bb6b0c7016..c46be0e41ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased + - Adds environment variables for log exporter ([#3037](https://github.com/open-telemetry/opentelemetry-python/pull/3037)) - Add attribute name to type warning message. @@ -16,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3128](https://github.com/open-telemetry/opentelemetry-python/pull/3128)) - Fix validation of baggage values ([#3058](https://github.com/open-telemetry/opentelemetry-python/pull/3058)) +- Implement shutdown procedure forOTLP grpc exporters + ([#3138](https://github.com/open-telemetry/opentelemetry-python/pull/3138)) ## Version 1.15.0/0.36b0 (2022-12-09) @@ -372,7 +375,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-distro` & `opentelemetry-sdk` Moved Auto Instrumentation Configurator code to SDK to let distros use its default implementation ([#1937](https://github.com/open-telemetry/opentelemetry-python/pull/1937)) -- Add Trace ID validation to meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992)) +- Add Trace ID validation to + meet [TraceID spec](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/overview.md#spancontext) ([#1992](https://github.com/open-telemetry/opentelemetry-python/pull/1992)) - Fixed Python 3.10 incompatibility in `opentelemetry-opentracing-shim` tests ([#2018](https://github.com/open-telemetry/opentelemetry-python/pull/2018)) - `opentelemetry-sdk` added support for `OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT` @@ -703,7 +707,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1471](https://github.com/open-telemetry/opentelemetry-python/pull/1471)) - Add support for Python 3.9 ([#1441](https://github.com/open-telemetry/opentelemetry-python/pull/1441)) -- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable, when using opentelemetry-instrument command. +- Added the ability to disable instrumenting libraries specified by OTEL_PYTHON_DISABLED_INSTRUMENTATIONS env variable, + when using opentelemetry-instrument command. ([#1461](https://github.com/open-telemetry/opentelemetry-python/pull/1461)) - Add `fields` to propagators ([#1374](https://github.com/open-telemetry/opentelemetry-python/pull/1374)) @@ -752,7 +757,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1533](https://github.com/open-telemetry/opentelemetry-python/pull/1533)) - `opentelemetry-sdk` The JaegerPropagator has been moved into its own package: `opentelemetry-propagator-jaeger` ([#1525](https://github.com/open-telemetry/opentelemetry-python/pull/1525)) -- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and Zipkin exporters +- `opentelemetry-exporter-jaeger`, `opentelemetry-exporter-zipkin` Update InstrumentationInfo tag keys for Jaeger and + Zipkin exporters ([#1535](https://github.com/open-telemetry/opentelemetry-python/pull/1535)) - `opentelemetry-sdk` Remove rate property setter from TraceIdRatioBasedSampler ([#1536](https://github.com/open-telemetry/opentelemetry-python/pull/1536)) @@ -862,7 +868,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1199](https://github.com/open-telemetry/opentelemetry-python/pull/1199)) - Add Global Error Handler ([#1080](https://github.com/open-telemetry/opentelemetry-python/pull/1080)) -- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables +- Add support for `OTEL_BSP_MAX_QUEUE_SIZE`, `OTEL_BSP_SCHEDULE_DELAY_MILLIS`, `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` + and `OTEL_BSP_EXPORT_TIMEOUT_MILLIS` environment variables ([#1105](https://github.com/open-telemetry/opentelemetry-python/pull/1120)) - Adding Resource to MeterRecord ([#1209](https://github.com/open-telemetry/opentelemetry-python/pull/1209)) @@ -887,7 +894,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1151](https://github.com/open-telemetry/opentelemetry-python/pull/1151)) - Fixed OTLP events to Zipkin annotations translation. ([#1161](https://github.com/open-telemetry/opentelemetry-python/pull/1161)) -- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of opentelemetry-instrumentation-flask. +- Fixed bootstrap command to correctly install opentelemetry-instrumentation-falcon instead of + opentelemetry-instrumentation-flask. ([#1138](https://github.com/open-telemetry/opentelemetry-python/pull/1138)) - Update sampling result names ([#1128](https://github.com/open-telemetry/opentelemetry-python/pull/1128)) @@ -897,7 +905,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1203](https://github.com/open-telemetry/opentelemetry-python/pull/1203)) - Protect access to Span implementation ([#1188](https://github.com/open-telemetry/opentelemetry-python/pull/1188)) -- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context manager. +- `start_as_current_span` and `use_span` can now optionally auto-record any exceptions raised inside the context + manager. ([#1162](https://github.com/open-telemetry/opentelemetry-python/pull/1162)) ## Version 0.13b0 (2020-09-17) @@ -974,7 +983,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#959](https://github.com/open-telemetry/opentelemetry-python/pull/959)) - Update default port to 55680 ([#977](https://github.com/open-telemetry/opentelemetry-python/pull/977)) -- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with jaeger-collector +- Add proper length zero padding to hex strings of traceId, spanId, parentId sent on the wire, for compatibility with + jaeger-collector ([#908](https://github.com/open-telemetry/opentelemetry-python/pull/908)) - Send start_timestamp and convert labels to strings ([#937](https://github.com/open-telemetry/opentelemetry-python/pull/937)) From 6b9d10b7e642e029b6727471dd2a6f8ccd336216 Mon Sep 17 00:00:00 2001 From: Girish Chandrashekar Date: Sun, 12 Mar 2023 13:13:19 +0100 Subject: [PATCH 4/4] lint test files --- .../tests/test_otlp_exporter_mixin.py | 10 +++++++--- .../tests/test_otlp_metrics_exporter.py | 8 +++++--- .../tests/test_otlp_trace_exporter.py | 8 +++++--- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py index 748b255352a..c7577557405 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_exporter_mixin.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time from logging import WARNING from types import MethodType from typing import Sequence @@ -138,6 +140,7 @@ def _exporting(self) -> str: otlp_mock_exporter._export(data={}), result_mock.SUCCESS ) otlp_mock_exporter.shutdown() + # pylint: disable=protected-access self.assertEqual( otlp_mock_exporter._export(data={}), result_mock.FAILURE ) @@ -147,9 +150,6 @@ def _exporting(self) -> str: ) def test_shutdown_wait_last_export(self): - import threading - import time - result_mock = Mock() rpc_error = RpcError() @@ -183,18 +183,22 @@ def _exporting(self) -> str: otlp_mock_exporter = OTLPMockExporter() + # pylint: disable=protected-access export_thread = threading.Thread( target=otlp_mock_exporter._export, args=({},) ) export_thread.start() try: + # pylint: disable=protected-access self.assertTrue(otlp_mock_exporter._export_lock.locked()) # delay is 1 second while the default shutdown timeout is 30_000 milliseconds start_time = time.time() otlp_mock_exporter.shutdown() now = time.time() self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access self.assertTrue(otlp_mock_exporter._shutdown) + # pylint: disable=protected-access self.assertFalse(otlp_mock_exporter._export_lock.locked()) finally: export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py index ea419223a14..2390b973012 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_metrics_exporter.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import time from concurrent.futures import ThreadPoolExecutor # pylint: disable=too-many-lines @@ -1379,9 +1381,6 @@ def test_shutdown(self): ) def test_shutdown_wait_last_export(self): - import threading - import time - add_MetricsServiceServicer_to_server( MetricsServiceServicerUNAVAILABLEDelay(), self.server ) @@ -1391,13 +1390,16 @@ def test_shutdown_wait_last_export(self): ) export_thread.start() try: + # pylint: disable=protected-access self.assertTrue(self.exporter._export_lock.locked()) # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds start_time = time.time() self.exporter.shutdown() now = time.time() self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access self.assertTrue(self.exporter._shutdown) + # pylint: disable=protected-access self.assertFalse(self.exporter._export_lock.locked()) finally: export_thread.join() diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py index fc3e6c419c4..2498da74b81 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/test_otlp_trace_exporter.py @@ -13,6 +13,8 @@ # limitations under the License. import os +import threading +import time from collections import OrderedDict from concurrent.futures import ThreadPoolExecutor from logging import WARNING @@ -948,9 +950,6 @@ def test_shutdown(self): ) def test_shutdown_wait_last_export(self): - import threading - import time - add_TraceServiceServicer_to_server( TraceServiceServicerUNAVAILABLEDelay(), self.server ) @@ -960,13 +959,16 @@ def test_shutdown_wait_last_export(self): ) export_thread.start() try: + # pylint: disable=protected-access self.assertTrue(self.exporter._export_lock.locked()) # delay is 4 seconds while the default shutdown timeout is 30_000 milliseconds start_time = time.time() self.exporter.shutdown() now = time.time() self.assertGreaterEqual(now, (start_time + 30 / 1000)) + # pylint: disable=protected-access self.assertTrue(self.exporter._shutdown) + # pylint: disable=protected-access self.assertFalse(self.exporter._export_lock.locked()) finally: export_thread.join()