Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Adds telemetry #255

Merged
merged 6 commits into from
Jan 2, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .ci/setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ python --version
echo "----- pip version -----"
pip --version
echo "-----------------------"

# disable telemetry!
export HAMILTON_TELEMETRY_ENABLED=false
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,41 @@ If you are doing a lot of repetitive work, one might consider multiple cursors.

To use it hit `option + mouse click` to create multiple cursors. `Esc` to revert back to a normal mode.

# Usage analytics & data privacy
By default, when using Hamilton, it collects anonymous usage data to help improve Hamilton and know where to apply development
efforts.

We capture two events: one when the `Driver` object is instantiated, and one when the `execute()` call on the `Driver` object completes.
No user data or potentially sensitive information is or ever will be collected. The captured data is limited to:

* Operating System and Python version
* A persistent UUID to indentify the session, stored in ~/.hamilton.conf.
* Error stack trace limited to Hamilton code, if one occurs.
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
* Information on what features you're using from Hamilton: decorators, adapters, result builders.
* How Hamilton is being used: number of final nodes in DAG, number of modules, size of objects passed to `execute()`.

If you're worried, see telemetry.py for details.

If you do not wish to participate, one can opt-out with one of the following methods:
1. Set it to false programmatically in your code before creating a Hamilton driver:
```python
from hamilton import telemetry
telemetry.disable_telemetry()
```
2. Set the key `telemetry_enabled` to `false` in ~/.hamilton.conf under the `DEFAULT` section:
```
[DEFAULT]
telemetry_enabled = True
```
3. Set HAMILTON_TELEMETRY_ENABLED=false as an environment variable. Either setting it for your shell session:
```bash
export HAMILTON_TELEMETRY_ENABLED=false
```
or passing it as part of the run command:
```bash
HAMILTON_TELEMETRY_ENABLED=false python NAME_OF_MY_DRIVER.py
```

# Contributors

## Code Contributors
Expand Down
10 changes: 8 additions & 2 deletions examples/async/fastapi_example.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import async_module
import fastapi

from hamilton import base
from hamilton.experimental import h_async

app = fastapi.FastAPI()

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())


@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
"""Handler for pipeline call"""
dr = h_async.AsyncDriver({}, async_module)
input_data = {"request": request}
return await dr.raw_execute(["pipeline"], inputs=input_data)
# Can instantiate a driver within a request as well:
# dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
result = await dr.execute(["pipeline"], inputs=input_data)
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
return result


if __name__ == "__main__":
Expand Down
1 change: 0 additions & 1 deletion examples/async/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
aiohttp
async
fastapi
sf-hamilton
uvicorn
33 changes: 33 additions & 0 deletions graph_adapter_tests/h_async/test_h_async.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import asyncio
from unittest import mock

import pytest

from hamilton import base
from hamilton.experimental import h_async

from .resources import simple_async_module
Expand Down Expand Up @@ -57,3 +59,34 @@ async def test_driver_end_to_end():
"simple_async_func": 2,
"simple_non_async_func": 7,
}


@pytest.mark.asyncio
@mock.patch("hamilton.telemetry.send_event_json")
@mock.patch("hamilton.telemetry.g_telemetry_enabled", True)
async def test_driver_end_to_end_telemetry(send_event_json):
dr = h_async.AsyncDriver({}, simple_async_module, result_builder=base.DictResult())
all_vars = [var.name for var in dr.list_available_variables()]
result = await dr.execute(final_vars=all_vars, inputs={"external_input": 1})
assert result == {
"another_async_func": 8,
"async_func_with_param": 4,
"external_input": 1,
"non_async_func_with_decorator": {"result_1": 9, "result_2": 5},
"result_1": 9,
"result_2": 5,
"simple_async_func": 2,
"simple_non_async_func": 7,
}
# to ensure the last telemetry invocation finishes executing
# get all tasks -- and the current task, and await all others.
try:
# only works for 3.7+
tasks = asyncio.all_tasks()
current_task = asyncio.current_task()
await asyncio.gather(*[t for t in tasks if t != current_task])
except AttributeError:
# required for 3.6
await asyncio.sleep(1)
assert send_event_json.called
assert len(send_event_json.call_args_list) == 2
5 changes: 4 additions & 1 deletion hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import typing_inspect
from pandas.core.indexes import extension as pd_extension

