Skip to content

Commit

Permalink
refactor: Delete properties (#2338)
Browse files Browse the repository at this point in the history
* Delete properties from FeatureService

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Delete properties from Entity

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Delete properties from BaseFeatureView

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Delete properties from DataSource

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Delete properties from RequestDataSource, KafkaSource, KafkaOptions, KinesisSource, KinesisOptions

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Delete unused properties for FileSource, BigQuerySource, RedshiftSource, SnowflakeSource

Signed-off-by: Felix Wang <wangfelix98@gmail.com>

* Fix FileOptions error

Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Feb 25, 2022
1 parent fece99a commit 57d1343
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 455 deletions.
26 changes: 3 additions & 23 deletions sdk/python/feast/base_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,32 +33,12 @@ class BaseFeatureView(ABC):

@abstractmethod
def __init__(self, name: str, features: List[Feature]):
self._name = name
self._features = features
self._projection = FeatureViewProjection.from_definition(self)
self.name = name
self.features = features
self.projection = FeatureViewProjection.from_definition(self)
self.created_timestamp: Optional[datetime] = None
self.last_updated_timestamp: Optional[datetime] = None

@property
def name(self) -> str:
return self._name

@property
def features(self) -> List[Feature]:
return self._features

@features.setter
def features(self, value):
self._features = value

@property
def projection(self) -> FeatureViewProjection:
return self._projection

@projection.setter
def projection(self, value):
self._projection = value

@property
@abstractmethod
def proto_class(self) -> Type[Message]:
Expand Down
226 changes: 22 additions & 204 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,51 +44,9 @@ class KafkaOptions:
def __init__(
self, bootstrap_servers: str, message_format: StreamFormat, topic: str,
):
self._bootstrap_servers = bootstrap_servers
self._message_format = message_format
self._topic = topic

@property
def bootstrap_servers(self):
"""
Returns a comma-separated list of Kafka bootstrap servers
"""
return self._bootstrap_servers

@bootstrap_servers.setter
def bootstrap_servers(self, bootstrap_servers):
"""
Sets a comma-separated list of Kafka bootstrap servers
"""
self._bootstrap_servers = bootstrap_servers

@property
def message_format(self):
"""
Returns the data format that is used to encode the feature data in Kafka messages
"""
return self._message_format

@message_format.setter
def message_format(self, message_format):
"""
Sets the data format that is used to encode the feature data in Kafka messages
"""
self._message_format = message_format

@property
def topic(self):
"""
Returns the Kafka topic to collect feature data from
"""
return self._topic

@topic.setter
def topic(self, topic):
"""
Sets the Kafka topic to collect feature data from
"""
self._topic = topic
self.bootstrap_servers = bootstrap_servers
self.message_format = message_format
self.topic = topic

@classmethod
def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions):
Expand Down Expand Up @@ -135,51 +93,9 @@ class KinesisOptions:
def __init__(
self, record_format: StreamFormat, region: str, stream_name: str,
):
self._record_format = record_format
self._region = region
self._stream_name = stream_name

@property
def record_format(self):
"""
Returns the data format used to encode the feature data in the Kinesis records.
"""
return self._record_format

@record_format.setter
def record_format(self, record_format):
"""
Sets the data format used to encode the feature data in the Kinesis records.
"""
self._record_format = record_format

@property
def region(self):
"""
Returns the AWS region of Kinesis stream
"""
return self._region

@region.setter
def region(self, region):
"""
Sets the AWS region of Kinesis stream
"""
self._region = region

@property
def stream_name(self):
"""
Returns the Kinesis stream name to obtain feature data from
"""
return self._stream_name

@stream_name.setter
def stream_name(self, stream_name):
"""
Sets the Kinesis stream name to obtain feature data from
"""
self._stream_name = stream_name
self.record_format = record_format
self.region = region
self.stream_name = stream_name

