Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Environment class and DataSourceCreator API, and use fixtures for datasets and data sources #1790

Merged
merged 53 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
789d7a1
Fix API cruft from DataSourceCreator
achals Aug 18, 2021
8b7eab5
Remove the need for get_prefixed_table_name
achals Aug 18, 2021
0cc21ab
major refactor
achals Aug 19, 2021
6ddd5b0
move start time
achals Aug 19, 2021
a700745
Remove one dimension of variation to be added in later
achals Aug 19, 2021
c25b6cb
Fix default
achals Aug 19, 2021
a3e4473
Fixups
achals Aug 19, 2021
3f632fd
Fixups
achals Aug 19, 2021
9aeea86
Fix up tests
achals Aug 19, 2021
84cbefd
Add retries to execute_redshift_statement_async
achals Aug 19, 2021
fe180eb
Add retries to execute_redshift_statement_async
achals Aug 19, 2021
0cd08d2
refactoooor
achals Aug 19, 2021
4701183
remove retries
achals Aug 19, 2021
4cde284
Remove provider variation since they don't really play a big role
achals Aug 20, 2021
974bb0b
Session scoped cache for test datasets and skipping older tests whose…
achals Aug 23, 2021
dbbd6fb
make format
achals Aug 23, 2021
473b630
make format
achals Aug 23, 2021
95982e2
remove import
achals Aug 23, 2021
fc1b4ee
merge from master
achals Aug 25, 2021
df9596b
fix merge
achals Aug 25, 2021
fc46edb
Use an enum for the stopping procedure instead of the bools
achals Aug 25, 2021
0daaff0
Fix refs
achals Aug 26, 2021
ef6a6b3
fix step
achals Aug 26, 2021
d63691d
WIP fixes
achals Aug 26, 2021
3d75977
Fix for feature inferencing
achals Aug 26, 2021
1abbaa8
C901 '_python_value_to_proto_value' is too complex :(
achals Aug 26, 2021
21996b7
Split out construct_test_repo and construct_universal_test_repo
achals Aug 27, 2021
4541561
remove import
achals Aug 27, 2021
2f20b5c
add unsafe_hash
achals Aug 27, 2021
6cc32b5
Update testrepoconfig
achals Aug 27, 2021
68f2997
Update testrepoconfig
achals Aug 27, 2021
a73868d
Remove kwargs from construct_universal_test_environment
achals Aug 27, 2021
ff9d49d
Remove unneeded method
achals Aug 27, 2021
b95c3ee
Docs
achals Aug 27, 2021
3e5aada
Kill skipped tests
achals Aug 27, 2021
b6abcaa
reorder
achals Aug 27, 2021
9580654
add todo
achals Aug 27, 2021
171a1ef
Split universal vs non data_source_cache
achals Aug 27, 2021
de9d8aa
make format
achals Aug 27, 2021
6454775
WIP fixtures
achals Aug 31, 2021
3fbb2e4
WIP Trying fixtures more effectively
achals Sep 1, 2021
2961d58
fix refs
achals Sep 1, 2021
1739815
Fix refs
achals Sep 1, 2021
fe14ba3
Fix refs
achals Sep 1, 2021
43d8e27
Fix refs
achals Sep 1, 2021
58c75b0
fix historical tests
achals Sep 1, 2021
8b5ba5a
renames
achals Sep 1, 2021
c445f53
CR updates
achals Sep 1, 2021
e0ccd9b
use the actual ref to data source creators
achals Sep 1, 2021
4d4efad
merge from master
achals Sep 1, 2021
a510ca6
format
achals Sep 1, 2021
6c10502
unused imports'
achals Sep 1, 2021
ff1fb81
Add ids for pytest params
achals Sep 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,28 @@ def infer_features_from_batch_source(self, config: RepoConfig):
self.batch_source.created_timestamp_column,
} | set(self.entities)

if (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to not exclude the original names of the columns then?

Copy link
Member Author

@achals achals Sep 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are already excluded in a section above this one.

self.batch_source.event_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.event_timestamp_column
]
)
if (
self.batch_source.created_timestamp_column
in self.batch_source.field_mapping
):
columns_to_exclude.add(
self.batch_source.field_mapping[
self.batch_source.created_timestamp_column
]
)
for e in self.entities:
if e in self.batch_source.field_mapping:
columns_to_exclude.add(self.batch_source.field_mapping[e])

