Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre compute the timestamp range for feature views #2103

Merged
merged 5 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime
from pathlib import Path

import pytz

from feast.feature_view import FeatureView
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand All @@ -24,13 +22,6 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class LocalRegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
registry_path = Path(registry_config.path)
Expand Down
60 changes: 55 additions & 5 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import contextlib
import uuid
from datetime import date, datetime, timedelta
from typing import Callable, ContextManager, Dict, Iterator, List, Optional, Union
from typing import (
Callable,
ContextManager,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -156,9 +165,17 @@ def query_generator() -> Iterator[str]:
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df, entity_df_event_timestamp_col, client, table_reference,
)

# Build a query context containing all information required to template the BigQuery SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
feature_refs,
feature_views,
registry,
project,
entity_df_event_timestamp_range,
)

# Generate the BigQuery SQL query from the query context
Expand Down Expand Up @@ -368,7 +385,7 @@ def _upload_entity_df_and_get_entity_schema(
) -> Dict[str, np.dtype]:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""

if type(entity_df) is str:
if isinstance(entity_df, str):
job = client.query(f"CREATE TABLE {table_name} AS ({entity_df})")
block_until_done(client, job)

Expand All @@ -394,6 +411,39 @@ def _upload_entity_df_and_get_entity_schema(
return entity_schema


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str],
entity_df_event_timestamp_col: str,
client: Client,
table_name: str,
) -> Tuple[datetime, datetime]:
if type(entity_df) is str:
judahrand marked this conversation as resolved.
Show resolved Hide resolved
job = client.query(
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM {table_name}"
)
res = next(job.result())
entity_df_event_timestamp_range = (
res.get("min"),
res.get("max"),
)
elif isinstance(entity_df, pd.DataFrame):
entity_df_event_timestamp = entity_df.loc[
:, entity_df_event_timestamp_col
].infer_objects()
if pd.api.types.is_string_dtype(entity_df_event_timestamp):
entity_df_event_timestamp = pd.to_datetime(
entity_df_event_timestamp, utc=True
)
entity_df_event_timestamp_range = (
entity_df_event_timestamp.min(),
entity_df_event_timestamp.max(),
)
else:
raise InvalidEntityType(type(entity_df))

return entity_df_event_timestamp_range


def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] = None):
try:
client = bigquery.Client(project=project, location=location)
Expand Down Expand Up @@ -484,9 +534,9 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= Timestamp_sub((SELECT MIN(entity_timestamp) FROM entity_dataframe), interval {{ featureview.ttl }} second)
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),

Expand Down
16 changes: 15 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import uuid
from dataclasses import asdict, dataclass
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple

import numpy as np
Expand All @@ -20,6 +20,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import Registry
from feast.utils import to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"

Expand Down Expand Up @@ -90,13 +91,16 @@ class FeatureViewQueryContext:
created_timestamp_column: Optional[str]
table_subquery: str
entity_selections: List[str]
min_event_timestamp: Optional[str]
max_event_timestamp: str


def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
registry: Registry,
project: str,
entity_df_timestamp_range: Tuple[datetime, datetime],
) -> List[FeatureViewQueryContext]:
"""Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query"""

Expand Down Expand Up @@ -130,6 +134,14 @@ def get_feature_view_query_context(
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column

min_event_timestamp = None
if feature_view.ttl:
min_event_timestamp = to_naive_utc(
entity_df_timestamp_range[0] - feature_view.ttl
).isoformat()
judahrand marked this conversation as resolved.
Show resolved Hide resolved

max_event_timestamp = to_naive_utc(entity_df_timestamp_range[1]).isoformat()

context = FeatureViewQueryContext(
name=feature_view.projection.name_to_use(),
ttl=ttl_seconds,
Expand All @@ -144,6 +156,8 @@ def get_feature_view_query_context(
# TODO: Make created column optional and not hardcoded
table_subquery=feature_view.input.get_table_query_string(),
entity_selections=entity_selections,
min_event_timestamp=min_event_timestamp,
max_event_timestamp=max_event_timestamp,
)
query_context.append(context)
return query_context
Expand Down
72 changes: 68 additions & 4 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import contextlib
import uuid
from datetime import datetime
from typing import Callable, ContextManager, Dict, Iterator, List, Optional, Union
from typing import (
Callable,
ContextManager,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)

import numpy as np
import pandas as pd
import pyarrow as pa
from dateutil import parser
from pydantic import StrictStr
from pydantic.typing import Literal
from pytz import utc
Expand Down Expand Up @@ -145,9 +155,21 @@ def query_generator() -> Iterator[str]:
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df,
entity_df_event_timestamp_col,
redshift_client,
config,
table_name,
)

# Build a query context containing all information required to template the Redshift SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
feature_refs,
feature_views,
registry,
project,
entity_df_event_timestamp_range,
)

# Generate the Redshift SQL query from the query context
Expand Down Expand Up @@ -357,6 +379,48 @@ def _upload_entity_df_and_get_entity_schema(
raise InvalidEntityType(type(entity_df))


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str],
entity_df_event_timestamp_col: str,
redshift_client,
config: RepoConfig,
table_name: str,
) -> Tuple[datetime, datetime]:
if isinstance(entity_df, pd.DataFrame):
entity_df_event_timestamp = entity_df.loc[
:, entity_df_event_timestamp_col
].infer_objects()
if pd.api.types.is_string_dtype(entity_df_event_timestamp):
entity_df_event_timestamp = pd.to_datetime(
entity_df_event_timestamp, utc=True
)
entity_df_event_timestamp_range = (
entity_df_event_timestamp.min(),
entity_df_event_timestamp.max(),
)
elif isinstance(entity_df, str):
# If the entity_df is a string (SQL query), determine range
# from table
statement_id = aws_utils.execute_redshift_statement(
redshift_client,
config.offline_store.cluster_id,
config.offline_store.database,
config.offline_store.user,
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM {table_name}",
)
res = aws_utils.get_redshift_statement_result(redshift_client, statement_id)[
"Records"
][0]
entity_df_event_timestamp_range = (
parser.parse(res[0]["stringValue"]),
parser.parse(res[1]["stringValue"]),
)
else:
raise InvalidEntityType(type(entity_df))

return entity_df_event_timestamp_range


# This query is based on sdk/python/feast/infra/offline_stores/bigquery.py:MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
# There are couple of changes from BigQuery:
# 1. Use VARCHAR instead of STRING type
Expand Down Expand Up @@ -428,9 +492,9 @@ def _upload_entity_df_and_get_entity_schema(
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),

Expand Down
13 changes: 3 additions & 10 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import pytz
from pydantic import StrictStr
from pydantic.schema import Literal

Expand All @@ -33,6 +32,7 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage, tracing_span
from feast.utils import to_naive_utc


class SqliteOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -95,9 +95,9 @@ def online_write_batch(
with conn:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
timestamp = _to_naive_utc(timestamp)
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
conn.execute(
Expand Down Expand Up @@ -222,13 +222,6 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class SqliteTable(InfraObject):
"""
A Sqlite table managed by Feast.
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ def make_tzaware(t: datetime) -> datetime:
return t.replace(tzinfo=utc)
else:
return t


def to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(utc).replace(tzinfo=None)