Skip to content

Commit

Permalink
Direct exporter - AWS S3 (#42)
Browse files Browse the repository at this point in the history
* S3 trace exporter support
Signed-off-by: sachintendulkar576123 <sachin.tendulkar@beehyv.com>
  • Loading branch information
sachintendulkar576123 authored Oct 14, 2024
1 parent 9c17ad0 commit 044be3c
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 35 deletions.
17 changes: 13 additions & 4 deletions Monocle_User_Guide.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#Monocle User Guide
# Monocle User Guide

## Monocle Concepts
### Traces
Expand All @@ -13,15 +13,21 @@ It’s typically the workflow code components of an application that generate th
```
> pip install monocle_apptrace
```
- For AWS support (to upload traces to AWS), install with the aws extra:
```
> pip install monocle_apptrace[aws]
```

- You can locally build and install Monocle library from source
```
> pip install .
> pip install .
```
- Install the optional test dependencies listed against dev in pyproject.toml in editable mode
```
> pip install -e ".[dev]"
> pip install -e ".[dev]"
```


## Using Monocle with your application to generate traces
### Enable Monocle tracing
You need to import monocle package and invoke the API ``setup_monocle_telemetry(workflow=<workflow-name>)`` to enable the tracing. The 'workflow-name' is what you define to identify the give application workflow, for example "customer-chatbot". Monocle trace will include this name in every trace. The trace output will include a list of spans in the traces. You can print the output on the console or send it to an HTTP endpoint.
Expand All @@ -48,7 +54,7 @@ chain.invoke({"number":2})
# Request callbacks: Finally, let's use the request `callbacks` to achieve the same result
chain = LLMChain(llm=llm, prompt=prompt)
chain.invoke({"number":2}, {"callbacks":[handler]})

```

### Accessing monocle trace
Expand All @@ -63,6 +69,9 @@ setup_monocle_telemetry(workflow_name = "simple_math_app",
```
To print the trace on the console, use ```ConsoleSpanExporter()``` instead of ```FileSpanExporter()```

For AWS:
Install the AWS support as shown in the setup section, then use ```S3SpanExporter()``` to upload the traces to an S3 bucket.

### Leveraging Monocle's extensibility to handle customization
When the out of box features from app frameworks are not sufficent, the app developers have to add custom code. For example, if you are extending a LLM class in LlamaIndex to use a model hosted in NVIDIA Triton. This new class is not know to Monocle. You can specify this new class method part of Monocle enabling API and it will be able to trace it.

Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ dev = [
'llama-index-vector-stores-chroma==0.1.9',
'parameterized==0.9.0'
]
aws = [
'boto3==1.35.19',
]

[project.urls]
Homepage = "https://github.com/monocle2ai/monocle"
Expand Down
151 changes: 151 additions & 0 deletions src/monocle_apptrace/exporters/aws/s3_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import os
import time
import random
import datetime
import logging
import asyncio
import boto3
from botocore.exceptions import ClientError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from monocle_apptrace.exporters.base_exporter import SpanExporterBase
from typing import Sequence

logger = logging.getLogger(__name__)

class S3SpanExporter(SpanExporterBase):
def __init__(self, bucket_name=None, region_name="us-east-1"):
super().__init__()
# Use environment variables if credentials are not provided
DEFAULT_FILE_PREFIX = "monocle_trace__"
DEFAULT_TIME_FORMAT = "%Y-%m-%d__%H.%M.%S"
self.max_batch_size = 500
self.export_interval = 1
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=region_name,
)
self.bucket_name = bucket_name or os.getenv('AWS_S3_BUCKET_NAME','default-bucket')
self.file_prefix = DEFAULT_FILE_PREFIX
self.time_format = DEFAULT_TIME_FORMAT
self.export_queue = []
self.last_export_time = time.time()

# Check if bucket exists or create it
if not self.__bucket_exists(self.bucket_name):
try:
if region_name == "us-east-1":
self.s3_client.create_bucket(Bucket=self.bucket_name)
else:
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': region_name}
)
logger.info(f"Bucket {self.bucket_name} created successfully.")
except ClientError as e:
logger.error(f"Error creating bucket {self.bucket_name}: {e}")
raise e

