Skip to content

Commit

Permalink
feat: Add interfaces for batch materialization engine (#2901)
Browse files Browse the repository at this point in the history
* feat: Add scaffolding for batch materialization engine

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* fix tests

Signed-off-by: Achal Shah <achals@gmail.com>

* a little better

Signed-off-by: Achal Shah <achals@gmail.com>

* a little better

Signed-off-by: Achal Shah <achals@gmail.com>

* docs

Signed-off-by: Achal Shah <achals@gmail.com>

* more api updates'

Signed-off-by: Achal Shah <achals@gmail.com>

* fix typos

Signed-off-by: Achal Shah <achals@gmail.com>

* make engine importable

Signed-off-by: Achal Shah <achals@gmail.com>

* style stuff

Signed-off-by: Achal Shah <achals@gmail.com>

* style stuff

Signed-off-by: Achal Shah <achals@gmail.com>
  • Loading branch information
achals authored Jul 6, 2022
1 parent eaf4022 commit 38b28ca
Show file tree
Hide file tree
Showing 11 changed files with 635 additions and 264 deletions.
13 changes: 13 additions & 0 deletions sdk/python/feast/infra/materialization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationTask,
)
from .local_engine import LocalMaterializationEngine

__all__ = [
"MaterializationJob",
"MaterializationTask",
"BatchMaterializationEngine",
"LocalMaterializationEngine",
]
122 changes: 122 additions & 0 deletions sdk/python/feast/infra/materialization/batch_materialization_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import enum
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Optional, Sequence, Union

from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.stream_feature_view import StreamFeatureView


@dataclass
class MaterializationTask:
"""
A MaterializationTask represents a unit of data that needs to be materialized from an
offline store to an online store.
"""

project: str
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
start_time: datetime
end_time: datetime
tqdm_builder: Callable[[int], tqdm]


class MaterializationJobStatus(enum.Enum):
WAITING = 1
RUNNING = 2
AVAILABLE = 3
ERROR = 4
CANCELLING = 5
CANCELLED = 6
SUCCEEDED = 7


class MaterializationJob(ABC):
"""
MaterializationJob represents an ongoing or executed process that materializes data as per the
definition of a materialization task.
"""

task: MaterializationTask

@abstractmethod
def status(self) -> MaterializationJobStatus:
...

@abstractmethod
def error(self) -> Optional[BaseException]:
...

@abstractmethod
def should_be_retried(self) -> bool:
...

@abstractmethod
def job_id(self) -> str:
...

@abstractmethod
def url(self) -> Optional[str]:
...


class BatchMaterializationEngine(ABC):
def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
self.repo_config = repo_config
self.offline_store = offline_store
self.online_store = online_store

@abstractmethod
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
"""This method ensures that any necessary infrastructure or resources needed by the
engine are set up ahead of materialization."""

@abstractmethod
def materialize(
self, registry: BaseRegistry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
"""
Materialize data from the offline store to the online store for this feature repo.
Args:
registry: The feast registry containing the applied feature views.
tasks: A list of individual materialization tasks.
Returns:
A list of materialization jobs representing each task.
"""
...

@abstractmethod
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
"""This method ensures that any infrastructure or resources set up by ``update()``are torn down."""
185 changes: 185 additions & 0 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union

from tqdm import tqdm

from feast.batch_feature_view import BatchFeatureView
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.online_stores.online_store import OnlineStore
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.stream_feature_view import StreamFeatureView

from ...registry import BaseRegistry
from ...utils import (
_convert_arrow_to_proto,
_get_column_names,
_run_pyarrow_field_mapping,
)
from .batch_materialization_engine import (
BatchMaterializationEngine,
MaterializationJob,
MaterializationJobStatus,
MaterializationTask,
)

DEFAULT_BATCH_SIZE = 10_000


class LocalMaterializationEngineConfig(FeastConfigBaseModel):
"""Batch Materialization Engine config for local in-process engine"""

type: Literal["local"] = "local"
""" Type selector"""


@dataclass
class LocalMaterializationJob(MaterializationJob):
def __init__(
self,
job_id: str,
status: MaterializationJobStatus,
error: Optional[BaseException] = None,
) -> None:
super().__init__()
self._job_id: str = job_id
self._status: MaterializationJobStatus = status
self._error: Optional[BaseException] = error

def status(self) -> MaterializationJobStatus:
return self._status

def error(self) -> Optional[BaseException]:
return self._error

def should_be_retried(self) -> bool:
return False

def job_id(self) -> str:
return self._job_id

def url(self) -> Optional[str]:
return None


class LocalMaterializationEngine(BatchMaterializationEngine):
def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
):
# Nothing to set up.
pass

def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
entities: Sequence[Entity],
):
# Nothing to tear down.
pass

def __init__(
self,
*,
repo_config: RepoConfig,
offline_store: OfflineStore,
online_store: OnlineStore,
**kwargs,
):
super().__init__(
repo_config=repo_config,
offline_store=offline_store,
online_store=online_store,
**kwargs,
)

def materialize(
self, registry, tasks: List[MaterializationTask]
) -> List[MaterializationJob]:
return [
self._materialize_one(
registry,
task.feature_view,
task.start_time,
task.end_time,
task.project,
task.tqdm_builder,
)
for task in tasks
]

def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
start_date: datetime,
end_date: datetime,
project: str,
tqdm_builder: Callable[[int], tqdm],
):
entities = []
for entity_name in feature_view.entities:
entities.append(registry.get_entity(entity_name, project))

(
join_key_columns,
feature_name_columns,
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
table, feature_view.batch_source.field_mapping
)

join_key_to_value_type = {
entity.name: entity.dtype.to_value_type()
for entity in feature_view.entity_columns
}

with tqdm_builder(table.num_rows) as pbar:
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
rows_to_write = _convert_arrow_to_proto(
batch, feature_view, join_key_to_value_type
)
self.online_store.online_write_batch(
self.repo_config,
feature_view,
rows_to_write,
lambda x: pbar.update(x),
)
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
)
except BaseException as e:
return LocalMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.ERROR, error=e
)
8 changes: 4 additions & 4 deletions sdk/python/feast/infra/offline_stores/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL,
get_pyarrow_schema_from_batch_source,
)
from feast.infra.provider import (
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)
from feast.registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.saved_dataset import SavedDatasetStorage
from feast.usage import log_exceptions_and_usage
from feast.utils import (
_get_requested_feature_views_to_features_dict,
_run_dask_field_mapping,
)


class FileOfflineStoreConfig(FeastConfigBaseModel):
Expand Down
3 changes: 1 addition & 2 deletions sdk/python/feast/infra/offline_stores/offline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
from feast.feature_view import FeatureView
from feast.importer import import_class
from feast.infra.offline_stores.offline_store import OfflineStore
from feast.infra.provider import _get_requested_feature_views_to_features_dict
from feast.registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.type_map import feast_value_type_to_pa
from feast.utils import to_naive_utc
from feast.utils import _get_requested_feature_views_to_features_dict, to_naive_utc

DEFAULT_ENTITY_DF_EVENT_TIMESTAMP_COL = "event_timestamp"

Expand Down
Loading

0 comments on commit 38b28ca

Please sign in to comment.