diff --git a/infra/scripts/codebuild_runner.py b/infra/scripts/codebuild_runner.py index 717118b9ce..382ce9494c 100755 --- a/infra/scripts/codebuild_runner.py +++ b/infra/scripts/codebuild_runner.py @@ -11,6 +11,7 @@ import sys import argparse import boto3 +from botocore.config import Config class LogTailer: @@ -125,7 +126,14 @@ def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str: async def run_build(project_name: str, source_version: str, source_location: str): print(f"Building {project_name} at {source_version}", file=sys.stderr) - logs_client = boto3.client("logs", region_name="us-west-2") + + config = Config( + retries = { + 'max_attempts': 10, + } + ) + + logs_client = boto3.client("logs", region_name="us-west-2", config=config) codebuild_client = boto3.client("codebuild", region_name="us-west-2") print("Submitting the build..", file=sys.stderr) diff --git a/infra/scripts/setup-e2e-env-sparkop.sh b/infra/scripts/setup-e2e-env-sparkop.sh new file mode 100755 index 0000000000..dbc2859dae --- /dev/null +++ b/infra/scripts/setup-e2e-env-sparkop.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +make compile-protos-python + +python -m pip install --upgrade pip==20.2 setuptools wheel + +python -m pip install -qr sdk/python/requirements-dev.txt +python -m pip install -qr tests/requirements.txt + +# Using mvn -q to make it less verbose. This step happens after docker containers were +# succesfully built so it should be unlikely to fail, therefore we likely won't need detailed logs. +echo "########## Building ingestion jar" +TIMEFORMAT='########## took %R seconds' + +time make build-java-no-tests REVISION=develop MAVEN_EXTRA_OPTS="-q --no-transfer-progress" diff --git a/infra/scripts/test-end-to-end-sparkop.sh b/infra/scripts/test-end-to-end-sparkop.sh new file mode 100755 index 0000000000..035acc39f5 --- /dev/null +++ b/infra/scripts/test-end-to-end-sparkop.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +set -euo pipefail + +pip install "s3fs" "boto3" "urllib3>=1.25.4" + +export DISABLE_FEAST_SERVICE_FIXTURES=1 +export DISABLE_SERVICE_FIXTURES=1 + +export FEAST_SPARK_K8S_NAMESPACE=sparkop + +PYTHONPATH=sdk/python pytest tests/e2e/ \ + --feast-version develop \ + --core-url sparkop-feast-core:6565 \ + --serving-url sparkop-feast-online-serving:6566 \ + --env k8s \ + --staging-path $STAGING_PATH \ + --redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \ + --kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \ + -m "not bq" \ No newline at end of file diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index af5efca1df..0a5e04d78e 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -184,7 +184,7 @@ class ConfigOptions(metaclass=ConfigMeta): SPARK_K8S_NAMESPACE = "default" # expect k8s spark operator to be running in the same cluster as Feast - SPARK_K8S_USE_INCLUSTER_CONFIG = True + SPARK_K8S_USE_INCLUSTER_CONFIG = "True" # SparkApplication resource template SPARK_K8S_JOB_TEMPLATE_PATH = None diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s.py b/sdk/python/feast/pyspark/launchers/k8s/k8s.py index 7e7dbb56c0..6132c046a9 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s.py @@ -223,6 +223,7 @@ def historical_feature_retrieval( jars=[], extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()}, arguments=job_params.get_arguments(), + namespace=self._namespace, ) job_info = _submit_job( @@ -276,6 +277,7 @@ def offline_to_online_ingestion( jars=[], extra_metadata={}, arguments=ingestion_job_params.get_arguments(), + namespace=self._namespace, ) job_info = _submit_job( @@ -317,6 +319,7 @@ def start_stream_to_online_ingestion( jars=extra_jar_paths, extra_metadata={METADATA_JOBHASH: job_hash}, arguments=ingestion_job_params.get_arguments(), + namespace=self._namespace, ) job_info = _submit_job( diff --git a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py index 7afaa64f65..5f57f19029 100644 --- a/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py +++ b/sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py @@ -112,6 +112,7 @@ def _prepare_job_resource( jars: List[str], extra_metadata: Dict[str, str], arguments: List[str], + namespace: str, ) -> Dict[str, Any]: """ Prepare SparkApplication custom resource configs """ job = deepcopy(job_template) @@ -119,7 +120,11 @@ def _prepare_job_resource( labels = {LABEL_JOBID: job_id, LABEL_JOBTYPE: job_type} _add_keys(job, ("metadata", "labels"), labels) - _add_keys(job, ("metadata",), dict(name=_job_id_to_resource_name(job_id))) + _add_keys( + job, + ("metadata",), + dict(name=_job_id_to_resource_name(job_id), namespace=namespace), + ) _add_keys(job, ("spec",), dict(mainClass=main_class)) _add_keys(job, ("spec",), dict(mainApplicationFile=main_application_file)) _add_keys(job, ("spec",), dict(arguments=arguments)) @@ -179,7 +184,7 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus: def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo: labels = resource["metadata"]["labels"] - sparkConf = resource["spec"].get("sparkConf") + sparkConf = resource["spec"].get("sparkConf", {}) if "status" in resource: state = _k8s_state_to_feast(resource["status"]["applicationState"]["state"]) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 6e340b49ef..58205e8fcf 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -9,7 +9,9 @@ def pytest_addoption(parser): parser.addoption("--job-service-url", action="store", default="localhost:6568") parser.addoption("--kafka-brokers", action="store", default="localhost:9092") - parser.addoption("--env", action="store", help="local|aws|gcloud", default="local") + parser.addoption( + "--env", action="store", help="local|aws|gcloud|k8s", default="local" + ) parser.addoption("--with-job-service", action="store_true") parser.addoption("--staging-path", action="store") parser.addoption("--dataproc-cluster-name", action="store") diff --git a/tests/e2e/fixtures/client.py b/tests/e2e/fixtures/client.py index 264d5dc687..db70e4dc72 100644 --- a/tests/e2e/fixtures/client.py +++ b/tests/e2e/fixtures/client.py @@ -79,6 +79,19 @@ def feast_client( local_staging_path, "historical_output" ), ) + elif pytestconfig.getoption("env") == "k8s": + return Client( + core_url=f"{feast_core[0]}:{feast_core[1]}", + serving_url=f"{feast_serving[0]}:{feast_serving[1]}", + spark_launcher="k8s", + spark_staging_location=os.path.join(local_staging_path, "k8s"), + spark_ingestion_jar=ingestion_job_jar, + redis_host=pytestconfig.getoption("redis_url").split(":")[0], + redis_port=pytestconfig.getoption("redis_url").split(":")[1], + historical_feature_output_location=os.path.join( + local_staging_path, "historical_output" + ), + ) else: raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}") diff --git a/tests/e2e/test_historical_features.py b/tests/e2e/test_historical_features.py index 9b054434b5..5cf2449582 100644 --- a/tests/e2e/test_historical_features.py +++ b/tests/e2e/test_historical_features.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta from typing import Union -from urllib.parse import urlparse +from urllib.parse import urlparse, urlunparse import gcsfs import numpy as np @@ -24,11 +24,14 @@ def read_parquet(uri): files = ["gs://" + path for path in fs.glob(uri + "/part-*")] ds = parquet.ParquetDataset(files, filesystem=fs) return ds.read().to_pandas() - elif parsed_uri.scheme == "s3": + elif parsed_uri.scheme == "s3" or parsed_uri.scheme == "s3a": + + s3uri = urlunparse(parsed_uri._replace(scheme="s3")) + import s3fs fs = s3fs.S3FileSystem() - files = ["s3://" + path for path in fs.glob(uri + "/part-*")] + files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")] ds = parquet.ParquetDataset(files, filesystem=fs) return ds.read().to_pandas() else: