Skip to content

Commit

Permalink
Merge pull request feast-dev#1 from redhatHameed/remote-offline
Browse files Browse the repository at this point in the history
[WIP] feat: Added offline store Arrow Flight server/client
  • Loading branch information
dmartinol authored May 9, 2024
2 parents e88f1e3 + 47faa21 commit e7cd32f
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 0 deletions.
29 changes: 29 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from feast import utils
from feast.constants import (
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
DEFAULT_OFFLINE_SERVER_PORT,
DEFAULT_REGISTRY_SERVER_PORT,
)
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
Expand Down Expand Up @@ -773,6 +774,34 @@ def serve_registry_command(ctx: click.Context, port: int):
store.serve_registry(port)


@cli.command("serve_offline")
@click.option(
"--host",
"-h",
type=click.STRING,
default="127.0.0.1",
show_default=True,
help="Specify a host for the server",
)
@click.option(
"--port",
"-p",
type=click.INT,
default=DEFAULT_OFFLINE_SERVER_PORT,
help="Specify a port for the server",
)
@click.pass_context
def serve_offline_command(
ctx: click.Context,
host: str,
port: int,
):
"""Start a remote server locally on a given host, port."""
store = create_feature_store(ctx)

store.serve_offline(host, port)


@cli.command("validate")
@click.option(
"--feature-service",
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
# Default registry server port
DEFAULT_REGISTRY_SERVER_PORT = 6570

# Default offline server port
DEFAULT_OFFLINE_SERVER_PORT = 8815

# Environment variable for feature server docker image tag
DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG"

Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2369,6 +2369,13 @@ def serve_registry(self, port: int) -> None:

registry_server.start_server(self, port)

@log_exceptions_and_usage
def serve_offline(self, host: str, port: int) -> None:
"""Start offline server locally on a given port."""
from feast import offline_server

offline_server.start_server(self, host, port)

@log_exceptions_and_usage
def serve_transformations(self, port: int) -> None:
"""Start the feature transformation server locally on a given port."""
Expand Down
160 changes: 160 additions & 0 deletions sdk/python/feast/infra/offline_stores/remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, List, Literal, Optional, Union

import pandas as pd
import pyarrow as pa
import pyarrow.parquet
from pydantic import StrictStr

from feast import OnDemandFeatureView
from feast.data_source import DataSource
from feast.feature_logging import LoggingConfig, LoggingSource
from feast.feature_view import FeatureView
from feast.infra.offline_stores.offline_store import (
OfflineStore,
RetrievalJob,
)
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.registry.registry import Registry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.usage import log_exceptions_and_usage


class RemoteOfflineStoreConfig(FeastConfigBaseModel):

offline_type: StrictStr = "remote"
""" str: Provider name or a class name that implements Offline store."""

path: StrictStr = ""
""" str: Path to metadata store.
If offline_type is 'remote', then this is a URL for offline server """

host: StrictStr = ""
""" str: host to offline store.
If offline_type is 'remote', then this is a host URL for offline store of arrow flight server """

port: StrictStr = ""
""" str: host to offline store."""


class RemoteRetrievalJob(RetrievalJob):
def __init__(
self,
config: RepoConfig,
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
# TODO add missing parameters from the OfflineStore API
):
# Generate unique command identifier
self.command = str(uuid.uuid4())
# Initialize the client connection
self.client = pa.flight.connect(f"grpc://{config.offline_store.host}:{config.offline_store.port}")
# Put API parameters
self._put_parameters(feature_refs, entity_df)

def _put_parameters(self, feature_refs, entity_df):
entity_df_table = pa.Table.from_pandas(entity_df)
historical_flight_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
writer, _ = self.client.do_put(historical_flight_descriptor,
entity_df_table.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'entity_df'}))
writer.write_table(entity_df_table)
writer.close()

features_array = pa.array(feature_refs)
features_batch = pa.RecordBatch.from_arrays([features_array], ['features'])
writer, _ = self.client.do_put(historical_flight_descriptor,
features_batch.schema.with_metadata({
'command': self.command,
'api': 'get_historical_features',
'param': 'features'}))
writer.write_batch(features_batch)
writer.close()

# Invoked to realize the Pandas DataFrame
def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
# We use arrow format because it gives better control of the table schema
return self._to_arrow_internal().to_pandas()

# Invoked to synchronously execute the underlying query and return the result as an arrow table
# This is where do_get service is invoked
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
upload_descriptor = pa.flight.FlightDescriptor.for_command(self.command)
flight = self.client.get_flight_info(upload_descriptor)
ticket = flight.endpoints[0].ticket

