Skip to content

Commit

Permalink
Initial scaffolding for on demand feature view (#1803)
Browse files Browse the repository at this point in the history
* Initial scaffolding for on demand feature view, with initial support for transforms on online fetches

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Fixing comments

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Comments

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Added basic test

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Simplifying function serialization

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Refactor logic into odfv

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia authored Sep 2, 2021
1 parent 021daf0 commit 9dc9e60
Show file tree
Hide file tree
Showing 12 changed files with 434 additions and 11 deletions.
1 change: 1 addition & 0 deletions protos/feast/core/FeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ message FeatureView {
FeatureViewMeta meta = 2;
}

// TODO(adchia): refactor common fields from this and ODFV into separate metadata proto
message FeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;
Expand Down
57 changes: 57 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//
// Copyright 2020 The Feast Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
option java_outer_classname = "OnDemandFeatureViewProto";
option java_package = "feast.proto.core";

import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";

message OnDemandFeatureView {
// User-specified specifications of this feature view.
OnDemandFeatureViewSpec spec = 1;
}

message OnDemandFeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;

// Name of Feast project that this feature view belongs to.
string project = 2;

// List of features specifications for each feature defined with this feature view.
repeated FeatureSpecV2 features = 3;

// List of features specifications for each feature defined with this feature view.
// TODO(adchia): add support for request data
map<string, FeatureView> inputs = 4;

UserDefinedFunction user_defined_function = 5;
}

// Serialized representation of python function.
message UserDefinedFunction {
// The function name
string name = 1;

// The python-syntax function body (serialized by dill)
bytes body = 2;
}
2 changes: 2 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import "feast/core/Entity.proto";
import "feast/core/FeatureService.proto";
import "feast/core/FeatureTable.proto";
import "feast/core/FeatureView.proto";
import "feast/core/OnDemandFeatureView.proto";
import "google/protobuf/timestamp.proto";

message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated FeatureService feature_services = 7;

string registry_schema_version = 3; // to support migrations; incremented when schema is changed
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/feast/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .feature_store import FeatureStore
from .feature_table import FeatureTable
from .feature_view import FeatureView
from .on_demand_feature_view import OnDemandFeatureView
from .repo_config import RepoConfig
from .value_type import ValueType

Expand All @@ -37,6 +38,7 @@
"FeatureStore",
"FeatureTable",
"FeatureView",
"OnDemandFeatureView",
"RepoConfig",
"SourceType",
"ValueType",
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,10 @@ def __init__(self, entity_type: type):
f"The entity dataframe you have provided must be a Pandas DataFrame or a SQL query, "
f"but we found: {entity_type} "
)


class ConflictingFeatureViewNames(Exception):
def __init__(self, feature_view_name: str):
super().__init__(
f"The feature view name: {feature_view_name} refers to both an on-demand feature view and a feature view"
)
61 changes: 57 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
update_entities_with_inferred_types_from_feature_views,
)
from feast.infra.provider import Provider, RetrievalJob, get_provider
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse, _infer_online_entity_rows
from feast.protos.feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequestV2,
Expand All @@ -45,6 +46,7 @@
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.registry import Registry
from feast.repo_config import RepoConfig, load_repo_config
from feast.type_map import python_value_to_proto_value
from feast.usage import log_exceptions, log_exceptions_and_usage
from feast.version import get_version

Expand Down Expand Up @@ -267,8 +269,9 @@ def apply(
objects: Union[
Entity,
FeatureView,
OnDemandFeatureView,
FeatureService,
List[Union[FeatureView, Entity, FeatureService]],
List[Union[FeatureView, OnDemandFeatureView, Entity, FeatureService]],
],
commit: bool = True,
):
Expand Down Expand Up @@ -314,6 +317,7 @@ def apply(
assert isinstance(objects, list)

views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)]
_validate_feature_views(views_to_update)
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)]
Expand All @@ -332,11 +336,15 @@ def apply(

if len(views_to_update) + len(entities_to_update) + len(
services_to_update
) != len(objects):
) + len(odfvs_to_update) != len(objects):
raise ValueError("Unknown object type provided as part of apply() call")

