Skip to content

Commit

Permalink
Refactor OnlineStoreConfig classes into owning modules (feast-dev#1649)
Browse files Browse the repository at this point in the history
* Refactor OnlineStoreConfig classes into owning modules

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* Move redis too

Signed-off-by: Achal Shah <achals@gmail.com>

* update test_telemetery

Signed-off-by: Achal Shah <achals@gmail.com>

* add a create_repo_config method that should be called instead of RepoConfig ctor directly

Signed-off-by: Achal Shah <achals@gmail.com>

* fix the table reference in repo_operations

Signed-off-by: Achal Shah <achals@gmail.com>

* reuse create_repo_config

Signed-off-by: Achal Shah <achals@gmail.com>

Remove redis provider reference

* CR comments

Signed-off-by: Achal Shah <achals@gmail.com>

* Remove create_repo_config in favor of __init__

Signed-off-by: Achal Shah <achals@gmail.com>

* make format

Signed-off-by: Achal Shah <achals@gmail.com>

* Remove print statement

Signed-off-by: Achal Shah <achals@gmail.com>
Signed-off-by: Mwad22 <51929507+Mwad22@users.noreply.github.com>
  • Loading branch information
achals authored and Mwad22 committed Jul 7, 2021
1 parent cef2869 commit c2e2b4d
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 163 deletions.
26 changes: 20 additions & 6 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ def __init__(self, provider_name):
super().__init__(f"Provider '{provider_name}' is not implemented")


class FeastProviderModuleImportError(Exception):
def __init__(self, module_name):
super().__init__(f"Could not import provider module '{module_name}'")
class FeastModuleImportError(Exception):
def __init__(self, module_name, module_type="provider"):
super().__init__(f"Could not import {module_type} module '{module_name}'")


class FeastProviderClassImportError(Exception):
def __init__(self, module_name, class_name):
class FeastClassImportError(Exception):
def __init__(self, module_name, class_name, class_type="provider"):
super().__init__(
f"Could not import provider '{class_name}' from module '{module_name}'"
f"Could not import {class_type} '{class_name}' from module '{module_name}'"
)


Expand All @@ -84,6 +84,20 @@ def __init__(self, feature_name_collisions: str):
f"The following feature name(s) have collisions: {feature_name_collisions}. Set 'full_feature_names' argument in the data retrieval function to True to use the full feature name which is prefixed by the feature view name."
)


class FeastOnlineStoreInvalidName(Exception):
def __init__(self, online_store_class_name: str):
super().__init__(
f"Online Store Class '{online_store_class_name}' should end with the string `OnlineStore`.'"
)


class FeastOnlineStoreConfigInvalidName(Exception):
def __init__(self, online_store_config_class_name: str):
super().__init__(
f"Online Store Config Class '{online_store_config_class_name}' should end with the string `OnlineStoreConfig`.'"
)


class FeastOnlineStoreUnsupportedDataSource(Exception):
def __init__(self, online_store_name: str, data_source_name: str):
Expand Down
23 changes: 22 additions & 1 deletion sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union

import mmh3
from pydantic import PositiveInt, StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, utils
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import DatastoreOnlineStoreConfig, RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig

try:
from google.auth.exceptions import DefaultCredentialsError
Expand All @@ -40,6 +42,25 @@
]


class DatastoreOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for GCP Datastore """

type: Literal["datastore"] = "datastore"
""" Online store type selector"""

project_id: Optional[StrictStr] = None
""" (optional) GCP Project Id """

namespace: Optional[StrictStr] = None
""" (optional) Datastore namespace """

write_concurrency: Optional[PositiveInt] = 40
""" (optional) Amount of threads to use when writing batches of feature rows into Datastore"""

write_batch_size: Optional[PositiveInt] = 50
""" (optional) Amount of feature rows per batch being written into Datastore"""


class DatastoreOnlineStore(OnlineStore):
"""
OnlineStore is an object used for all interaction between Feast and the service used for offline storage of
Expand Down
78 changes: 27 additions & 51 deletions sdk/python/feast/infra/online_stores/helpers.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,41 @@
import importlib
import struct
from typing import Any, Dict, Set
from typing import Any

import mmh3

from feast.data_source import BigQuerySource, DataSource, FileSource
from feast.errors import FeastOnlineStoreUnsupportedDataSource
from feast import errors
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.storage.Redis_pb2 import RedisKeyV2 as RedisKeyProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.repo_config import (
DatastoreOnlineStoreConfig,
OnlineStoreConfig,
RedisOnlineStoreConfig,
SqliteOnlineStoreConfig,
)


def get_online_store_from_config(
online_store_config: OnlineStoreConfig,
) -> OnlineStore:
def get_online_store_from_config(online_store_config: Any,) -> OnlineStore:
"""Get the offline store from offline store config"""

if isinstance(online_store_config, SqliteOnlineStoreConfig):
from feast.infra.online_stores.sqlite import SqliteOnlineStore

return SqliteOnlineStore()
elif isinstance(online_store_config, DatastoreOnlineStoreConfig):
from feast.infra.online_stores.datastore import DatastoreOnlineStore

