Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Adds capturing telemetry to Hamilton
Browse files Browse the repository at this point in the history
After this change, by default, when using Hamilton, it will collect anonymous usage data to help us improve Hamilton and know where to apply development efforts.

We capture two events: one when a driver object is instantiated, and one when the `execute()` call on the driver completes.
No user data or potentially sensitive information is or ever will be collected. The captured data is limited to:

* Operating System and Python version
* A persistent UUID to indentify the session, stored in ~/.hamilton.conf.
* Error stack trace limited to Hamilton code, if one occurs.
* Information on what features you're using from Hamilton: decorators, adapters, result builders.
* How Hamilton is being used: number of final nodes in DAG, number of modules, size of objects passed to `execute()`.

If you do not wish to participate, one can opt-out with one of the following methods:
1. Set it to false programmatically in your code before creating a Hamilton driver:
   ```python
   from hamilton import telemetry
   telemetry.disable_telemetry()
   ```
2. Set the key `telemetry_enabled` to `false` in ~/.hamilton.conf under the `DEFAULT` section:
   ```
   [DEFAULT]
   telemetry_enabled = True
   ```
3. Set HAMILTON_TELEMETRY_ENABLED=false as an environment variable. Either setting it for your shell session:
   ```bash
   export HAMILTON_TELEMETRY_ENABLED=false
   ```
   or passing it as part of the run command:
   ```bash
   HAMILTON_TELEMETRY_ENABLED=false python NAME_OF_MY_DRIVER.py
   ```

Otherwise, this commit is a large one, it:

* adds a telemetry.py that handles the schema, sending logic, and related logic for capturing telemetry about hamilton usage. Note: we stop capturing after 1000 checks
for is_telemetry_enabled to handle the case someone is doing something in bulk; we likely don’t care too much pass 1000 invocation. It also creates a thread that sends the telemetry; this should work in all contexts. We did not want to pull in any other python dependences, so that’s why we’re using urllib.
* makes the two Drivers (regular, and async) orchestrate the logic to capture telemetry. So we will only capture telemetry if people are using the standard drivers. Rather than instrumentation graph, I think driver is the better place for it, since that’s where all the context is.
* we add some global state to capture decorator usage and expose it via the graph object. This felt like the most natural way to do it.
* adds tests and adjusts things to ensure telemetry is disabled for unit tests/circleci. Note: the sanitize error test depends on paths -- so circleci is the best place to ensure it works. We should fix this if it becomes an issue.
* adds documentation on how to opt-out.

—— Former commits that are being squashed:
Adds async adapter telemetry unit test

To ensure that the changes to the async driver work
as expected. (+12 squashed commits)
Squashed commits:
[4f25e41] Adds unit tests for telemetry addition

This fixes up a few functions and refactors them to be more easily
unit testable. It also ensures that by default, telemetry is disabled
for unit tests and circleci.
[36e5a7e] Fixing doc strings
[b0d4c4d] Refactors decorator counter methodology

Now it's a decorator on the __call__ function.

That way we decouple the logic for telemetry needs -- without
it explicitly living within the NodeTransformLifecycle class.
I mean it's still coupled, it's just we can now change
that functionality more clearly.
[57e209b] Adjust telemetry documentation and functions

In response to PR comments.

Adds some helper functions to make them easier to unit test.
I put them in `telemetry.py` because they're static, and only
relevant for telemetry, so it didn't seem too bad to put there...
[1bda6a6] Fixes up imports to enable running driver.py as a script

Legacy requirement. Just propagating it.
[6f0c7b0] Adds telemetry tracking ability to async driver

The async driver needs to have special casing
to ensure it can also emit telemetry in an async
friendly way.

So added it to handle sending constructor and execute
tracking that should not impact, for example, running
within a fastapi webserver.
[0bac34a] Wraps sending telemetry request in own thread

For performance reasons we should spawn a thread to ensure
we don't slow down an app's performance.
[cba7bc3] Simplifies sanitize_error logic

Removes unnecessary code, and makes the
variable names a little easier to follow.
[34e574e] Wraps sanitize_error in try except

Since we don't want this code to cause
a cryptic error message for the end user,
so we wrap it in a try except.
[f1d44b9] Adds usage and data privacy section to main README

So that people know what we're doing and how to opt-out of it.
[5ac73a9] Fixes to adjust pending changes to main
[2a84121] Refactors and adds functionality

