diff --git a/CHANGELOG.md b/CHANGELOG.md index e1acb29..efe0b78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog --- + +## [0.1.3] - 2023-06-20 + +### Added + +- Support for notifications + ## [0.1.2] - 2022-11-07 ### Fixed diff --git a/VERSION.txt b/VERSION.txt index d917d3e..b1e80bb 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -0.1.2 +0.1.3 diff --git a/dagster_hex/ops.py b/dagster_hex/ops.py index c8a2fb6..0226ebf 100644 --- a/dagster_hex/ops.py +++ b/dagster_hex/ops.py @@ -32,6 +32,13 @@ default_value=None, description="Additional inputs for the Hex Project", ), + "notifications": Field( + Noneable(list), + is_required=False, + default_value=None, + description="A list of notification details that will be delivered " + + "once a project run completes", + ), "update_cache": Field( bool, is_required=False, @@ -71,6 +78,7 @@ def hex_project_op(context): hex_output: HexOutput = context.resources.hex.run_and_poll( project_id=context.op_config["project_id"], inputs=context.op_config["inputs"], + notifications=context.op_config["notifications"], update_cache=context.op_config["update_cache"], kill_on_timeout=context.op_config["kill_on_timeout"], poll_interval=context.op_config["poll_interval"], diff --git a/dagster_hex/resources.py b/dagster_hex/resources.py index 0c65ad4..c694c80 100644 --- a/dagster_hex/resources.py +++ b/dagster_hex/resources.py @@ -2,13 +2,18 @@ import logging import time from importlib.metadata import PackageNotFoundError, version -from typing import Any, Dict, Optional, cast +from typing import Any, Dict, List, Optional, cast from urllib.parse import urljoin import requests from dagster import Failure, Field, StringSource, get_dagster_logger, resource -from dagster_hex.types import HexOutput, RunResponse, StatusResponse +from dagster_hex.types import ( + HexOutput, + NotificationDetails, + RunResponse, + StatusResponse, +) from .consts import ( COMPLETE, @@ -39,7 +44,6 @@ def __init__( request_max_retries: int = 3, request_retry_delay: float = 0.25, ): - self._log = log self._api_key = api_key self._request_max_retries = request_max_retries @@ -129,6 +133,7 @@ def run_project( project_id: str, inputs: Optional[Dict[str, Any]] = None, update_cache: bool = False, + notifications: List[NotificationDetails] = [], ) -> RunResponse: """Trigger a sync and initiate a sync run @@ -151,6 +156,9 @@ def run_project( if inputs: data["inputParams"] = inputs + if notifications: + data["notifications"] = notifications + response = cast( RunResponse, self.make_request( @@ -203,6 +211,7 @@ def run_and_poll( self, project_id: str, inputs: Optional[dict], + notifications: List[NotificationDetails] = [], update_cache: bool = False, kill_on_timeout: bool = True, poll_interval: float = DEFAULT_POLL_INTERVAL, @@ -228,7 +237,12 @@ def run_and_poll( Returns: Dict[str, Any]: Parsed json output from the API """ - run_response = self.run_project(project_id, inputs, update_cache) + run_response = self.run_project( + project_id, + inputs=inputs, + notifications=notifications, + update_cache=update_cache, + ) run_id = run_response["runId"] poll_start = datetime.datetime.now() while True: diff --git a/dagster_hex/types.py b/dagster_hex/types.py index 4724445..546ab39 100644 --- a/dagster_hex/types.py +++ b/dagster_hex/types.py @@ -1,4 +1,4 @@ -from typing import NamedTuple, TypedDict +from typing import List, NamedTuple, TypedDict RunResponse = TypedDict( "RunResponse", @@ -10,6 +10,16 @@ "traceId": str, }, ) +NotificationResponse = TypedDict( + "NotificationResponse", + { + "type": str, + "recipientType": str, + "includeSuccessScreenshot": bool, + "recipients": List[dict], + }, +) + StatusResponse = TypedDict( "StatusResponse", @@ -22,6 +32,19 @@ "endTime": str, "elapsedTime": int, "traceId": str, + "notifications": List[NotificationResponse], + }, +) + + +NotificationDetails = TypedDict( + "NotificationDetails", + { + "type": str, + "includeSuccessScreenshot": bool, + "slackChannelIds": List[str], + "userIds": List[str], + "groupIds": List[str], }, ) diff --git a/example_job/example_hex.py b/example_job/example_hex.py new file mode 100644 index 0000000..099c537 --- /dev/null +++ b/example_job/example_hex.py @@ -0,0 +1,31 @@ +# resources.py +import os + +from dagster import job + +from dagster_hex.ops import hex_project_op +from dagster_hex.resources import hex_resource + +API_KEY = os.getenv("DAGSTER_HEX_API") +PROJ_ID = "db753fd1-2b7c-4dbe-8cbb-30742214cd9b" + +notifications = [ + { + "type": "SUCCESS", + "includeSuccessScreenshot": True, + "slackChannelIds": ["C059NUK6SQG"], + "userIds": [], + "groupIds": [], + } +] + + +my_resource = hex_resource.configured({"api_key": API_KEY}) +run_hex_op = hex_project_op.configured( + {"project_id": PROJ_ID, "notifications": notifications}, name="run_job" +) + + +@job(resource_defs={"hex": my_resource}) +def hex_job(): + run_hex_op() diff --git a/setup.cfg b/setup.cfg index a11d96f..8bce2d4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,7 +23,7 @@ python_requires = >=3.8 packages = find_namespace: include_package_data = true install_requires = - dagster>=1.0 + dagster>=1.3 dagit requests>=2 zip_safe = false