Skip to content

Commit

Permalink
generating test environments bases on test markers and fixtures
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed May 10, 2022
1 parent 1fd9c2d commit 155feee
Show file tree
Hide file tree
Showing 19 changed files with 346 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
from feast.infra.offline_stores.contrib.trino_offline_store.tests.data_source import (
TrinoSourceCreator,
)
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
from tests.integration.feature_repos.repo_configuration import REDIS_CONFIG
from tests.integration.feature_repos.universal.online_store.redis import (
RedisOnlineStoreCreator,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(offline_store_creator=SparkDataSourceCreator),
IntegrationTestRepoConfig(offline_store_creator=TrinoSourceCreator),
AVAILABLE_OFFLINE_STORES = [
("local", SparkDataSourceCreator),
("local", TrinoSourceCreator),
]

AVAILABLE_ONLINE_STORES = {"redis": (REDIS_CONFIG, RedisOnlineStoreCreator)}
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
from feast.infra.offline_stores.contrib.postgres_offline_store.tests.data_source import (
PostgreSQLDataSourceCreator,
)
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)

FULL_REPO_CONFIGS = [
IntegrationTestRepoConfig(
provider="local",
offline_store_creator=PostgreSQLDataSourceCreator,
online_store_creator=PostgreSQLDataSourceCreator,
),
]
AVAILABLE_OFFLINE_STORES = [("local", PostgreSQLDataSourceCreator)]

AVAILABLE_ONLINE_STORES = {"postgres": (None, PostgreSQLDataSourceCreator)}
22 changes: 12 additions & 10 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
from datetime import datetime
from multiprocessing.pool import ThreadPool
from queue import Queue
from queue import Empty, Queue
from threading import Lock, Thread
from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple

Expand Down Expand Up @@ -292,22 +292,24 @@ def increment(self):

def worker(shared_counter):
while True:
client.delete_multi(deletion_queue.get())
try:
job = deletion_queue.get(block=False)
except Empty:
return

client.delete_multi(job)
shared_counter.increment()
LOGGER.debug(
f"batch deletions completed: {shared_counter.value} ({shared_counter.value * BATCH_SIZE} total entries) & outstanding queue size: {deletion_queue.qsize()}"
)
deletion_queue.task_done()

for _ in range(NUM_THREADS):
Thread(target=worker, args=(status_info_counter,), daemon=True).start()

query = client.query(kind="Row", ancestor=key)
while True:
entities = list(query.fetch(limit=BATCH_SIZE))
if not entities:
break
deletion_queue.put([entity.key for entity in entities])
for page in query.fetch().pages:
deletion_queue.put([entity.key for entity in page])

for _ in range(NUM_THREADS):
Thread(target=worker, args=(status_info_counter,)).start()

deletion_queue.join()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

@pytest.mark.benchmark
@pytest.mark.integration
@pytest.mark.universal_online_stores
def test_online_retrieval(environment, universal_data_sources, benchmark):
fs = environment.feature_store
entities, datasets, data_sources = universal_data_sources
Expand Down
192 changes: 131 additions & 61 deletions sdk/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
import logging
import multiprocessing
import time
import socket
from contextlib import closing
from datetime import datetime, timedelta
from multiprocessing import Process
from sys import platform
Expand All @@ -24,19 +25,22 @@
from _pytest.nodes import Item

