Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support passing batch source to streaming sources for backfills #2523

Merged
merged 3 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go/embedded/online_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri

joinKeyTypes := make(map[string]int32)

for viewName, _ := range viewNames {
for viewName := range viewNames {
view, err := s.fs.GetFeatureView(viewName, true)
if err != nil {
// skip on demand feature views
continue
}
for entityName, _ := range view.Entities {
for entityName := range view.Entities {
entity := entitiesByName[entityName]
joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number())
}
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN
// skip on demand feature views
continue
}
for entityName, _ := range view.Entities {
for entityName := range view.Entities {
entity := entitiesByName[entityName]
joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number())
}
Expand Down
9 changes: 6 additions & 3 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import "feast/core/DataFormat.proto";
import "feast/types/Value.proto";

// Defines a Data Source that can be used source Feature data
// Next available id: 26
// Next available id: 27
message DataSource {
// Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not,
// but they are going to be reserved for backwards compatibility.
Expand Down Expand Up @@ -82,6 +82,10 @@ message DataSource {
// first party sources as well.
string data_source_class_type = 17;

// Optional batch source for streaming sources for historical features and materialization.
DataSource batch_source = 26;


// Defines options for DataSource that sources features from a file
message FileOptions {
FileFormat file_format = 1;
Expand Down Expand Up @@ -128,6 +132,7 @@ message DataSource {

// Defines the stream data format encoding feature/entity data in Kafka messages.
StreamFormat message_format = 3;

}

// Defines options for DataSource that sources features from Kinesis records.
Expand Down Expand Up @@ -199,8 +204,6 @@ message DataSource {
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
// Optional batch source for the push source for historical features and materialization.
DataSource batch_source = 2;
}


Expand Down
20 changes: 15 additions & 5 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
):
super().__init__(
event_timestamp_column=event_timestamp_column,
Expand All @@ -372,6 +373,7 @@ def __init__(
name=name,
timestamp_field=timestamp_field,
)
self.batch_source = batch_source
self.kafka_options = KafkaOptions(
bootstrap_servers=bootstrap_servers,
message_format=message_format,
Expand Down Expand Up @@ -411,6 +413,7 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
)

def to_proto(self) -> DataSourceProto:
Expand All @@ -427,6 +430,8 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
if self.batch_source:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())
return data_source_proto

@staticmethod
Expand Down Expand Up @@ -546,6 +551,7 @@ def from_proto(data_source: DataSourceProto):
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
batch_source=DataSource.from_proto(data_source.batch_source),
)

@staticmethod
Expand All @@ -569,6 +575,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
batch_source: Optional[DataSource] = None,
):
super().__init__(
name=name,
Expand All @@ -581,6 +588,7 @@ def __init__(
owner=owner,
timestamp_field=timestamp_field,
)
self.batch_source = batch_source
self.kinesis_options = KinesisOptions(
record_format=record_format, region=region, stream_name=stream_name
)
Expand Down Expand Up @@ -618,6 +626,8 @@ def to_proto(self) -> DataSourceProto:
data_source_proto.timestamp_field = self.timestamp_field
data_source_proto.created_timestamp_column = self.created_timestamp_column
data_source_proto.date_partition_column = self.date_partition_column
if self.batch_source:
data_source_proto.batch_source.MergeFrom(self.batch_source.to_proto())

return data_source_proto

Expand All @@ -634,6 +644,7 @@ class PushSource(DataSource):

def __init__(
self,
*,
name: str,
schema: Dict[str, ValueType],
batch_source: DataSource,
achals marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -693,8 +704,8 @@ def from_proto(data_source: DataSourceProto):
for key, val in schema_pb.items():
schema[key] = ValueType(val)

assert data_source.push_options.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.push_options.batch_source)
assert data_source.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.batch_source)

return PushSource(
name=data_source.name,
Expand All @@ -714,9 +725,7 @@ def to_proto(self) -> DataSourceProto:
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(
schema=schema_pb, batch_source=batch_source_proto
)
options = DataSourceProto.PushOptions(schema=schema_pb,)
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.PUSH_SOURCE,
Expand All @@ -725,6 +734,7 @@ def to_proto(self) -> DataSourceProto:
description=self.description,
tags=self.tags,
owner=self.owner,
batch_source=batch_source_proto,
)

return data_source_proto
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def test_push_with_batch():
batch_source=BigQuerySource(table="test.test"),
)
push_source_proto = push_source.to_proto()
assert push_source_proto.HasField("batch_source")
assert push_source_proto.push_options is not None
assert push_source_proto.push_options.HasField("batch_source")

push_source_unproto = PushSource.from_proto(push_source_proto)

Expand Down