-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds code for snowflake container example running the UI #1257
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
FROM python:3.13.1-slim | ||
|
||
RUN apt update | ||
RUN apt upgrade sqlite3 -y | ||
RUN pip install "sf-hamilton[ui,sdk]" | ||
RUN pip install flask | ||
|
||
ENV HAMILTON_ALLOWED_HOSTS=".snowflakecomputing.app" | ||
ENV SERVER_PORT=8001 | ||
|
||
COPY pipeline_endpoint.py /pipeline_endpoint.py | ||
COPY my_functions.py /my_functions.py | ||
|
||
ENTRYPOINT /bin/bash -c "(hamilton ui --base-dir /hamilton-basedir &) && python /pipeline_endpoint.py" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Running the Hamilton & the Hamilton UI in Snowflake | ||
|
||
This example is code for the ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by | ||
[Greg Kantyka](https://medium.com/@pkantyka). | ||
|
||
Here we show the code required to be packaged up for use on Snowflake: | ||
|
||
1. Docker file that runs the Hamilton UI and a flask endpoint to exercise Hamilton code | ||
2. my_functions.py - the Hamilton code that is exercised by the flask endpoint | ||
3. pipeline_endpoint.py - the flask endpoint that exercises the Hamilton code | ||
|
||
To run see: | ||
- snowflake.sql that contains all the SQL to create the necessary objects in Snowflake and exercise things. | ||
|
||
For more details see ["Observability of Python code and application logic with Hamilton UI on Snowflake Container Services" post](https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635) by | ||
[Greg Kantyka](https://medium.com/@pkantyka). |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import pandas as pd | ||
|
||
|
||
def spend_mean(spend: pd.Series) -> float: | ||
"""Shows function creating a scalar. In this case it computes the mean of the entire column.""" | ||
return spend.mean() | ||
|
||
|
||
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series: | ||
"""Shows function that takes a scalar. In this case to zero mean spend.""" | ||
return spend - spend_mean | ||
|
||
|
||
def spend_std_dev(spend: pd.Series) -> float: | ||
"""Function that computes the standard deviation of the spend column.""" | ||
return spend.std() | ||
|
||
|
||
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series: | ||
"""Function showing one way to make spend have zero mean and unit variance.""" | ||
return spend_zero_mean / spend_std_dev |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
""" | ||
This module: | ||
- Defines a flask app that listens for POST requests on /echo | ||
- the /echo command is invoked from a Snowflake SQL query | ||
- the /echo command calls a function get_response that is defined in this module | ||
- get_response uses Hamilton to execute a pipeline defined in my_functions.py | ||
- my_functions.py contains the functions that are used in the pipeline | ||
- the pipeline is executed with the input data from the Snowflake query | ||
- the output of the pipeline is returned to Snowflake | ||
- the Hamilton UI tracker is used to track the execution of the pipeline | ||
""" | ||
|
||
import logging | ||
import os | ||
import sys | ||
|
||
import pandas as pd | ||
from flask import Flask, make_response, request | ||
|
||
from hamilton import registry | ||
|
||
registry.disable_autoload() | ||
import my_functions # we import the module here! | ||
|
||
from hamilton import driver | ||
from hamilton_sdk import adapters | ||
|
||
# WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### | ||
|
||
SERVICE_HOST = os.getenv("SERVER_HOST", "0.0.0.0") | ||
SERVICE_PORT = os.getenv("SERVER_PORT", 8080) | ||
CHARACTER_NAME = os.getenv("CHARACTER_NAME", "I") | ||
|
||
|
||
def get_logger(logger_name): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
logger = logging.getLogger(logger_name) | ||
logger.setLevel(logging.DEBUG) | ||
handler = logging.StreamHandler(sys.stdout) | ||
handler.setLevel(logging.DEBUG) | ||
handler.setFormatter(logging.Formatter("%(name)s [%(asctime)s] [%(levelname)s] %(message)s")) | ||
logger.addHandler(handler) | ||
return logger | ||
|
||
|
||
logger = get_logger("echo-service") | ||
|
||
app = Flask(__name__) | ||
|
||
|
||
@app.get("/healthcheck") | ||
def readiness_probe(): | ||
return "OK" | ||
|
||
|
||
@app.post("/echo") | ||
def echo(): | ||
"""This is the endpoint that Snowflake will call to run Hamilton code.""" | ||
message = request.json | ||
logger.debug(f"Received request: {message}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid logging sensitive data. The request data being logged here could contain sensitive information. Consider removing or sanitizing this log statement. |
||
|
||
if message is None or not message["data"]: | ||
logger.info("Received empty message") | ||
return {} | ||
|
||
input_rows = message["data"] | ||
logger.info(f"Received {len(input_rows)} rows") | ||
|
||
output_rows = [[row[0], get_response(row[1], row[2], row[3], row[4])] for row in input_rows] | ||
logger.info(f"Produced {len(output_rows)} rows") | ||
|
||
response = make_response({"data": output_rows}) | ||
response.headers["Content-type"] = "application/json" | ||
logger.debug(f"Sending response: {response.json}") | ||
return response | ||
|
||
|
||
# END OF WRAPPER CODE FOR SNOWFLAKE FUNCTION ###### | ||
|
||
|
||
def get_response(prj_id, spend, signups, output_columns): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
"""The function that is called from SQL on Snowflake.""" | ||
tracker = adapters.HamiltonTracker( | ||
project_id=prj_id, | ||
username="admin", | ||
dag_name="MYDAG", | ||
tags={"environment": "R&D", "team": "MY_TEAM", "version": "Beta"}, | ||
) | ||
input_columns = { | ||
"signups": pd.Series(spend), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
"spend": pd.Series(signups), | ||
} | ||
dr = ( | ||
driver.Builder() | ||
.with_config({}) # we don't have any configuration or invariant data for this example. | ||
.with_modules( | ||
my_functions | ||
) # we need to tell hamilton where to load function definitions from | ||
.with_adapters(tracker) # we add the Hamilton UI tracker | ||
.build() | ||
) | ||
|
||
df = dr.execute(output_columns, inputs=input_columns) | ||
|
||
serializable_df = {} | ||
|
||
for key, value in df.items(): | ||
if isinstance(value, pd.Series): | ||
# Convert Series to dict (or .tolist() for just values) | ||
serializable_df[key] = {str(k): v for k, v in value.to_dict().items()} | ||
else: | ||
# Pass other values as is | ||
serializable_df[key] = value | ||
|
||
return serializable_df | ||
|
||
|
||
if __name__ == "__main__": | ||
app.run(host=SERVICE_HOST, port=SERVICE_PORT) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
flask | ||
sf-hamilton[ui,sdk] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
-- For more details visit: | ||
-- https://medium.com/@pkantyka/observability-of-python-code-and-application-logic-with-hamilton-ui-on-snowflake-container-services-a26693b46635 | ||
|
||
CREATE SERVICE public.hamilton_ui | ||
IN COMPUTE POOL TEST_POOL | ||
FROM SPECIFICATION $$ | ||
spec: | ||
containers: | ||
- name: hamiltonui | ||
image: <account-url-registry-host>/<db-name>/<schema-name>/<repo-name>/snowflake-hamilton-ui | ||
volumeMounts: | ||
- name: hamilton-basedir | ||
mountPath: /hamilton-basedir | ||
endpoints: | ||
- name: entrypoint | ||
port: 8001 | ||
- name: hamilton | ||
port: 8241 | ||
public: true | ||
volumes: | ||
- name: hamilton-basedir | ||
source: "@<db-name>.<schema-name>.hamilton_base" | ||
$$ | ||
QUERY_WAREHOUSE = <warehause-name> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a typo in |
||
; | ||
|
||
CALL SYSTEM$GET_SERVICE_STATUS('<db-name>.<schema>.hamilton_ui'); | ||
|
||
CALL SYSTEM$GET_SERVICE_LOGS('<db-name>.<schema>.hamilton_ui', '0', 'hammiltonui', 1000); | ||
|
||
SHOW ENDPOINTS IN SERVICE public.hamilton_ui; | ||
|
||
CREATE OR REPLACE FUNCTION public.hamilton_pipeline (prj_id number, signups variant, spend variant, output_columns variant) | ||
RETURNS VARIANT | ||
SERVICE=public.hamilton_ui | ||
ENDPOINT=entrypoint | ||
AS '/echo'; | ||
|
||
|
||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data; | ||
|
||
WITH input_data AS ( | ||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data | ||
), | ||
flattened AS ( | ||
SELECT | ||
key AS metric_key, | ||
value AS metric_value | ||
FROM | ||
input_data | ||
left join LATERAL FLATTEN(input_data.data) | ||
) | ||
SELECT | ||
* | ||
FROM | ||
flattened f; | ||
|
||
WITH input_data AS ( | ||
SELECT | ||
public.hamilton_pipeline ( | ||
1, | ||
[1, 10, 50, 100, 200, 400], | ||
[10, 10, 20, 40, 40, 50], | ||
[ 'spend', 'signups', 'spend_std_dev', 'spend_zero_mean_unit_variance' ] | ||
) as data | ||
), | ||
flattened AS ( | ||
SELECT | ||
key AS metric_key, | ||
value AS metric_value | ||
FROM | ||
input_data | ||
left join LATERAL FLATTEN(input_data.data) | ||
) | ||
SELECT | ||
f2.key, | ||
f2.value | ||
FROM | ||
flattened f | ||
left join lateral flatten(metric_value) f2 | ||
where | ||
metric_key = 'spend_zero_mean_unit_variance'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using a stable Python version, such as 3.11.x, instead of 3.13.1, which is not a stable release.