for (
col_name,
col_datatype,
Expand All @@ -335,7 +357,7 @@ def infer_features_from_batch_source(self, config: RepoConfig):
):
feature_name = (
self.batch_source.field_mapping[col_name]
if col_name in self.batch_source.field_mapping.keys()
if col_name in self.batch_source.field_mapping
else col_name
)
self.features.append(
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def pull_latest_from_table_or_query(
)
WHERE _feast_row = 1
"""

return BigQueryRetrievalJob(query=query, client=client, config=config)

@staticmethod
Expand Down
6 changes: 6 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ def _get_column_names(
reverse_field_mapping[col] if col in reverse_field_mapping.keys() else col
for col in feature_names
]

# We need to exclude join keys and timestamp columns from the list of features, after they are mapped to
# their final column names via the `field_mapping` field of the source.
_feature_names = set(feature_names) - set(join_keys)
achals marked this conversation as resolved.
Show resolved Hide resolved
_feature_names = _feature_names - {event_timestamp_column, created_timestamp_column}
feature_names = list(_feature_names)
return (
join_keys,
feature_names,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/utils/aws_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class RedshiftStatementNotFinishedError(Exception):


@retry(
wait=wait_exponential(multiplier=0.1, max=30),
wait=wait_exponential(multiplier=1, max=30),
retry=retry_if_exception_type(RedshiftStatementNotFinishedError),
)
def wait_for_redshift_statement(redshift_data_client, statement: dict) -> None:
Expand Down
11 changes: 10 additions & 1 deletion sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

import re
from datetime import datetime
from typing import Any, Dict, Union

import numpy as np
import pandas as pd
from google.protobuf.json_format import MessageToDict
from google.protobuf.timestamp_pb2 import Timestamp

from feast.protos.feast.types.Value_pb2 import (
BoolList,
Expand Down Expand Up @@ -104,6 +106,8 @@ def python_type_to_feast_value_type(
"int8": ValueType.INT32,
"bool": ValueType.BOOL,
"timedelta": ValueType.UNIX_TIMESTAMP,
"Timestamp": ValueType.UNIX_TIMESTAMP,
"datetime": ValueType.UNIX_TIMESTAMP,
"datetime64[ns]": ValueType.UNIX_TIMESTAMP,
"datetime64[ns, tz]": ValueType.UNIX_TIMESTAMP,
"category": ValueType.STRING,
Expand Down Expand Up @@ -160,7 +164,8 @@ def _type_err(item, dtype):
raise ValueError(f'Value "{item}" is of type {type(item)} not of type {dtype}')


def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
# TODO(achals): Simplify this method and remove the noqa.
def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: # noqa: C901
achals marked this conversation as resolved.
Show resolved Hide resolved
"""
Converts a Python (native, pandas) value to a Feast Proto Value based
on a provided value type
Expand Down Expand Up @@ -281,6 +286,10 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
elif feast_value_type == ValueType.INT64:
return ProtoValue(int64_val=int(value))
elif feast_value_type == ValueType.UNIX_TIMESTAMP:
if isinstance(value, datetime):
return ProtoValue(int64_val=int(value.timestamp()))
elif isinstance(value, Timestamp):
return ProtoValue(int64_val=int(value.ToSeconds()))
return ProtoValue(int64_val=int(value))
elif feast_value_type == ValueType.FLOAT:
return ProtoValue(float_val=float(value))
Expand Down
51 changes: 51 additions & 0 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
import pandas as pd
import pytest

from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.repo_configuration import (
FULL_REPO_CONFIGS,
Environment,
construct_test_environment,
construct_universal_data_sources,
construct_universal_datasets,
construct_universal_entities,
)


def pytest_configure(config):
if platform in ["darwin", "windows"]:
Expand Down Expand Up @@ -87,3 +97,44 @@ def simple_dataset_2() -> pd.DataFrame:
],
}
return pd.DataFrame.from_dict(data)


@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request):
with construct_test_environment(request.param) as e:
yield e


@pytest.fixture(scope="session")
def universal_data_sources(environment):
entities = construct_universal_entities()
datasets = construct_universal_datasets(
entities, environment.start_date, environment.end_date
)
datasources = construct_universal_data_sources(
datasets, environment.data_source_creator
)

yield entities, datasets, datasources

environment.data_source_creator.teardown()


@pytest.fixture(scope="session")
def e2e_data_sources(environment: Environment):
df = create_dataset()
data_source = environment.data_source_creator.create_data_source(
df, environment.feature_store.project, field_mapping={"ts_1": "ts"},
)

yield df, data_source

environment.data_source_creator.teardown()


@pytest.fixture(params=FULL_REPO_CONFIGS, scope="session")
def type_test_environment(request):
with construct_test_environment(request.param) as e:
yield e
18 changes: 8 additions & 10 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,21 @@
from typing import Optional

import pandas as pd
import pytest
from pytz import utc

from feast import FeatureStore, FeatureView
from tests.integration.feature_repos.test_repo_configuration import (
Environment,
parametrize_e2e_test,
)
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view


@parametrize_e2e_test
def test_e2e_consistency(test_environment: Environment):
fs, fv = (
test_environment.feature_store,
driver_feature_view(test_environment.data_source),
)
@pytest.mark.integration
@pytest.mark.parametrize("infer_features", [True, False])
def test_e2e_consistency(environment, e2e_data_sources, infer_features):
fs = environment.feature_store
df, data_source = e2e_data_sources
fv = driver_feature_view(data_source=data_source, infer_features=infer_features)