from feast import FeatureStore
from feast.wait import wait_retry_backoff
from tests.data.data_creator import create_dataset
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import (
FULL_REPO_CONFIGS,
REDIS_CLUSTER_CONFIG,
REDIS_CONFIG,
AVAILABLE_OFFLINE_STORES,
AVAILABLE_ONLINE_STORES,
Environment,
TestData,
construct_test_environment,
construct_universal_test_data,
)
from tests.integration.feature_repos.universal.data_sources.file import (
FileDataSourceCreator,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,86 +165,152 @@ def start_test_local_server(repo_path: str, port: int):
fs.serve("localhost", port, no_access_log=True)


@pytest.fixture(
params=FULL_REPO_CONFIGS, scope="session", ids=[str(c) for c in FULL_REPO_CONFIGS]
)
def environment(request, worker_id: str):
@pytest.fixture(scope="session")
def environment(request, worker_id):
e = construct_test_environment(
request.param, worker_id=worker_id, fixture_request=request
)

yield e

e.feature_store.teardown()
e.data_source_creator.teardown()
if e.online_store_creator:
e.online_store_creator.teardown()


_config_cache = {}


def pytest_generate_tests(metafunc):
if "environment" in metafunc.fixturenames:
markers = {m.name: m for m in metafunc.definition.own_markers}

if "universal_offline_stores" in markers:
offline_stores = AVAILABLE_OFFLINE_STORES
else:
# default offline store for testing online store dimension
offline_stores = [("local", FileDataSourceCreator)]

online_stores = None
if "universal_online_stores" in markers:
# Online stores are explicitly requested
if "only" in markers["universal_online_stores"].kwargs:
online_stores = [
AVAILABLE_ONLINE_STORES.get(store_name)
for store_name in markers["universal_online_stores"].kwargs["only"]
if store_name in AVAILABLE_ONLINE_STORES
]
else:
online_stores = AVAILABLE_ONLINE_STORES.values()

if online_stores is None:
# No online stores requested -> setting the default or first available
online_stores = [
AVAILABLE_ONLINE_STORES.get(
"redis",
AVAILABLE_ONLINE_STORES.get(
"sqlite", next(iter(AVAILABLE_ONLINE_STORES.values()))
),
)
]

extra_dimensions = [{}]

if "python_server" in metafunc.fixturenames:
extra_dimensions.extend(
[
{"python_feature_server": True},
# {"python_feature_server": True, "provider": "aws"},
]
)

if "goserver" in markers:
extra_dimensions.append({"go_feature_retrieval": True})

configs = []
for provider, offline_store_creator in offline_stores:
for online_store, online_store_creator in online_stores:
for dim in extra_dimensions:
config = {
"provider": provider,
"offline_store_creator": offline_store_creator,
"online_store": online_store,
"online_store_creator": online_store_creator,
**dim,
}
# temporary Go works only with redis
if config.get("go_feature_retrieval") and (
not isinstance(online_store, dict)
or online_store["type"] != "redis"
):
continue

# aws lambda works only with dynamo
if (
config.get("python_feature_server")
and config.get("provider") == "aws"
and (
not isinstance(online_store, dict)
or online_store["type"] != "dynamodb"
)
):
continue

c = IntegrationTestRepoConfig(**config)

if c not in _config_cache:
_config_cache[c] = c

configs.append(_config_cache[c])

metafunc.parametrize(
"environment", configs, indirect=True, ids=[str(c) for c in configs]
)


@pytest.fixture(scope="session")
def python_server(environment):
proc = Process(
target=start_test_local_server,
args=(e.feature_store.repo_path, e.get_local_server_port()),
args=(environment.feature_store.repo_path, environment.get_local_server_port()),
daemon=True,
)
if e.python_feature_server and e.test_repo_config.provider == "local":
if (
environment.python_feature_server
and environment.test_repo_config.provider == "local"
):
proc.start()
# Wait for server to start
time.sleep(3)

def cleanup():
e.feature_store.teardown()
if proc.is_alive():
proc.kill()
if e.online_store_creator:
e.online_store_creator.teardown()

request.addfinalizer(cleanup)

return e
wait_retry_backoff(
lambda: (
None,
_check_port_open("localhost", environment.get_local_server_port()),
),
timeout_secs=10,
)

yield

@pytest.fixture(
params=[REDIS_CONFIG, REDIS_CLUSTER_CONFIG],
scope="session",
ids=[str(c) for c in [REDIS_CONFIG, REDIS_CLUSTER_CONFIG]],
)
def local_redis_environment(request, worker_id):
e = construct_test_environment(
IntegrationTestRepoConfig(online_store=request.param),
worker_id=worker_id,
fixture_request=request,
)
if proc.is_alive():
proc.kill()

def cleanup():
e.feature_store.teardown()

request.addfinalizer(cleanup)
return e
def _check_port_open(host, port) -> bool:
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
return sock.connect_ex((host, port)) == 0


@pytest.fixture(scope="session")
def universal_data_sources(request, environment) -> TestData:
def cleanup():
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
environment.data_source_creator.teardown()

request.addfinalizer(cleanup)
def universal_data_sources(environment) -> TestData:
return construct_universal_test_data(environment)


@pytest.fixture(scope="session")
def redis_universal_data_sources(request, local_redis_environment):
def cleanup():
# logger.info("Running cleanup in %s, Request: %s", worker_id, request.param)
local_redis_environment.data_source_creator.teardown()

request.addfinalizer(cleanup)
return construct_universal_test_data(local_redis_environment)


@pytest.fixture(scope="session")
def e2e_data_sources(environment: Environment, request):
def e2e_data_sources(environment: Environment):
df = create_dataset()
data_source = environment.data_source_creator.create_data_source(
df, environment.feature_store.project, field_mapping={"ts_1": "ts"},
)

def cleanup():
environment.data_source_creator.teardown()
if environment.online_store_creator:
environment.online_store_creator.teardown()

request.addfinalizer(cleanup)

return df, data_source
Loading

0 comments on commit 155feee

Please sign in to comment.