This commit will be squashed in to the final, but it does the following:

1. Hooks up posthog to capture telemetry. They have a free tier that should
be sufficient for our needs.
2. Refactors code into functions to enable better testing (TODO).
3. Adds logic to sanitize an error. We don't pull the name, just where in the
hamilton code it runs from. This should suffice in helping us understand where
people are encountering errors.
4. Adds logic to not capture custom code with respect to decorators and adapters.
5. Adds three ways to disable telemetry and documents it in the module. (+1 squashed commit)
Squashed commits:
[bb7376a] WIP sketch of telemetry

This is just a rough sketch. It shows one way we might implement things.
I.e. have it all be in the driver. So if someone is using their own custom driver,
we would not get telemetry. AFAIK most people use the current driver.

TODO:
 - actually check whether telemetry gathering is enabled
 - hook it up to something like posthog
 - test, test, test
  • Loading branch information
skrawcz committed Dec 27, 2022
1 parent 468fc65 commit 4316e01
Show file tree
Hide file tree
Showing 12 changed files with 923 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .ci/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ python --version
echo "----- pip version -----"
pip --version
echo "-----------------------"

# disable telemetry!
export HAMILTON_TELEMETRY_ENABLED=false
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,41 @@ If you are doing a lot of repetitive work, one might consider multiple cursors.

To use it hit `option + mouse click` to create multiple cursors. `Esc` to revert back to a normal mode.

# Usage analytics & data privacy
By default, when using Hamilton, it collects anonymous usage data to help improve Hamilton and know where to apply development
efforts.

We capture two events: one when the `Driver` object is instantiated, and one when the `execute()` call on the `Driver` object completes.
No user data or potentially sensitive information is or ever will be collected. The captured data is limited to:

* Operating System and Python version
* A persistent UUID to indentify the session, stored in ~/.hamilton.conf.
* Error stack trace limited to Hamilton code, if one occurs.
* Information on what features you're using from Hamilton: decorators, adapters, result builders.
* How Hamilton is being used: number of final nodes in DAG, number of modules, size of objects passed to `execute()`.

If you're worried, see telemetry.py for details.

If you do not wish to participate, one can opt-out with one of the following methods:
1. Set it to false programmatically in your code before creating a Hamilton driver:
```python
from hamilton import telemetry
telemetry.disable_telemetry()
```
2. Set the key `telemetry_enabled` to `false` in ~/.hamilton.conf under the `DEFAULT` section:
```
[DEFAULT]
telemetry_enabled = True
```
3. Set HAMILTON_TELEMETRY_ENABLED=false as an environment variable. Either setting it for your shell session:
```bash
export HAMILTON_TELEMETRY_ENABLED=false
```
or passing it as part of the run command:
```bash
HAMILTON_TELEMETRY_ENABLED=false python NAME_OF_MY_DRIVER.py
```

# Contributors

## Code Contributors
Expand Down
24 changes: 24 additions & 0 deletions graph_adapter_tests/h_async/test_h_async.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from unittest import mock

import pytest

from hamilton import base
from hamilton.experimental import h_async

from .resources import simple_async_module
Expand Down Expand Up @@ -57,3 +59,25 @@ async def test_driver_end_to_end():
"simple_async_func": 2,
"simple_non_async_func": 7,
}


@pytest.mark.asyncio
@mock.patch("hamilton.telemetry.send_event_json")
@mock.patch("hamilton.telemetry.g_telemetry_enabled", True)
async def test_driver_end_to_end_telemetry(send_event_json):
dr = h_async.AsyncDriver({}, simple_async_module, result_builder=base.DictResult())
all_vars = [var.name for var in dr.list_available_variables()]
result = await dr.execute(final_vars=all_vars, inputs={"external_input": 1})
assert result == {
"another_async_func": 8,
"async_func_with_param": 4,
"external_input": 1,
"non_async_func_with_decorator": {"result_1": 9, "result_2": 5},
"result_1": 9,
"result_2": 5,
"simple_async_func": 2,
"simple_non_async_func": 7,
}
await asyncio.sleep(1) # to ensure the last telemetry invocation finishes executing
assert send_event_json.called
assert len(send_event_json.call_args_list) == 2
5 changes: 4 additions & 1 deletion hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import typing_inspect
from pandas.core.indexes import extension as pd_extension