def __bucket_exists(self, bucket_name):
try:
# Check if the bucket exists by calling head_bucket
self.s3_client.head_bucket(Bucket=bucket_name)
return True
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
# Bucket not found
logger.error(f"Bucket {bucket_name} does not exist (404).")
return False
elif error_code == '403':
# Permission denied
logger.error(f"Access to bucket {bucket_name} is forbidden (403).")
raise PermissionError(f"Access to bucket {bucket_name} is forbidden.")
elif error_code == '400':
# Bad request or malformed input
logger.error(f"Bad request for bucket {bucket_name} (400).")
raise ValueError(f"Bad request for bucket {bucket_name}.")
else:
# Other client errors
logger.error(f"Unexpected error when accessing bucket {bucket_name}: {e}")
raise e
except TypeError as e:
# Handle TypeError separately
logger.error(f"Type error while checking bucket existence: {e}")
raise e

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Synchronous export method that internally handles async logic."""
try:
# Run the asynchronous export logic in an event loop
asyncio.run(self.__export_async(spans))
return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Error exporting spans: {e}")
return SpanExportResult.FAILURE

async def __export_async(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
# Add spans to the export queue
for span in spans:
self.export_queue.append(span)
# If the queue reaches MAX_BATCH_SIZE, export the spans
if len(self.export_queue) >= self.max_batch_size:
await self.__export_spans()

# Check if it's time to force a flush
current_time = time.time()
if current_time - self.last_export_time >= self.export_interval:
await self.__export_spans() # Export spans if time interval has passed
self.last_export_time = current_time # Reset the last export time

return SpanExportResult.SUCCESS
except Exception as e:
logger.error(f"Error exporting spans: {e}")
return SpanExportResult.FAILURE

def __serialize_spans(self, spans: Sequence[ReadableSpan]) -> str:
try:
# Serialize spans to JSON or any other format you prefer
span_data_list = [span.to_json() for span in spans]
return "[" + ", ".join(span_data_list) + "]"
except Exception as e:
logger.error(f"Error serializing spans: {e}")
raise

async def __export_spans(self):
if len(self.export_queue) == 0:
return

# Take a batch of spans from the queue
batch_to_export = self.export_queue[:self.max_batch_size]
serialized_data = self.__serialize_spans(batch_to_export)
self.export_queue = self.export_queue[self.max_batch_size:]
try:
if asyncio.get_event_loop().is_running():
task = asyncio.create_task(self._retry_with_backoff(self.__upload_to_s3, serialized_data))
await task
else:
await self._retry_with_backoff(self.__upload_to_s3, serialized_data)

except Exception as e:
logger.error(f"Failed to upload span batch: {e}")

def __upload_to_s3(self, span_data_batch: str):
current_time = datetime.datetime.now().strftime(self.time_format)
file_name = f"{self.file_prefix}{current_time}.json"
self.s3_client.put_object(
Bucket=self.bucket_name,
Key=file_name,
Body=span_data_batch
)
logger.info(f"Span batch uploaded to AWS S3 as {file_name}.")

async def force_flush(self, timeout_millis: int = 30000) -> bool:
await self.__export_spans() # Export any remaining spans in the queue
return True

def shutdown(self) -> None:
logger.info("S3SpanExporter has been shut down.")
47 changes: 47 additions & 0 deletions src/monocle_apptrace/exporters/base_exporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import time
import random
import logging
from abc import ABC, abstractmethod
from azure.core.exceptions import ServiceRequestError, ClientAuthenticationError
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from typing import Sequence
import asyncio

logger = logging.getLogger(__name__)

class SpanExporterBase(ABC):
def __init__(self):
self.backoff_factor = 2
self.max_retries = 10
self.export_queue = []
self.last_export_time = time.time()

@abstractmethod
async def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
pass

@abstractmethod
async def force_flush(self, timeout_millis: int = 30000) -> bool:
pass

def shutdown(self) -> None:
pass

async def _retry_with_backoff(self, func, *args, **kwargs):
"""Handle retries with exponential backoff."""
attempt = 0
while attempt < self.max_retries:
try:
return func(*args, **kwargs)
except ServiceRequestError as e:
logger.warning(f"Network connectivity error: {e}. Retrying in {self.backoff_factor ** attempt} seconds...")
sleep_time = self.backoff_factor * (2 ** attempt) + random.uniform(0, 1)
await asyncio.sleep(sleep_time)
attempt += 1
except ClientAuthenticationError as e:
logger.error(f"Failed to authenticate: {str(e)}")
break

logger.error("Max retries exceeded.")
raise ServiceRequestError(message="Max retries exceeded.")
Loading

0 comments on commit 044be3c

Please sign in to comment.