Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance HexRunProjectOperator and HexHook #21

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
repos:
- repo: https://github.com/psf/black
rev: 22.6.0
rev: 24.8.0
hooks:
- id: black
args: ["--target-version=py38", "--line-length=88"]

- repo: https://github.com/pycqa/isort
rev: 5.10.1
rev: 5.13.2
hooks:
- id: isort
args: ["--profile=black"]

- repo: https://gitlab.com/pycqa/flake8
rev: 3.9.2
- repo: https://github.com/pycqa/flake8
rev: 7.1.1
hooks:
- id: flake8

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.1.0
rev: v4.6.0
hooks:
- id: check-merge-conflict
- id: check-toml
Expand All @@ -28,14 +28,14 @@ repos:
- id: trailing-whitespace

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.931
rev: v1.11.2
hooks:
- id: mypy
exclude: ^tests/
additional_dependencies: [ types-requests ]

- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
rev: v2.3.0
hooks:
- id: codespell
name: Run codespell to check for common misspellings in files
Expand Down
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@
-

### Fixed
-

## [0.1.10] - 2024-09-09

### Added
- Enhanced retry mechanism for polling project status
- New `max_poll_retries` and `poll_retry_delay` parameters for `HexRunProjectOperator`
- New `run_status_with_retries` method in `HexHook`
- New `poll_project_status` method in `HexHook` with improved error handling

### Changed
- Improved error handling for API calls and status checks


## [0.1.9] - 2023-05-16
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ clean:
docker-compose -f dev/docker-compose.yaml down --volumes --remove-orphans

init:
docker-compose up -f dev/docker-compose.yaml airflow-init
docker-compose -f dev/docker-compose.yaml up airflow-init
98 changes: 66 additions & 32 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
# Hex Airflow Provider
# Airflow Provider for Hex