for view in views_to_update:
self._registry.apply_feature_view(view, project=self.project, commit=False)
for odfv in odfvs_to_update:
self._registry.apply_on_demand_feature_view(
odfv, project=self.project, commit=False
)
for ent in entities_to_update:
self._registry.apply_entity(ent, project=self.project, commit=False)
for feature_service in services_to_update:
Expand Down Expand Up @@ -717,7 +725,6 @@ def get_online_features(
all_feature_views = self._registry.list_feature_views(
project=self.project, allow_cache=True
)

_validate_feature_refs(_feature_refs, full_feature_names)
grouped_refs = _group_feature_refs(_feature_refs, all_feature_views)
for table, requested_features in grouped_refs:
Expand Down Expand Up @@ -759,6 +766,47 @@ def get_online_features(
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT

initial_response = OnlineResponse(
GetOnlineFeaturesResponse(field_values=result_rows)
)
return self._augment_response_with_on_demand_transforms(
_feature_refs, full_feature_names, initial_response, result_rows
)

def _augment_response_with_on_demand_transforms(
self,
feature_refs: List[str],
full_feature_names: bool,
initial_response: OnlineResponse,
result_rows: List[GetOnlineFeaturesResponse.FieldValues],
) -> OnlineResponse:
all_on_demand_feature_views = self._registry.list_on_demand_feature_views(
project=self.project, allow_cache=True
)
if len(all_on_demand_feature_views) == 0:
return initial_response
initial_response_df = initial_response.to_df()
# Apply on demand transformations
for odfv in all_on_demand_feature_views:
feature_ref = odfv.name
if feature_ref in feature_refs:
transformed_features_df = odfv.get_transformed_features_df(
full_feature_names, initial_response_df
)
for row_idx in range(len(result_rows)):
result_row = result_rows[row_idx]
# TODO(adchia): support multiple output features in an ODFV, which requires different naming
# conventions
result_row.fields[odfv.name].CopyFrom(
python_value_to_proto_value(
transformed_features_df[odfv.features[0].name].values[
row_idx
]
)
)
result_row.statuses[
feature_ref
] = GetOnlineFeaturesResponse.FieldStatus.PRESENT
return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

@log_exceptions_and_usage
Expand Down Expand Up @@ -791,7 +839,9 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F
ref for ref, occurrences in Counter(feature_refs).items() if occurrences > 1
]
else:
feature_names = [ref.split(":")[1] for ref in feature_refs]
feature_names = [
ref.split(":")[1] if ":" in ref else ref for ref in feature_refs
]
collided_feature_names = [
ref
for ref, occurrences in Counter(feature_names).items()
Expand Down Expand Up @@ -820,6 +870,9 @@ def _group_feature_refs(

if isinstance(features, list) and isinstance(features[0], str):
for ref in features:
if ":" not in ref:
# This is an on demand feature view ref
continue
view_name, feat_name = ref.split(":")
if view_name not in view_index:
raise FeatureViewNotFoundException(view_name)
Expand Down
150 changes: 150 additions & 0 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import functools
from types import MethodType
from typing import Dict, List

import dill
import pandas as pd

from feast.feature import Feature
from feast.feature_view import FeatureView
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import OnDemandFeatureViewSpec
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
UserDefinedFunction as UserDefinedFunctionProto,
)
from feast.usage import log_exceptions
from feast.value_type import ValueType


class OnDemandFeatureView:
"""
An OnDemandFeatureView defines on demand transformations on existing feature view values and request data.
Args:
name: Name of the group of features.
features: Output schema of transformation with feature names
inputs: The input feature views passed into the transform.
udf: User defined transformation function that takes as input pandas dataframes
"""

name: str
features: List[Feature]
inputs: Dict[str, FeatureView]
udf: MethodType

@log_exceptions
def __init__(
self,
name: str,
features: List[Feature],
inputs: Dict[str, FeatureView],
udf: MethodType,
):
"""
Creates an OnDemandFeatureView object.
"""

self.name = name
self.features = features
self.inputs = inputs
self.udf = udf

def to_proto(self) -> OnDemandFeatureViewProto:
"""
Converts an on demand feature view object to its protobuf representation.
Returns:
A OnDemandFeatureViewProto protobuf.
"""
spec = OnDemandFeatureViewSpec(
name=self.name,
features=[feature.to_proto() for feature in self.features],
inputs={k: fv.to_proto() for k, fv in self.inputs.items()},
user_defined_function=UserDefinedFunctionProto(
name=self.udf.__name__, body=dill.dumps(self.udf, recurse=True),
),
)

return OnDemandFeatureViewProto(spec=spec)

@classmethod
def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
"""
Creates an on demand feature view from a protobuf representation.
Args:
on_demand_feature_view_proto: A protobuf representation of an on-demand feature view.
Returns:
A OnDemandFeatureView object based on the on-demand feature view protobuf.
"""
on_demand_feature_view_obj = cls(
name=on_demand_feature_view_proto.spec.name,
features=[
Feature(
name=feature.name,
dtype=ValueType(feature.value_type),
labels=dict(feature.labels),
)
for feature in on_demand_feature_view_proto.spec.features
],
inputs={
feature_view_name: FeatureView.from_proto(feature_view_proto)
for feature_view_name, feature_view_proto in on_demand_feature_view_proto.spec.inputs.items()
},
udf=dill.loads(
on_demand_feature_view_proto.spec.user_defined_function.body
),
)

return on_demand_feature_view_obj

def get_transformed_features_df(
self, full_feature_names: bool, df_with_features: pd.DataFrame
) -> pd.DataFrame:
# Apply on demand transformations
# TODO(adchia): Include only the feature values from the specified input FVs in the ODFV.
# Copy over un-prefixed features even if not requested since transform may need it
columns_to_cleanup = []
if full_feature_names:
for input_fv in self.inputs.values():
for feature in input_fv.features:
full_feature_ref = f"{input_fv.name}__{feature.name}"
if full_feature_ref in df_with_features.keys():
df_with_features[feature.name] = df_with_features[
full_feature_ref
]
columns_to_cleanup.append(feature.name)

# Compute transformed values and apply to each result row
df_with_transformed_features = self.udf.__call__(df_with_features)

# Cleanup extra columns used for transformation
df_with_features.drop(columns=columns_to_cleanup, inplace=True)
return df_with_transformed_features


def on_demand_feature_view(features: List[Feature], inputs: Dict[str, FeatureView]):
"""
Declare an on-demand feature view
:param features: Output schema with feature names
:param inputs: The inputs passed into the transform.
:return: An On Demand Feature View.
"""

def decorator(user_function):
on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
inputs=inputs,
features=features,
udf=user_function,
)
functools.update_wrapper(
wrapper=on_demand_feature_view_obj, wrapped=user_function
)
return on_demand_feature_view_obj

return decorator
Loading

0 comments on commit 9dc9e60

Please sign in to comment.