Skip to content

Commit

Permalink
feat: Add Substrait-based ODFV transformation (#3969)
Browse files Browse the repository at this point in the history
  • Loading branch information
tokoko committed Feb 24, 2024
1 parent d3c68fb commit 9e58bd4
Show file tree
Hide file tree
Showing 11 changed files with 504 additions and 164 deletions.
5 changes: 5 additions & 0 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ message OnDemandFeatureViewSpec {

oneof transformation {
UserDefinedFunction user_defined_function = 5;
OnDemandSubstraitTransformation on_demand_substrait_transformation = 9;
}

// Description of the on demand feature view.
Expand Down Expand Up @@ -89,3 +90,7 @@ message UserDefinedFunction {
// The string representation of the udf
string body_text = 3;
}

message OnDemandSubstraitTransformation {
bytes substrait_plan = 1;
}
55 changes: 52 additions & 3 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import functools
import inspect
import warnings
from datetime import datetime
from types import FunctionType
Expand All @@ -17,6 +18,7 @@
from feast.feature_view_projection import FeatureViewProjection
from feast.field import Field, from_value_type
from feast.on_demand_pandas_transformation import OnDemandPandasTransformation
from feast.on_demand_substrait_transformation import OnDemandSubstraitTransformation
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
Expand Down Expand Up @@ -210,6 +212,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
user_defined_function=self.transformation.to_proto()
if type(self.transformation) == OnDemandPandasTransformation
else None,
on_demand_substrait_transformation=self.transformation.to_proto() # type: ignore
if type(self.transformation) == OnDemandSubstraitTransformation
else None,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down Expand Up @@ -255,6 +260,13 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
transformation = OnDemandPandasTransformation.from_proto(
on_demand_feature_view_proto.spec.user_defined_function
)
elif (
on_demand_feature_view_proto.spec.WhichOneof("transformation")
== "on_demand_substrait_transformation"
):
transformation = OnDemandSubstraitTransformation.from_proto(
on_demand_feature_view_proto.spec.on_demand_substrait_transformation
)
else:
raise Exception("At least one transformation type needs to be provided")

Expand Down Expand Up @@ -460,10 +472,47 @@ def mainify(obj) -> None:
obj.__module__ = "__main__"

def decorator(user_function):
udf_string = dill.source.getsource(user_function)
mainify(user_function)
return_annotation = inspect.signature(user_function).return_annotation
if (
return_annotation
and return_annotation.__module__ == "ibis.expr.types.relations"
and return_annotation.__name__ == "Table"
):
import ibis
import ibis.expr.datatypes as dt
from ibis_substrait.compiler.core import SubstraitCompiler

compiler = SubstraitCompiler()

input_fields: Field = []

for s in sources:
if type(s) == FeatureView:
fields = s.projection.features
else:
fields = s.features

input_fields.extend(
[
(
f.name,
dt.dtype(
feast_value_type_to_pandas_type(f.dtype.to_value_type())
),
)
for f in fields
]
)

expr = user_function(ibis.table(input_fields, "t"))

transformation = OnDemandPandasTransformation(user_function, udf_string)
transformation = OnDemandSubstraitTransformation(
substrait_plan=compiler.compile(expr).SerializeToString()
)
else:
udf_string = dill.source.getsource(user_function)
mainify(user_function)
transformation = OnDemandPandasTransformation(user_function, udf_string)

on_demand_feature_view_obj = OnDemandFeatureView(
name=user_function.__name__,
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/feast/on_demand_substrait_transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pandas as pd
import pyarrow
import pyarrow.substrait as substrait # type: ignore # noqa

from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandSubstraitTransformation as OnDemandSubstraitTransformationProto,
)


class OnDemandSubstraitTransformation:
def __init__(self, substrait_plan: bytes):
"""
Creates an OnDemandSubstraitTransformation object.
Args:
substrait_plan: The user-provided substrait plan.
"""
self.substrait_plan = substrait_plan

def transform(self, df: pd.DataFrame) -> pd.DataFrame:
def table_provider(names, schema: pyarrow.Schema):
return pyarrow.Table.from_pandas(df[schema.names])

table: pyarrow.Table = pyarrow.substrait.run_query(
self.substrait_plan, table_provider=table_provider
).read_all()
return table.to_pandas()

def __eq__(self, other):
if not isinstance(other, OnDemandSubstraitTransformation):
raise TypeError(
"Comparisons should only involve OnDemandSubstraitTransformation class objects."
)

if not super().__eq__(other):
return False

return self.substrait_plan == other.substrait_plan

def to_proto(self) -> OnDemandSubstraitTransformationProto:
return OnDemandSubstraitTransformationProto(substrait_plan=self.substrait_plan)

@classmethod
def from_proto(
cls,
on_demand_substrait_transformation_proto: OnDemandSubstraitTransformationProto,
):
return OnDemandSubstraitTransformation(
substrait_plan=on_demand_substrait_transformation_proto.substrait_plan
)
Loading

0 comments on commit 9e58bd4

Please sign in to comment.