Skip to content

Commit

Permalink
feat: Create stream and batch feature view abstractions (#2559)
Browse files Browse the repository at this point in the history
* feat: Create stream and batch feature view abstractions

Signed-off-by: Achal Shah <achals@gmail.com>

* CR

Signed-off-by: Achal Shah <achals@gmail.com>

* CR

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals committed Apr 18, 2022
1 parent 54ad3b6 commit d1f76e5
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ The services with containerized replacements currently implemented are:
- Datastore
- DynamoDB
- Redis
- Trino

You can run `make test-python-integration-container` to run tests against the containerized versions of dependencies.

Expand Down
58 changes: 58 additions & 0 deletions sdk/python/feast/batch_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_BATCH_SOURCES = {
"BigQuerySource",
"FileSource",
"RedshiftSource",
"SnowflakeSource",
"SparkSource",
"TrinoSource",
}


class BatchFeatureView(FeatureView):
def __init__(
self,
*,
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_BATCH_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Batch feature views need a batch source, expected one of {SUPPORTED_BATCH_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
54 changes: 54 additions & 0 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from datetime import timedelta
from typing import Dict, List, Optional, Union

from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto

SUPPORTED_STREAM_SOURCES = {
"KafkaSource",
"KinesisSource",
}


class StreamFeatureView(FeatureView):
def __init__(
self,
*,
name: Optional[str] = None,
entities: Optional[Union[List[Entity], List[str]]] = None,
ttl: Optional[timedelta] = None,
tags: Optional[Dict[str, str]] = None,
online: bool = True,
description: str = "",
owner: str = "",
schema: Optional[List[Field]] = None,
source: Optional[DataSource] = None,
):

if source is None:
raise ValueError("Feature views need a source specified")
if (
type(source).__name__ not in SUPPORTED_STREAM_SOURCES
and source.to_proto().type != DataSourceProto.SourceType.CUSTOM_SOURCE
):
raise ValueError(
f"Stream feature views need a stream source, expected one of {SUPPORTED_STREAM_SOURCES} "
f"or CUSTOM_SOURCE, got {type(source).__name__}: {source.name} instead "
)

super().__init__(
name=name,
entities=entities,
ttl=ttl,
batch_source=None,
stream_source=None,
tags=tags,
online=online,
description=description,
owner=owner,
schema=schema,
source=source,
)
70 changes: 70 additions & 0 deletions sdk/python/tests/unit/test_feature_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from datetime import timedelta

import pytest

from feast.batch_feature_view import BatchFeatureView
from feast.data_format import AvroFormat
from feast.data_source import KafkaSource
from feast.infra.offline_stores.file_source import FileSource
from feast.stream_feature_view import StreamFeatureView


def test_create_batch_feature_view():
batch_source = FileSource(path="some path")
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=batch_source,
)

with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
with pytest.raises(ValueError):
BatchFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)


def test_create_stream_feature_view():
stream_source = KafkaSource(
name="kafka",
event_timestamp_column="",
bootstrap_servers="",
message_format=AvroFormat(""),
topic="topic",
batch_source=FileSource(path="some path"),
)
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=stream_source,
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view", entities=[], ttl=timedelta(days=30)
)

with pytest.raises(ValueError):
StreamFeatureView(
name="test batch feature view",
entities=[],
ttl=timedelta(days=30),
source=FileSource(path="some path"),
)

0 comments on commit d1f76e5

Please sign in to comment.