Skip to content

Commit

Permalink
fix: Update file api (#2470)
Browse files Browse the repository at this point in the history
* Update file

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Update the schema

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix unit tests

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Keep file_url param

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix lint

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>

* Fix

Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba authored Mar 31, 2022
1 parent 5be1cc6 commit 83a11c6
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docs/reference/data-sources/file.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ from feast.data_format import ParquetFormat

parquet_file_source = FileSource(
file_format=ParquetFormat(),
file_url="file:///feast/customer.parquet",
path="file:///feast/customer.parquet",
)
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static FeatureTableSpec createFeatureTableSpec(
FileFormat.newBuilder()
.setParquetFormat(ParquetFormat.newBuilder().build())
.build())
.setFileUrl("/dev/null")
.setUri("/dev/null")
.build())
.build())
.putAllLabels(labels)
Expand Down Expand Up @@ -203,10 +203,7 @@ public static DataSource createFileDataSourceSpec(
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_FILE)
.setFileOptions(
FileOptions.newBuilder()
.setFileFormat(createParquetFormat())
.setFileUrl(fileURL)
.build())
FileOptions.newBuilder().setFileFormat(createParquetFormat()).setUri(fileURL).build())
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
Expand Down
2 changes: 1 addition & 1 deletion protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ message DataSource {
// s3://path/to/file for AWS S3 storage
// gs://path/to/file for GCP GCS storage
// file:///path/to/file for local storage
string file_url = 2;
string uri = 2;

// override AWS S3 storage endpoint with custom S3 endpoint
string s3_endpoint_override = 3;
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ def _to_arrow_internal(self):

def persist(self, storage: SavedDatasetStorage):
assert isinstance(storage, SavedDatasetFileStorage)

filesystem, path = FileSource.create_filesystem_and_path(
storage.file_options.file_url, storage.file_options.s3_endpoint_override,
storage.file_options.uri, storage.file_options.s3_endpoint_override,
)

if path.endswith(".parquet"):
Expand Down
51 changes: 39 additions & 12 deletions sdk/python/feast/infra/offline_stores/file_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import warnings
from typing import Callable, Dict, Iterable, Optional, Tuple

from pyarrow._fs import FileSystem
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(
self.file_options = FileOptions(
file_format=file_format,
file_url=path,
uri=path,
s3_endpoint_override=s3_endpoint_override,
)

Expand All @@ -85,7 +87,6 @@ def __eq__(self, other):

return (
self.name == other.name
and self.file_options.file_url == other.file_options.file_url
and self.file_options.file_format == other.file_options.file_format
and self.event_timestamp_column == other.event_timestamp_column
and self.created_timestamp_column == other.created_timestamp_column
Expand All @@ -102,15 +103,15 @@ def path(self):
"""
Returns the path of this file data source.
"""
return self.file_options.file_url
return self.file_options.uri

@staticmethod
def from_proto(data_source: DataSourceProto):
return FileSource(
name=data_source.name,
field_mapping=dict(data_source.field_mapping),
file_format=FileFormat.from_proto(data_source.file_options.file_format),
path=data_source.file_options.file_url,
path=data_source.file_options.uri,
event_timestamp_column=data_source.event_timestamp_column,
created_timestamp_column=data_source.created_timestamp_column,
date_partition_column=data_source.date_partition_column,
Expand Down Expand Up @@ -182,17 +183,28 @@ def __init__(
file_format: Optional[FileFormat],
file_url: Optional[str],
s3_endpoint_override: Optional[str],
uri: Optional[str],
):
"""
FileOptions initialization method
Args:
file_format (FileFormat, optional): file source format eg. parquet
file_url (str, optional): file source url eg. s3:// or local file
s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 file_url)
file_url (str, optional): [DEPRECATED] file source url eg. s3:// or local file
s3_endpoint_override (str, optional): custom s3 endpoint (used only with s3 uri)
uri (str, optional): file source url eg. s3:// or local file
"""
self._file_format = file_format
self._file_url = file_url
if file_url:
warnings.warn(
(
"The option to pass a file_url parameter to FileOptions is being deprecated. "
"Please pass the file url to the uri parameter instead. The parameter will be deprecated in Feast 0.23"
),
DeprecationWarning,
)
self._uri = uri or file_url
self._s3_endpoint_override = s3_endpoint_override

@property
Expand Down Expand Up @@ -223,6 +235,20 @@ def file_url(self, file_url):
"""
self._file_url = file_url

@property
def uri(self):
"""
Returns the file url of this file
"""
return self._uri

@uri.setter
def uri(self, uri):
"""
Sets the file url of this file
"""
self._uri = uri

@property
def s3_endpoint_override(self):
"""
Expand Down Expand Up @@ -250,7 +276,8 @@ def from_proto(cls, file_options_proto: DataSourceProto.FileOptions):
"""
file_options = cls(
file_format=FileFormat.from_proto(file_options_proto.file_format),
file_url=file_options_proto.file_url,
file_url="",
uri=file_options_proto.uri,
s3_endpoint_override=file_options_proto.s3_endpoint_override,
)
return file_options
Expand All @@ -262,12 +289,11 @@ def to_proto(self) -> DataSourceProto.FileOptions:
Returns:
FileOptionsProto protobuf
"""

file_options_proto = DataSourceProto.FileOptions(
file_format=(
None if self.file_format is None else self.file_format.to_proto()
),
file_url=self.file_url,
uri=self.uri,
s3_endpoint_override=self.s3_endpoint_override,
)

Expand All @@ -286,16 +312,17 @@ def __init__(
s3_endpoint_override: Optional[str] = None,
):
self.file_options = FileOptions(
file_url=path,
file_format=file_format,
file_url="",
s3_endpoint_override=s3_endpoint_override,
uri=path,
)

@staticmethod
def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage:
file_options = FileOptions.from_proto(storage_proto.file_storage)
return SavedDatasetFileStorage(
path=file_options.file_url,
path=file_options.uri,
file_format=file_options.file_format,
s3_endpoint_override=file_options.s3_endpoint_override,
)
Expand All @@ -305,7 +332,7 @@ def to_proto(self) -> SavedDatasetStorageProto:

def to_data_source(self) -> DataSource:
return FileSource(
path=self.file_options.file_url,
path=self.file_options.uri,
file_format=self.file_options.file_format,
s3_endpoint_override=self.file_options.s3_endpoint_override,
)

0 comments on commit 83a11c6

Please sign in to comment.