reader = self.client.do_get(ticket)
return reader.read_all()

@property
def on_demand_feature_views(self) -> List[OnDemandFeatureView]:
return []


class RemoteOfflineStore(OfflineStore):
def __init__(
self,

arrow_host,
arrow_port
):
self.arrow_host = arrow_host
self.arrow_port = arrow_port

@log_exceptions_and_usage(offline_store="remote")
def get_historical_features(
self,
config: RepoConfig,
feature_views: List[FeatureView],
feature_refs: List[str],
entity_df: Union[pd.DataFrame, str],
registry: Registry = None,
project: str = '',
full_feature_names: bool = False,
) -> RemoteRetrievalJob:
offline_store_config = config.offline_store
assert isinstance(config.offline_store_config, RemoteOfflineStoreConfig)
store_type = offline_store_config.type
port = offline_store_config.port
host = offline_store_config.host

return RemoteRetrievalJob(RepoConfig, feature_refs, entity_df)

@log_exceptions_and_usage(offline_store="remote")
def pull_latest_from_table_or_query(self,
config: RepoConfig,
data_source: DataSource,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
created_timestamp_column: Optional[str],
start_date: datetime,
end_date: datetime) -> RetrievalJob:
""" Pulls data from the offline store for use in materialization."""
print("Pulling latest features from my offline store")
# Implementation here.
pass

def write_logged_features(
config: RepoConfig,
data: Union[pyarrow.Table, Path],
source: LoggingSource,
logging_config: LoggingConfig,
registry: BaseRegistry,
):
""" Optional method to have Feast support logging your online features."""
# Implementation here.
pass

def offline_write_batch(
config: RepoConfig,
feature_view: FeatureView,
table: pyarrow.Table,
progress: Optional[Callable[[int], Any]],
):
# Implementation here.
pass
86 changes: 86 additions & 0 deletions sdk/python/feast/offline_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import ast

import pyarrow as pa
import pyarrow.flight

from feast import FeatureStore


class OfflineServer(pa.flight.FlightServerBase):
def __init__(self, location=None):
super(OfflineServer, self).__init__(location)
self._location = location
self.flights = {}
self.store = FeatureStore

@classmethod
def descriptor_to_key(self, descriptor):
return (
descriptor.descriptor_type.value,
descriptor.command,
tuple(descriptor.path or tuple()),
)

def _make_flight_info(self, key, descriptor, table):
endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [self._location])]
mock_sink = pyarrow.MockOutputStream()
stream_writer = pyarrow.RecordBatchStreamWriter(mock_sink, table.schema)
stream_writer.write_table(table)
stream_writer.close()
data_size = mock_sink.size()

return pyarrow.flight.FlightInfo(
table.schema, descriptor, endpoints, table.num_rows, data_size
)

def get_flight_info(self, context, descriptor):
key = OfflineServer.descriptor_to_key(descriptor)
if key in self.flights:
table = self.flights[key]
return self._make_flight_info(key, descriptor, table)
raise KeyError("Flight not found.")

def list_flights(self, context, criteria):
for key, table in self.flights.items():
if key[1] is not None:
descriptor = pyarrow.flight.FlightDescriptor.for_command(key[1])
else:
descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2])

yield self._make_flight_info(key, descriptor, table)

def do_put(self, context, descriptor, reader, writer):
key = OfflineServer.descriptor_to_key(descriptor)
self.flights[key] = reader.read_all()

def do_get(self, context, ticket):
key = ast.literal_eval(ticket.ticket.decode())
if key not in self.flights:
return None

entity_df_key = self.flights[key]
entity_df = pa.Table.to_pandas(entity_df_key)
# Get feature data
features_key = (2, b"features_descriptor", ())
if features_key in self.flights:
features_data = self.flights[features_key]
features = pa.RecordBatch.to_pylist(features_data)
features = [item["features"] for item in features]
else:
features = None

training_df = self.store.get_historical_features(entity_df, features).to_df()
table = pa.Table.from_pandas(training_df)

return pa.flight.RecordBatchStream(table)


def start_server(
store: FeatureStore,
host: str,
port: int,
):
location = "grpc+tcp://{}:{}".format(host, port)
server = OfflineServer(location)
print("Serving on", location)
server.serve()
1 change: 1 addition & 0 deletions sdk/python/feast/repo_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
"athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore",
"mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore",
"duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore",
"remote": "feast.infra.offline_stores.remote.RemoteOfflineStore",
}

FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = {
Expand Down

0 comments on commit e7cd32f

Please sign in to comment.