Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registry store plugin #1812

Merged
merged 11 commits into from
Sep 14, 2021
14 changes: 3 additions & 11 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os
import warnings
from collections import Counter, OrderedDict, defaultdict
from datetime import datetime, timedelta
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

Expand Down Expand Up @@ -95,11 +95,7 @@ def __init__(
raise ValueError("Please specify one of repo_path or config.")

registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._registry = Registry(registry_config, repo_path=self.repo_path)

@log_exceptions
def version(self) -> str:
Expand Down Expand Up @@ -136,11 +132,7 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features()
"""
registry_config = self.config.get_registry_config()
self._registry = Registry(
registry_path=registry_config.path,
repo_path=self.repo_path,
cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds),
)
self._registry = Registry(registry_config, repo_path=self.repo_path)
self._registry.refresh()

@log_exceptions_and_usage
Expand Down
75 changes: 74 additions & 1 deletion sdk/python/feast/infra/aws.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import os
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse

import pandas
from tqdm import tqdm

from feast import FeatureTable
from feast.entity import Entity
from feast.errors import S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.infra.online_stores.helpers import get_online_store_from_config
Expand All @@ -16,10 +22,12 @@
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig, RepoConfig


class AwsProvider(Provider):
Expand Down Expand Up @@ -141,3 +149,68 @@ def get_historical_features(
full_feature_names=full_feature_names,
)
return job


class S3RegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
uri = registry_config.path
try:
import boto3
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._key = self._uri.path.lstrip("/")

self.s3_client = boto3.resource(
"s3", endpoint_url=os.environ.get("FEAST_S3_ENDPOINT_URL")
)

def get_registry_proto(self):
file_obj = TemporaryFile()
registry_proto = RegistryProto()
try:
from botocore.exceptions import ClientError
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("aws", str(e))
try:
bucket = self.s3_client.Bucket(self._bucket)
self.s3_client.meta.client.head_bucket(Bucket=bucket.name)
except ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the bucket does not exist.
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
raise S3RegistryBucketNotExist(self._bucket)
else:
raise S3RegistryBucketForbiddenAccess(self._bucket) from e

try:
obj = bucket.Object(self._key)
obj.download_fileobj(file_obj)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
except ClientError as e:
raise FileNotFoundError(
f"Error while trying to locate Registry at path {self._uri.geturl()}"
) from e

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

def teardown(self):
self.s3_client.Object(self._bucket, self._key).delete()

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
# we have already checked the bucket exists so no need to do it again
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
self.s3_client.Bucket(self._bucket).put_object(Body=file_obj, Key=self._key)
71 changes: 70 additions & 1 deletion sdk/python/feast/infra/gcp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import uuid
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryFile
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse

import pandas
from tqdm import tqdm
Expand All @@ -16,10 +20,12 @@
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig, RepoConfig


class GcpProvider(Provider):
Expand Down Expand Up @@ -143,3 +149,66 @@ def get_historical_features(
full_feature_names=full_feature_names,
)
return job


class GCSRegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
uri = registry_config.path
try:
from google.cloud import storage
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("gcp", str(e))

self.gcs_client = storage.Client()
self._uri = urlparse(uri)
self._bucket = self._uri.hostname
self._blob = self._uri.path.lstrip("/")

def get_registry_proto(self):
from google.cloud import storage
from google.cloud.exceptions import NotFound

file_obj = TemporaryFile()
registry_proto = RegistryProto()
try:
bucket = self.gcs_client.get_bucket(self._bucket)
except NotFound:
raise Exception(
f"No bucket named {self._bucket} exists; please create it first."
)
if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client):
self.gcs_client.download_blob_to_file(
self._uri.geturl(), file_obj, timeout=30
)
file_obj.seek(0)
registry_proto.ParseFromString(file_obj.read())
return registry_proto
raise FileNotFoundError(
f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?'
)

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

def teardown(self):
from google.cloud.exceptions import NotFound

gs_bucket = self.gcs_client.get_bucket(self._bucket)
try:
gs_bucket.delete_blob(self._blob)
except NotFound:
# If the blob deletion fails with NotFound, it has already been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
# we have already checked the bucket exists so no need to do it again
gs_bucket = self.gcs_client.get_bucket(self._bucket)
blob = gs_bucket.blob(self._blob)
file_obj = TemporaryFile()
file_obj.write(registry_proto.SerializeToString())
file_obj.seek(0)
blob.upload_from_file(file_obj)
42 changes: 41 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pandas as pd
Expand All @@ -17,10 +19,12 @@
_get_column_names,
_run_field_mapping,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.registry import Registry
from feast.repo_config import RepoConfig
from feast.registry_store import RegistryStore
from feast.repo_config import RegistryConfig, RepoConfig


class LocalProvider(Provider):
Expand Down Expand Up @@ -152,3 +156,39 @@ def _to_naive_utc(ts: datetime):
return ts
else:
return ts.astimezone(pytz.utc).replace(tzinfo=None)


class LocalRegistryStore(RegistryStore):
def __init__(self, registry_config: RegistryConfig, repo_path: Path):
registry_path = Path(registry_config.path)
if registry_path.is_absolute():
self._filepath = registry_path
else:
self._filepath = repo_path.joinpath(registry_path)

def get_registry_proto(self):
registry_proto = RegistryProto()
if self._filepath.exists():
registry_proto.ParseFromString(self._filepath.read_bytes())
return registry_proto
raise FileNotFoundError(
f'Registry not found at path "{self._filepath}". Have you run "feast apply"?'
)

def update_registry_proto(self, registry_proto: RegistryProto):
self._write_registry(registry_proto)

def teardown(self):
try:
self._filepath.unlink()
except FileNotFoundError:
# If the file deletion fails with FileNotFoundError, the file has already
# been deleted.
pass

def _write_registry(self, registry_proto: RegistryProto):
registry_proto.version_id = str(uuid.uuid4())
registry_proto.last_updated.FromDatetime(datetime.utcnow())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
self._filepath.write_bytes(registry_proto.SerializeToString())
Loading