from . import node
try:
from . import node
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
except ImportError:
import node

logger = logging.getLogger(__name__)

Expand Down
109 changes: 104 additions & 5 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import logging
import sys
import time

# required if we want to run this code stand alone.
import typing
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from types import ModuleType
from typing import Any, Collection, Dict, List
from typing import Any, Collection, Dict, List, Optional, Tuple

import pandas as pd

Expand All @@ -17,10 +20,12 @@
)

if __name__ == "__main__":
import base
import graph
import node
import telemetry
else:
from . import base, graph, node
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
from . import base, graph, node, telemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -53,14 +58,59 @@ def __init__(
:param adapter: Optional. A way to wire in another way of "executing" a hamilton graph.
Defaults to using original Hamilton adapter which is single threaded in memory python.
"""
self.driver_run_id = uuid.uuid4()
if adapter is None:
adapter = base.SimplePythonDataFrameGraphAdapter()
error = None
try:
self.graph = graph.FunctionGraph(*modules, config=config, adapter=adapter)
self.adapter = adapter
except Exception as e:
error = telemetry.sanitize_error(*sys.exc_info())
logger.error(SLACK_ERROR_MESSAGE)
raise e
self.adapter = adapter
finally:
self.capture_constructor_telemetry(error, modules, config, adapter)

def capture_constructor_telemetry(
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
self,
error: Optional[str],
modules: Tuple[ModuleType],
config: Dict[str, Any],
adapter: base.HamiltonGraphAdapter,
):
"""Captures constructor telemetry.
skrawcz marked this conversation as resolved.
Show resolved Hide resolved

Notes:
(1) we want to do this in a way that does not break.
(2) we need to account for all possible states, e.g. someone passing in None, or assuming that
the entire constructor code ran without issue, e.g. `adpater` was assigned to `self`.

:param error: the sanitized error string to send.
:param modules: the list of modules, could be None.
:param config: the config dict passed, could be None.
:param adapter: the adapter passed in, might not be attached to `self` yet.
"""
if telemetry.is_telemetry_enabled():
try:
adapter_name = telemetry.get_adapter_name(adapter)
result_builder = telemetry.get_result_builder_name(adapter)
# being defensive here with ensuring values exist
payload = telemetry.create_start_event_json(
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
len(self.graph.nodes) if hasattr(self, "graph") else 0,
len(modules) if modules else 0,
len(config) if config else 0,
dict(self.graph.decorator_counter) if hasattr(self, "graph") else {},
adapter_name,
result_builder,
self.driver_run_id,
error,
)
telemetry.send_event_json(payload)
except Exception as e:
# we don't want this to fail at all!
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Error caught in processing telemetry: {e}")

def _node_is_required_by_anything(self, node_: node.Node) -> bool:
"""Checks dependencies on this node and determines if at least one requires it.
Expand Down Expand Up @@ -133,12 +183,62 @@ def execute(
"display_graph=True is deprecated. It will be removed in the 2.0.0 release. "
"Please use visualize_execution()."
)
start_time = time.time()
run_successful = True
error = None
try:
outputs = self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
return self.adapter.build_result(**outputs)
result = self.adapter.build_result(**outputs)
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
return result
except Exception as e:
run_successful = False
logger.error(SLACK_ERROR_MESSAGE)
error = telemetry.sanitize_error(*sys.exc_info())
raise e
finally:
duration = time.time() - start_time
self.capture_execute_telemetry(
error, final_vars, inputs, overrides, run_successful, duration
)

def capture_execute_telemetry(
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
self,
error: Optional[str],
final_vars: List[str],
inputs: Dict[str, Any],
overrides: Dict[str, Any],
run_successful: bool,
duration: float,
):
"""Captures telemetry after execute has run.

Notes:
(1) we want to be quite defensive in not breaking anyone's code with things we do here.
(2) thus we want to double-check that values exist before doing something with them.

:param error: the sanitized error string to capture, if any.
:param final_vars: the list of final variables to get.
:param inputs: the inputs to the execute function.
:param overrides: any overrides to the execute function.
:param run_successful: whether this run was successful.
:param duration: time it took to run execute.
"""
if telemetry.is_telemetry_enabled():
try:
payload = telemetry.create_end_event_json(
run_successful,
duration,
len(final_vars) if final_vars else 0,
len(overrides) if isinstance(overrides, Dict) else 0,
len(inputs) if isinstance(overrides, Dict) else 0,
self.driver_run_id,
error,
)
telemetry.send_event_json(payload)
except Exception as e:
# we don't want this to fail at all!
if logger.isEnabledFor(logging.DEBUG):
logger.debug(f"Error caught in processing telemetry:\n{e}")

def raw_execute(
self,
Expand Down Expand Up @@ -302,7 +402,6 @@ def what_is_upstream_of(self, *node_names: str) -> List[Variable]:
if __name__ == "__main__":
"""some example test code"""
import importlib
import sys

formatter = logging.Formatter("[%(levelname)s] %(asctime)s %(name)s(%(lineno)s): %(message)s")
stream_handler = logging.StreamHandler(sys.stdout)
Expand Down
72 changes: 69 additions & 3 deletions hamilton/experimental/h_async.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import asyncio
import inspect
import logging
import sys
import time
import types
import typing
from typing import Any, Dict, Optional
from types import ModuleType
from typing import Any, Dict, Optional, Tuple

from hamilton import base, driver, node
from hamilton import base, driver, node, telemetry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,9 +144,72 @@ async def execute(
"display_graph=True is not supported for the async graph adapter. "
"Instead you should be using visualize_execution."
)
start_time = time.time()
run_successful = True
error = None
try:
outputs = await self.raw_execute(final_vars, overrides, display_graph, inputs=inputs)
return self.adapter.build_result(**outputs)
result = self.adapter.build_result(**outputs)
return result
except Exception as e:
run_successful = False
logger.error(driver.SLACK_ERROR_MESSAGE)
error = telemetry.sanitize_error(*sys.exc_info())
raise e
finally:
duration = time.time() - start_time
# ensure we can capture telemetry in async friendly way.
if telemetry.is_telemetry_enabled():

async def make_coroutine():
self.capture_execute_telemetry(
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
error, final_vars, inputs, overrides, run_successful, duration
)

try:
# we don't have to await because we are running within the event loop.
asyncio.create_task(make_coroutine())
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.error(f"Encountered error submitting async telemetry:\n{e}")

def capture_constructor_telemetry(
self,
error: Optional[str],
modules: Tuple[ModuleType],
config: Dict[str, Any],
adapter: base.HamiltonGraphAdapter,
):
"""Ensures we capture constructor telemetry the right way in an async context.

This is a simpler wrapper around what's in the driver class.

:param error: sanitized error string, if any.
:param modules: tuple of modules to build DAG from.
:param config: config to create the driver.
:param adapter: adapter class object.
"""
if telemetry.is_telemetry_enabled():
try:
# check whether the event loop has been started yet or not
loop = asyncio.get_event_loop()
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
if loop.is_running():
loop.run_in_executor(
None,
super(AsyncDriver, self).capture_constructor_telemetry,
error,
modules,
config,
adapter,
)
else:

async def make_coroutine():
super(AsyncDriver, self).capture_constructor_telemetry(
error, modules, config, adapter
)

loop.run_until_complete(make_coroutine())
except Exception as e:
if logger.isEnabledFor(logging.DEBUG):
logger.error(f"Encountered error submitting async telemetry:\n{e}")
Loading