Provides an Airflow Operator and Hook to trigger Hex project runs.
[![PyPI version](https://badge.fury.io/py/airflow-provider-hex.svg)](https://badge.fury.io/py/airflow-provider-hex)

This [Airflow Provider Package](https://airflow.apache.org/docs/apache-airflow-providers/)
provides Hooks and Operators for interacting with the Hex API.
This [Airflow Provider Package](https://airflow.apache.org/docs/apache-airflow-providers/) provides Hooks and Operators for interacting with the Hex API, allowing you to trigger and manage Hex project runs in your Apache Airflow DAGs.

## Table of Contents
- [Requirements](#requirements)
- [Installation](#installation)
- [Initial Setup](#initial-setup)
- [Operators](#operators)
- [Hooks](#hooks)
- [Examples](#examples)
- [Development](#development)
- [Changelog](#changelog)

## Requirements

* Airflow >=2.2
* Apache Airflow >= 2.2.0
* Python >= 3.7
* Hex API Token

## Initial Setup
## Installation

Install the package.
Install the package using pip:

```
```bash
pip install airflow-provider-hex
```

After creating a Hex API token, set up your Airflow Connection Credentials in the Airflow
UI.
## Initial Setup

After creating a Hex API token, set up your Airflow Connection Credentials in the Airflow UI:

![Connection Setup](https://raw.githubusercontent.com/hex-inc/airflow-provider-hex/main/docs/hex-connection-setup.png)

Expand All @@ -30,41 +41,38 @@ UI.

## Operators

The [`airflow_provider_hex.operators.hex.HexRunProjectOperator`](/airflow_provider_hex/operators/hex.py)
Operator runs Hex Projects, either synchronously or asynchronously.

In the synchronous mode, the Operator will start a Hex Project run and then
poll the run until either an error or success status is returned, or until
the poll timeout. If the timeout occurs, the default behaviour is to attempt to
cancel the run.
The [`HexRunProjectOperator`](/airflow_provider_hex/operators/hex.py) runs Hex Projects either synchronously or asynchronously.

In the asynchronous mode, the Operator will request that a Hex Project is run,
but will not poll for completion. This can be useful for long-running projects.
- In synchronous mode, the Operator starts a Hex Project run and polls until completion or timeout.
- In asynchronous mode, the Operator requests a Hex Project run without waiting for completion.

The operator accepts inputs in the form of a dictionary. These can be used to
override existing input elements in your Hex project.
The operator accepts inputs as a dictionary to override existing input elements in your Hex project. You can also include optional notifications for a run.

You may also optionally include notifications for a particular run. See
the [Hex API documentation](https://learn.hex.tech/docs/develop-logic/hex-api/api-reference#operation/RunProject) for details.
For more details, see the [Hex API documentation](https://learn.hex.tech/docs/develop-logic/hex-api/api-reference#operation/RunProject).

## Hooks

The [`airflow_provider_hex.hooks.hex.HexHook`](/airflow_provider_hex/hooks/hex.py)
provides a low-level interface to the Hex API.

These can be useful for testing and development, as they provide both a generic
`run` method which sends an authenticated request to the Hex API, as well as
implementations of the `run` method that provide access to specific endpoints.

The [`HexHook`](/airflow_provider_hex/hooks/hex.py) provides a low-level interface to the Hex API. It's useful for testing and development, offering both a generic `run` method for authenticated requests and specific endpoint implementations.

## Examples

A simplified example DAG demonstrates how to use the [Airflow Operator](/example_dags/example_hex.py)
Here's a simplified example DAG demonstrating how to use the HexRunProjectOperator:

```python
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_provider_hex.operators.hex import HexRunProjectOperator
from airflow_provider_hex.types import NotificationDetails

PROJ_ID = 'abcdef-ghijkl-mnopq'

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

dag = DAG('hex_example', default_args=default_args, schedule_interval=None)

notifications: list[NotificationDetails] = [
{
"type": "SUCCESS",
Expand All @@ -74,7 +82,7 @@ notifications: list[NotificationDetails] = [
"groupIds": [],
}
]
...

sync_run = HexRunProjectOperator(
task_id="run",
hex_conn_id="hex_default",
Expand All @@ -83,3 +91,29 @@ sync_run = HexRunProjectOperator(
notifications=notifications
)
```

For more examples, check the [example_dags](/example_dags) directory.

## Development

To set up the development environment:

1. Clone the repository
2. Install development dependencies: `pip install -e .[dev]`
3. Install pre-commit hooks: `pre-commit install`

To run tests:

```bash
make tests
```

To run linters:

```bash
make lint
```

## Changelog

See the [CHANGELOG.md](CHANGELOG.md) file for details on all changes and past releases.
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.1.9
0.1.10
1 change: 1 addition & 0 deletions airflow_provider_hex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Version information for the package."""

import os
import sys

Expand Down
88 changes: 68 additions & 20 deletions airflow_provider_hex/hooks/hex.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from importlib_metadata import PackageNotFoundError, version
from requests.exceptions import RequestException
from tenacity import retry, stop_after_attempt, wait_fixed

from airflow_provider_hex.types import NotificationDetails, RunResponse, StatusResponse

Expand Down Expand Up @@ -151,52 +153,74 @@ def run_project(
),
)

def run_status(self, project_id, run_id) -> StatusResponse:
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
def run_status(self, project_id: str, run_id: str) -> StatusResponse:
endpoint = f"api/v1/project/{project_id}/run/{run_id}"
method = "GET"
try:
response = self.run(method=method, endpoint=endpoint, data=None)
return cast(StatusResponse, response)
except RequestException as e:
self.log.error(f"API call failed: {str(e)}")
raise

return cast(
StatusResponse, self.run(method=method, endpoint=endpoint, data=None)
)

def cancel_run(self, project_id, run_id) -> str:
def cancel_run(self, project_id: str, run_id: str) -> str:
endpoint = f"api/v1/project/{project_id}/run/{run_id}"
method = "DELETE"

self.run(method=method, endpoint=endpoint)
return run_id

def run_and_poll(
def run_status_with_retries(
self, project_id: str, run_id: str, max_retries: int = 3, retry_delay: int = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A retry limit should be enforced server-side to prevent the caller from specifying an excessive number of retries for project runs, which could lead to resource exhaustion or unintended behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is implementing a server-side retry limit currently prioritized? As this PR focuses on client-side Airflow hooks, I can hardcode a conservative number of retries and delay to prevent configurability. This would address immediate concerns while keeping the implementation on the client side. Let me know if you'd like me to proceed with this approach.

Copy link
Collaborator

@clrcrl clrcrl Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay here! (Also this is a product-manager translation from an engineer, so apologies if I lose something in translation!)

We rate limit requests, so at this time don't intend to implement a server-side retry limit. Also, our API is fairly non-flakey, so we don't think users will need to retry that often.

That being said, it would still be great to hardcode a conservative number of retries juuuust in case — 3 seems like a good spot!

) -> StatusResponse:
@retry(stop=stop_after_attempt(max_retries), wait=wait_fixed(retry_delay))
def _run_status():
return self.run_status(project_id, run_id)

return _run_status()

def poll_project_status(
self,
project_id: str,
inputs: Optional[dict],
update_cache: bool = False,
run_id: str,
poll_interval: int = 3,
poll_timeout: int = 600,
kill_on_timeout: bool = True,
notifications: List[NotificationDetails] = [],
):
run_response = self.run_project(project_id, inputs, update_cache, notifications)
run_id = run_response["runId"]

max_poll_retries: int = 3,
poll_retry_delay: int = 5,
) -> StatusResponse:
poll_start = datetime.datetime.now()
while True:
run_status = self.run_status(project_id, run_id)
try:
run_status = self.run_status_with_retries(
project_id, run_id, max_poll_retries, poll_retry_delay
)
except Exception as e:
self.log.error(
f"Failed to get run status after {max_poll_retries} "
f"attempts: {str(e)}"
)
if kill_on_timeout:
self.cancel_run(project_id, run_id)
raise AirflowException(
"Failed to get run status for project "
f"{project_id} with run: {run_id}"
)

project_status = run_status["status"]

self.log.info(
f"Polling Hex Project {project_id}. Status: {project_status}."
)
if project_status not in VALID_STATUSES:
raise AirflowException(f"Unhandled status: {project_status}")
Comment on lines -190 to -191

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to remove this exception

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this back


if project_status == COMPLETE:
break
return run_status

if project_status in TERMINAL_STATUSES:
raise AirflowException(
f"Project Run failed with status {project_status}. "
f"See Run URL for more info {run_response['runUrl']}"
f"See Run URL for more info {run_status['runUrl']}"
)

if (
Expand All @@ -217,4 +241,28 @@ def run_and_poll(
)

time.sleep(poll_interval)
return run_status

def run_and_poll(
self,
project_id: str,
inputs: Optional[dict],
update_cache: bool = False,
poll_interval: int = 3,
poll_timeout: int = 600,
kill_on_timeout: bool = True,
notifications: List[NotificationDetails] = [],
max_poll_retries: int = 3,
poll_retry_delay: int = 5,
):
run_response = self.run_project(project_id, inputs, update_cache, notifications)
run_id = run_response["runId"]

return self.poll_project_status(
project_id,
run_id,
poll_interval,
poll_timeout,
kill_on_timeout,
max_poll_retries,
poll_retry_delay,
)
Loading