Skip to content

Commit

Permalink
build okahu exporter and added test cases for okahu exporter in monoc…
Browse files Browse the repository at this point in the history
…le code

Signed-off-by: hansrajr <hansraj.rose@beehyv.com>
  • Loading branch information
Hansrajr committed Oct 18, 2024
1 parent 086b223 commit a1f1645
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 0 deletions.
117 changes: 117 additions & 0 deletions src/monocle_apptrace/exporters/okahu/exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import json
import logging
import os
from typing import Callable, Optional, Sequence
import requests
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult, ConsoleSpanExporter
from requests.exceptions import ReadTimeout

from monocle_apptrace.exporters.okahu.processor import ExportTaskProcessor

REQUESTS_SUCCESS_STATUS_CODES = (200, 202)
OKAHU_PROD_INGEST_ENDPOINT = "https://ingest.okahu.co/api/v1/trace/ingest"

logger = logging.getLogger(__name__)


class OkahuSpanExporter(SpanExporter):
def __init__(
self,
endpoint: Optional[str] = None,
timeout: Optional[int] = None,
session: Optional[requests.Session] = None,
task_processor: ExportTaskProcessor = None
):
"""Okahu exporter."""
okahu_endpoint: str = os.environ.get("OKAHU_INGESTION_ENDPOINT", OKAHU_PROD_INGEST_ENDPOINT)
self.endpoint = endpoint or okahu_endpoint
api_key: str = os.environ.get("OKAHU_API_KEY")
self._closed = False
if not api_key:
logger.warning("OKAHU_API_KEY not set. Using ConsoleSpanExporter instead.")
self.exporter = ConsoleSpanExporter()
return
self.timeout = timeout or 15
self.session = session or requests.Session()
self.session.headers.update(
{"Content-Type": "application/json", "x-api-key": api_key}
)

self.task_processor = task_processor or None
if task_processor is not None:
task_processor.start()

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
# After the call to Shutdown subsequent calls to Export are
# not allowed and should return a Failure result
if not hasattr(self, 'session'):
return self.exporter.export(spans)

if self._closed:
logger.warning("Exporter already shutdown, ignoring batch")
return SpanExportResult.FAILUREencoder
if len(spans) == 0:
return

span_list = {
"batch": []
}

# append the batch object with all the spans object
for span in spans:
# create a object from serialized span
obj = json.loads(span.to_json())
if obj["parent_id"] is None:
obj["parent_id"] = "None"
else:
obj["parent_id"] = remove_0x_from_start(obj["parent_id"])
if obj["context"] is not None:
obj["context"]["trace_id"] = remove_0x_from_start(obj["context"]["trace_id"])
obj["context"]["span_id"] = remove_0x_from_start(obj["context"]["span_id"])
span_list["batch"].append(obj)

def send_spans_to_okahu(span_list_local=None):
try:
result = self.session.post(
url=self.endpoint,
data=json.dumps(span_list_local),
timeout=self.timeout,
)
if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES:
logger.error(
"Traces cannot be uploaded; status code: %s, message %s",
result.status_code,
result.text,
)
return SpanExportResult.FAILURE
logger.warning("spans successfully exported to okahu")
return SpanExportResult.SUCCESS
except ReadTimeout as e:
logger.warning("Trace export timed out: %s", str(e))
return SpanExportResult.FAILURE

# if async task function is present, then push the request to asnc task

if self.task_processor is not None and callable(self.task_processor.queue_task):
self.task_processor.queue_task(send_spans_to_okahu, span_list)
return SpanExportResult.SUCCESS
return send_spans_to_okahu(span_list)

def shutdown(self) -> None:
if self._closed:
logger.warning("Exporter already shutdown, ignoring call")
return
if hasattr(self, 'session'):
self.session.close()
self._closed = True

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True


# only removes the first occurrence of 0x from the string
def remove_0x_from_start(my_str: str):
if my_str.startswith("0x"):
return my_str.replace("0x", "", 1)
return my_str
19 changes: 19 additions & 0 deletions src/monocle_apptrace/exporters/okahu/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod
import logging
from typing import Callable

logger = logging.getLogger(__name__)

class ExportTaskProcessor(ABC):

@abstractmethod
def start(self):
return

@abstractmethod
def stop(self):
return

@abstractmethod
def queue_task(self, async_task: Callable[[Callable, any], any] = None, args: any = None):
return
90 changes: 90 additions & 0 deletions tests/okahu_exporter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import unittest
from unittest.mock import patch, MagicMock, call
from monocle_apptrace.exporters.okahu.exporter import OkahuSpanExporter, remove_0x_from_start
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SpanExportResult
from requests.exceptions import ReadTimeout
from opentelemetry.sdk.trace import ReadableSpan
import json
class TestOkahuSpanExporter(unittest.TestCase):

@patch.dict('os.environ', {}, clear=True) # Simulate environment without API key
def test_default_to_console_exporter(self):
"""Test that it defaults to ConsoleSpanExporter when no API key is set."""
exporter = OkahuSpanExporter()
self.assertIsInstance(exporter.exporter, ConsoleSpanExporter)
self.assertEqual(exporter.endpoint, "https://ingest.okahu.co/api/v1/trace/ingest")

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_okahu_exporter_with_api_key(self, mock_session):
"""Test that OkahuSpanExporter is used when an API key is set."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_post = mock_session_instance.post
mock_post.return_value.status_code = 200

mock_span = MagicMock(spec=ReadableSpan)
mock_span.to_json.return_value = json.dumps({
"parent_id": "0x123456",
"context": {
"trace_id": "0xabcdef",
"span_id": "0x654321"
}
})
spans = [mock_span]
exporter = OkahuSpanExporter()
exporter.export(spans)
mock_post.assert_called_once()

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_success(self, mock_session):
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.return_value.status_code = 200
exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.SUCCESS)

mock_session_instance.post.assert_called_once_with(
url=exporter.endpoint,
data='{"batch": [{"parent_id": "None", "context": {"trace_id": "123", "span_id": "456"}}]}',
timeout=15
)

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_failure(self, mock_session):
"""Test exporting spans with an error response from Okahu."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.return_value.status_code = 500

exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.FAILURE)

@patch.dict('os.environ', {'OKAHU_API_KEY': 'test-api-key'})
@patch('monocle_apptrace.exporters.okahu.exporter.requests.Session')
def test_export_timeout(self, mock_session):
"""Test exporting spans with a timeout."""
mock_session_instance = MagicMock()
mock_session.return_value = mock_session_instance
mock_session_instance.post.side_effect = ReadTimeout

exporter = OkahuSpanExporter()
mock_span = MagicMock()
mock_span.to_json.return_value = '{"parent_id": null, "context": {"trace_id": "0x123", "span_id": "0x456"}}'

result = exporter.export([mock_span])
self.assertEqual(result, SpanExportResult.FAILURE)


if __name__ == '__main__':
unittest.main()

0 comments on commit a1f1645

Please sign in to comment.