@classmethod
def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions):
Expand Down Expand Up @@ -233,10 +149,10 @@ class DataSource(ABC):
date_partition_column (optional): Timestamp column used for partitioning.
"""

_event_timestamp_column: str
_created_timestamp_column: str
_field_mapping: Dict[str, str]
_date_partition_column: str
event_timestamp_column: str
created_timestamp_column: str
field_mapping: Dict[str, str]
date_partition_column: str

def __init__(
self,
Expand All @@ -246,14 +162,14 @@ def __init__(
date_partition_column: Optional[str] = None,
):
"""Creates a DataSource object."""
self._event_timestamp_column = (
self.event_timestamp_column = (
event_timestamp_column if event_timestamp_column else ""
)
self._created_timestamp_column = (
self.created_timestamp_column = (
created_timestamp_column if created_timestamp_column else ""
)
self._field_mapping = field_mapping if field_mapping else {}
self._date_partition_column = (
self.field_mapping = field_mapping if field_mapping else {}
self.date_partition_column = (
date_partition_column if date_partition_column else ""
)

Expand All @@ -271,62 +187,6 @@ def __eq__(self, other):

return True

@property
def field_mapping(self) -> Dict[str, str]:
"""
Returns the field mapping of this data source.
"""
return self._field_mapping

@field_mapping.setter
def field_mapping(self, field_mapping):
"""
Sets the field mapping of this data source.
"""
self._field_mapping = field_mapping

@property
def event_timestamp_column(self) -> str:
"""
Returns the event timestamp column of this data source.
"""
return self._event_timestamp_column

@event_timestamp_column.setter
def event_timestamp_column(self, event_timestamp_column):
"""
Sets the event timestamp column of this data source.
"""
self._event_timestamp_column = event_timestamp_column

@property
def created_timestamp_column(self) -> str:
"""
Returns the created timestamp column of this data source.
"""
return self._created_timestamp_column

@created_timestamp_column.setter
def created_timestamp_column(self, created_timestamp_column):
"""
Sets the created timestamp column of this data source.
"""
self._created_timestamp_column = created_timestamp_column

@property
def date_partition_column(self) -> str:
"""
Returns the date partition column of this data source.
"""
return self._date_partition_column

@date_partition_column.setter
def date_partition_column(self, date_partition_column):
"""
Sets the date partition column of this data source.
"""
self._date_partition_column = date_partition_column

@staticmethod
@abstractmethod
def from_proto(data_source: DataSourceProto) -> Any:
Expand Down Expand Up @@ -450,7 +310,7 @@ def __init__(
field_mapping,
date_partition_column,
)
self._kafka_options = KafkaOptions(
self.kafka_options = KafkaOptions(
bootstrap_servers=bootstrap_servers,
message_format=message_format,
topic=topic,
Expand All @@ -472,20 +332,6 @@ def __eq__(self, other):

return True

@property
def kafka_options(self):
"""
Returns the kafka options of this data source
"""
return self._kafka_options

@kafka_options.setter
def kafka_options(self, kafka_options):
"""
Sets the kafka options of this data source
"""
self._kafka_options = kafka_options

@staticmethod
def from_proto(data_source: DataSourceProto):
return KafkaSource(
Expand Down Expand Up @@ -531,30 +377,16 @@ class RequestDataSource(DataSource):
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError

_name: str
_schema: Dict[str, ValueType]
name: str
schema: Dict[str, ValueType]

def __init__(
self, name: str, schema: Dict[str, ValueType],
):
"""Creates a RequestDataSource object."""
super().__init__()
self._name = name
self._schema = schema

@property
def name(self) -> str:
"""
Returns the name of this data source
"""
return self._name

@property
def schema(self) -> Dict[str, ValueType]:
"""
Returns the schema for this request data source
"""
return self._schema
self.name = name
self.schema = schema

def validate(self, config: RepoConfig):
pass
Expand All @@ -576,9 +408,9 @@ def from_proto(data_source: DataSourceProto):

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for key, value in self._schema.items():
for key, value in self.schema.items():
schema_pb[key] = value.value
options = DataSourceProto.RequestDataOptions(name=self._name, schema=schema_pb)
options = DataSourceProto.RequestDataOptions(name=self.name, schema=schema_pb)
data_source_proto = DataSourceProto(
type=DataSourceProto.REQUEST_SOURCE, request_data_options=options
)
Expand Down Expand Up @@ -629,7 +461,7 @@ def __init__(
field_mapping,
date_partition_column,
)
self._kinesis_options = KinesisOptions(
self.kinesis_options = KinesisOptions(
record_format=record_format, region=region, stream_name=stream_name
)

Expand All @@ -651,20 +483,6 @@ def __eq__(self, other):

return True

@property
def kinesis_options(self):
"""
Returns the kinesis options of this data source
"""
return self._kinesis_options

@kinesis_options.setter
def kinesis_options(self, kinesis_options):
"""
Sets the kinesis options of this data source
"""
self._kinesis_options = kinesis_options

def to_proto(self) -> DataSourceProto:
data_source_proto = DataSourceProto(
type=DataSourceProto.STREAM_KINESIS,
Expand Down
Loading

0 comments on commit 57d1343

Please sign in to comment.