Skip to content

Commit

Permalink
feat: Metadata changes & making data sources top level objects to pow…
Browse files Browse the repository at this point in the history
…er Feast UI (feast-dev#2336)

* Squash commits for metadata changes

Signed-off-by: Danny Chiao <danny@tecton.ai>

* tests

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Add more tests

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Add apply test

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Add apply test

Signed-off-by: Danny Chiao <danny@tecton.ai>

* Add apply test

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix bigquery source

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix test

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix spark source

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix spark source

Signed-off-by: Danny Chiao <danny@tecton.ai>
  • Loading branch information
adchia authored Mar 4, 2022
1 parent 5481caf commit 43da230
Show file tree
Hide file tree
Showing 30 changed files with 718 additions and 101 deletions.
12 changes: 9 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 22
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
Expand All @@ -45,6 +46,13 @@ message DataSource {
REQUEST_SOURCE = 7;

}

// Unique name of data source within the project
string name = 20;

// Name of Feast project that this data source belongs to.
string project = 21;

SourceType type = 1;

// Defines mapping between fields in the sourced data
Expand Down Expand Up @@ -156,9 +164,7 @@ message DataSource {

// Defines options for DataSource that sources features from request data
message RequestDataOptions {
// Name of the request data source
string name = 1;

reserved 1;
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 2;
}
Expand Down
2 changes: 0 additions & 2 deletions protos/feast/core/OnDemandFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ message OnDemandFeatureViewSpec {
map<string, OnDemandInput> inputs = 4;

UserDefinedFunction user_defined_function = 5;


}

message OnDemandFeatureViewMeta {
Expand Down
3 changes: 3 additions & 0 deletions protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import "feast/core/FeatureView.proto";
import "feast/core/InfraObject.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "google/protobuf/timestamp.proto";

// Next id: 13
message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated DataSource data_sources = 12;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated RequestFeatureView request_feature_views = 9;
repeated FeatureService feature_services = 7;
Expand Down
2 changes: 0 additions & 2 deletions protos/feast/core/RequestFeatureView.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";
option java_outer_classname = "RequestFeatureViewProto";
option java_package = "feast.proto.core";

import "feast/core/FeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";

message RequestFeatureView {
Expand Down
5 changes: 4 additions & 1 deletion protos/feast/core/SavedDataset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ option java_outer_classname = "SavedDatasetProto";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";
import "feast/core/DataSource.proto";
import "feast/core/FeatureService.proto";

message SavedDatasetSpec {
// Name of the dataset. Must be unique since it's possible to overwrite dataset by name
Expand All @@ -44,6 +44,9 @@ message SavedDatasetSpec {

SavedDatasetStorage storage = 6;

// Optional and only populated if generated from a feature service fetch
string feature_service_name = 8;

// User defined metadata
map<string, string> tags = 7;
}
Expand Down
1 change: 0 additions & 1 deletion protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ option java_package = "feast.proto.core";
option java_outer_classname = "ValidationProfile";
option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core";

import "google/protobuf/timestamp.proto";
import "feast/core/SavedDataset.proto";

message GEValidationProfiler {
Expand Down
1 change: 0 additions & 1 deletion protos/feast/storage/Redis.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

syntax = "proto3";

import "feast/types/Field.proto";
import "feast/types/Value.proto";

package feast.storage;
Expand Down
50 changes: 50 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,56 @@ def endpoint(ctx: click.Context):
_logger.info("There is no active feature server.")


@cli.group(name="data-sources")
def data_sources_cmd():
"""
Access data sources
"""
pass


@data_sources_cmd.command("describe")
@click.argument("name", type=click.STRING)
@click.pass_context
def data_source_describe(ctx: click.Context, name: str):
"""
Describe a data source
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

try:
data_source = store.get_data_source(name)
except FeastObjectNotFoundException as e:
print(e)
exit(1)

print(
yaml.dump(
yaml.safe_load(str(data_source)), default_flow_style=False, sort_keys=False
)
)


@data_sources_cmd.command(name="list")
@click.pass_context
def data_source_list(ctx: click.Context):
"""
List all data sources
"""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))
table = []
for datasource in store.list_data_sources():
table.append([datasource.name, datasource.__class__])

from tabulate import tabulate

print(tabulate(table, headers=["NAME", "CLASS"], tablefmt="plain"))


@cli.group(name="entities")
def entities_cmd():
"""
Expand Down
57 changes: 42 additions & 15 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class DataSource(ABC):
DataSource that can be used to source features.
Args:
name: Name of data source, which should be unique within a project
event_timestamp_column (optional): Event timestamp column used for point in time
joins of feature values.
created_timestamp_column (optional): Timestamp column indicating when the row
Expand All @@ -149,19 +150,22 @@ class DataSource(ABC):
date_partition_column (optional): Timestamp column used for partitioning.
"""

name: str
event_timestamp_column: str
created_timestamp_column: str
field_mapping: Dict[str, str]
date_partition_column: str

def __init__(
self,
name: str,
event_timestamp_column: Optional[str] = None,
created_timestamp_column: Optional[str] = None,
field_mapping: Optional[Dict[str, str]] = None,
date_partition_column: Optional[str] = None,
):
"""Creates a DataSource object."""
self.name = name
self.event_timestamp_column = (
event_timestamp_column if event_timestamp_column else ""
)
Expand All @@ -173,12 +177,16 @@ def __init__(
date_partition_column if date_partition_column else ""
)

def __hash__(self):
return hash((id(self), self.name))

def __eq__(self, other):
if not isinstance(other, DataSource):
raise TypeError("Comparisons should only involve DataSource class objects.")

if (
self.event_timestamp_column != other.event_timestamp_column
self.name != other.name
or self.event_timestamp_column != other.event_timestamp_column
or self.created_timestamp_column != other.created_timestamp_column
or self.field_mapping != other.field_mapping
or self.date_partition_column != other.date_partition_column
Expand Down Expand Up @@ -206,7 +214,9 @@ def from_proto(data_source: DataSourceProto) -> Any:
cls = get_data_source_class_from_type(data_source.data_source_class_type)
return cls.from_proto(data_source)

if data_source.file_options.file_format and data_source.file_options.file_url:
if data_source.request_data_options and data_source.request_data_options.schema:
data_source_obj = RequestDataSource.from_proto(data_source)
elif data_source.file_options.file_format and data_source.file_options.file_url:
from feast.infra.offline_stores.file_source import FileSource

data_source_obj = FileSource.from_proto(data_source)
Expand Down Expand Up @@ -246,7 +256,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
@abstractmethod
def to_proto(self) -> DataSourceProto:
"""
Converts an DataSourceProto object to its protobuf representation.
Converts a DataSourceProto object to its protobuf representation.
"""
raise NotImplementedError

Expand Down Expand Up @@ -296,6 +306,7 @@ def get_table_column_names_and_types(

def __init__(
self,
name: str,
event_timestamp_column: str,
bootstrap_servers: str,
message_format: StreamFormat,
Expand All @@ -305,6 +316,7 @@ def __init__(
date_partition_column: Optional[str] = "",
):
super().__init__(
name,
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand Down Expand Up @@ -335,6 +347,7 @@ def __eq__(self, other):
@staticmethod
def from_proto(data_source: DataSourceProto):
return KafkaSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
bootstrap_servers=data_source.kafka_options.bootstrap_servers,
message_format=StreamFormat.from_proto(
Expand All @@ -348,6 +361,7 @@ def from_proto(data_source: DataSourceProto):

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.STREAM_KAFKA,
field_mapping=self.field_mapping,
kafka_options=self.kafka_options.to_proto(),
Expand All @@ -363,6 +377,9 @@ def to_proto(self) -> DataSourceProto:
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
return type_map.redshift_to_feast_value_type

def get_table_query_string(self) -> str:
raise NotImplementedError


class RequestDataSource(DataSource):
"""
Expand All @@ -373,19 +390,14 @@ class RequestDataSource(DataSource):
schema: Schema mapping from the input feature name to a ValueType
"""

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

name: str
schema: Dict[str, ValueType]

def __init__(
self, name: str, schema: Dict[str, ValueType],
):
"""Creates a RequestDataSource object."""
super().__init__()
self.name = name
super().__init__(name)
self.schema = schema

def validate(self, config: RepoConfig):
Expand All @@ -402,21 +414,28 @@ def from_proto(data_source: DataSourceProto):
schema = {}
for key in schema_pb.keys():
schema[key] = ValueType(schema_pb.get(key))
return RequestDataSource(
name=data_source.request_data_options.name, schema=schema
)
return RequestDataSource(name=data_source.name, schema=schema)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(name=self.name, schema=schema_pb)
options = DataSourceProto.RequestDataOptions(schema=schema_pb)
data_source_proto = DataSourceProto(
type=DataSourceProto.REQUEST_SOURCE, request_data_options=options
name=self.name,
type=DataSourceProto.REQUEST_SOURCE,
request_data_options=options,
)

return data_source_proto

def get_table_query_string(self) -> str:
raise NotImplementedError

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError


class KinesisSource(DataSource):
def validate(self, config: RepoConfig):
Expand All @@ -430,6 +449,7 @@ def get_table_column_names_and_types(
@staticmethod
def from_proto(data_source: DataSourceProto):
return KinesisSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
record_format=StreamFormat.from_proto(
data_source.kinesis_options.record_format
Expand All @@ -445,8 +465,12 @@ def from_proto(data_source: DataSourceProto):
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
pass

def get_table_query_string(self) -> str:
raise NotImplementedError

def __init__(
self,
name: str,
event_timestamp_column: str,
created_timestamp_column: str,
record_format: StreamFormat,
Expand All @@ -456,6 +480,7 @@ def __init__(
date_partition_column: Optional[str] = "",
):
super().__init__(
name,
event_timestamp_column,
created_timestamp_column,
field_mapping,
Expand All @@ -475,7 +500,8 @@ def __eq__(self, other):
)

if (
self.kinesis_options.record_format != other.kinesis_options.record_format
self.name != other.name
or self.kinesis_options.record_format != other.kinesis_options.record_format
or self.kinesis_options.region != other.kinesis_options.region
or self.kinesis_options.stream_name != other.kinesis_options.stream_name
):
Expand All @@ -485,6 +511,7 @@ def __eq__(self, other):

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.STREAM_KINESIS,
field_mapping=self.field_mapping,
kinesis_options=self.kinesis_options.to_proto(),
Expand Down
Loading

0 comments on commit 43da230

Please sign in to comment.