Skip to content

Commit

Permalink
Pre compute the timestamp range for feature views (#2103)
Browse files Browse the repository at this point in the history
* Pre compute the timestamp range for feature views

This allows BigQuery to take advantage of Partitions when querying
historical features.

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Correct comparison typo

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Use `isinstance` rather than `type`

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Convert min and max `event_timestamp` to UTC

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>

* Move `_to_naive_utc` to avoid circular import

Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com>
  • Loading branch information
judahrand committed Dec 21, 2021
1 parent a710cb9 commit 2e41d03
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 29 deletions.
9 changes: 0 additions & 9 deletions sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
from datetime import datetime
from pathlib import Path

import pytz

from feast.feature_view import FeatureView
from feast.infra.passthrough_provider import PassthroughProvider
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand All @@ -24,13 +22,6 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class LocalRegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
registry_path = Path(registry_config.path)
Expand Down
60 changes: 55 additions & 5 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import contextlib
import uuid
from datetime import date, datetime, timedelta
from typing import Callable, ContextManager, Dict, Iterator, List, Optional, Union
from typing import (
Callable,
ContextManager,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -156,9 +165,17 @@ def query_generator() -> Iterator[str]:
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df, entity_df_event_timestamp_col, client, table_reference,
)

# Build a query context containing all information required to template the BigQuery SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
feature_refs,
feature_views,
registry,
project,
entity_df_event_timestamp_range,
)

# Generate the BigQuery SQL query from the query context
Expand Down Expand Up @@ -368,7 +385,7 @@ def _upload_entity_df_and_get_entity_schema(
) -> Dict[str, np.dtype]:
"""Uploads a Pandas entity dataframe into a BigQuery table and returns the resulting table"""

if type(entity_df) is str:
if isinstance(entity_df, str):
job = client.query(f"CREATE TABLE {table_name} AS ({entity_df})")
block_until_done(client, job)

Expand All @@ -394,6 +411,39 @@ def _upload_entity_df_and_get_entity_schema(
return entity_schema


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str],
entity_df_event_timestamp_col: str,
client: Client,
table_name: str,
) -> Tuple[datetime, datetime]:
if type(entity_df) is str:
job = client.query(
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM {table_name}"
)
res = next(job.result())
entity_df_event_timestamp_range = (
res.get("min"),
res.get("max"),
)
elif isinstance(entity_df, pd.DataFrame):
entity_df_event_timestamp = entity_df.loc[
:, entity_df_event_timestamp_col
].infer_objects()
if pd.api.types.is_string_dtype(entity_df_event_timestamp):
entity_df_event_timestamp = pd.to_datetime(
entity_df_event_timestamp, utc=True
)
entity_df_event_timestamp_range = (
entity_df_event_timestamp.min(),
entity_df_event_timestamp.max(),
)
else:
raise InvalidEntityType(type(entity_df))

return entity_df_event_timestamp_range


def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] = None):
try:
client = bigquery.Client(project=project, location=location)
Expand Down Expand Up @@ -484,9 +534,9 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str]
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= Timestamp_sub((SELECT MIN(entity_timestamp) FROM entity_dataframe), interval {{ featureview.ttl }} second)
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
Expand Down
16 changes: 15 additions & 1 deletion sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import uuid
from dataclasses import asdict, dataclass
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any, Dict, KeysView, List, Optional, Set, Tuple

import numpy as np
Expand All @@ -20,6 +20,7 @@
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import Registry
from feast.utils import to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"

Expand Down Expand Up @@ -90,13 +91,16 @@ class FeatureViewQueryContext:
created_timestamp_column: Optional[str]
table_subquery: str
entity_selections: List[str]
min_event_timestamp: Optional[str]
max_event_timestamp: str


def get_feature_view_query_context(
feature_refs: List[str],
feature_views: List[FeatureView],
registry: Registry,
project: str,
entity_df_timestamp_range: Tuple[datetime, datetime],
) -> List[FeatureViewQueryContext]:
"""Build a query context containing all information required to template a BigQuery and Redshift point-in-time SQL query"""

