From ab9d4b1aed0f3610e7fab56138d2a945c2ae006a Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 19 Mar 2024 14:31:16 +0100 Subject: [PATCH] elasticsearch: don't set body as db statement for bulk requests bulk requests can be too big and diverse to make sense as db statement. Other than that the sanitizer currently only handles dicts so it's crashing. Closes #2150 Co-authored-by: Jason Mobarak --- .../instrumentation/elasticsearch/__init__.py | 8 +++++--- .../tests/test_elasticsearch.py | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index dd72a5235e..8f3842deb2 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -245,9 +245,11 @@ def wrapper(wrapped, _, args, kwargs): if method: attributes["elasticsearch.method"] = method if body: - attributes[SpanAttributes.DB_STATEMENT] = sanitize_body( - body - ) + # Don't set db.statement for bulk requests, as it can be very large + if isinstance(body, dict): + attributes[SpanAttributes.DB_STATEMENT] = sanitize_body( + body + ) if params: attributes["elasticsearch.params"] = str(params) if doc_id: diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 0c84cf5cd6..603282d0b2 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -486,3 +486,19 @@ def test_body_sanitization(self, _): sanitize_body(json.dumps(sanitization_queries.interval_query)), str(sanitization_queries.interval_query_sanitized), ) + + def test_bulk(self, request_mock): + request_mock.return_value = (1, {}, "") + + es = Elasticsearch() + es.bulk([dict(_op_type="index", _index="sw", _doc_type="_doc", _id=1, doc={"name": "adam"})] * 2) + + spans_list = self.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + # self.assertEqualSpanInstrumentationInfo(span, opentelemetry.instrumentation.elasticsearch) + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.elasticsearch + )