Skip to content

Commit

Permalink
parent 7b9f52a
Browse files Browse the repository at this point in the history
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182901 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182885 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182883 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182877 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182862 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182842 +0530

parent 7b9f52a
author Dimitry Foures <dimitry.foures@gmail.com> 1729160952 +0100
committer sikehish <hisham0502@gmail.com> 1730182748 +0530

Update materialization.rst

Fix mutate docstring

Docstring raised Pytest escape error due to /* so we wrap it in quotes.

Improve pipe_output first node naming

Current name convention is very prone to name clashes with user naming.

We assign the same naming convention using namespace.raw to indicate the
first node.

Provide dockerx setup for docker builds (#1194)

This change adds a script for multi-platform docker builds as
well as a github workflow to automatically build them when a new sf-hamilton-ui version
has been published.

Squashed commits:

* Created buildx_and_push.sh script in ui directory to create multi-platform docker builds

* Fixed content related issue in buildx_and_push.sh

* buildx_and_push.sh: Added functionality to fecth the latest version from PyPi

* buildx_and_push.sh: Added check_buildx_installed

* buildx_and_push.sh: Added check_buildx_installed

* buildx_and_push.sh: Enhanced error handling(checking if jq exists, curl response handling and docker buildx error handling)

* Adding build args to buildx_and_push.sh

* buildx_and_push.sh: Changes made to test a new workflow

* Created a new workflow: hamilton-ui-build-and-push

* buildx_and_push.sh: echo statement added to debug

* buildx_and_push.sh: cd'ing to the directory(ui) where the shell script is located to prevent context related errors in workflow.

* hamilton-ui-build-and-push.yml: Added Docker Hub login step

* buildx_and_push.sh: added dagworks dockerhub username; workflow worked on the fork.

* hamilton-ui-build-and-push.yml: Replaced previous version from cache with version tag from Dockerhub image.

* buildx_and_push.sh: Changed dockerhub username for testing

* hamilton-ui-build-and-push.yml: Minor change in the docker registry URL(version)

* hamilton-ui-build-and-push.yml: Minor change in the "Fetch highest version from Docker Hub" step's shell script

* hamilton-ui-build-and-push.yml: Replacing deprecated set-output with GITHUB_OUTPUT

* hamilton-ui-build-and-push.yml: Conditional execution of steps implemented

* Undid change in dockerhub username

* Update ui/buildx_and_push.sh

* Update ui/buildx_and_push.sh

* chore: fix pre-commit whitespace issues

---------

Co-authored-by: Stefan Krawczyk <stefan@dagworks.io>

Fix `keep_dot` propagation in `Driver` display functions

Bumps hamilton version from 1.81.0 to 1.81.1

fix: caching `SQLiteMetadataStore.get_run_ids()` (#1205)

* fixed .get_run_ids() and standardized .get_run() + tests

* fixed docstrings formatting errors

---------

Co-authored-by: zilto <tjean@DESKTOP-V6JDCS2>

Bumps hamilton version from 1.81.1 to 1.81.2

buildx_and_push.sh: Changes made to test a new workflow

buildx_and_push.sh: echo statement added to debug

buildx_and_push.sh: added dagworks dockerhub username; workflow worked on the fork.

buildx_and_push.sh: Changed dockerhub username for testing

Undid change in dockerhub username

Created a new update external blogs script which updates External Blogs section in learning_resources.md with the latest blogs(with a date cutoff)

Added docstring in update_blogs_in_learning_resources.py

chore: fix pre-commit whitespace issues

update_blogs_in_learning_resources.py: Added a "print to standard out" option (using --print flag)

update_blogs_in_learning_resources.py: Argument parser with expaned help text

update_blogs_in_learning_resources.py: Expanded docstring

Fixes whitespace and small docs typo

buildx_and_push.sh: Changes made to test a new workflow

buildx_and_push.sh: echo statement added to debug

hamilton-ui-build-and-push.yml: Added Docker Hub login step

buildx_and_push.sh: added dagworks dockerhub username; workflow worked on the fork.

buildx_and_push.sh: Changed dockerhub username for testing

hamilton-ui-build-and-push.yml: Minor change in the "Fetch highest version from Docker Hub" step's shell script

hamilton-ui-build-and-push.yml: Replacing deprecated set-output with GITHUB_OUTPUT

chore: fix pre-commit whitespace issues

Update ui/buildx_and_push.sh

Update ui/buildx_and_push.sh

Fixes whitespace and small docs typo
  • Loading branch information
thedimlebowski authored and sikehish committed Oct 29, 2024
1 parent 7b9f52a commit 8a2870e
Show file tree
Hide file tree
Showing 12 changed files with 528 additions and 39 deletions.
83 changes: 83 additions & 0 deletions .github/workflows/hamilton-ui-build-and-push.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Building and pushing UI frontend and backend images

on:
schedule:
- cron: '0 0 * * *'
workflow_dispatch:

jobs:
check_and_build:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Log in to Docker Hub
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_TOKEN }}

- name: Install jq
run: sudo apt-get update && sudo apt-get install -y jq

- name: Check latest version of sf-hamilton-ui
id: check_version
run: |
response=$(curl -s https://pypi.org/pypi/sf-hamilton-ui/json)
version=$(echo "$response" | jq -r '.info.version')
echo "latest_version=$version" >> $GITHUB_ENV
- name: Fetch highest version from Docker Hub
id: fetch_tags
run: |
# get the list of tags for the frontend image
tags=$(curl -s https://registry.hub.docker.com/v2/repositories/dagworks/ui-frontend/tags?page_size=100 | jq -r '.results[].name')
# find the highest version tag
highest_version="none"
for tag in $tags; do
if [[ "$tag" =~ ^[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
if [ "$highest_version" == "none" ] || [ "$(printf '%s\n' "$tag" "$highest_version" | sort -V | tail -n1)" == "$tag" ]; then
highest_version="$tag"
fi
fi
done
echo "Highest version on Docker Hub: $highest_version"
echo "highest_version=$highest_version" >> $GITHUB_OUTPUT
- name: Compare versions
id: compare_versions
run: |
echo "Current version: $latest_version"
highest_version=${{ steps.fetch_tags.outputs.highest_version }}
echo "Highest version found: $highest_version"
if [[ "$latest_version" == "$highest_version" ]]; then
echo "No new version found. Exiting."
echo "skip_build=true" >> $GITHUB_ENV
else
echo "New version detected: $latest_version"
echo "skip_build=false" >> $GITHUB_ENV
fi
- name: Run build and push script
if: env.skip_build != 'true'
run: |
chmod +x ./ui/buildx_and_push.sh
./ui/buildx_and_push.sh $latest_version
- name: Save the latest version to a file
if: env.skip_build != 'true'
run: |
mkdir -p version_cache
echo "$latest_version" > version_cache/previous_version.txt
- name: Cache the latest version
if: env.skip_build != 'true'
uses: actions/cache@v4
with:
path: ./version_cache
key: hamilton-ui-version-cache
2 changes: 1 addition & 1 deletion docs/concepts/materialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ So far, we executed our dataflow using the ``Driver.execute()`` method, which ca

On this page, you'll learn:

- How to load and data in Hamilton
- How to load and save data in Hamilton
- Why use materialization
- What are ``DataSaver`` and ``DataLoader`` objects
- The difference between ``.execute()`` and ``.materialize()``
Expand Down
18 changes: 11 additions & 7 deletions hamilton/caching/stores/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def set(self, cache_key: str, data_version: str, **kwargs) -> Optional[Any]:
"""

@abc.abstractmethod
def get(self, cache_key: str) -> Optional[str]:
def get(self, cache_key: str, **kwargs) -> Optional[str]:
"""Try to retrieve ``data_version`` keyed by ``cache_key``.
If retrieval misses return ``None``.
"""
Expand All @@ -118,15 +118,19 @@ def exists(self, cache_key: str) -> bool:
def get_run_ids(self) -> Sequence[str]:
"""Return a list of run ids, sorted from oldest to newest start time.
A ``run_id`` is registered when the metadata_store ``.initialize()`` is called.
NOTE because of race conditions, the order could theoretically differ from the
order stored on the SmartCacheAdapter `._run_ids` attribute.
"""

@abc.abstractmethod
def get_run(self, run_id: str) -> Any:
"""Return all the metadata associated with a run.
The metadata content may differ across MetadataStore implementations
def get_run(self, run_id: str) -> Sequence[dict]:
"""Return a list of node metadata associated with a run.
For each node, the metadata should include ``cache_key`` (created or used)
and ``data_version``. These values allow to manually query the MetadataStore
or ResultStore.
Decoding the ``cache_key`` gives the ``node_name``, ``code_version``, and
``dependencies_data_versions``. Individual implementations may add more
information or decode the ``cache_key`` before returning metadata.
"""

@property
Expand Down
82 changes: 61 additions & 21 deletions hamilton/caching/stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
from typing import List, Optional

from hamilton.caching.cache_key import decode_key
from hamilton.caching.stores.base import MetadataStore


Expand All @@ -19,14 +20,14 @@ def __init__(

self._thread_local = threading.local()

def _get_connection(self):
def _get_connection(self) -> sqlite3.Connection:
if not hasattr(self._thread_local, "connection"):
self._thread_local.connection = sqlite3.connect(
str(self._path), check_same_thread=False, **self.connection_kwargs
)
return self._thread_local.connection

def _close_connection(self):
def _close_connection(self) -> None:
if hasattr(self._thread_local, "connection"):
self._thread_local.connection.close()
del self._thread_local.connection
Expand Down Expand Up @@ -76,9 +77,9 @@ def _create_tables_if_not_exists(self):
"""\
CREATE TABLE IF NOT EXISTS cache_metadata (
cache_key TEXT PRIMARY KEY,
data_version TEXT NOT NULL,
node_name TEXT NOT NULL,
code_version TEXT NOT NULL,
data_version TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (cache_key) REFERENCES history(cache_key)
Expand Down Expand Up @@ -106,13 +107,21 @@ def set(
self,
*,
cache_key: str,
node_name: str,
code_version: str,
data_version: str,
run_id: str,
node_name: str = None,
code_version: str = None,
**kwargs,
) -> None:
cur = self.connection.cursor()

# if the caller of ``.set()`` directly provides the ``node_name`` and ``code_version``,
# we can skip the decoding step.
if (node_name is None) or (code_version is None):
decoded_key = decode_key(cache_key)
node_name = decoded_key["node_name"]
code_version = decoded_key["code_version"]

cur.execute("INSERT INTO history (cache_key, run_id) VALUES (?, ?)", (cache_key, run_id))
cur.execute(
"""\
Expand Down Expand Up @@ -150,7 +159,7 @@ def delete(self, cache_key: str) -> None:
cur.execute("DELETE FROM cache_metadata WHERE cache_key = ?", (cache_key,))
self.connection.commit()

def delete_all(self):
def delete_all(self) -> None:
"""Delete all existing tables from the database"""
cur = self.connection.cursor()

Expand All @@ -170,35 +179,66 @@ def exists(self, cache_key: str) -> bool:
return result is not None

def get_run_ids(self) -> List[str]:
"""Return a list of run ids, sorted from oldest to newest start time."""
cur = self.connection.cursor()
cur.execute("SELECT run_id FROM history ORDER BY id")
cur.execute("SELECT run_id FROM run_ids ORDER BY id")
result = cur.fetchall()

if result is None:
raise IndexError("No `run_id` found. Table `history` is empty.")
return [r[0] for r in result]

return result[0]
def _run_exists(self, run_id: str) -> bool:
"""Returns True if a run was initialized with ``run_id``, even
if the run recorded no node executions.
"""
cur = self.connection.cursor()
cur.execute(
"""\
SELECT EXISTS(
SELECT 1
FROM run_ids
WHERE run_id = ?
)
""",
(run_id,),
)
result = cur.fetchone()
# SELECT EXISTS returns 1 for True, i.e., `run_id` is found
return result[0] == 1

def get_run(self, run_id: str) -> List[dict]:
"""Return all the metadata associated with a run."""
"""Return a list of node metadata associated with a run.
:param run_id: ID of the run to retrieve
:return: List of node metadata which includes ``cache_key``, ``data_version``,
``node_name``, and ``code_version``. The list can be empty if a run was initialized
but no nodes were executed.
:raises IndexError: if the ``run_id`` is not found in metadata store.
"""
cur = self.connection.cursor()
if self._run_exists(run_id) is False:
raise IndexError(f"`run_id` not found in table `run_ids`: {run_id}")

cur.execute(
"""\
SELECT
cache_metadata.cache_key,
cache_metadata.data_version,
cache_metadata.node_name,
cache_metadata.code_version,
cache_metadata.data_version
FROM (SELECT * FROM history WHERE history.run_id = ?) AS run_history
JOIN cache_metadata ON run_history.cache_key = cache_metadata.cache_key
cache_metadata.code_version
FROM history
JOIN cache_metadata ON history.cache_key = cache_metadata.cache_key
WHERE history.run_id = ?
""",
(run_id,),
)
results = cur.fetchall()

if results is None:
raise IndexError(f"`run_id` not found in table `history`: {run_id}")

return [
dict(node_name=node_name, code_version=code_version, data_version=data_version)
for node_name, code_version, data_version in results
dict(
cache_key=cache_key,
data_version=data_version,
node_name=node_name,
code_version=code_version,
)
for cache_key, data_version, node_name, code_version in results
]
2 changes: 2 additions & 0 deletions hamilton/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,7 @@ def display_downstream_of(
display_fields=show_schema,
custom_style_function=custom_style_function,
config=self.graph._config,
keep_dot=keep_dot,
)
except ImportError as e:
logger.warning(f"Unable to import {e}", exc_info=True)
Expand Down Expand Up @@ -1263,6 +1264,7 @@ def display_upstream_of(
display_fields=show_schema,
custom_style_function=custom_style_function,
config=self.graph._config,
keep_dot=keep_dot,
)
except ImportError as e:
logger.warning(f"Unable to import {e}", exc_info=True)
Expand Down
11 changes: 6 additions & 5 deletions hamilton/function_modifiers/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ class pipe_output(base.NodeTransformer):
def B(...):
return ...
we obtain the new DAG **A --> B_raw --> B1 --> B2 --> B --> C**, where we can think of the **B_raw --> B1 --> B2 --> B** as a "pipe" that takes the raw output of **B**, applies to it
we obtain the new DAG **A --> B.raw --> B1 --> B2 --> B --> C**, where we can think of the **B.raw --> B1 --> B2 --> B** as a "pipe" that takes the raw output of **B**, applies to it
**B1**, takes the output of **B1** applies to it **B2** and then gets renamed to **B** to re-connect to the rest of the DAG.
The rules for chaining nodes are the same as for ``pipe_input``.
Expand Down Expand Up @@ -1282,7 +1282,7 @@ def transform_node(
) -> Collection[node.Node]:
"""Injects nodes into the graph.
We create a copy of the original function and rename it to `function_name_raw` to be the
We create a copy of the original function and rename it to `function_name.raw` to be the
initial node. Then we create a node for each step in `post-pipe` and chain them together.
The last node is an identity to the previous one with the original name `function_name` to
represent an exit point of `pipe_output`.
Expand All @@ -1299,7 +1299,8 @@ def transform_node(
else:
_namespace = self.namespace

original_node = node_.copy_with(name=f"{node_.name}_raw")
# We pick a reserved prefix that ovoids clashes with user defined functions / nodes
original_node = node_.copy_with(name=f"{node_.name}.raw")

def __identity(foo: Any) -> Any:
return foo
Expand Down Expand Up @@ -1455,7 +1456,7 @@ def _transform1(...):
def _transform2(...):
return ...
we obtain the new pipe-like subDAGs **A_raw --> _transform1 --> A** and **B_raw --> _transform1 --> _transform2 --> B**,
we obtain the new pipe-like subDAGs **A.raw --> _transform1 --> A** and **B.raw --> _transform1 --> _transform2 --> B**,
where the behavior is the same as ``pipe_output``.
While it is generally reasonable to use ``pipe_output``, you should consider ``mutate`` in the following scenarios:
Expand Down Expand Up @@ -1526,7 +1527,7 @@ def __init__(
:param target_functions: functions we wish to mutate the output of
:param collapse: Whether to collapse this into a single node. This is not currently supported.
:param _chain: Whether to chain the first parameter. This is the only mode that is supported. Furthermore, this is not externally exposed. ``@flow`` will make use of this.
:param \*\*mutating_function_kwargs: other kwargs that the decorated function has. Must be validly called as ``f(**kwargs)``, and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all ``target_functions``, unless ``apply_to`` already has the mutator function kwargs, in which case it takes those.
:param `**mutating_function_kwargs`: other kwargs that the decorated function has. Must be validly called as ``f(**kwargs)``, and have a 1-to-1 mapping of kwargs to parameters. This will be applied for all ``target_functions``, unless ``apply_to`` already has the mutator function kwargs, in which case it takes those.
"""
self.collapse = collapse
self.chain = _chain
Expand Down
2 changes: 1 addition & 1 deletion hamilton/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = (1, 81, 0)
VERSION = (1, 81, 2)
Loading

0 comments on commit 8a2870e

Please sign in to comment.