From 20437676785df1f9146641a95bd17bd20fc0663d Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 13:22:30 -0700 Subject: [PATCH 01/16] write logs to offline store Signed-off-by: pyalex --- protos/feast/core/FeatureService.proto | 42 +++++ sdk/python/feast/embedded_go/types.py | 85 ++++++++++ sdk/python/feast/feature_logging.py | 145 ++++++++++++++++++ sdk/python/feast/feature_service.py | 10 ++ sdk/python/feast/feature_store.py | 16 ++ .../feast/infra/offline_stores/bigquery.py | 70 ++++++++- .../infra/offline_stores/bigquery_source.py | 29 ++++ sdk/python/feast/infra/offline_stores/file.py | 29 +++- .../feast/infra/offline_stores/file_source.py | 36 +++++ .../infra/offline_stores/offline_store.py | 17 ++ .../feast/infra/offline_stores/redshift.py | 37 ++++- .../infra/offline_stores/redshift_source.py | 29 ++++ .../feast/infra/offline_stores/snowflake.py | 24 +++ .../infra/offline_stores/snowflake_source.py | 39 +++++ .../feast/infra/passthrough_provider.py | 51 ++++++ sdk/python/feast/infra/provider.py | 26 +++- sdk/python/feast/infra/utils/aws_utils.py | 107 +++++++++---- .../universal/data_source_creator.py | 5 + .../universal/data_sources/bigquery.py | 12 +- .../universal/data_sources/file.py | 18 ++- .../universal/data_sources/redshift.py | 14 +- .../universal/data_sources/snowflake.py | 14 +- .../offline_store/test_feature_logging.py | 109 +++++++++++++ 23 files changed, 924 insertions(+), 40 deletions(-) create mode 100644 sdk/python/feast/embedded_go/types.py create mode 100644 sdk/python/feast/feature_logging.py create mode 100644 sdk/python/tests/integration/offline_store/test_feature_logging.py diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 2295677583..a4091bc22e 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -5,6 +5,7 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core"; option java_outer_classname = "FeatureServiceProto"; option java_package = "feast.proto.core"; +import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; import "feast/core/FeatureViewProjection.proto"; @@ -35,6 +36,9 @@ message FeatureServiceSpec { // Owner of the feature service. string owner = 6; + + // (optional) if provided logging will be enabled for this feature service. + LoggingConfig logging_config = 7; } @@ -46,3 +50,41 @@ message FeatureServiceMeta { google.protobuf.Timestamp last_updated_timestamp = 2; } + + +message LoggingConfig { + float sample_rate = 1; + google.protobuf.Duration partition_interval = 2; + + oneof destination { + FileDestination file_destination = 3; + BigQueryDestination bigquery_destination = 4; + RedshiftDestination redshift_destination = 5; + SnowflakeDestination snowflake_destination = 6; + } + + message FileDestination { + string path = 1; + string s3_endpoint_override = 2; + } + + message BigQueryDestination { + // Full table reference in the form of [project:dataset.table] + string table_ref = 1; + } + + message RedshiftDestination { + string table_ref = 1; + } + + message SnowflakeDestination { + // Snowflake table name + string table = 1; + + // Snowflake schema name + string schema = 2; + + // Snowflake schema name + string database = 3; + } +} \ No newline at end of file diff --git a/sdk/python/feast/embedded_go/types.py b/sdk/python/feast/embedded_go/types.py new file mode 100644 index 0000000000..a5b43c151b --- /dev/null +++ b/sdk/python/feast/embedded_go/types.py @@ -0,0 +1,85 @@ +from typing import List + +import pyarrow as pa + +from feast.protos.feast.types import Value_pb2 +from feast.types import Array, PrimitiveFeastType + +ARROW_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_val", + pa.int64(): "int64_val", + pa.float32(): "float_val", + pa.float64(): "double_val", + pa.bool_(): "bool_val", + pa.string(): "string_val", + pa.binary(): "bytes_val", + pa.time64("ns"): "unix_timestamp_val", +} + +ARROW_LIST_TYPE_TO_PROTO_FIELD = { + pa.int32(): "int32_list_val", + pa.int64(): "int64_list_val", + pa.float32(): "float_list_val", + pa.float64(): "double_list_val", + pa.bool_(): "bool_list_val", + pa.string(): "string_list_val", + pa.binary(): "bytes_list_val", + pa.time64("ns"): "unix_timestamp_list_val", +} + +ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { + pa.int32(): Value_pb2.Int32List, + pa.int64(): Value_pb2.Int64List, + pa.float32(): Value_pb2.FloatList, + pa.float64(): Value_pb2.DoubleList, + pa.bool_(): Value_pb2.BoolList, + pa.string(): Value_pb2.StringList, + pa.binary(): Value_pb2.BytesList, + pa.time64("ns"): Value_pb2.Int64List, +} + +FEAST_TYPE_TO_ARROW_TYPE = { + PrimitiveFeastType.INT32: pa.int32(), + PrimitiveFeastType.INT64: pa.int64(), + PrimitiveFeastType.FLOAT32: pa.float32(), + PrimitiveFeastType.FLOAT64: pa.float64(), + PrimitiveFeastType.STRING: pa.string(), + PrimitiveFeastType.BYTES: pa.binary(), + PrimitiveFeastType.BOOL: pa.bool_(), + PrimitiveFeastType.UNIX_TIMESTAMP: pa.time64("ns"), + Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()), + Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()), + Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()), + Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()), + Array(PrimitiveFeastType.STRING): pa.list_(pa.string()), + Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()), + Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()), + Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.time64("ns")), +} + + +def arrow_array_to_array_of_proto( + arrow_type: pa.DataType, arrow_array: pa.Array +) -> List[Value_pb2.Value]: + values = [] + if isinstance(arrow_type, pa.ListType): + proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[arrow_type.value_type] + proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[arrow_type.value_type] + + if arrow_type.value_type == pa.time64("ns"): + arrow_array = arrow_array.cast(pa.list_(pa.int64())) + + for v in arrow_array.tolist(): + values.append( + Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) + ) + else: + proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[arrow_type] + + if arrow_type == pa.time64("ns"): + arrow_array = arrow_array.cast(pa.int64()) + + for v in arrow_array.tolist(): + values.append(Value_pb2.Value(**{proto_field_name: v})) + + return values diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py new file mode 100644 index 0000000000..90e8b6616e --- /dev/null +++ b/sdk/python/feast/feature_logging.py @@ -0,0 +1,145 @@ +import abc +from typing import TYPE_CHECKING, Dict, Type, cast + +import pyarrow as pa +from pytz import UTC + +from feast.data_source import DataSource +from feast.embedded_go.types import FEAST_TYPE_TO_ARROW_TYPE +from feast.errors import ( + FeastObjectNotFoundException, + FeatureViewNotFoundException, + OnDemandFeatureViewNotFoundException, +) +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) +from feast.types import from_value_type + +if TYPE_CHECKING: + from feast import FeatureService + from feast.registry import Registry + + +class LoggingSource: + @abc.abstractmethod + def get_schema(self, registry: "Registry") -> pa.Schema: + raise NotImplementedError + + @abc.abstractmethod + def get_partition_column(self, registry: "Registry") -> str: + raise NotImplementedError + + @abc.abstractmethod + def get_log_timestamp_column(self) -> str: + raise NotImplementedError + + +class FeatureServiceLoggingSource(LoggingSource): + def __init__(self, feature_service: "FeatureService", project: str): + self._feature_service = feature_service + self._project = project + + def get_schema(self, registry: "Registry") -> pa.Schema: + fields: Dict[str, pa.DataType] = {} + + for projection in self._feature_service.feature_view_projections: + for feature in projection.features: + fields[ + f"{projection.name_to_use()}__{feature.name}" + ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] + fields[ + f"{projection.name_to_use()}__{feature.name}__timestamp" + ] = pa.timestamp("ns", tz=UTC) + fields[ + f"{projection.name_to_use()}__{feature.name}__status" + ] = pa.int32() + + try: + feature_view = registry.get_feature_view(projection.name, self._project) + except FeatureViewNotFoundException: + try: + on_demand_feature_view = registry.get_on_demand_feature_view( + projection.name, self._project + ) + except OnDemandFeatureViewNotFoundException: + raise FeastObjectNotFoundException( + f"Can't recognize feature view with a name {projection.name}" + ) + + for ( + request_source + ) in on_demand_feature_view.source_request_sources.values(): + for name, dtype in request_source.schema.items(): + fields[name] = FEAST_TYPE_TO_ARROW_TYPE[from_value_type(dtype)] + + else: + for entity_name in feature_view.entities: + entity = registry.get_entity(entity_name, self._project) + join_key = projection.join_key_map.get( + entity.join_key, entity.join_key + ) + fields[join_key] = FEAST_TYPE_TO_ARROW_TYPE[ + from_value_type(entity.value_type) + ] + + # system columns + fields["request_id"] = pa.string() + fields["log_timestamp"] = pa.timestamp("ns", tz=UTC) + fields["log_date"] = pa.date32() + + return pa.schema( + [pa.field(name, data_type) for name, data_type in fields.items()] + ) + + def get_partition_column(self, registry: "Registry") -> str: + return "log_date" + + def get_log_timestamp_column(self) -> str: + return "log_timestamp" + + +class _DestinationRegistry(type): + classes_by_proto_attr_name: Dict[str, Type["LoggingDestination"]] = {} + + def __new__(cls, name, bases, dct): + kls = type.__new__(cls, name, bases, dct) + if dct.get("_proto_attr_name"): + cls.classes_by_proto_attr_name[dct["_proto_attr_name"]] = kls + return kls + + +class LoggingDestination: + _proto_attr_name: str + + @classmethod + @abc.abstractmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + raise NotImplementedError + + @abc.abstractmethod + def to_proto(self) -> LoggingConfigProto: + raise NotImplementedError + + @abc.abstractmethod + def to_data_source(self) -> DataSource: + raise NotImplementedError + + +class LoggingConfig: + destination: LoggingDestination + + def __init__(self, destination: LoggingDestination): + self.destination = destination + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingConfig": + proto_attr_name = cast(str, config_proto.WhichOneof("destination")) + destination_class = _DestinationRegistry.classes_by_proto_attr_name[ + proto_attr_name + ] + return LoggingConfig(destination=destination_class.from_proto(config_proto)) + + def to_proto(self) -> LoggingConfigProto: + proto = self.destination.to_proto() + return proto diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 492d31a809..bfa48b3bf4 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,6 +5,7 @@ from google.protobuf.json_format import MessageToJson from feast.base_feature_view import BaseFeatureView +from feast.feature_logging import LoggingConfig from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.on_demand_feature_view import OnDemandFeatureView @@ -44,6 +45,7 @@ class FeatureService: owner: str created_timestamp: Optional[datetime] = None last_updated_timestamp: Optional[datetime] = None + logging_config: Optional[LoggingConfig] = None @log_exceptions def __init__( @@ -54,6 +56,7 @@ def __init__( tags: Dict[str, str] = None, description: str = "", owner: str = "", + logging_config: Optional[LoggingConfig] = None, ): """ Creates a FeatureService object. @@ -106,6 +109,7 @@ def __init__( self.owner = owner self.created_timestamp = None self.last_updated_timestamp = None + self.logging_config = logging_config def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) @@ -152,6 +156,9 @@ def from_proto(cls, feature_service_proto: FeatureServiceProto): tags=dict(feature_service_proto.spec.tags), description=feature_service_proto.spec.description, owner=feature_service_proto.spec.owner, + logging_config=LoggingConfig.from_proto( + feature_service_proto.spec.logging_config + ), ) fs.feature_view_projections.extend( [ @@ -192,6 +199,9 @@ def to_proto(self) -> FeatureServiceProto: tags=self.tags, description=self.description, owner=self.owner, + logging_config=self.logging_config.to_proto() + if self.logging_config + else None, ) return FeatureServiceProto(spec=spec, meta=meta) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4f456be384..36f80e6fc9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -34,6 +34,7 @@ ) import pandas as pd +import pyarrow as pa from colorama import Fore, Style from google.protobuf.timestamp_pb2 import Timestamp from tqdm import tqdm @@ -1976,6 +1977,21 @@ def serve_transformations(self, port: int) -> None: def _teardown_go_server(self): self._go_server = None + def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]): + if not isinstance(source, FeatureService): + raise ValueError("Only feature service is currently supported as a source") + + assert ( + source.logging_config is not None + ), "Feature service must have logging config attached" + + self._get_provider().write_feature_service_logs( + feature_service=source, + logs=logs, + config=self.config, + registry=self._registry, + ) + def _validate_entity_values(join_key_values: Dict[str, List[Value]]): set_of_row_lengths = {len(v) for v in join_key_values.values()} diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 29d0e029d9..0d4106a36d 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -1,4 +1,5 @@ import contextlib +import tempfile import uuid from datetime import date, datetime, timedelta from typing import ( @@ -28,6 +29,7 @@ FeastProviderLoginError, InvalidEntityType, ) +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -41,13 +43,18 @@ from ...saved_dataset import SavedDatasetStorage from ...usage import log_exceptions_and_usage -from .bigquery_source import BigQuerySource, SavedDatasetBigQueryStorage +from .bigquery_source import ( + BigqueryLoggingDestination, + BigQuerySource, + SavedDatasetBigQueryStorage, +) try: from google.api_core.exceptions import NotFound from google.auth.exceptions import DefaultCredentialsError from google.cloud import bigquery - from google.cloud.bigquery import Client, Table + from google.cloud.bigquery import Client, SchemaField, Table + from google.cloud.bigquery._pandas_helpers import ARROW_SCALAR_IDS_TO_BQ except ImportError as e: from feast.errors import FeastExtrasDependencyImportError @@ -248,6 +255,43 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, BigqueryLoggingDestination) + + client = _get_bigquery_client( + project=config.offline_store.project_id, + location=config.offline_store.location, + ) + + job_config = bigquery.LoadJobConfig( + source_format=bigquery.SourceFormat.PARQUET, + schema=arrow_schema_to_bq_schema(source.get_schema(registry)), + ) + partition_col = source.get_partition_column(registry) + if partition_col: + job_config.time_partitioning = bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, field=partition_col + ) + + with tempfile.TemporaryFile() as parquet_temp_file: + pyarrow.parquet.write_table(table=data, where=parquet_temp_file) + + parquet_temp_file.seek(0) + + client.load_table_from_file( + file_obj=parquet_temp_file, + destination=destination.table_ref, + job_config=job_config, + ) + class BigQueryRetrievalJob(RetrievalJob): def __init__( @@ -513,7 +557,9 @@ def _get_entity_df_event_timestamp_range( return entity_df_event_timestamp_range -def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] = None): +def _get_bigquery_client( + project: Optional[str] = None, location: Optional[str] = None +) -> bigquery.Client: try: client = bigquery.Client(project=project, location=location) except DefaultCredentialsError as e: @@ -533,6 +579,24 @@ def _get_bigquery_client(project: Optional[str] = None, location: Optional[str] return client +def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]: + bq_schema = [] + + for field in arrow_schema: + if pyarrow.types.is_list(field.type): + detected_mode = "REPEATED" + detected_type = ARROW_SCALAR_IDS_TO_BQ[field.type.value_type.id] + else: + detected_mode = "NULLABLE" + detected_type = ARROW_SCALAR_IDS_TO_BQ[field.type.id] + + bq_schema.append( + SchemaField(name=field.name, field_type=detected_type, mode=detected_mode) + ) + + return bq_schema + + # TODO: Optimizations # * Use GENERATE_UUID() instead of ROW_NUMBER(), or join on entity columns directly # * Precompute ROW_NUMBER() so that it doesn't have to be recomputed for every query on entity_dataframe diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 001576c98f..1e5ffec3a6 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -4,7 +4,11 @@ from feast import type_map from feast.data_source import DataSource from feast.errors import DataSourceNotFoundException +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -253,3 +257,28 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return BigQuerySource(table=self.bigquery_options.table) + + +class BigqueryLoggingDestination(LoggingDestination): + _proto_attr_name = "bigquery_destination" + + table_ref: str + + def __init__(self, table_ref): + self.table_ref = table_ref + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return BigqueryLoggingDestination( + table_ref=config_proto.bigquery_destination.table_ref, + ) + + def to_data_source(self) -> DataSource: + return BigQuerySource(table=self.table_ref) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + bigquery_destination=LoggingConfigProto.BigQueryDestination( + table_ref=self.table_ref + ) + ) diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 052d546748..31a2123ad8 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -4,14 +4,19 @@ import dask.dataframe as dd import pandas as pd import pyarrow +import pyarrow.parquet import pytz from pydantic.typing import Literal from feast import FileSource, OnDemandFeatureView from feast.data_source import DataSource from feast.errors import FeastJoinKeysDuringMaterialization +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView -from feast.infra.offline_stores.file_source import SavedDatasetFileStorage +from feast.infra.offline_stores.file_source import ( + FileLoggingDestination, + SavedDatasetFileStorage, +) from feast.infra.offline_stores.offline_store import ( OfflineStore, RetrievalJob, @@ -367,6 +372,28 @@ def pull_all_from_table_or_query( end_date=end_date, ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, FileLoggingDestination) + + filesystem, path = FileSource.create_filesystem_and_path( + destination.path, destination.s3_endpoint_override, + ) + + pyarrow.parquet.write_to_dataset( + data, + root_path=path, + partition_cols=[source.get_partition_column(registry)], + filesystem=filesystem, + ) + def _get_entity_df_event_timestamp_range( entity_df: Union[pd.DataFrame, str], entity_df_event_timestamp_col: str, diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index a6fc7a1600..08c91a8eda 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -8,7 +8,11 @@ from feast import type_map from feast.data_format import FileFormat, ParquetFormat from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -290,3 +294,35 @@ def to_data_source(self) -> DataSource: file_format=self.file_options.file_format, s3_endpoint_override=self.file_options.s3_endpoint_override, ) + + +class FileLoggingDestination(LoggingDestination): + _proto_attr_name = "file_destination" + + path: str + s3_endpoint_override: str + + def __init__(self, *, path: str, s3_endpoint_override=""): + self.path = path + self.s3_endpoint_override = s3_endpoint_override + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return FileLoggingDestination( + path=config_proto.file_destination.path, + s3_endpoint_override=config_proto.file_destination.s3_endpoint_override, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + file_destination=LoggingConfigProto.FileDestination( + path=self.path, s3_endpoint_override=self.s3_endpoint_override, + ) + ) + + def to_data_source(self) -> DataSource: + return FileSource( + path=self.path, + file_format=ParquetFormat(), + s3_endpoint_override=self.s3_endpoint_override, + ) diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 83f20bb3e5..8b9d7ed41e 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -21,6 +21,7 @@ from feast.data_source import DataSource from feast.dqm.errors import ValidationFailed +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry @@ -241,3 +242,19 @@ def pull_all_from_table_or_query( end_date: Ending date of query """ pass + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + """ + Flush data with logged features to specified destination in offline store. + Data should be appended to existing table / destination or create a new one if it doesn't exist. + + This is optional method that could be supported only be some stores. + """ + raise NotImplementedError() diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index cd309c92b2..885f65bc30 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -14,6 +14,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa from dateutil import parser from pydantic import StrictStr @@ -23,6 +24,7 @@ from feast import OnDemandFeatureView, RedshiftSource from feast.data_source import DataSource from feast.errors import InvalidEntityType +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -30,7 +32,10 @@ RetrievalJob, RetrievalMetadata, ) -from feast.infra.offline_stores.redshift_source import SavedDatasetRedshiftStorage +from feast.infra.offline_stores.redshift_source import ( + RedshiftLoggingDestination, + SavedDatasetRedshiftStorage, +) from feast.infra.utils import aws_utils from feast.registry import Registry from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -257,6 +262,36 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + destination = logging_config.destination + assert isinstance(destination, RedshiftLoggingDestination) + + redshift_client = aws_utils.get_redshift_data_client( + config.offline_store.region + ) + s3_resource = aws_utils.get_s3_resource(config.offline_store.region) + s3_path = f"{config.offline_store.s3_staging_location}/logged_features/{uuid.uuid4()}.parquet" + + aws_utils.upload_arrow_table_to_redshift( + table=data, + redshift_data_client=redshift_client, + cluster_id=config.offline_store.cluster_id, + database=config.offline_store.database, + user=config.offline_store.user, + s3_resource=s3_resource, + s3_path=s3_path, + iam_role=config.offline_store.iam_role, + table_name=destination.table_ref, + fail_if_exists=False, + ) + class RedshiftRetrievalJob(RetrievalJob): def __init__( diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 00af8c1abf..107954fa63 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -4,7 +4,11 @@ from feast import type_map from feast.data_source import DataSource from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -327,3 +331,28 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return RedshiftSource(table=self.redshift_options.table) + + +class RedshiftLoggingDestination(LoggingDestination): + _proto_attr_name = "redshift_destination" + + table_ref: str + + def __init__(self, table_ref: str): + self.table_ref = table_ref + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return RedshiftLoggingDestination( + table_ref=config_proto.redshift_destination.table_ref, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + redshift_destination=LoggingConfigProto.RedshiftDestination( + table_ref=self.table_ref + ) + ) + + def to_data_source(self) -> DataSource: + return RedshiftSource(table=self.table_ref) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index a07f7a57c6..4b91f0234a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -16,6 +16,7 @@ import numpy as np import pandas as pd +import pyarrow import pyarrow as pa from pydantic import Field from pydantic.typing import Literal @@ -24,6 +25,7 @@ from feast import OnDemandFeatureView from feast.data_source import DataSource from feast.errors import InvalidEntityType +from feast.feature_logging import LoggingConfig, LoggingSource from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView from feast.infra.offline_stores import offline_utils from feast.infra.offline_stores.offline_store import ( @@ -33,6 +35,7 @@ ) from feast.infra.offline_stores.snowflake_source import ( SavedDatasetSnowflakeStorage, + SnowflakeLoggingDestination, SnowflakeSource, ) from feast.infra.utils.snowflake_utils import ( @@ -274,6 +277,27 @@ def query_generator() -> Iterator[str]: ), ) + @staticmethod + def write_logged_features( + config: RepoConfig, + data: pyarrow.Table, + source: LoggingSource, + logging_config: LoggingConfig, + registry: Registry, + ): + assert isinstance(logging_config.destination, SnowflakeLoggingDestination) + + snowflake_conn = get_snowflake_conn(config.offline_store) + + write_pandas( + snowflake_conn, + data.to_pandas(), + table_name=logging_config.destination.table, + schema=logging_config.destination.schema, + database=logging_config.destination.database, + auto_create_table=True, + ) + class SnowflakeRetrievalJob(RetrievalJob): def __init__( diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 904fc48043..155c1821de 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -3,7 +3,11 @@ from feast import type_map from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.FeatureService_pb2 import ( + LoggingConfig as LoggingConfigProto, +) from feast.protos.feast.core.SavedDataset_pb2 import ( SavedDatasetStorage as SavedDatasetStorageProto, ) @@ -329,3 +333,38 @@ def to_proto(self) -> SavedDatasetStorageProto: def to_data_source(self) -> DataSource: return SnowflakeSource(table=self.snowflake_options.table) + + +class SnowflakeLoggingDestination(LoggingDestination): + table: str + schema: Optional[str] + database: Optional[str] + + def __init__( + self, table: str, schema: Optional[str] = None, database: Optional[str] = None + ): + self.table = table + self.schema = schema + self.database = database + + @classmethod + def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": + return SnowflakeLoggingDestination( + table=config_proto.snowflake_destination.table, + schema=config_proto.snowflake_destination.schema or None, + database=config_proto.snowflake_destination.database or None, + ) + + def to_proto(self) -> LoggingConfigProto: + return LoggingConfigProto( + snowflake_destination=LoggingConfigProto.SnowflakeDestination( + table=self.table, + schema=self.schema or "", + database=self.database or "", + ) + ) + + def to_data_source(self) -> DataSource: + return SnowflakeSource( + table=self.table, schema=self.schema, database=self.database, + ) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 09ca98d86d..d7a60ef3cc 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -2,10 +2,13 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +import pyarrow import pyarrow as pa from tqdm import tqdm +from feast import FeatureService from feast.entity import Entity +from feast.feature_logging import FeatureServiceLoggingSource from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config @@ -214,3 +217,51 @@ def retrieve_saved_dataset( start_date=make_tzaware(dataset.min_event_timestamp), # type: ignore end_date=make_tzaware(dataset.max_event_timestamp + timedelta(seconds=1)), # type: ignore ) + + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for a feature service before calling this function" + + self.offline_store.write_logged_features( + config=config, + data=logs, + source=FeatureServiceLoggingSource(feature_service, config.project), + logging_config=feature_service.logging_config, + registry=registry, + ) + + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + from_: datetime, + to: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: + assert ( + feature_service.logging_config is not None + ), "Logging should be configured for a feature service before calling this function" + + logging_source = FeatureServiceLoggingSource(feature_service, config.project) + schema = logging_source.get_schema(registry) + logging_config = feature_service.logging_config + ts_column = logging_source.get_log_timestamp_column() + partition_column = logging_source.get_partition_column(registry) + columns = list(set(schema.names) - {ts_column, partition_column}) + + return self.offline_store.pull_all_from_table_or_query( + config=config, + data_source=logging_config.destination.to_data_source(), + join_key_columns=[], + feature_name_columns=columns, + event_timestamp_column=ts_column, + start_date=from_, + end_date=to, + ) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index a71bd6d2d0..12c7547767 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -9,7 +9,7 @@ import pyarrow from tqdm import tqdm -from feast import errors +from feast import FeatureService, errors from feast.entity import Entity from feast.feature_view import DUMMY_ENTITY_ID, FeatureView from feast.importer import import_class @@ -186,6 +186,30 @@ def retrieve_saved_dataset( """ ... + @abc.abstractmethod + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): + """ + + """ + ... + + @abc.abstractmethod + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + from_: datetime, + to: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: + ... + def get_feature_server_endpoint(self) -> Optional[str]: """Returns endpoint for the feature server, if it exists.""" return None diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index fe5eed774e..9147c79486 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -5,6 +5,7 @@ from typing import Any, Dict, Iterator, Optional, Tuple import pandas as pd +import pyarrow import pyarrow as pa import pyarrow.parquet as pq from tenacity import ( @@ -194,13 +195,6 @@ def upload_df_to_redshift( The caller is responsible for deleting the table when no longer necessary. - Here's how the upload process works: - 1. Pandas DataFrame is converted to PyArrow Table - 2. PyArrow Table is serialized into a Parquet format on local disk - 3. The Parquet file is uploaded to S3 - 4. The S3 file is uploaded to Redshift as a new table through COPY command - 5. The local disk & s3 paths are cleaned up - Args: redshift_data_client: Redshift Data API Service client cluster_id: Redshift Cluster Identifier @@ -216,10 +210,6 @@ def upload_df_to_redshift( Raises: RedshiftTableNameTooLong: The specified table name is too long. """ - if len(table_name) > REDSHIFT_TABLE_NAME_MAX_LENGTH: - raise RedshiftTableNameTooLong(table_name) - - bucket, key = get_bucket_and_key(s3_path) # Drop the index so that we dont have unnecessary columns df.reset_index(drop=True, inplace=True) @@ -231,35 +221,92 @@ def upload_df_to_redshift( # More details at: # https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html#values-considered-missing table = pa.Table.from_pandas(df) - column_names, column_types = [], [] - for field in table.schema: - column_names.append(field.name) - column_types.append(pa_to_redshift_value_type(field.type)) + upload_arrow_table_to_redshift( + table, + redshift_data_client, + cluster_id=cluster_id, + database=database, + user=user, + s3_resource=s3_resource, + iam_role=iam_role, + s3_path=s3_path, + table_name=table_name, + ) + + +def upload_arrow_table_to_redshift( + table: pyarrow.Table, + redshift_data_client, + cluster_id: str, + database: str, + user: str, + s3_resource, + iam_role: str, + s3_path: str, + table_name: str, + fail_if_exists: bool = True, +): + """Uploads an Arrow Table to Redshift to a new or existing table. + + Here's how the upload process works: + 1. PyArrow Table is serialized into a Parquet format on local disk + 2. The Parquet file is uploaded to S3 + 3. The S3 file is uploaded to Redshift as a new table through COPY command + 4. The local disk & s3 paths are cleaned up + + Args: + redshift_data_client: Redshift Data API Service client + cluster_id: Redshift Cluster Identifier + database: Redshift Database Name + user: Redshift username + s3_resource: S3 Resource object + s3_path: S3 path where the Parquet file is temporarily uploaded + iam_role: IAM Role for Redshift to assume during the COPY command. + The role must grant permission to read the S3 location. + table_name: The name of the new Redshift table where we copy the dataframe + table: The Arrow Table to upload + fail_if_exists: fail if table with such name exists or append data to existing table + + Raises: + RedshiftTableNameTooLong: The specified table name is too long. + """ + if len(table_name) > REDSHIFT_TABLE_NAME_MAX_LENGTH: + raise RedshiftTableNameTooLong(table_name) + + bucket, key = get_bucket_and_key(s3_path) + column_query_list = ", ".join( [ - f"{column_name} {column_type}" - for column_name, column_type in zip(column_names, column_types) + f"{field.name} {pa_to_redshift_value_type(field.type)}" + for field in table.schema ] ) # Write the PyArrow Table on disk in Parquet format and upload it to S3 - with tempfile.TemporaryDirectory() as temp_dir: - file_path = f"{temp_dir}/{uuid.uuid4()}.parquet" - pq.write_table(table, file_path) - s3_resource.Object(bucket, key).put(Body=open(file_path, "rb")) + with tempfile.TemporaryFile(suffix=".parquet") as parquet_temp_file: + pq.write_table(table, parquet_temp_file) + parquet_temp_file.seek(0) + s3_resource.Object(bucket, key).put(Body=parquet_temp_file) - # Create the table with the desired schema and - # copy the Parquet file contents to the Redshift table - create_and_copy_query = ( - f"CREATE TABLE {table_name}({column_query_list}); " - + f"COPY {table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" + copy_query = ( + f"COPY {table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET" ) - execute_redshift_statement( - redshift_data_client, cluster_id, database, user, create_and_copy_query + create_query = ( + f"CREATE TABLE {'IF NOT EXISTS' if not fail_if_exists else ''}" + f" {table_name}({column_query_list})" ) - # Clean up S3 temporary data - s3_resource.Object(bucket, key).delete() + try: + execute_redshift_statement( + redshift_data_client, + cluster_id, + database, + user, + f"{create_query}; {copy_query}", + ) + finally: + # Clean up S3 temporary data + s3_resource.Object(bucket, key).delete() @contextlib.contextmanager diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index ba36f8e89b..4c6a693d2e 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -4,6 +4,7 @@ import pandas as pd from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.repo_config import FeastConfigBaseModel from feast.saved_dataset import SavedDatasetStorage @@ -51,6 +52,10 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def create_saved_dataset_destination(self) -> SavedDatasetStorage: ... + @abstractmethod + def create_logged_features_destination(self) -> LoggingDestination: + ... + @abstractmethod def teardown(self): ... diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 881f547617..5149334e30 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -7,8 +7,12 @@ from feast import BigQuerySource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig -from feast.infra.offline_stores.bigquery_source import SavedDatasetBigQueryStorage +from feast.infra.offline_stores.bigquery_source import ( + BigqueryLoggingDestination, + SavedDatasetBigQueryStorage, +) from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) @@ -86,5 +90,11 @@ def create_saved_dataset_destination(self) -> SavedDatasetBigQueryStorage: ) return SavedDatasetBigQueryStorage(table=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" + ) + return BigqueryLoggingDestination(table_ref=table) + def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.client.project}.{self.project_name}.{suffix}" diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 64c3aeacf3..76a4bbc171 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -1,3 +1,4 @@ +import shutil import tempfile import uuid from typing import Any, Dict, List, Optional @@ -10,8 +11,12 @@ from feast import FileSource from feast.data_format import ParquetFormat from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.file import FileOfflineStoreConfig -from feast.infra.offline_stores.file_source import SavedDatasetFileStorage +from feast.infra.offline_stores.file_source import ( + FileLoggingDestination, + SavedDatasetFileStorage, +) from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, @@ -20,10 +25,12 @@ class FileDataSourceCreator(DataSourceCreator): files: List[Any] + dirs: List[Any] def __init__(self, project_name: str, *args, **kwargs): super().__init__(project_name) self.files = [] + self.dirs = [] def create_data_source( self, @@ -53,6 +60,7 @@ def create_data_source( def create_saved_dataset_destination(self) -> SavedDatasetFileStorage: d = tempfile.mkdtemp(prefix=self.project_name) + self.dirs.append(d) return SavedDatasetFileStorage( path=d, file_format=ParquetFormat(), s3_endpoint_override=None ) @@ -63,10 +71,18 @@ def get_prefixed_table_name(self, suffix: str) -> str: def create_offline_store_config(self) -> FeastConfigBaseModel: return FileOfflineStoreConfig() + def create_logged_features_destination(self) -> LoggingDestination: + d = tempfile.mkdtemp(prefix=self.project_name) + self.dirs.append(d) + return FileLoggingDestination(path=d) + def teardown(self): for f in self.files: f.close() + for d in self.dirs: + shutil.rmtree(d) + class S3FileDataSourceCreator(DataSourceCreator): f: Any diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index 7e305fee80..a5c50a5773 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -5,8 +5,12 @@ from feast import RedshiftSource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig -from feast.infra.offline_stores.redshift_source import SavedDatasetRedshiftStorage +from feast.infra.offline_stores.redshift_source import ( + RedshiftLoggingDestination, + SavedDatasetRedshiftStorage, +) from feast.infra.utils import aws_utils from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( @@ -74,6 +78,14 @@ def create_saved_dataset_destination(self) -> SavedDatasetRedshiftStorage: return SavedDatasetRedshiftStorage(table_ref=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"persisted_ds_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.tables.append(table) + + return RedshiftLoggingDestination(table_ref=table) + def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 3942444f32..8db2d56996 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -6,8 +6,12 @@ from feast import SnowflakeSource from feast.data_source import DataSource +from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.snowflake import SnowflakeOfflineStoreConfig -from feast.infra.offline_stores.snowflake_source import SavedDatasetSnowflakeStorage +from feast.infra.offline_stores.snowflake_source import ( + SavedDatasetSnowflakeStorage, + SnowflakeLoggingDestination, +) from feast.infra.utils.snowflake_utils import get_snowflake_conn, write_pandas from feast.repo_config import FeastConfigBaseModel from tests.integration.feature_repos.universal.data_source_creator import ( @@ -66,6 +70,14 @@ def create_saved_dataset_destination(self) -> SavedDatasetSnowflakeStorage: return SavedDatasetSnowflakeStorage(table_ref=table) + def create_logged_features_destination(self) -> LoggingDestination: + table = self.get_prefixed_table_name( + f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" + ) + self.tables.append(table) + + return SnowflakeLoggingDestination(table=table) + def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py new file mode 100644 index 0000000000..e0ab1e6264 --- /dev/null +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -0,0 +1,109 @@ +import datetime +import uuid + +import numpy as np +import pandas as pd +import pyarrow as pa +import pytest + +from feast.feature_logging import LoggingConfig +from feast.feature_service import FeatureService +from feast.protos.feast.serving.ServingService_pb2 import FieldStatus +from feast.wait import wait_retry_backoff +from tests.integration.feature_repos.repo_configuration import ( + UniversalDatasets, + construct_universal_feature_views, +) +from tests.integration.feature_repos.universal.entities import driver +from tests.integration.feature_repos.universal.feature_views import conv_rate_plus_100 + + +@pytest.mark.integration +@pytest.mark.universal +def test_feature_service_logging(environment, universal_data_sources): + store = environment.feature_store + + (_, datasets, data_sources) = universal_data_sources + + feature_views = construct_universal_feature_views(data_sources) + store.apply([driver(), *feature_views.values()]) + + logs_df = prepare_logs(datasets) + + feature_service = FeatureService( + name="test_service", + features=[ + feature_views.driver[["conv_rate", "avg_daily_trips"]], + feature_views.driver_odfv[ + ["conv_rate_plus_val_to_add", "conv_rate_plus_100_rounded"] + ], + ], + logging_config=LoggingConfig( + destination=environment.data_source_creator.create_logged_features_destination() + ), + ) + + num_rows = logs_df.shape[0] + first_batch = logs_df.iloc[: num_rows // 2, :] + second_batch = logs_df.iloc[num_rows // 2 :, :] + + store.write_logged_features( + source=feature_service, logs=pa.Table.from_pandas(first_batch), + ) + + store.write_logged_features( + source=feature_service, logs=pa.Table.from_pandas(second_batch), + ) + expected_columns = list(set(logs_df.columns) - {"log_date"}) + + def retrieve(): + retrieval_job = store._get_provider().retrieve_feature_service_logs( + feature_service=feature_service, + from_=logs_df["log_timestamp"].min(), + to=logs_df["log_timestamp"].max() + datetime.timedelta(seconds=1), + config=store.config, + registry=store._registry, + ) + + df = retrieval_job.to_df() + return df, df.shape[0] == logs_df.shape[0] + + persisted_logs = wait_retry_backoff( + retrieve, timeout_secs=60, timeout_msg="Logs retrieval failed" + ) + + persisted_logs = persisted_logs[expected_columns] + logs_df = logs_df[expected_columns] + pd.testing.assert_frame_equal( + logs_df.sort_values("request_id").reset_index(drop=True), + persisted_logs.sort_values("request_id").reset_index(drop=True), + check_dtype=False, + ) + + +def prepare_logs(datasets: UniversalDatasets) -> pd.DataFrame: + driver_df = datasets.driver_df + driver_df["val_to_add"] = 50 + driver_df = driver_df.join(conv_rate_plus_100(driver_df)) + num_rows = driver_df.shape[0] + + logs_df = driver_df[["driver_id", "val_to_add"]] + logs_df["request_id"] = [str(uuid.uuid4()) for _ in range(num_rows)] + logs_df["log_timestamp"] = pd.Series( + np.random.randint(0, 7 * 24 * 3600, num_rows) + ).map(lambda secs: pd.Timestamp.utcnow() - datetime.timedelta(seconds=secs)) + logs_df["log_date"] = logs_df["log_timestamp"].dt.date + + for view, features in ( + ("driver_stats", ("conv_rate", "avg_daily_trips")), + ( + "conv_rate_plus_100", + ("conv_rate_plus_val_to_add", "conv_rate_plus_100_rounded"), + ), + ): + for feature in features: + logs_df[f"{view}__{feature}"] = driver_df[feature] + logs_df[f"{view}__{feature}__timestamp"] = driver_df["event_timestamp"] + logs_df[f"{view}__{feature}__status"] = FieldStatus.PRESENT + + return logs_df From f3f9998bd74cb4474a42c526e9073096fc459cee Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 13:31:05 -0700 Subject: [PATCH 02/16] format Signed-off-by: pyalex --- protos/feast/core/FeatureService.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index a4091bc22e..4827278811 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -87,4 +87,4 @@ message LoggingConfig { // Snowflake schema name string database = 3; } -} \ No newline at end of file +} From af58966edddce21cba6f7f5f4a807b5ee0f81159 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 13:38:54 -0700 Subject: [PATCH 03/16] fix after rebase Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 90e8b6616e..c0363b5a8a 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -70,8 +70,8 @@ def get_schema(self, registry: "Registry") -> pa.Schema: for ( request_source ) in on_demand_feature_view.source_request_sources.values(): - for name, dtype in request_source.schema.items(): - fields[name] = FEAST_TYPE_TO_ARROW_TYPE[from_value_type(dtype)] + for field in request_source.schema: + fields[field.name] = FEAST_TYPE_TO_ARROW_TYPE[field.dtype] else: for entity_name in feature_view.entities: From 2681b3c6cffbe4470bb324e824a3ada886f53355 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 14:21:10 -0700 Subject: [PATCH 04/16] fix tests Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 7 +++++-- sdk/python/tests/foo_provider.py | 11 ++++++++++- .../feature_repos/universal/data_sources/file.py | 12 ++++++++++++ 3 files changed, 27 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index c0363b5a8a..fd648624c3 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -1,5 +1,5 @@ import abc -from typing import TYPE_CHECKING, Dict, Type, cast +from typing import TYPE_CHECKING, Dict, Optional, Type, cast import pyarrow as pa from pytz import UTC @@ -133,8 +133,11 @@ def __init__(self, destination: LoggingDestination): self.destination = destination @classmethod - def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingConfig": + def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: proto_attr_name = cast(str, config_proto.WhichOneof("destination")) + if proto_attr_name is None: + return + destination_class = _DestinationRegistry.classes_by_proto_attr_name[ proto_attr_name ] diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 1d4ce7d6cb..f03719c46a 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -2,9 +2,10 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union import pandas +import pyarrow from tqdm import tqdm -from feast import Entity, FeatureView, RepoConfig +from feast import Entity, FeatureView, RepoConfig, FeatureService from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -79,3 +80,11 @@ def online_read( def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset): pass + + def write_feature_service_logs(self, feature_service: FeatureService, logs: pyarrow.Table, config: RepoConfig, + registry: Registry): + pass + + def retrieve_feature_service_logs(self, feature_service: FeatureService, from_: datetime, to: datetime, + config: RepoConfig, registry: Registry) -> RetrievalJob: + pass diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 76a4bbc171..ccc1544bb8 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -1,3 +1,4 @@ +import os.path import shutil import tempfile import uuid @@ -81,6 +82,8 @@ def teardown(self): f.close() for d in self.dirs: + if not os.path.exists(d): + continue shutil.rmtree(d) @@ -159,6 +162,15 @@ def create_saved_dataset_destination(self) -> SavedDatasetFileStorage: s3_endpoint_override=f"http://{host}:{port}", ) + def create_logged_features_destination(self) -> LoggingDestination: + port = self.minio.get_exposed_port("9000") + host = self.minio.get_container_host_ip() + + return FileLoggingDestination( + path=f"s3://{self.bucket}/logged_features/{str(uuid.uuid4())}", + s3_endpoint_override=f"http://{host}:{port}", + ) + def get_prefixed_table_name(self, suffix: str) -> str: return f"{suffix}" From 5637da69c3d05a511631a84ddf03b92b309a59c0 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 14:47:27 -0700 Subject: [PATCH 05/16] handle table not found in tests Signed-off-by: pyalex --- sdk/python/tests/foo_provider.py | 21 ++++++++++++++----- .../offline_store/test_feature_logging.py | 7 ++++++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index f03719c46a..bccf7931b5 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -5,7 +5,7 @@ import pyarrow from tqdm import tqdm -from feast import Entity, FeatureView, RepoConfig, FeatureService +from feast import Entity, FeatureService, FeatureView, RepoConfig from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -81,10 +81,21 @@ def online_read( def retrieve_saved_dataset(self, config: RepoConfig, dataset: SavedDataset): pass - def write_feature_service_logs(self, feature_service: FeatureService, logs: pyarrow.Table, config: RepoConfig, - registry: Registry): + def write_feature_service_logs( + self, + feature_service: FeatureService, + logs: pyarrow.Table, + config: RepoConfig, + registry: Registry, + ): pass - def retrieve_feature_service_logs(self, feature_service: FeatureService, from_: datetime, to: datetime, - config: RepoConfig, registry: Registry) -> RetrievalJob: + def retrieve_feature_service_logs( + self, + feature_service: FeatureService, + from_: datetime, + to: datetime, + config: RepoConfig, + registry: Registry, + ) -> RetrievalJob: pass diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index e0ab1e6264..2bf1384a75 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -5,6 +5,7 @@ import pandas as pd import pyarrow as pa import pytest +from google.api_core.exceptions import NotFound from feast.feature_logging import LoggingConfig from feast.feature_service import FeatureService @@ -64,8 +65,12 @@ def retrieve(): config=store.config, registry=store._registry, ) + try: + df = retrieval_job.to_df() + except NotFound: + # Table was not created yet + return None, False - df = retrieval_job.to_df() return df, df.shape[0] == logs_df.shape[0] persisted_logs = wait_retry_backoff( From 43c97c2b89a2a550191ec81e3f1a0e7bc2533404 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 19 Apr 2022 17:24:00 -0700 Subject: [PATCH 06/16] some api docs Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 3 +++ sdk/python/feast/feature_store.py | 6 +++++- .../feast/infra/offline_stores/offline_store.py | 15 ++++++++++++--- sdk/python/feast/infra/passthrough_provider.py | 4 ++-- sdk/python/feast/infra/provider.py | 11 +++++++++++ 5 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index fd648624c3..35787f97d3 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -24,14 +24,17 @@ class LoggingSource: @abc.abstractmethod def get_schema(self, registry: "Registry") -> pa.Schema: + """ Generate schema for logs destination. """ raise NotImplementedError @abc.abstractmethod def get_partition_column(self, registry: "Registry") -> str: + """ Return partition column that must exist in generated schema. """ raise NotImplementedError @abc.abstractmethod def get_log_timestamp_column(self) -> str: + """ Return timestamp column that must exist in generated schema. """ raise NotImplementedError diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 36f80e6fc9..95b1458849 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1978,12 +1978,16 @@ def _teardown_go_server(self): self._go_server = None def write_logged_features(self, logs: pa.Table, source: Union[FeatureService]): + """ + Write logs produced by a source (currently only feature service is supported as a source) + to an offline store. + """ if not isinstance(source, FeatureService): raise ValueError("Only feature service is currently supported as a source") assert ( source.logging_config is not None - ), "Feature service must have logging config attached" + ), "Feature service must be configured with logging config in order to use this functionality" self._get_provider().write_feature_service_logs( feature_service=source, diff --git a/sdk/python/feast/infra/offline_stores/offline_store.py b/sdk/python/feast/infra/offline_stores/offline_store.py index 8b9d7ed41e..ed545ed5ad 100644 --- a/sdk/python/feast/infra/offline_stores/offline_store.py +++ b/sdk/python/feast/infra/offline_stores/offline_store.py @@ -252,9 +252,18 @@ def write_logged_features( registry: Registry, ): """ - Flush data with logged features to specified destination in offline store. - Data should be appended to existing table / destination or create a new one if it doesn't exist. + Write logged features to a specified destination (taken from logging_config) in the offline store. + Data can be appended to an existing table (destination) or a new one will be created automatically + (if it doesn't exist). + Hence, this function can be called repeatedly with the same destination to flush logs in chunks. - This is optional method that could be supported only be some stores. + Args: + config: Repo configuration object + data: Arrow table produced by logging source. + source: Logging source that provides schema and some additional metadata. + logging_config: used to determine destination + registry: Feast registry + + This is an optional method that could be supported only be some stores. """ raise NotImplementedError() diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index d7a60ef3cc..271fcbc5ff 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -227,7 +227,7 @@ def write_feature_service_logs( ): assert ( feature_service.logging_config is not None - ), "Logging should be configured for a feature service before calling this function" + ), "Logging should be configured for the feature service before calling this function" self.offline_store.write_logged_features( config=config, @@ -247,7 +247,7 @@ def retrieve_feature_service_logs( ) -> RetrievalJob: assert ( feature_service.logging_config is not None - ), "Logging should be configured for a feature service before calling this function" + ), "Logging should be configured for the feature service before calling this function" logging_source = FeatureServiceLoggingSource(feature_service, config.project) schema = logging_source.get_schema(registry) diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 12c7547767..f8c2a4482f 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -195,7 +195,10 @@ def write_feature_service_logs( registry: Registry, ): """ + Write features and entities logged by a feature server to an offline store. + Schema of logs table is being inferred from the provided feature service. + Only feature services with configured logging are accepted. """ ... @@ -208,6 +211,14 @@ def retrieve_feature_service_logs( config: RepoConfig, registry: Registry, ) -> RetrievalJob: + """ + Read logged features from an offline store for a given time window [from, to). + Target table is determined based on logging configuration from the feature service. + + Returns: + RetrievalJob object, which wraps the query to the offline store. + + """ ... def get_feature_server_endpoint(self) -> Optional[str]: From c5c7fbebceb9310b76f71835db5dc4d10a8c1033 Mon Sep 17 00:00:00 2001 From: pyalex Date: Wed, 20 Apr 2022 20:27:29 -0700 Subject: [PATCH 07/16] fix import Signed-off-by: pyalex --- sdk/python/feast/embedded_go/types.py | 85 --------------------------- sdk/python/feast/feature_logging.py | 2 +- 2 files changed, 1 insertion(+), 86 deletions(-) delete mode 100644 sdk/python/feast/embedded_go/types.py diff --git a/sdk/python/feast/embedded_go/types.py b/sdk/python/feast/embedded_go/types.py deleted file mode 100644 index a5b43c151b..0000000000 --- a/sdk/python/feast/embedded_go/types.py +++ /dev/null @@ -1,85 +0,0 @@ -from typing import List - -import pyarrow as pa - -from feast.protos.feast.types import Value_pb2 -from feast.types import Array, PrimitiveFeastType - -ARROW_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_val", - pa.int64(): "int64_val", - pa.float32(): "float_val", - pa.float64(): "double_val", - pa.bool_(): "bool_val", - pa.string(): "string_val", - pa.binary(): "bytes_val", - pa.time64("ns"): "unix_timestamp_val", -} - -ARROW_LIST_TYPE_TO_PROTO_FIELD = { - pa.int32(): "int32_list_val", - pa.int64(): "int64_list_val", - pa.float32(): "float_list_val", - pa.float64(): "double_list_val", - pa.bool_(): "bool_list_val", - pa.string(): "string_list_val", - pa.binary(): "bytes_list_val", - pa.time64("ns"): "unix_timestamp_list_val", -} - -ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS = { - pa.int32(): Value_pb2.Int32List, - pa.int64(): Value_pb2.Int64List, - pa.float32(): Value_pb2.FloatList, - pa.float64(): Value_pb2.DoubleList, - pa.bool_(): Value_pb2.BoolList, - pa.string(): Value_pb2.StringList, - pa.binary(): Value_pb2.BytesList, - pa.time64("ns"): Value_pb2.Int64List, -} - -FEAST_TYPE_TO_ARROW_TYPE = { - PrimitiveFeastType.INT32: pa.int32(), - PrimitiveFeastType.INT64: pa.int64(), - PrimitiveFeastType.FLOAT32: pa.float32(), - PrimitiveFeastType.FLOAT64: pa.float64(), - PrimitiveFeastType.STRING: pa.string(), - PrimitiveFeastType.BYTES: pa.binary(), - PrimitiveFeastType.BOOL: pa.bool_(), - PrimitiveFeastType.UNIX_TIMESTAMP: pa.time64("ns"), - Array(PrimitiveFeastType.INT32): pa.list_(pa.int32()), - Array(PrimitiveFeastType.INT64): pa.list_(pa.int64()), - Array(PrimitiveFeastType.FLOAT32): pa.list_(pa.float32()), - Array(PrimitiveFeastType.FLOAT64): pa.list_(pa.float64()), - Array(PrimitiveFeastType.STRING): pa.list_(pa.string()), - Array(PrimitiveFeastType.BYTES): pa.list_(pa.binary()), - Array(PrimitiveFeastType.BOOL): pa.list_(pa.bool_()), - Array(PrimitiveFeastType.UNIX_TIMESTAMP): pa.list_(pa.time64("ns")), -} - - -def arrow_array_to_array_of_proto( - arrow_type: pa.DataType, arrow_array: pa.Array -) -> List[Value_pb2.Value]: - values = [] - if isinstance(arrow_type, pa.ListType): - proto_list_class = ARROW_LIST_TYPE_TO_PROTO_LIST_CLASS[arrow_type.value_type] - proto_field_name = ARROW_LIST_TYPE_TO_PROTO_FIELD[arrow_type.value_type] - - if arrow_type.value_type == pa.time64("ns"): - arrow_array = arrow_array.cast(pa.list_(pa.int64())) - - for v in arrow_array.tolist(): - values.append( - Value_pb2.Value(**{proto_field_name: proto_list_class(val=v)}) - ) - else: - proto_field_name = ARROW_TYPE_TO_PROTO_FIELD[arrow_type] - - if arrow_type == pa.time64("ns"): - arrow_array = arrow_array.cast(pa.int64()) - - for v in arrow_array.tolist(): - values.append(Value_pb2.Value(**{proto_field_name: v})) - - return values diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 35787f97d3..1e962db61b 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -5,7 +5,7 @@ from pytz import UTC from feast.data_source import DataSource -from feast.embedded_go.types import FEAST_TYPE_TO_ARROW_TYPE +from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE from feast.errors import ( FeastObjectNotFoundException, FeatureViewNotFoundException, From eaea5ddb12a28ba5da5e96c4e495e7142f143ead Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 21 Apr 2022 15:16:55 -0700 Subject: [PATCH 08/16] use predefined schema in tests Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 6 +++--- .../integration/offline_store/test_feature_logging.py | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 1e962db61b..ce12b9134c 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -5,7 +5,7 @@ from pytz import UTC from feast.data_source import DataSource -from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE +from feast.embedded_go.type_map import FEAST_TYPE_TO_ARROW_TYPE, PA_TIMESTAMP_TYPE from feast.errors import ( FeastObjectNotFoundException, FeatureViewNotFoundException, @@ -53,7 +53,7 @@ def get_schema(self, registry: "Registry") -> pa.Schema: ] = FEAST_TYPE_TO_ARROW_TYPE[feature.dtype] fields[ f"{projection.name_to_use()}__{feature.name}__timestamp" - ] = pa.timestamp("ns", tz=UTC) + ] = PA_TIMESTAMP_TYPE fields[ f"{projection.name_to_use()}__{feature.name}__status" ] = pa.int32() @@ -88,7 +88,7 @@ def get_schema(self, registry: "Registry") -> pa.Schema: # system columns fields["request_id"] = pa.string() - fields["log_timestamp"] = pa.timestamp("ns", tz=UTC) + fields["log_timestamp"] = pa.timestamp("us", tz=UTC) fields["log_date"] = pa.date32() return pa.schema( diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index 2bf1384a75..f96f38d507 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -7,7 +7,7 @@ import pytest from google.api_core.exceptions import NotFound -from feast.feature_logging import LoggingConfig +from feast.feature_logging import FeatureServiceLoggingSource, LoggingConfig from feast.feature_service import FeatureService from feast.protos.feast.serving.ServingService_pb2 import FieldStatus from feast.wait import wait_retry_backoff @@ -48,12 +48,16 @@ def test_feature_service_logging(environment, universal_data_sources): first_batch = logs_df.iloc[: num_rows // 2, :] second_batch = logs_df.iloc[num_rows // 2 :, :] + schema = FeatureServiceLoggingSource( + feature_service=feature_service, project=store.project + ).get_schema(store._registry) + store.write_logged_features( - source=feature_service, logs=pa.Table.from_pandas(first_batch), + source=feature_service, logs=pa.Table.from_pandas(first_batch, schema=schema), ) store.write_logged_features( - source=feature_service, logs=pa.Table.from_pandas(second_batch), + source=feature_service, logs=pa.Table.from_pandas(second_batch, schema=schema), ) expected_columns = list(set(logs_df.columns) - {"log_date"}) From 7ac0b83cce9b849b2a25b7f437727174b1094ad3 Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 21 Apr 2022 15:25:31 -0700 Subject: [PATCH 09/16] address pr comments Signed-off-by: pyalex --- protos/feast/core/FeatureService.proto | 7 ------- sdk/python/feast/feature_logging.py | 5 +++++ .../feast/infra/offline_stores/bigquery.py | 6 +++--- .../infra/offline_stores/bigquery_source.py | 12 ++++++------ .../feast/infra/offline_stores/redshift.py | 2 +- .../infra/offline_stores/redshift_source.py | 8 ++++---- .../feast/infra/offline_stores/snowflake.py | 2 -- .../infra/offline_stores/snowflake_source.py | 16 ++-------------- sdk/python/feast/infra/passthrough_provider.py | 8 ++++---- .../universal/data_sources/bigquery.py | 4 ++-- .../offline_store/test_feature_logging.py | 4 ++-- 11 files changed, 29 insertions(+), 45 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 4827278811..0ddc209079 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -78,13 +78,6 @@ message LoggingConfig { } message SnowflakeDestination { - // Snowflake table name string table = 1; - - // Snowflake schema name - string schema = 2; - - // Snowflake schema name - string database = 3; } } diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index ce12b9134c..d1856b2e98 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -22,6 +22,11 @@ class LoggingSource: + """ + Logging source describes object that produces logs (eg, feature service produces logs of served features). + It should be able to provide schema of produced logs table and additional metadata that describes logs data. + """ + @abc.abstractmethod def get_schema(self, registry: "Registry") -> pa.Schema: """ Generate schema for logs destination. """ diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 0d4106a36d..70c93e06e8 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -44,7 +44,7 @@ from ...saved_dataset import SavedDatasetStorage from ...usage import log_exceptions_and_usage from .bigquery_source import ( - BigqueryLoggingDestination, + BigQueryLoggingDestination, BigQuerySource, SavedDatasetBigQueryStorage, ) @@ -264,7 +264,7 @@ def write_logged_features( registry: Registry, ): destination = logging_config.destination - assert isinstance(destination, BigqueryLoggingDestination) + assert isinstance(destination, BigQueryLoggingDestination) client = _get_bigquery_client( project=config.offline_store.project_id, @@ -288,7 +288,7 @@ def write_logged_features( client.load_table_from_file( file_obj=parquet_temp_file, - destination=destination.table_ref, + destination=destination.table, job_config=job_config, ) diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 1e5ffec3a6..a4653592e4 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -259,26 +259,26 @@ def to_data_source(self) -> DataSource: return BigQuerySource(table=self.bigquery_options.table) -class BigqueryLoggingDestination(LoggingDestination): +class BigQueryLoggingDestination(LoggingDestination): _proto_attr_name = "bigquery_destination" - table_ref: str + table: str def __init__(self, table_ref): - self.table_ref = table_ref + self.table = table_ref @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": - return BigqueryLoggingDestination( + return BigQueryLoggingDestination( table_ref=config_proto.bigquery_destination.table_ref, ) def to_data_source(self) -> DataSource: - return BigQuerySource(table=self.table_ref) + return BigQuerySource(table=self.table) def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( bigquery_destination=LoggingConfigProto.BigQueryDestination( - table_ref=self.table_ref + table_ref=self.table ) ) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 885f65bc30..25dc344806 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -288,7 +288,7 @@ def write_logged_features( s3_resource=s3_resource, s3_path=s3_path, iam_role=config.offline_store.iam_role, - table_name=destination.table_ref, + table_name=destination.table, fail_if_exists=False, ) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 107954fa63..59bc565942 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -336,10 +336,10 @@ def to_data_source(self) -> DataSource: class RedshiftLoggingDestination(LoggingDestination): _proto_attr_name = "redshift_destination" - table_ref: str + table: str def __init__(self, table_ref: str): - self.table_ref = table_ref + self.table = table_ref @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": @@ -350,9 +350,9 @@ def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( redshift_destination=LoggingConfigProto.RedshiftDestination( - table_ref=self.table_ref + table_ref=self.table ) ) def to_data_source(self) -> DataSource: - return RedshiftSource(table=self.table_ref) + return RedshiftSource(table=self.table) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index 4b91f0234a..d9e598c544 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -293,8 +293,6 @@ def write_logged_features( snowflake_conn, data.to_pandas(), table_name=logging_config.destination.table, - schema=logging_config.destination.schema, - database=logging_config.destination.database, auto_create_table=True, ) diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 155c1821de..1eafad553e 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -337,34 +337,22 @@ def to_data_source(self) -> DataSource: class SnowflakeLoggingDestination(LoggingDestination): table: str - schema: Optional[str] - database: Optional[str] - def __init__( - self, table: str, schema: Optional[str] = None, database: Optional[str] = None - ): + def __init__(self, table: str): self.table = table - self.schema = schema - self.database = database @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return SnowflakeLoggingDestination( table=config_proto.snowflake_destination.table, - schema=config_proto.snowflake_destination.schema or None, - database=config_proto.snowflake_destination.database or None, ) def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( snowflake_destination=LoggingConfigProto.SnowflakeDestination( table=self.table, - schema=self.schema or "", - database=self.database or "", ) ) def to_data_source(self) -> DataSource: - return SnowflakeSource( - table=self.table, schema=self.schema, database=self.database, - ) + return SnowflakeSource(table=self.table,) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 271fcbc5ff..1b8741d669 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -240,8 +240,8 @@ def write_feature_service_logs( def retrieve_feature_service_logs( self, feature_service: FeatureService, - from_: datetime, - to: datetime, + start_date: datetime, + end_date: datetime, config: RepoConfig, registry: Registry, ) -> RetrievalJob: @@ -262,6 +262,6 @@ def retrieve_feature_service_logs( join_key_columns=[], feature_name_columns=columns, event_timestamp_column=ts_column, - start_date=from_, - end_date=to, + start_date=start_date, + end_date=end_date, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py index 5149334e30..620f444159 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/bigquery.py @@ -10,7 +10,7 @@ from feast.feature_logging import LoggingDestination from feast.infra.offline_stores.bigquery import BigQueryOfflineStoreConfig from feast.infra.offline_stores.bigquery_source import ( - BigqueryLoggingDestination, + BigQueryLoggingDestination, SavedDatasetBigQueryStorage, ) from tests.integration.feature_repos.universal.data_source_creator import ( @@ -94,7 +94,7 @@ def create_logged_features_destination(self) -> LoggingDestination: table = self.get_prefixed_table_name( f"logged_features_{str(uuid.uuid4()).replace('-', '_')}" ) - return BigqueryLoggingDestination(table_ref=table) + return BigQueryLoggingDestination(table_ref=table) def get_prefixed_table_name(self, suffix: str) -> str: return f"{self.client.project}.{self.project_name}.{suffix}" diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index f96f38d507..d41e8e6c9e 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -64,8 +64,8 @@ def test_feature_service_logging(environment, universal_data_sources): def retrieve(): retrieval_job = store._get_provider().retrieve_feature_service_logs( feature_service=feature_service, - from_=logs_df["log_timestamp"].min(), - to=logs_df["log_timestamp"].max() + datetime.timedelta(seconds=1), + start_date=logs_df["log_timestamp"].min(), + end_date=logs_df["log_timestamp"].max() + datetime.timedelta(seconds=1), config=store.config, registry=store._registry, ) From 40fe0289c7eae1ba249d5d26bcae285e9ee8a7fe Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 21 Apr 2022 15:40:07 -0700 Subject: [PATCH 10/16] more api docs Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index d1856b2e98..4f3146b7bc 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -118,6 +118,14 @@ def __new__(cls, name, bases, dct): class LoggingDestination: + """ + Logging destination contains details about where exactly logs should be written inside an offline store. + It is implementation specific - each offline store must implement LoggingDestination subclass. + + Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message + and "_proto_attr_name" property of each subclass. + """ + _proto_attr_name: str @classmethod @@ -131,6 +139,9 @@ def to_proto(self) -> LoggingConfigProto: @abc.abstractmethod def to_data_source(self) -> DataSource: + """ + Convert this object into a data source to read logs from an offline store. + """ raise NotImplementedError From 65a1a7797c30017130cf056cfbb3bc1f14a6ce72 Mon Sep 17 00:00:00 2001 From: pyalex Date: Thu, 21 Apr 2022 16:13:20 -0700 Subject: [PATCH 11/16] add proto attr to snowflake dest Signed-off-by: pyalex --- sdk/python/feast/infra/offline_stores/redshift.py | 1 + sdk/python/feast/infra/offline_stores/snowflake_source.py | 2 ++ sdk/python/feast/infra/utils/aws_utils.py | 8 ++++---- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 25dc344806..1ed34a10c8 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -289,6 +289,7 @@ def write_logged_features( s3_path=s3_path, iam_role=config.offline_store.iam_role, table_name=destination.table, + schema=source.get_schema(registry), fail_if_exists=False, ) diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 1eafad553e..14beafc2e4 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -338,6 +338,8 @@ def to_data_source(self) -> DataSource: class SnowflakeLoggingDestination(LoggingDestination): table: str + _proto_attr_name = "snowflake_destination" + def __init__(self, table: str): self.table = table diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 9147c79486..d73c484b29 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -244,6 +244,7 @@ def upload_arrow_table_to_redshift( iam_role: str, s3_path: str, table_name: str, + schema: Optional[pyarrow.Schema] = None, fail_if_exists: bool = True, ): """Uploads an Arrow Table to Redshift to a new or existing table. @@ -265,6 +266,7 @@ def upload_arrow_table_to_redshift( The role must grant permission to read the S3 location. table_name: The name of the new Redshift table where we copy the dataframe table: The Arrow Table to upload + schema: (Optionally) client may provide arrow Schema which will be converted into redshift table schema fail_if_exists: fail if table with such name exists or append data to existing table Raises: @@ -275,11 +277,9 @@ def upload_arrow_table_to_redshift( bucket, key = get_bucket_and_key(s3_path) + schema = schema or table.schema column_query_list = ", ".join( - [ - f"{field.name} {pa_to_redshift_value_type(field.type)}" - for field in table.schema - ] + [f"{field.name} {pa_to_redshift_value_type(field.type)}" for field in schema] ) # Write the PyArrow Table on disk in Parquet format and upload it to S3 From 4b983d6ae58bab81ebeff766016eb3b8e54cdd67 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 22 Apr 2022 10:59:17 -0700 Subject: [PATCH 12/16] add prefixes to system fields Signed-off-by: pyalex --- sdk/python/feast/feature_logging.py | 15 ++++++++---- .../offline_store/test_feature_logging.py | 24 ++++++++++++------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 4f3146b7bc..22aa8018e0 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -21,6 +21,11 @@ from feast.registry import Registry +REQUEST_ID_FIELD = "__request_id" +LOG_TIMESTAMP_FIELD = "__log_timestamp" +LOG_DATE_FIELD = "__log_date" + + class LoggingSource: """ Logging source describes object that produces logs (eg, feature service produces logs of served features). @@ -92,19 +97,19 @@ def get_schema(self, registry: "Registry") -> pa.Schema: ] # system columns - fields["request_id"] = pa.string() - fields["log_timestamp"] = pa.timestamp("us", tz=UTC) - fields["log_date"] = pa.date32() + fields[REQUEST_ID_FIELD] = pa.string() + fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) + fields[LOG_DATE_FIELD] = pa.date32() return pa.schema( [pa.field(name, data_type) for name, data_type in fields.items()] ) def get_partition_column(self, registry: "Registry") -> str: - return "log_date" + return LOG_DATE_FIELD def get_log_timestamp_column(self) -> str: - return "log_timestamp" + return LOG_TIMESTAMP_FIELD class _DestinationRegistry(type): diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index d41e8e6c9e..f15eb8a849 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -7,7 +7,13 @@ import pytest from google.api_core.exceptions import NotFound -from feast.feature_logging import FeatureServiceLoggingSource, LoggingConfig +from feast.feature_logging import ( + LOG_DATE_FIELD, + LOG_TIMESTAMP_FIELD, + REQUEST_ID_FIELD, + FeatureServiceLoggingSource, + LoggingConfig, +) from feast.feature_service import FeatureService from feast.protos.feast.serving.ServingService_pb2 import FieldStatus from feast.wait import wait_retry_backoff @@ -59,13 +65,13 @@ def test_feature_service_logging(environment, universal_data_sources): store.write_logged_features( source=feature_service, logs=pa.Table.from_pandas(second_batch, schema=schema), ) - expected_columns = list(set(logs_df.columns) - {"log_date"}) + expected_columns = list(set(logs_df.columns) - {LOG_DATE_FIELD}) def retrieve(): retrieval_job = store._get_provider().retrieve_feature_service_logs( feature_service=feature_service, - start_date=logs_df["log_timestamp"].min(), - end_date=logs_df["log_timestamp"].max() + datetime.timedelta(seconds=1), + start_date=logs_df[LOG_TIMESTAMP_FIELD].min(), + end_date=logs_df[LOG_TIMESTAMP_FIELD].max() + datetime.timedelta(seconds=1), config=store.config, registry=store._registry, ) @@ -84,8 +90,8 @@ def retrieve(): persisted_logs = persisted_logs[expected_columns] logs_df = logs_df[expected_columns] pd.testing.assert_frame_equal( - logs_df.sort_values("request_id").reset_index(drop=True), - persisted_logs.sort_values("request_id").reset_index(drop=True), + logs_df.sort_values(REQUEST_ID_FIELD).reset_index(drop=True), + persisted_logs.sort_values(REQUEST_ID_FIELD).reset_index(drop=True), check_dtype=False, ) @@ -97,11 +103,11 @@ def prepare_logs(datasets: UniversalDatasets) -> pd.DataFrame: num_rows = driver_df.shape[0] logs_df = driver_df[["driver_id", "val_to_add"]] - logs_df["request_id"] = [str(uuid.uuid4()) for _ in range(num_rows)] - logs_df["log_timestamp"] = pd.Series( + logs_df[REQUEST_ID_FIELD] = [str(uuid.uuid4()) for _ in range(num_rows)] + logs_df[LOG_TIMESTAMP_FIELD] = pd.Series( np.random.randint(0, 7 * 24 * 3600, num_rows) ).map(lambda secs: pd.Timestamp.utcnow() - datetime.timedelta(seconds=secs)) - logs_df["log_date"] = logs_df["log_timestamp"].dt.date + logs_df[LOG_DATE_FIELD] = logs_df[LOG_TIMESTAMP_FIELD].dt.date for view, features in ( ("driver_stats", ("conv_rate", "avg_daily_trips")), From 0fc3e5a316134e814be6f2fb99bc7dbd14e33d60 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 22 Apr 2022 13:43:36 -0700 Subject: [PATCH 13/16] add custom destination Signed-off-by: pyalex --- protos/feast/core/FeatureService.proto | 6 ++++++ sdk/python/feast/feature_logging.py | 15 ++++++++------- .../feast/infra/offline_stores/bigquery_source.py | 2 +- .../feast/infra/offline_stores/file_source.py | 2 +- .../feast/infra/offline_stores/redshift_source.py | 2 +- .../infra/offline_stores/snowflake_source.py | 2 +- 6 files changed, 18 insertions(+), 11 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index 0ddc209079..a839f6a6a1 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -61,6 +61,7 @@ message LoggingConfig { BigQueryDestination bigquery_destination = 4; RedshiftDestination redshift_destination = 5; SnowflakeDestination snowflake_destination = 6; + CustomDestination custom_destination = 7; } message FileDestination { @@ -80,4 +81,9 @@ message LoggingConfig { message SnowflakeDestination { string table = 1; } + + message CustomDestination { + string kind = 1; + map config = 2; + } } diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index 22aa8018e0..e39521f645 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -128,10 +128,10 @@ class LoggingDestination: It is implementation specific - each offline store must implement LoggingDestination subclass. Kind of logging destination will be determined by matching attribute name in LoggingConfig protobuf message - and "_proto_attr_name" property of each subclass. + and "_proto_kind" property of each subclass. """ - _proto_attr_name: str + _proto_kind: str @classmethod @abc.abstractmethod @@ -158,13 +158,14 @@ def __init__(self, destination: LoggingDestination): @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]: - proto_attr_name = cast(str, config_proto.WhichOneof("destination")) - if proto_attr_name is None: + proto_kind = cast(str, config_proto.WhichOneof("destination")) + if proto_kind is None: return - destination_class = _DestinationRegistry.classes_by_proto_attr_name[ - proto_attr_name - ] + if proto_kind == "custom_destination": + proto_kind = config_proto.custom_destination.kind + + destination_class = _DestinationRegistry.classes_by_proto_attr_name[proto_kind] return LoggingConfig(destination=destination_class.from_proto(config_proto)) def to_proto(self) -> LoggingConfigProto: diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index a4653592e4..02bdb506e4 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -260,7 +260,7 @@ def to_data_source(self) -> DataSource: class BigQueryLoggingDestination(LoggingDestination): - _proto_attr_name = "bigquery_destination" + _proto_kind = "bigquery_destination" table: str diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 08c91a8eda..137bb25686 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -297,7 +297,7 @@ def to_data_source(self) -> DataSource: class FileLoggingDestination(LoggingDestination): - _proto_attr_name = "file_destination" + _proto_kind = "file_destination" path: str s3_endpoint_override: str diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 59bc565942..69108dc98d 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -334,7 +334,7 @@ def to_data_source(self) -> DataSource: class RedshiftLoggingDestination(LoggingDestination): - _proto_attr_name = "redshift_destination" + _proto_kind = "redshift_destination" table: str diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 14beafc2e4..5de8374d5a 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -338,7 +338,7 @@ def to_data_source(self) -> DataSource: class SnowflakeLoggingDestination(LoggingDestination): table: str - _proto_attr_name = "snowflake_destination" + _proto_kind = "snowflake_destination" def __init__(self, table: str): self.table = table From 9d1eb1a0c930431d295f7bb17a628f58fcba796b Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 26 Apr 2022 14:29:38 -0700 Subject: [PATCH 14/16] move partition columns to destination config Signed-off-by: pyalex --- protos/feast/core/FeatureService.proto | 9 +++++++-- sdk/python/feast/feature_logging.py | 9 --------- .../feast/infra/offline_stores/bigquery.py | 9 ++++----- .../infra/offline_stores/bigquery_source.py | 2 +- sdk/python/feast/infra/offline_stores/file.py | 2 +- .../feast/infra/offline_stores/file_source.py | 19 ++++++++++++++++--- .../feast/infra/offline_stores/redshift.py | 2 +- .../infra/offline_stores/redshift_source.py | 12 ++++++------ .../feast/infra/offline_stores/snowflake.py | 2 +- .../infra/offline_stores/snowflake_source.py | 12 ++++++------ .../feast/infra/passthrough_provider.py | 3 +-- .../universal/data_sources/redshift.py | 2 +- .../universal/data_sources/snowflake.py | 2 +- 13 files changed, 46 insertions(+), 39 deletions(-) diff --git a/protos/feast/core/FeatureService.proto b/protos/feast/core/FeatureService.proto index a839f6a6a1..c04fa97507 100644 --- a/protos/feast/core/FeatureService.proto +++ b/protos/feast/core/FeatureService.proto @@ -67,6 +67,9 @@ message LoggingConfig { message FileDestination { string path = 1; string s3_endpoint_override = 2; + + // column names to use for partitioning + repeated string partition_by = 3; } message BigQueryDestination { @@ -75,11 +78,13 @@ message LoggingConfig { } message RedshiftDestination { - string table_ref = 1; + // Destination table name. ClusterId and database will be taken from an offline store config + string table_name = 1; } message SnowflakeDestination { - string table = 1; + // Destination table name. Schema and database will be taken from an offline store config + string table_name = 1; } message CustomDestination { diff --git a/sdk/python/feast/feature_logging.py b/sdk/python/feast/feature_logging.py index e39521f645..e35dbdd9c7 100644 --- a/sdk/python/feast/feature_logging.py +++ b/sdk/python/feast/feature_logging.py @@ -37,11 +37,6 @@ def get_schema(self, registry: "Registry") -> pa.Schema: """ Generate schema for logs destination. """ raise NotImplementedError - @abc.abstractmethod - def get_partition_column(self, registry: "Registry") -> str: - """ Return partition column that must exist in generated schema. """ - raise NotImplementedError - @abc.abstractmethod def get_log_timestamp_column(self) -> str: """ Return timestamp column that must exist in generated schema. """ @@ -99,15 +94,11 @@ def get_schema(self, registry: "Registry") -> pa.Schema: # system columns fields[REQUEST_ID_FIELD] = pa.string() fields[LOG_TIMESTAMP_FIELD] = pa.timestamp("us", tz=UTC) - fields[LOG_DATE_FIELD] = pa.date32() return pa.schema( [pa.field(name, data_type) for name, data_type in fields.items()] ) - def get_partition_column(self, registry: "Registry") -> str: - return LOG_DATE_FIELD - def get_log_timestamp_column(self) -> str: return LOG_TIMESTAMP_FIELD diff --git a/sdk/python/feast/infra/offline_stores/bigquery.py b/sdk/python/feast/infra/offline_stores/bigquery.py index 70c93e06e8..ee024d4d40 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery.py +++ b/sdk/python/feast/infra/offline_stores/bigquery.py @@ -274,12 +274,11 @@ def write_logged_features( job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.PARQUET, schema=arrow_schema_to_bq_schema(source.get_schema(registry)), + time_partitioning=bigquery.TimePartitioning( + type_=bigquery.TimePartitioningType.DAY, + field=source.get_log_timestamp_column(), + ), ) - partition_col = source.get_partition_column(registry) - if partition_col: - job_config.time_partitioning = bigquery.TimePartitioning( - type_=bigquery.TimePartitioningType.DAY, field=partition_col - ) with tempfile.TemporaryFile() as parquet_temp_file: pyarrow.parquet.write_table(table=data, where=parquet_temp_file) diff --git a/sdk/python/feast/infra/offline_stores/bigquery_source.py b/sdk/python/feast/infra/offline_stores/bigquery_source.py index 02bdb506e4..f66b2066bd 100644 --- a/sdk/python/feast/infra/offline_stores/bigquery_source.py +++ b/sdk/python/feast/infra/offline_stores/bigquery_source.py @@ -264,7 +264,7 @@ class BigQueryLoggingDestination(LoggingDestination): table: str - def __init__(self, table_ref): + def __init__(self, *, table_ref): self.table = table_ref @classmethod diff --git a/sdk/python/feast/infra/offline_stores/file.py b/sdk/python/feast/infra/offline_stores/file.py index 31a2123ad8..a85cd880b1 100644 --- a/sdk/python/feast/infra/offline_stores/file.py +++ b/sdk/python/feast/infra/offline_stores/file.py @@ -390,7 +390,7 @@ def write_logged_features( pyarrow.parquet.write_to_dataset( data, root_path=path, - partition_cols=[source.get_partition_column(registry)], + partition_cols=destination.partition_by, filesystem=filesystem, ) diff --git a/sdk/python/feast/infra/offline_stores/file_source.py b/sdk/python/feast/infra/offline_stores/file_source.py index 137bb25686..68c1835cb0 100644 --- a/sdk/python/feast/infra/offline_stores/file_source.py +++ b/sdk/python/feast/infra/offline_stores/file_source.py @@ -1,5 +1,5 @@ import warnings -from typing import Callable, Dict, Iterable, Optional, Tuple +from typing import Callable, Dict, Iterable, List, Optional, Tuple from pyarrow._fs import FileSystem from pyarrow._s3fs import S3FileSystem @@ -301,22 +301,35 @@ class FileLoggingDestination(LoggingDestination): path: str s3_endpoint_override: str + partition_by: Optional[List[str]] - def __init__(self, *, path: str, s3_endpoint_override=""): + def __init__( + self, + *, + path: str, + s3_endpoint_override="", + partition_by: Optional[List[str]] = None, + ): self.path = path self.s3_endpoint_override = s3_endpoint_override + self.partition_by = partition_by @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return FileLoggingDestination( path=config_proto.file_destination.path, s3_endpoint_override=config_proto.file_destination.s3_endpoint_override, + partition_by=list(config_proto.file_destination.partition_by) + if config_proto.file_destination.partition_by + else None, ) def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( file_destination=LoggingConfigProto.FileDestination( - path=self.path, s3_endpoint_override=self.s3_endpoint_override, + path=self.path, + s3_endpoint_override=self.s3_endpoint_override, + partition_by=self.partition_by, ) ) diff --git a/sdk/python/feast/infra/offline_stores/redshift.py b/sdk/python/feast/infra/offline_stores/redshift.py index 1ed34a10c8..fb5759cbbb 100644 --- a/sdk/python/feast/infra/offline_stores/redshift.py +++ b/sdk/python/feast/infra/offline_stores/redshift.py @@ -288,7 +288,7 @@ def write_logged_features( s3_resource=s3_resource, s3_path=s3_path, iam_role=config.offline_store.iam_role, - table_name=destination.table, + table_name=destination.table_name, schema=source.get_schema(registry), fail_if_exists=False, ) diff --git a/sdk/python/feast/infra/offline_stores/redshift_source.py b/sdk/python/feast/infra/offline_stores/redshift_source.py index 69108dc98d..93d811998a 100644 --- a/sdk/python/feast/infra/offline_stores/redshift_source.py +++ b/sdk/python/feast/infra/offline_stores/redshift_source.py @@ -336,23 +336,23 @@ def to_data_source(self) -> DataSource: class RedshiftLoggingDestination(LoggingDestination): _proto_kind = "redshift_destination" - table: str + table_name: str - def __init__(self, table_ref: str): - self.table = table_ref + def __init__(self, *, table_name: str): + self.table_name = table_name @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return RedshiftLoggingDestination( - table_ref=config_proto.redshift_destination.table_ref, + table_name=config_proto.redshift_destination.table_name, ) def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( redshift_destination=LoggingConfigProto.RedshiftDestination( - table_ref=self.table + table_name=self.table_name ) ) def to_data_source(self) -> DataSource: - return RedshiftSource(table=self.table) + return RedshiftSource(table=self.table_name) diff --git a/sdk/python/feast/infra/offline_stores/snowflake.py b/sdk/python/feast/infra/offline_stores/snowflake.py index d9e598c544..4cf6716c5e 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake.py +++ b/sdk/python/feast/infra/offline_stores/snowflake.py @@ -292,7 +292,7 @@ def write_logged_features( write_pandas( snowflake_conn, data.to_pandas(), - table_name=logging_config.destination.table, + table_name=logging_config.destination.table_name, auto_create_table=True, ) diff --git a/sdk/python/feast/infra/offline_stores/snowflake_source.py b/sdk/python/feast/infra/offline_stores/snowflake_source.py index 5de8374d5a..ef3ae52d28 100644 --- a/sdk/python/feast/infra/offline_stores/snowflake_source.py +++ b/sdk/python/feast/infra/offline_stores/snowflake_source.py @@ -336,25 +336,25 @@ def to_data_source(self) -> DataSource: class SnowflakeLoggingDestination(LoggingDestination): - table: str + table_name: str _proto_kind = "snowflake_destination" - def __init__(self, table: str): - self.table = table + def __init__(self, *, table_name: str): + self.table_name = table_name @classmethod def from_proto(cls, config_proto: LoggingConfigProto) -> "LoggingDestination": return SnowflakeLoggingDestination( - table=config_proto.snowflake_destination.table, + table_name=config_proto.snowflake_destination.table_name, ) def to_proto(self) -> LoggingConfigProto: return LoggingConfigProto( snowflake_destination=LoggingConfigProto.SnowflakeDestination( - table=self.table, + table_name=self.table_name, ) ) def to_data_source(self) -> DataSource: - return SnowflakeSource(table=self.table,) + return SnowflakeSource(table=self.table_name,) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 1b8741d669..616a323f5d 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -253,8 +253,7 @@ def retrieve_feature_service_logs( schema = logging_source.get_schema(registry) logging_config = feature_service.logging_config ts_column = logging_source.get_log_timestamp_column() - partition_column = logging_source.get_partition_column(registry) - columns = list(set(schema.names) - {ts_column, partition_column}) + columns = list(set(schema.names) - {ts_column}) return self.offline_store.pull_all_from_table_or_query( config=config, diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py index a5c50a5773..3b2794393f 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/redshift.py @@ -84,7 +84,7 @@ def create_logged_features_destination(self) -> LoggingDestination: ) self.tables.append(table) - return RedshiftLoggingDestination(table_ref=table) + return RedshiftLoggingDestination(table_name=table) def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py index 8db2d56996..23466bc00c 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/snowflake.py @@ -76,7 +76,7 @@ def create_logged_features_destination(self) -> LoggingDestination: ) self.tables.append(table) - return SnowflakeLoggingDestination(table=table) + return SnowflakeLoggingDestination(table_name=table) def create_offline_store_config(self) -> FeastConfigBaseModel: return self.offline_store_config From ede26b2ad76a1579df14dde0cf298a13e0c094f0 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 26 Apr 2022 15:01:15 -0700 Subject: [PATCH 15/16] after rebase Signed-off-by: pyalex --- sdk/python/feast/infra/passthrough_provider.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index 616a323f5d..6364297b1e 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -260,7 +260,7 @@ def retrieve_feature_service_logs( data_source=logging_config.destination.to_data_source(), join_key_columns=[], feature_name_columns=columns, - event_timestamp_column=ts_column, + timestamp_field=ts_column, start_date=start_date, end_date=end_date, ) From b2ef41f63fa1af71801dd2af6803beeb93201e29 Mon Sep 17 00:00:00 2001 From: pyalex Date: Tue, 26 Apr 2022 15:06:24 -0700 Subject: [PATCH 16/16] allow data source creator implementations w/o logging destination Signed-off-by: pyalex --- .../integration/feature_repos/universal/data_source_creator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py index 4c6a693d2e..b36af0db47 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_source_creator.py @@ -52,9 +52,8 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def create_saved_dataset_destination(self) -> SavedDatasetStorage: ... - @abstractmethod def create_logged_features_destination(self) -> LoggingDestination: - ... + pass @abstractmethod def teardown(self):