from . import node
try:
from . import node
except ImportError:
import node

logger = logging.getLogger(__name__)

Expand Down
109 changes: 104 additions & 5 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import sys
import time

# required if we want to run this code stand alone.
import typing
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from types import ModuleType
from typing import Any, Collection, Dict, List
from typing import Any, Collection, Dict, List, Optional, Tuple

import pandas as pd

Expand All @@ -17,10 +20,12 @@
)

if __name__ == "__main__":
import base
import graph
import node
import telemetry
else:
from . import base, graph, node
from . import base, graph, node, telemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,14 +58,59 @@ def __init__(
:param adapter: Optional. A way to wire in another way of "executing" a hamilton graph.
Defaults to using original Hamilton adapter which is single threaded in memory python.
"""
self.driver_run_id = uuid.uuid4()
if adapter is None:
adapter = base.SimplePythonDataFrameGraphAdapter()
error = None
try:
self.graph = graph.FunctionGraph(*modules, config=config, adapter=adapter)
self.adapter = adapter
except Exception as e:
error = telemetry.sanitize_error(*sys.exc_info())
logger.error(SLACK_ERROR_MESSAGE)
raise e
self.adapter = adapter
finally:
self.capture_constructor_telemetry(error, modules, config, adapter)

def capture_constructor_telemetry(
self,
error: Optional[str],
modules: Tuple[ModuleType],
config: Dict[str, Any],
adapter: base.HamiltonGraphAdapter,
):
"""Captures constructor telemetry.
Notes:
(1) we want to do this in a way that does not break.
(2) we need to account for all possible states, e.g. someone passing in None, or assuming that
the entire constructor code ran without issue, e.g. `adpater` was assigned to `self`.
:param error: the sanitized error string to send.
:param modules: the list of modules, could be None.
:param config: the config dict passed, could be None.
:param adapter: the adapter passed in, might not be attached to `self` yet.
"""
if telemetry.is_telemetry_enabled():
try:
adapter_name = telemetry.get_adapter_name(adapter)
result_builder = telemetry.get_result_builder_name(adapter)
# being defensive here with ensuring values exist
payload = telemetry.create_start_event_json(
len(self.graph.nodes) if hasattr(self, "graph") else 0,
len(modules) if modules else 0,
len(config) if config else 0,
dict(self.graph.decorator_counter) if hasattr(self, "graph") else {},
adapter_name,
result_builder,
self.driver_run_id,
error,
)
telemetry.send_event_json(payload)
except Exception as e:
# we don't want this to fail at all!
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Error caught in processing telemetry: {e}")

def _node_is_required_by_anything(self, node_: node.Node) -> bool:
"""Checks dependencies on this node and determines if at least one requires it.
Expand Down Expand Up @@ -133,12 +183,62 @@ def execute(
"display_graph=True is deprecated. It will be removed in the 2.0.0 release. "
"Please use visualize_execution()."
)
start_time = time.time()
run_successful = True
error = None
try:
outputs = self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
return self.adapter.build_result(**outputs)
result = self.adapter.build_result(**outputs)
return result
except Exception as e:
run_successful = False
logger.error(SLACK_ERROR_MESSAGE)
error = telemetry.sanitize_error(*sys.exc_info())
raise e
finally:
duration = time.time() - start_time
self.capture_execute_telemetry(
error, final_vars, inputs, overrides, run_successful, duration
)

def capture_execute_telemetry(
self,
error: Optional[str],
final_vars: List[str],
inputs: Dict[str, Any],
overrides: Dict[str, Any],
run_successful: bool,
duration: float,
):
"""Captures telemetry after execute has run.
Notes:
(1) we want to be quite defensive in not breaking anyone's code with things we do here.
(2) thus we want to double-check that values exist before doing something with them.
:param error: the sanitized error string to capture, if any.
:param final_vars: the list of final variables to get.
:param inputs: the inputs to the execute function.
:param overrides: any overrides to the execute function.
:param run_successful: whether this run was successful.
:param duration: time it took to run execute.
"""
if telemetry.is_telemetry_enabled():
try:
payload = telemetry.create_end_event_json(
run_successful,
duration,
len(final_vars) if final_vars else 0,
len(overrides) if isinstance(overrides, Dict) else 0,
len(inputs) if isinstance(overrides, Dict) else 0,
self.driver_run_id,
error,
)
telemetry.send_event_json(payload)
except Exception as e:
# we don't want this to fail at all!
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Error caught in processing telemetry:\n{e}")

def raw_execute(
self,
Expand Down Expand Up @@ -302,7 +402,6 @@ def what_is_upstream_of(self, *node_names: str) -> List[Variable]:
if __name__ == "__main__":
"""some example test code"""
import importlib
import sys

formatter = logging.Formatter("[%(levelname)s] %(asctime)s %(name)s(%(lineno)s): %(message)s")
stream_handler = logging.StreamHandler(sys.stdout)
Expand Down
72 changes: 69 additions & 3 deletions hamilton/experimental/h_async.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import asyncio
import inspect
import logging
import sys
import time
import types
import typing
from typing import Any, Dict, Optional
from types import ModuleType
from typing import Any, Dict, Optional, Tuple

from hamilton import base, driver, node
from hamilton import base, driver, node, telemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,9 +144,72 @@ async def execute(
"display_graph=True is not supported for the async graph adapter. "
"Instead you should be using visualize_execution."
)
start_time = time.time()
run_successful = True
error = None
try:
outputs = await self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
return self.adapter.build_result(**outputs)
result = self.adapter.build_result(**outputs)
return result
except Exception as e:
run_successful = False
logger.error(driver.SLACK_ERROR_MESSAGE)
error = telemetry.sanitize_error(*sys.exc_info())
raise e
finally:
duration = time.time() - start_time
# ensure we can capture telemetry in async friendly way.
if telemetry.is_telemetry_enabled():

async def make_coroutine():
self.capture_execute_telemetry(
error, final_vars, inputs, overrides, run_successful, duration
)

try:
# we don't have to await because we are running within the event loop.
asyncio.create_task(make_coroutine())
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.error(f"Encountered error submitting async telemetry:\n{e}")

def capture_constructor_telemetry(
self,
error: Optional[str],
modules: Tuple[ModuleType],
config: Dict[str, Any],
adapter: base.HamiltonGraphAdapter,
):
"""Ensures we capture constructor telemetry the right way in an async context.
This is a simpler wrapper around what's in the driver class.
:param error: sanitized error string, if any.
:param modules: tuple of modules to build DAG from.
:param config: config to create the driver.
:param adapter: adapter class object.
"""
if telemetry.is_telemetry_enabled():
try:
# check whether the event loop has been started yet or not
loop = asyncio.get_event_loop()
if loop.is_running():
loop.run_in_executor(
None,
super(AsyncDriver, self).capture_constructor_telemetry,
error,
modules,
config,
adapter,
)
else:

async def make_coroutine():
super(AsyncDriver, self).capture_constructor_telemetry(
error, modules, config, adapter
)

loop.run_until_complete(make_coroutine())
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.error(f"Encountered error submitting async telemetry:\n{e}")
26 changes: 26 additions & 0 deletions hamilton/function_modifiers/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import abc
import collections
import functools
from typing import Any, Callable, Collection, Dict, List, Tuple

from hamilton import node
Expand All @@ -18,6 +20,29 @@ def sanitize_function_name(name: str) -> str:
return name[:last_dunder_index] if last_dunder_index != -1 else name


COUNTER = collections.defaultdict(int)


def track_decorator_usage(call_fn: Callable) -> Callable:
"""Decorator to wrap the __call__ to count decorator usage.
:param call_fn: the `__call__` function.
:return: the wrapped call function.
"""

@functools.wraps(call_fn)
def replace__call__(self, fn):
global COUNTER
if self.__module__.startswith("hamilton.function_modifiers"):
# only capture counts for hamilton decorators
COUNTER[self.__class__.__name__] = COUNTER[self.__class__.__name__] + 1
else:
COUNTER["custom_decorator"] = COUNTER["custom_decorator"] + 1
return call_fn(self, fn)

return replace__call__


class NodeTransformLifecycle(abc.ABC):
"""Base class to represent the decorator lifecycle. Common among all node decorators."""

Expand Down Expand Up @@ -47,6 +72,7 @@ def validate(self, fn: Callable):
"""
pass

@track_decorator_usage
def __call__(self, fn: Callable):
"""Calls the decorator by adding attributes using the get_lifecycle_name string.
These attributes are the pointer to the decorator object itself, and used later in resolve_nodes below.
Expand Down
Loading

0 comments on commit 4316e01

Please sign in to comment.