Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add a source field in the feature view API #2525

Merged
merged 7 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource, PushSource
from feast.data_source import DataSource, KafkaSource, KinesisSource, PushSource
Copy link
Collaborator

@felixwang9817 felixwang9817 Apr 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also should we update docs and tests here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'll update docs (and tests)

from feast.entity import Entity
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
Expand Down Expand Up @@ -61,9 +61,9 @@ class FeatureView(BaseFeatureView):
can result in extremely computationally intensive queries.
batch_source (optional): The batch source of data where this group of features
is stored. This is optional ONLY if a push source is specified as the
stream_source, since push sources contain their own batch sources.
stream_source, since push sources contain their own batch sources. This is deprecated in favor of `source`.
stream_source (optional): The stream source of data where this group of features
is stored.
is stored. This is deprecated in favor of `source`.
schema: The schema of the feature view, including feature, timestamp, and entity
columns.
features: The list of features defined as part of this feature view. Each
Expand All @@ -74,6 +74,8 @@ class FeatureView(BaseFeatureView):
tags: A dictionary of key-value pairs to store arbitrary metadata.
owner: The owner of the feature view, typically the email of the primary
maintainer.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.
"""

name: str
Expand All @@ -88,6 +90,7 @@ class FeatureView(BaseFeatureView):
tags: Dict[str, str]
owner: str
materialization_intervals: List[Tuple[datetime, datetime]]
source: Optional[DataSource]
adchia marked this conversation as resolved.
Show resolved Hide resolved

@log_exceptions
def __init__(
Expand All @@ -104,6 +107,7 @@ def __init__(
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):
"""
Creates a FeatureView object.
Expand All @@ -126,6 +130,8 @@ def __init__(
primary maintainer.
schema (optional): The schema of the feature view, including feature, timestamp,
and entity columns.
source (optional): The source of data for this group of features. May be a stream source, or a batch source.
If a stream source, the source should contain a batch_source for backfills & batch materialization.

Raises:
ValueError: A field mapping conflicts with an Entity or a Feature.
Expand Down Expand Up @@ -163,6 +169,8 @@ def __init__(
self.name = _name
self.entities = _entities if _entities else [DUMMY_ENTITY_NAME]

self._initialize_sources(_name, batch_source, stream_source, source)

if isinstance(_ttl, Duration):
self.ttl = timedelta(seconds=int(_ttl.seconds))
warnings.warn(
Expand Down Expand Up @@ -199,21 +207,6 @@ def __init__(
# current `features` parameter only accepts feature columns.
_features = _schema

if stream_source is not None and isinstance(stream_source, PushSource):
if stream_source.batch_source is None or not isinstance(
stream_source.batch_source, DataSource
):
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.batch_source = stream_source.batch_source
else:
if batch_source is None:
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.batch_source = batch_source

cols = [entity for entity in self.entities] + [
field.name for field in _features
]
Expand All @@ -236,9 +229,43 @@ def __init__(
owner=owner,
)
self.online = online
self.stream_source = stream_source
self.materialization_intervals = []

def _initialize_sources(self, name, batch_source, stream_source, source):
if source:
if (
isinstance(source, PushSource)
or isinstance(source, KafkaSource)
or isinstance(source, KinesisSource)
):
self.stream_source = source
if not source.batch_source:
raise ValueError(
f"A batch_source needs to be specified for stream source `{source.name}`"
)
else:
self.batch_source = source.batch_source
else:
self.stream_source = stream_source
self.batch_source = source
else:
warnings.warn(
"batch_source and stream_source have been deprecated in favor or `source`."
"The deprecated fields will be removed in Feast 0.23.",
DeprecationWarning,
)
if stream_source is not None and isinstance(stream_source, PushSource):
self.stream_source = stream_source
self.batch_source = stream_source.batch_source
else:
if batch_source is None:
raise ValueError(
f"A batch_source needs to be specified for feature view `{name}`"
)
self.stream_source = stream_source
self.batch_source = batch_source
self.source = source

# Note: Python requires redefining hash in child classes that override __eq__
def __hash__(self):
return super().__hash__()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def driver_feature_view(
entities=["driver"],
schema=None if infer_features else [Field(name="value", dtype=dtype)],
ttl=timedelta(days=5),
batch_source=data_source,
source=data_source,
)


Expand All @@ -49,7 +49,7 @@ def global_feature_view(
if infer_features
else [Feature(name="entityless_value", dtype=value_type)],
ttl=timedelta(days=5),
batch_source=data_source,
source=data_source,
)


Expand Down Expand Up @@ -162,7 +162,7 @@ def create_driver_hourly_stats_feature_view(source, infer_features: bool = False
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
],
batch_source=source,
source=source,
ttl=timedelta(hours=2),
)
return driver_stats_feature_view
Expand All @@ -179,7 +179,7 @@ def create_customer_daily_profile_feature_view(source, infer_features: bool = Fa
Field(name="avg_passenger_count", dtype=Float32),
Field(name="lifetime_trip_count", dtype=Int32),
],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return customer_profile_feature_view
Expand All @@ -196,7 +196,7 @@ def create_global_stats_feature_view(source, infer_features: bool = False):
Feature(name="num_rides", dtype=ValueType.INT32),
Feature(name="avg_ride_length", dtype=ValueType.FLOAT),
],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return global_stats_feature_view
Expand All @@ -209,7 +209,7 @@ def create_order_feature_view(source, infer_features: bool = False):
schema=None
if infer_features
else [Field(name="order_is_success", dtype=Int32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)

Expand All @@ -219,7 +219,7 @@ def create_location_stats_feature_view(source, infer_features: bool = False):
name="location_stats",
entities=["location_id"],
schema=None if infer_features else [Field(name="temperature", dtype=Int32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)
return location_stats_feature_view
Expand All @@ -231,7 +231,7 @@ def create_field_mapping_feature_view(source):
entities=[],
# Test that Features still work for FeatureViews.
features=[Feature(name="feature_name", dtype=ValueType.INT32)],
batch_source=source,
source=source,
ttl=timedelta(days=2),
)

Expand All @@ -252,5 +252,5 @@ def create_pushable_feature_view(batch_source: DataSource):
# Test that Features still work for FeatureViews.
features=[Feature(name="temperature", dtype=ValueType.INT32)],
ttl=timedelta(days=2),
stream_source=push_source,
source=push_source,
)