return DatastoreOnlineStore()
elif isinstance(online_store_config, RedisOnlineStoreConfig):
from feast.infra.online_stores.redis import RedisOnlineStore

return RedisOnlineStore()
raise ValueError(f"Unsupported offline store config '{online_store_config}'")


SUPPORTED_SOURCES: Dict[Any, Set[Any]] = {
SqliteOnlineStoreConfig: {FileSource},
DatastoreOnlineStoreConfig: {BigQuerySource},
RedisOnlineStoreConfig: {FileSource, BigQuerySource},
}


def assert_online_store_supports_data_source(
online_store_config: OnlineStoreConfig, data_source: DataSource
):
supported_sources: Set[Any] = SUPPORTED_SOURCES.get(
online_store_config.__class__, set()
)
# This is needed because checking for `in` with Union types breaks mypy.
# https://github.com/python/mypy/issues/4954
# We can replace this with `data_source.__class__ in SUPPORTED_SOURCES[online_store_config.__class__]`
# Once ^ is resolved.
if supported_sources:
for source in supported_sources:
if source == data_source.__class__:
return
raise FeastOnlineStoreUnsupportedDataSource(
online_store_config.type, data_source.__class__.__name__
)
module_name = online_store_config.__module__
qualified_name = type(online_store_config).__name__
store_class_name = qualified_name.replace("Config", "")
try:
module = importlib.import_module(module_name)
except Exception as e:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastModuleImportError(
module_name, module_type="OnlineStore"
) from e

# Try getting the provider class definition
try:
online_store_class = getattr(module, store_class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastClassImportError(
module_name, store_class_name, class_type="OnlineStore"
) from None
return online_store_class()


def _redis_key(project: str, entity_key: EntityKeyProto):
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/feast/infra/online_stores/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@
# limitations under the License.
import json
from datetime import datetime
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

from google.protobuf.timestamp_pb2 import Timestamp
from pydantic import StrictStr
from pydantic.typing import Literal

from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils
from feast.infra.online_stores.helpers import _mmh3, _redis_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RedisOnlineStoreConfig, RedisType
from feast.repo_config import FeastConfigBaseModel

try:
from redis import Redis
Expand All @@ -35,6 +38,25 @@
EX_SECONDS = 253402300799


class RedisType(str, Enum):
redis = "redis"
redis_cluster = "redis_cluster"


class RedisOnlineStoreConfig(FeastConfigBaseModel):
"""Online store config for Redis store"""

type: Literal["redis"] = "redis"
"""Online store type selector"""

redis_type: RedisType = RedisType.redis
"""Redis type: redis or redis_cluster"""

connection_string: StrictStr = "localhost:6379"
"""Connection string containing the host, port, and configuration parameters for Redis
format: host:port,parameter1,parameter2 eg. redis:6379,db=0 """


class RedisOnlineStore(OnlineStore):
_client: Optional[Union[Redis, RedisCluster]] = None

Expand Down Expand Up @@ -99,7 +121,6 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig):
startup_nodes, kwargs = self._parse_connection_string(
online_store_config.connection_string
)
print(f"Startup nodes: {startup_nodes}, {kwargs}")
if online_store_config.type == RedisType.redis_cluster:
kwargs["startup_nodes"] = startup_nodes
self._client = RedisCluster(**kwargs)
Expand Down
15 changes: 14 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,26 @@
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union

import pytz
from pydantic import StrictStr
from pydantic.schema import Literal

from feast import Entity, FeatureTable
from feast.feature_view import FeatureView
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import RepoConfig
from feast.repo_config import FeastConfigBaseModel, RepoConfig


class SqliteOnlineStoreConfig(FeastConfigBaseModel):
""" Online store config for local (SQLite-based) store """

type: Literal["sqlite"] = "sqlite"
""" Online store type selector"""

path: StrictStr = "data/online.db"
""" (optional) Path to sqlite db """


class SqliteOnlineStore(OnlineStore):
Expand Down Expand Up @@ -65,6 +77,7 @@ def online_write_batch(
],
progress: Optional[Callable[[int], Any]],
) -> None:

conn = self._get_conn(config)

project = config.project
Expand Down
6 changes: 2 additions & 4 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,15 @@ def get_provider(config: RepoConfig, repo_path: Path) -> Provider:
# The original exception can be anything - either module not found,
# or any other kind of error happening during the module import time.
# So we should include the original error as well in the stack trace.
raise errors.FeastProviderModuleImportError(module_name) from e
raise errors.FeastModuleImportError(module_name) from e

# Try getting the provider class definition
try:
ProviderCls = getattr(module, class_name)
except AttributeError:
# This can only be one type of error, when class_name attribute does not exist in the module
# So we don't have to include the original exception here
raise errors.FeastProviderClassImportError(
module_name, class_name
) from None
raise errors.FeastClassImportError(module_name, class_name) from None

return ProviderCls(config, repo_path)

Expand Down
Loading

0 comments on commit c2e2b4d

Please sign in to comment.