entity = driver()
fs.apply([fv, entity])

Expand Down
192 changes: 192 additions & 0 deletions sdk/python/tests/integration/feature_repos/repo_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import tempfile
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Type, Union

import pandas as pd

from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data
from feast.data_source import DataSource
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.bigquery import (
BigQueryDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)
from tests.integration.feature_repos.universal.data_sources.redshift import (
RedshiftDataSourceCreator,
)
from tests.integration.feature_repos.universal.feature_views import (
create_customer_daily_profile_feature_view,
create_driver_hourly_stats_feature_view,
)


@dataclass(frozen=True, repr=True)
class IntegrationTestRepoConfig:
"""
This class should hold all possible parameters that may need to be varied by individual tests.
"""

provider: str = "local"
online_store: Union[str, Dict] = "sqlite"

offline_store_creator: Type[DataSourceCreator] = FileDataSourceCreator

full_feature_names: bool = True
infer_event_timestamp_col: bool = True
infer_features: bool = False


DYNAMO_CONFIG = {"type": "dynamodb", "region": "us-west-2"}
REDIS_CONFIG = {"type": "redis", "connection_string": "localhost:6379,db=0"}
FULL_REPO_CONFIGS: List[IntegrationTestRepoConfig] = [
# Local configurations
IntegrationTestRepoConfig(),
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
# GCP configurations
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store="datastore",
),
IntegrationTestRepoConfig(
provider="gcp",
offline_store_creator=BigQueryDataSourceCreator,
online_store=REDIS_CONFIG,
),
# AWS configurations
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=DYNAMO_CONFIG,
),
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=REDIS_CONFIG,
),
]


def construct_universal_entities() -> Dict[str, List[Any]]:
return {"customer": list(range(1001, 1110)), "driver": list(range(5001, 5110))}


def construct_universal_datasets(
entities: Dict[str, List[Any]], start_time: datetime, end_time: datetime
) -> Dict[str, pd.DataFrame]:
customer_df = driver_test_data.create_customer_daily_profile_df(
entities["customer"], start_time, end_time
)
driver_df = driver_test_data.create_driver_hourly_stats_df(
entities["driver"], start_time, end_time
)
orders_df = driver_test_data.create_orders_df(
customers=entities["customer"],
drivers=entities["driver"],
start_date=end_time - timedelta(days=365),
end_date=end_time + timedelta(days=365),
order_count=1000,
)

return {"customer": customer_df, "driver": driver_df, "orders": orders_df}


def construct_universal_data_sources(
datasets: Dict[str, pd.DataFrame], data_source_creator: DataSourceCreator
) -> Dict[str, DataSource]:
customer_ds = data_source_creator.create_data_source(
datasets["customer"],
destination_name="customer_profile",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
driver_ds = data_source_creator.create_data_source(
datasets["driver"],
destination_name="driver_hourly",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
orders_ds = data_source_creator.create_data_source(
datasets["orders"],
destination_name="orders",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
return {"customer": customer_ds, "driver": driver_ds, "orders": orders_ds}


def construct_universal_feature_views(
data_sources: Dict[str, DataSource],
) -> Dict[str, FeatureView]:
return {
"customer": create_customer_daily_profile_feature_view(
data_sources["customer"]
),
"driver": create_driver_hourly_stats_feature_view(data_sources["driver"]),
}


@dataclass
class Environment:
name: str
test_repo_config: IntegrationTestRepoConfig
feature_store: FeatureStore
data_source_creator: DataSourceCreator

end_date: datetime = field(
default=datetime.now().replace(microsecond=0, second=0, minute=0)
)

def __post_init__(self):
self.start_date: datetime = self.end_date - timedelta(days=7)


def table_name_from_data_source(ds: DataSource) -> Optional[str]:
if hasattr(ds, "table_ref"):
return ds.table_ref
elif hasattr(ds, "table"):
return ds.table
return None


@contextmanager
def construct_test_environment(
test_repo_config: IntegrationTestRepoConfig,
test_suite_name: str = "integration_test",
) -> Environment:
project = f"{test_suite_name}_{str(uuid.uuid4()).replace('-', '')[:8]}"

offline_creator: DataSourceCreator = test_repo_config.offline_store_creator(project)

offline_store_config = offline_creator.create_offline_store_config()
online_store = test_repo_config.online_store

with tempfile.TemporaryDirectory() as repo_dir_name:
config = RepoConfig(
registry=str(Path(repo_dir_name) / "registry.db"),
project=project,
provider=test_repo_config.provider,
offline_store=offline_store_config,
online_store=online_store,
repo_path=repo_dir_name,
)
fs = FeatureStore(config=config)
environment = Environment(
name=project,
test_repo_config=test_repo_config,
feature_store=fs,
data_source_creator=offline_creator,
)

try:
yield environment
finally:
fs.teardown()
Loading