Expand Down Expand Up @@ -130,6 +134,14 @@ def get_feature_view_query_context(
event_timestamp_column = feature_view.input.event_timestamp_column
created_timestamp_column = feature_view.input.created_timestamp_column

min_event_timestamp = None
if feature_view.ttl:
min_event_timestamp = to_naive_utc(
entity_df_timestamp_range[0] - feature_view.ttl
).isoformat()

max_event_timestamp = to_naive_utc(entity_df_timestamp_range[1]).isoformat()

context = FeatureViewQueryContext(
name=feature_view.projection.name_to_use(),
ttl=ttl_seconds,
Expand All @@ -144,6 +156,8 @@ def get_feature_view_query_context(
# TODO: Make created column optional and not hardcoded
table_subquery=feature_view.input.get_table_query_string(),
entity_selections=entity_selections,
min_event_timestamp=min_event_timestamp,
max_event_timestamp=max_event_timestamp,
)
query_context.append(context)
return query_context
Expand Down
72 changes: 68 additions & 4 deletions sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import contextlib
import uuid
from datetime import datetime
from typing import Callable, ContextManager, Dict, Iterator, List, Optional, Union
from typing import (
Callable,
ContextManager,
Dict,
Iterator,
List,
Optional,
Tuple,
Union,
)

import numpy as np
import pandas as pd
import pyarrow as pa
from dateutil import parser
from pydantic import StrictStr
from pydantic.typing import Literal
from pytz import utc
Expand Down Expand Up @@ -145,9 +155,21 @@ def query_generator() -> Iterator[str]:
entity_schema, expected_join_keys, entity_df_event_timestamp_col
)

entity_df_event_timestamp_range = _get_entity_df_event_timestamp_range(
entity_df,
entity_df_event_timestamp_col,
redshift_client,
config,
table_name,
)

# Build a query context containing all information required to template the Redshift SQL query
query_context = offline_utils.get_feature_view_query_context(
feature_refs, feature_views, registry, project,
feature_refs,
feature_views,
registry,
project,
entity_df_event_timestamp_range,
)

# Generate the Redshift SQL query from the query context
Expand Down Expand Up @@ -357,6 +379,48 @@ def _upload_entity_df_and_get_entity_schema(
raise InvalidEntityType(type(entity_df))


def _get_entity_df_event_timestamp_range(
entity_df: Union[pd.DataFrame, str],
entity_df_event_timestamp_col: str,
redshift_client,
config: RepoConfig,
table_name: str,
) -> Tuple[datetime, datetime]:
if isinstance(entity_df, pd.DataFrame):
entity_df_event_timestamp = entity_df.loc[
:, entity_df_event_timestamp_col
].infer_objects()
if pd.api.types.is_string_dtype(entity_df_event_timestamp):
entity_df_event_timestamp = pd.to_datetime(
entity_df_event_timestamp, utc=True
)
entity_df_event_timestamp_range = (
entity_df_event_timestamp.min(),
entity_df_event_timestamp.max(),
)
elif isinstance(entity_df, str):
# If the entity_df is a string (SQL query), determine range
# from table
statement_id = aws_utils.execute_redshift_statement(
redshift_client,
config.offline_store.cluster_id,
config.offline_store.database,
config.offline_store.user,
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM {table_name}",
)
res = aws_utils.get_redshift_statement_result(redshift_client, statement_id)[
"Records"
][0]
entity_df_event_timestamp_range = (
parser.parse(res[0]["stringValue"]),
parser.parse(res[1]["stringValue"]),
)
else:
raise InvalidEntityType(type(entity_df))

return entity_df_event_timestamp_range


# This query is based on sdk/python/feast/infra/offline_stores/bigquery.py:MULTIPLE_FEATURE_VIEW_POINT_IN_TIME_JOIN
# There are couple of changes from BigQuery:
# 1. Use VARCHAR instead of STRING type
Expand Down Expand Up @@ -428,9 +492,9 @@ def _upload_entity_df_and_get_entity_schema(
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.event_timestamp_column }} <= (SELECT MAX(entity_timestamp) FROM entity_dataframe)
WHERE {{ featureview.event_timestamp_column }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.event_timestamp_column }} >= (SELECT MIN(entity_timestamp) FROM entity_dataframe) - {{ featureview.ttl }} * interval '1' second
AND {{ featureview.event_timestamp_column }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),
Expand Down
13 changes: 3 additions & 10 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple

import pytz
from pydantic import StrictStr
from pydantic.schema import Literal

Expand All @@ -33,6 +32,7 @@
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage, tracing_span
from feast.utils import to_naive_utc


class SqliteOnlineStoreConfig(FeastConfigBaseModel):
Expand Down Expand Up @@ -95,9 +95,9 @@ def online_write_batch(
with conn:
for entity_key, values, timestamp, created_ts in data:
entity_key_bin = serialize_entity_key(entity_key)
timestamp = _to_naive_utc(timestamp)
timestamp = to_naive_utc(timestamp)
if created_ts is not None:
created_ts = _to_naive_utc(created_ts)
created_ts = to_naive_utc(created_ts)

for feature_name, val in values.items():
conn.execute(
Expand Down Expand Up @@ -222,13 +222,6 @@ def _table_id(project: str, table: FeatureView) -> str:
return f"{project}_{table.name}"


def _to_naive_utc(ts: datetime):
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class SqliteTable(InfraObject):
"""
A Sqlite table managed by Feast.
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,10 @@ def make_tzaware(t: datetime) -> datetime:
return t.replace(tzinfo=utc)
else:
return t


def to_naive_utc(ts: datetime) -> datetime:
if ts.tzinfo is None:
return ts
else:
return ts.astimezone(utc).replace(tzinfo=None)

0 comments on commit 2e41d03

Please sign in to comment.