Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add tag kwarg to set Snowflake online store table path #3176

Merged
merged 1 commit into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions docs/reference/online-stores/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ The data model for using a Snowflake Transient Table as an online store follows
(This model may be subject to change when Snowflake Hybrid Tables are released)

## Example

{% code title="feature_store.yaml" %}
```yaml
project: my_feature_repo
Expand All @@ -34,14 +33,27 @@ online_store:
```
{% endcode %}

## Tags KWARGs Actions:

"ONLINE_PATH": Adding the "ONLINE_PATH" key to a FeatureView tags parameter allows you to choose the online table path for the online serving table (ex. "{database}"."{schema}").

{% code title="example_config.py" %}
```python
driver_stats_fv = FeatureView(
...
tags={"snowflake-online-store/online_path": '"FEAST"."ONLINE"'},
)
```
{% endcode %}

The full set of configuration options is available in [SnowflakeOnlineStoreConfig](https://rtd.feast.dev/en/latest/#feast.infra.online_stores.snowflake.SnowflakeOnlineStoreConfig).

## Functionality Matrix

The set of functionality supported by online stores is described in detail [here](overview.md#functionality).
Below is a matrix indicating which functionality is supported by the Snowflake online store.

| | Snowflake |
| | Snowflake |
| :-------------------------------------------------------- | :-- |
| write feature values to the online store | yes |
| read feature values from the online store | yes |
Expand Down
10 changes: 7 additions & 3 deletions sdk/python/feast/infra/materialization/snowflake_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
assert_snowflake_feature_names,
execute_snowflake_statement,
get_snowflake_conn,
get_snowflake_online_store_path,
package_snowpark_zip,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -373,8 +374,6 @@ def materialize_to_snowflake_online_store(
) -> None:
assert_snowflake_feature_names(feature_view)

online_table = f"""{repo_config .online_store.database}"."{repo_config.online_store.schema_}"."[online-transient] {project}_{feature_view.name}"""

feature_names_str = '", "'.join(
[feature.name for feature in feature_view.features]
)
Expand All @@ -384,8 +383,13 @@ def materialize_to_snowflake_online_store(
else:
fv_created_str = None

online_path = get_snowflake_online_store_path(repo_config, feature_view)
online_table = (
f'{online_path}."[online-transient] {project}_{feature_view.name}"'
)

query = f"""
MERGE INTO "{online_table}" online_table
MERGE INTO {online_table} online_table
USING (
SELECT
"entity_key" || TO_BINARY("feature_name", 'UTF-8') AS "entity_feature_key",
Expand Down
24 changes: 12 additions & 12 deletions sdk/python/feast/infra/online_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from feast.infra.utils.snowflake.snowflake_utils import (
execute_snowflake_statement,
get_snowflake_conn,
get_snowflake_online_store_path,
write_pandas_binary,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -111,9 +112,7 @@ def online_write_batch(
agg_df = pd.concat(dfs)

# This combines both the data upload plus the overwrite in the same transaction
table_path = (
f'"{config.online_store.database}"."{config.online_store.schema_}"'
)
online_path = get_snowflake_online_store_path(config, table)
with get_snowflake_conn(config.online_store, autocommit=False) as conn:
write_pandas_binary(
conn,
Expand All @@ -124,7 +123,7 @@ def online_write_batch(
) # special function for writing binary to snowflake

query = f"""
INSERT OVERWRITE INTO {table_path}."[online-transient] {config.project}_{table.name}"
INSERT OVERWRITE INTO {online_path}."[online-transient] {config.project}_{table.name}"
SELECT
"entity_feature_key",
"entity_key",
Expand All @@ -137,7 +136,7 @@ def online_write_batch(
*,
ROW_NUMBER() OVER(PARTITION BY "entity_key","feature_name" ORDER BY "event_ts" DESC, "created_ts" DESC) AS "_feast_row"
FROM
{table_path}."[online-transient] {config.project}_{table.name}")
{online_path}."[online-transient] {config.project}_{table.name}")
WHERE
"_feast_row" = 1;
"""
Expand Down Expand Up @@ -174,13 +173,13 @@ def online_read(
]
)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
online_path = get_snowflake_online_store_path(config, table)
with get_snowflake_conn(config.online_store) as conn:
query = f"""
SELECT
"entity_key", "feature_name", "value", "event_ts"
FROM
{table_path}."[online-transient] {config.project}_{table.name}"
{online_path}."[online-transient] {config.project}_{table.name}"
WHERE
"entity_feature_key" IN ({entity_fetch_str})
"""
Expand Down Expand Up @@ -214,11 +213,11 @@ def update(
):
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
with get_snowflake_conn(config.online_store) as conn:
for table in tables_to_keep:
online_path = get_snowflake_online_store_path(config, table)
query = f"""
CREATE TRANSIENT TABLE IF NOT EXISTS {table_path}."[online-transient] {config.project}_{table.name}" (
CREATE TRANSIENT TABLE IF NOT EXISTS {online_path}."[online-transient] {config.project}_{table.name}" (
"entity_feature_key" BINARY,
"entity_key" BINARY,
"feature_name" VARCHAR,
Expand All @@ -230,7 +229,8 @@ def update(
execute_snowflake_statement(conn, query)

for table in tables_to_delete:
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
online_path = get_snowflake_online_store_path(config, table)
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
execute_snowflake_statement(conn, query)

def teardown(
Expand All @@ -241,8 +241,8 @@ def teardown(
):
assert isinstance(config.online_store, SnowflakeOnlineStoreConfig)

table_path = f'"{config.online_store.database}"."{config.online_store.schema_}"'
with get_snowflake_conn(config.online_store) as conn:
for table in tables:
query = f'DROP TABLE IF EXISTS {table_path}."[online-transient] {config.project}_{table.name}"'
online_path = get_snowflake_online_store_path(config, table)
query = f'DROP TABLE IF EXISTS {online_path}."[online-transient] {config.project}_{table.name}"'
execute_snowflake_statement(conn, query)
16 changes: 16 additions & 0 deletions sdk/python/feast/infra/utils/snowflake/snowflake_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import feast
from feast.errors import SnowflakeIncompleteConfig, SnowflakeQueryUnknownError
from feast.feature_view import FeatureView
from feast.repo_config import RepoConfig

try:
import snowflake.connector
Expand Down Expand Up @@ -104,6 +105,21 @@ def get_snowflake_conn(config, autocommit=True) -> SnowflakeConnection:
raise SnowflakeIncompleteConfig(e)


def get_snowflake_online_store_path(
config: RepoConfig,
feature_view: FeatureView,
) -> str:
path_tag = "snowflake-online-store/online_path"
if path_tag in feature_view.tags:
online_path = feature_view.tags[path_tag]
else:
online_path = (
f'"{config.online_store.database}"."{config.online_store.schema_}"'
)

return online_path


def package_snowpark_zip(project_name) -> Tuple[str, str]:
path = os.path.dirname(feast.__file__)
copy_path = path + f"/snowflake_feast_{project_name}"
Expand Down