diff --git a/agenta-cli/agenta/__init__.py b/agenta-cli/agenta/__init__.py index 5a03fec167..53c65db70f 100644 --- a/agenta-cli/agenta/__init__.py +++ b/agenta-cli/agenta/__init__.py @@ -1,3 +1,5 @@ +from typing import Any, Callable, Optional + from .sdk.utils.preinit import PreInitObject import agenta.client.backend.types as client_types # pylint: disable=wrong-import-order @@ -18,7 +20,7 @@ ) from .sdk.utils.logging import log as logging -from .sdk.tracing import Tracing +from .sdk.tracing import Tracing, get_tracer from .sdk.decorators.tracing import instrument from .sdk.tracing.conventions import Reference from .sdk.decorators.routing import entrypoint, app, route @@ -36,15 +38,36 @@ DEFAULT_AGENTA_SINGLETON_INSTANCE = AgentaSingleton() types = client_types -tracing = None + api = None async_api = None +tracing = DEFAULT_AGENTA_SINGLETON_INSTANCE.tracing # type: ignore +tracer = get_tracer(tracing) -def init(*args, **kwargs): - global api, async_api, tracing, config - _init(*args, **kwargs) - tracing = DEFAULT_AGENTA_SINGLETON_INSTANCE.tracing # type: ignore +def init( + host: Optional[str] = None, + api_key: Optional[str] = None, + config_fname: Optional[str] = None, + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, + # DEPRECATING + app_id: Optional[str] = None, +): + global api, async_api, tracing, tracer # pylint: disable=global-statement + + _init( + host=host, + api_key=api_key, + config_fname=config_fname, + redact=redact, + redact_on_error=redact_on_error, + app_id=app_id, + ) + api = DEFAULT_AGENTA_SINGLETON_INSTANCE.api # type: ignore async_api = DEFAULT_AGENTA_SINGLETON_INSTANCE.async_api # type: ignore + + tracing = DEFAULT_AGENTA_SINGLETON_INSTANCE.tracing # type: ignore + tracer = get_tracer(tracing) diff --git a/agenta-cli/agenta/sdk/__init__.py b/agenta-cli/agenta/sdk/__init__.py index 65886105db..ff0093a9ab 100644 --- a/agenta-cli/agenta/sdk/__init__.py +++ b/agenta-cli/agenta/sdk/__init__.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Callable, Any from .utils.preinit import PreInitObject # always the first import! @@ -43,17 +43,21 @@ def init( host: Optional[str] = None, - app_id: Optional[str] = None, api_key: Optional[str] = None, config_fname: Optional[str] = None, + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, + # DEPRECATING + app_id: Optional[str] = None, ): - global api, async_api, tracing, tracer + global api, async_api, tracing, tracer # pylint: disable=global-statement _init( host=host, api_key=api_key, config_fname=config_fname, - # DEPRECATING + redact=redact, + redact_on_error=redact_on_error, app_id=app_id, ) diff --git a/agenta-cli/agenta/sdk/agenta_init.py b/agenta-cli/agenta/sdk/agenta_init.py index 7c1f77ff87..55795e47fa 100644 --- a/agenta-cli/agenta/sdk/agenta_init.py +++ b/agenta-cli/agenta/sdk/agenta_init.py @@ -1,7 +1,7 @@ import logging import toml from os import getenv -from typing import Optional +from typing import Optional, Callable, Any from importlib.metadata import version from agenta.sdk.utils.logging import log @@ -36,6 +36,8 @@ def init( host: Optional[str] = None, api_key: Optional[str] = None, config_fname: Optional[str] = None, + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, # DEPRECATING app_id: Optional[str] = None, ) -> None: @@ -91,6 +93,8 @@ def init( self.tracing = Tracing( url=f"{self.host}/api/observability/v1/otlp/traces", # type: ignore + redact=redact, + redact_on_error=redact_on_error, ) self.tracing.configure( @@ -258,7 +262,9 @@ def init( host: Optional[str] = None, api_key: Optional[str] = None, config_fname: Optional[str] = None, - # DEPRECATED + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, + # DEPRECATING app_id: Optional[str] = None, ): """Main function to initialize the agenta sdk. @@ -289,7 +295,8 @@ def init( host=host, api_key=api_key, config_fname=config_fname, - # DEPRECATED + redact=redact, + redact_on_error=redact_on_error, app_id=app_id, ) diff --git a/agenta-cli/agenta/sdk/decorators/tracing.py b/agenta-cli/agenta/sdk/decorators/tracing.py index 2f82728743..68f707b694 100644 --- a/agenta-cli/agenta/sdk/decorators/tracing.py +++ b/agenta-cli/agenta/sdk/decorators/tracing.py @@ -19,6 +19,8 @@ def __init__( config: Optional[Dict[str, Any]] = None, ignore_inputs: Optional[bool] = None, ignore_outputs: Optional[bool] = None, + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, max_depth: Optional[int] = 2, # DEPRECATING kind: str = "task", @@ -29,6 +31,8 @@ def __init__( self.config = config self.ignore_inputs = ignore_inputs self.ignore_outputs = ignore_outputs + self.redact = redact + self.redact_on_error = redact_on_error self.max_depth = max_depth def __call__(self, func: Callable[..., Any]): @@ -109,12 +113,10 @@ def _pre_instrument( ) _inputs = self._redact( - self._parse( - func, - *args, - **kwargs, - ), - self.ignore_inputs, + name=span.name, + field="inputs", + io=self._parse(func, *args, **kwargs), + ignore=self.ignore_inputs, ) span.set_attributes( attributes={"inputs": _inputs}, @@ -153,7 +155,12 @@ def _post_instrument( namespace="metrics.unit.tokens", ) - _outputs = self._redact(self._patch(result), self.ignore_outputs) + _outputs = self._redact( + name=span.name, + field="outputs", + io=self._patch(result), + ignore=self.ignore_outputs, + ) span.set_attributes( attributes={"outputs": _outputs}, namespace="data", @@ -192,6 +199,9 @@ def _parse( def _redact( self, + *, + name: str, + field: str, io: Dict[str, Any], ignore: Union[List[str], bool] = False, ) -> Dict[str, Any]: @@ -220,6 +230,20 @@ def _redact( ) } + if self.redact is not None: + try: + io = self.redact(name, field, io) + except: # pylint: disable=bare-except + if self.redact_on_error: + io = {} + + if ag.tracing.redact is not None: + try: + io = ag.tracing.redact(name, field, io) + except: # pylint: disable=bare-except + if ag.tracing.redact_on_error: + io = {} + return io def _patch( diff --git a/agenta-cli/agenta/sdk/tracing/tracing.py b/agenta-cli/agenta/sdk/tracing/tracing.py index bc083e86e3..f483868a8e 100644 --- a/agenta-cli/agenta/sdk/tracing/tracing.py +++ b/agenta-cli/agenta/sdk/tracing/tracing.py @@ -1,4 +1,4 @@ -from typing import Optional, Any, Dict +from typing import Optional, Any, Dict, Callable from enum import Enum from httpx import get as check @@ -32,6 +32,8 @@ class Tracing(metaclass=Singleton): def __init__( self, url: str, + redact: Optional[Callable[..., Any]] = None, + redact_on_error: Optional[bool] = True, ) -> None: # ENDPOINT (OTLP) self.otlp_url = url @@ -49,6 +51,10 @@ def __init__( # INLINE SPANS for INLINE TRACES (INLINE PROCESSOR) self.inline_spans: Dict[str, Any] = dict() + # REDACT + self.redact = redact + self.redact_on_error = redact_on_error + # PUBLIC def configure( diff --git a/agenta-cli/tests/redact/01_ignore_all.py b/agenta-cli/tests/redact/01_ignore_all.py new file mode 100644 index 0000000000..56d4a9b6c3 --- /dev/null +++ b/agenta-cli/tests/redact/01_ignore_all.py @@ -0,0 +1,18 @@ +import agenta as ag + +ag.init() + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + ignore_inputs=True, + ignore_outputs=True, +) +def embed(description: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/02_ignore_all_inputs.py b/agenta-cli/tests/redact/02_ignore_all_inputs.py new file mode 100644 index 0000000000..507cb34d44 --- /dev/null +++ b/agenta-cli/tests/redact/02_ignore_all_inputs.py @@ -0,0 +1,17 @@ +import agenta as ag + +ag.init() + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + ignore_inputs=True, +) +def embed(description: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/03_ignore_all_outputs.py b/agenta-cli/tests/redact/03_ignore_all_outputs.py new file mode 100644 index 0000000000..507cb34d44 --- /dev/null +++ b/agenta-cli/tests/redact/03_ignore_all_outputs.py @@ -0,0 +1,17 @@ +import agenta as ag + +ag.init() + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + ignore_inputs=True, +) +def embed(description: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/04_ignore_some.py b/agenta-cli/tests/redact/04_ignore_some.py new file mode 100644 index 0000000000..860b7dc0a6 --- /dev/null +++ b/agenta-cli/tests/redact/04_ignore_some.py @@ -0,0 +1,18 @@ +import agenta as ag + +ag.init() + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + ignore_inputs=["description"], + ignore_outputs=["embedding", "ignored"], +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/05_redact_instrument.py b/agenta-cli/tests/redact/05_redact_instrument.py new file mode 100644 index 0000000000..a66771c9e7 --- /dev/null +++ b/agenta-cli/tests/redact/05_redact_instrument.py @@ -0,0 +1,30 @@ +import agenta as ag + +ag.init() + + +def redact(name, field, io): + print(">", name, field, io) + + if name == "embed" and field == "inputs": + io = {key: value for key, value in io.items() if key not in ("description",)} + + if name == "embed" and field == "outputs": + io = {key: value for key, value in io.items() if key not in ("embedding",)} + + print("<", io) + return io + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + redact=redact, +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/06_redact_init.py b/agenta-cli/tests/redact/06_redact_init.py new file mode 100644 index 0000000000..90b1f59f9c --- /dev/null +++ b/agenta-cli/tests/redact/06_redact_init.py @@ -0,0 +1,31 @@ +import agenta as ag + + +def redact(name, field, io): + print(">", name, field, io) + + if name == "embed" and field == "inputs": + io = {key: value for key, value in io.items() if key not in ("description",)} + + if name == "embed" and field == "outputs": + io = {key: value for key, value in io.items() if key not in ("embedding",)} + + print("<", io) + + return io + + +ag.init(redact=redact) + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/07_redact_combined.py b/agenta-cli/tests/redact/07_redact_combined.py new file mode 100644 index 0000000000..91469127df --- /dev/null +++ b/agenta-cli/tests/redact/07_redact_combined.py @@ -0,0 +1,43 @@ +import agenta as ag + + +def init_redact(name, field, io): + print(">", name, field, io) + + if name == "embed" and field == "inputs": + io = {key: value for key, value in io.items() if key not in ("description",)} + + if name == "embed" and field == "outputs": + io = {key: value for key, value in io.items() if key not in ("embedding",)} + + print("<", io) + + return io + + +def instrument_redact(name, field, io): + print(">", name, field, io) + + if name == "embed" and field == "outputs": + io = {key: value for key, value in io.items() if key not in ("ignored",)} + + print("<", io) + + return io + + +ag.init(redact=init_redact) + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", + redact=instrument_redact, +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/08_redect_or_error_false.py b/agenta-cli/tests/redact/08_redect_or_error_false.py new file mode 100644 index 0000000000..245804e859 --- /dev/null +++ b/agenta-cli/tests/redact/08_redect_or_error_false.py @@ -0,0 +1,24 @@ +import agenta as ag + + +def redact(name, field, io): + raise Exception("error") + + +ag.init( + redact=redact, + redact_on_error=False, +) + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/08_redect_or_error_true.py b/agenta-cli/tests/redact/08_redect_or_error_true.py new file mode 100644 index 0000000000..ffcfd28a43 --- /dev/null +++ b/agenta-cli/tests/redact/08_redect_or_error_true.py @@ -0,0 +1,24 @@ +import agenta as ag + + +def redact(name, field, io): + raise Exception("error") + + +ag.init( + redact=redact, + # redact_on_error=True, +) + + +@ag.entrypoint +@ag.instrument( + spankind="WORKFLOW", +) +def embed(description: str, theme: str): + return { + "embedding": "somedata", + "ignored": "ignored", + "cost": 15, + "usage": 20, + } diff --git a/agenta-cli/tests/redact/requirements.txt b/agenta-cli/tests/redact/requirements.txt new file mode 100644 index 0000000000..7bac5206d6 --- /dev/null +++ b/agenta-cli/tests/redact/requirements.txt @@ -0,0 +1 @@ +agenta==0.27.3 \ No newline at end of file diff --git a/docs/docs/observability/03-observability-sdk.mdx b/docs/docs/observability/03-observability-sdk.mdx index 2ebe100ff7..cfeb563a79 100644 --- a/docs/docs/observability/03-observability-sdk.mdx +++ b/docs/docs/observability/03-observability-sdk.mdx @@ -176,14 +176,104 @@ def rag_workflow(query:str): ``` -## Excluding Inputs/Outputs from Capture +## Redacting sensitive data: how to exclude data from capture In some cases, you may want to exclude parts of the inputs or outputs due to privacy concerns or because the data is too large to be stored in the span. -You can achieve this by setting the ignore_inputs and ignore_outputs arguments to True in the instrument decorator. +You can do this by setting the `ignore_inputs` and/or `ignore_outputs` arguments to `True` in the instrument decorator. ```python -@ag.instrument(spankind="workflow", ignore_inputs=True, ignore_outputs=True) +@ag.instrument( + spankind="workflow", + ignore_inputs=True, + ignore_outputs=True +) def rag_workflow(query:str): ... ``` + +If you want more control, you can specify which parts of the inputs and outputs to exclude: + +```python +@ag.instrument( + spankind="workflow", + ignore_inputs=["user_id"], + ignore_outputs=["pii"], +) +def rag_workflow(query:str, user_id:str): + ... + return { + "result": ..., + "pii": ... + } +``` + +For even finer control, you can use a custom `redact()` callback, along with instructions in the case of errors. + +```python +def my_redact(name, field, data): + if name == "rag_workflow": + if field == "inputs": + del data["user_id"] + if field == "outputs": + del data["pii"] + + return data + + +@ag.instrument( + spankind="workflow", + redact=my_redact, + redact_on_error=False, +) +def rag_workflow(query:str, user_id:str): + ... + return { + "result": ..., + "pii": ... + } +``` + +Finally, if you want to set up global rules for redaction, you can provide a global `redact()` callback that applies everywhere. + +```python +def global_redact( + name:str, + field:str, + data: Dict[str, Any] +): + if "pii" in data: + del data["pii"] + + return data + + +ag.init( + redact=global_redact, + redact_on_error=True, +) + +def local_redact( + name:str, + field:str, + data: Dict[str, Any] +): + if name == "rag_workflow": + if field == "inputs": + del data["user_id"] + + return data + + +@ag.instrument( + spankind="workflow", + redact=local_redact, + redact_on_error=False, +) +def rag_workflow(query:str, user_id:str): + ... + return { + "result": ..., + "pii": ... + } +```