Skip to content

Commit

Permalink
integration test for k8s spark operator support
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Avdeev <oleg.v.avdeev@gmail.com>
  • Loading branch information
oavdeev committed Dec 17, 2020
1 parent 5b76f9a commit 93f5368
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 8 deletions.
10 changes: 9 additions & 1 deletion infra/scripts/codebuild_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import argparse
import boto3
from botocore.config import Config


class LogTailer:
Expand Down Expand Up @@ -125,7 +126,14 @@ def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str:

async def run_build(project_name: str, source_version: str, source_location: str):
print(f"Building {project_name} at {source_version}", file=sys.stderr)
logs_client = boto3.client("logs", region_name="us-west-2")

config = Config(
retries = {
'max_attempts': 10,
}
)

logs_client = boto3.client("logs", region_name="us-west-2", config=config)
codebuild_client = boto3.client("codebuild", region_name="us-west-2")

print("Submitting the build..", file=sys.stderr)
Expand Down
15 changes: 15 additions & 0 deletions infra/scripts/setup-e2e-env-sparkop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

make compile-protos-python

python -m pip install --upgrade pip==20.2 setuptools wheel

python -m pip install -qr sdk/python/requirements-dev.txt
python -m pip install -qr tests/requirements.txt

# Using mvn -q to make it less verbose. This step happens after docker containers were
# succesfully built so it should be unlikely to fail, therefore we likely won't need detailed logs.
echo "########## Building ingestion jar"
TIMEFORMAT='########## took %R seconds'

time make build-java-no-tests REVISION=develop MAVEN_EXTRA_OPTS="-q --no-transfer-progress"
20 changes: 20 additions & 0 deletions infra/scripts/test-end-to-end-sparkop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

set -euo pipefail

pip install "s3fs" "boto3" "urllib3>=1.25.4"

export DISABLE_FEAST_SERVICE_FIXTURES=1
export DISABLE_SERVICE_FIXTURES=1

export FEAST_SPARK_K8S_NAMESPACE=sparkop

PYTHONPATH=sdk/python pytest tests/e2e/ \
--feast-version develop \
--core-url sparkop-feast-core:6565 \
--serving-url sparkop-feast-online-serving:6566 \
--env k8s \
--staging-path $STAGING_PATH \
--redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \
--kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \
-m "not bq"
2 changes: 1 addition & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ConfigOptions(metaclass=ConfigMeta):
SPARK_K8S_NAMESPACE = "default"

# expect k8s spark operator to be running in the same cluster as Feast
SPARK_K8S_USE_INCLUSTER_CONFIG = True
SPARK_K8S_USE_INCLUSTER_CONFIG = "True"

# SparkApplication resource template
SPARK_K8S_JOB_TEMPLATE_PATH = None
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def historical_feature_retrieval(
jars=[],
extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()},
arguments=job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down Expand Up @@ -276,6 +277,7 @@ def offline_to_online_ingestion(
jars=[],
extra_metadata={},
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down Expand Up @@ -317,6 +319,7 @@ def start_stream_to_online_ingestion(
jars=extra_jar_paths,
extra_metadata={METADATA_JOBHASH: job_hash},
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down
9 changes: 7 additions & 2 deletions sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ def _prepare_job_resource(
jars: List[str],
extra_metadata: Dict[str, str],
arguments: List[str],
namespace: str,
) -> Dict[str, Any]:
""" Prepare SparkApplication custom resource configs """
job = deepcopy(job_template)

labels = {LABEL_JOBID: job_id, LABEL_JOBTYPE: job_type}

_add_keys(job, ("metadata", "labels"), labels)
_add_keys(job, ("metadata",), dict(name=_job_id_to_resource_name(job_id)))
_add_keys(
job,
("metadata",),
dict(name=_job_id_to_resource_name(job_id), namespace=namespace),
)
_add_keys(job, ("spec",), dict(mainClass=main_class))
_add_keys(job, ("spec",), dict(mainApplicationFile=main_application_file))
_add_keys(job, ("spec",), dict(arguments=arguments))
Expand Down Expand Up @@ -179,7 +184,7 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus:

def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo:
labels = resource["metadata"]["labels"]
sparkConf = resource["spec"].get("sparkConf")
sparkConf = resource["spec"].get("sparkConf", {})

if "status" in resource:
state = _k8s_state_to_feast(resource["status"]["applicationState"]["state"])
Expand Down
4 changes: 3 additions & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def pytest_addoption(parser):
parser.addoption("--job-service-url", action="store", default="localhost:6568")
parser.addoption("--kafka-brokers", action="store", default="localhost:9092")

parser.addoption("--env", action="store", help="local|aws|gcloud", default="local")
parser.addoption(
"--env", action="store", help="local|aws|gcloud|k8s", default="local"
)
parser.addoption("--with-job-service", action="store_true")
parser.addoption("--staging-path", action="store")
parser.addoption("--dataproc-cluster-name", action="store")
Expand Down
13 changes: 13 additions & 0 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ def feast_client(
local_staging_path, "historical_output"
),
)
elif pytestconfig.getoption("env") == "k8s":
return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="k8s",
spark_staging_location=os.path.join(local_staging_path, "k8s"),
spark_ingestion_jar=ingestion_job_jar,
redis_host=pytestconfig.getoption("redis_url").split(":")[0],
redis_port=pytestconfig.getoption("redis_url").split(":")[1],
historical_feature_output_location=os.path.join(
local_staging_path, "historical_output"
),
)
else:
raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}")

Expand Down
9 changes: 6 additions & 3 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from typing import Union
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

import gcsfs
import numpy as np
Expand All @@ -24,11 +24,14 @@ def read_parquet(uri):
files = ["gs://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "s3":
elif parsed_uri.scheme == "s3" or parsed_uri.scheme == "s3a":

s3uri = urlunparse(parsed_uri._replace(scheme="s3"))

import s3fs

fs = s3fs.S3FileSystem()
files = ["s3://" + path for path in fs.glob(uri + "/part-*")]
files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
Expand Down

0 comments on commit 93f5368

Please sign in to comment.