Skip to content

Commit

Permalink
Add dataproc executor resource config (#1160)
Browse files Browse the repository at this point in the history
* Add dataproc executor resource config

Signed-off-by: Terence <terencelimxp@gmail.com>

* Add default spark job executor values

Signed-off-by: Terence <terencelimxp@gmail.com>

* Fix e2e tests

Signed-off-by: Terence <terencelimxp@gmail.com>

* Shift spark configurations

Signed-off-by: Terence <terencelimxp@gmail.com>

* Update constants and docstrings

Signed-off-by: Terence <terencelimxp@gmail.com>
  • Loading branch information
terryyylim authored and pyalex committed Nov 24, 2020
1 parent 286cb05 commit 89871f1
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 7 deletions.
9 changes: 9 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,15 @@ class ConfigOptions(metaclass=ConfigMeta):
#: Region of Dataproc cluster
DATAPROC_REGION: Optional[str] = None

#: No. of executor instances for Dataproc cluster
DATAPROC_EXECUTOR_INSTANCES = "2"

#: No. of executor cores for Dataproc cluster
DATAPROC_EXECUTOR_CORES = "2"

#: No. of executor memory for Dataproc cluster
DATAPROC_EXECUTOR_MEMORY = "2g"

#: File format of historical retrieval features
HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet"

Expand Down
11 changes: 7 additions & 4 deletions sdk/python/feast/pyspark/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ def _dataproc_launcher(config: Config) -> JobLauncher:
from feast.pyspark.launchers import gcloud

return gcloud.DataprocClusterLauncher(
config.get(opt.DATAPROC_CLUSTER_NAME),
config.get(opt.SPARK_STAGING_LOCATION),
config.get(opt.DATAPROC_REGION),
config.get(opt.DATAPROC_PROJECT),
cluster_name=config.get(opt.DATAPROC_CLUSTER_NAME),
staging_location=config.get(opt.SPARK_STAGING_LOCATION),
region=config.get(opt.DATAPROC_REGION),
project_id=config.get(opt.DATAPROC_PROJECT),
executor_instances=config.get(opt.DATAPROC_EXECUTOR_INSTANCES),
executor_cores=config.get(opt.DATAPROC_EXECUTOR_CORES),
executor_memory=config.get(opt.DATAPROC_EXECUTOR_MEMORY),
)


Expand Down
27 changes: 24 additions & 3 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,14 @@ class DataprocClusterLauncher(JobLauncher):
JOB_HASH_LABEL_KEY = "feast_job_hash"

def __init__(
self, cluster_name: str, staging_location: str, region: str, project_id: str,
self,
cluster_name: str,
staging_location: str,
region: str,
project_id: str,
executor_instances: str,
executor_cores: str,
executor_memory: str,
):
"""
Initialize a dataproc job controller client, used internally for job submission and result
Expand All @@ -213,8 +220,14 @@ def __init__(
GCS directory for the storage of files generated by the launcher, such as the pyspark scripts.
region (str):
Dataproc cluster region.
project_id (str:
project_id (str):
GCP project id for the dataproc cluster.
executor_instances (str):
Number of executor instances for dataproc job.
executor_cores (str):
Number of cores for dataproc job.
executor_memory (str):
Amount of memory for dataproc job.
"""

self.cluster_name = cluster_name
Expand All @@ -231,6 +244,9 @@ def __init__(
self.job_client = JobControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)
self.executor_instances = executor_instances
self.executor_cores = executor_cores
self.executor_memory = executor_memory

def _stage_file(self, file_path: str, job_id: str) -> str:
if not os.path.isfile(file_path):
Expand Down Expand Up @@ -264,7 +280,12 @@ def dataproc_submit(
"jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS,
"main_class": job_params.get_class_name(),
"args": job_params.get_arguments(),
"properties": {"spark.yarn.user.classpath.first": "true"},
"properties": {
"spark.yarn.user.classpath.first": "true",
"spark.executor.instances": self.executor_instances,
"spark.executor.cores": self.executor_cores,
"spark.executor.memory": self.executor_memory,
},
}
}
)
Expand Down
3 changes: 3 additions & 0 deletions tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ def pytest_addoption(parser):
parser.addoption("--emr-cluster-id", action="store")
parser.addoption("--emr-region", action="store")
parser.addoption("--dataproc-project", action="store")
parser.addoption("--dataproc-executor-instances", action="store", default="2")
parser.addoption("--dataproc-executor-cores", action="store", default="2")
parser.addoption("--dataproc-executor-memory", action="store", default="2g")
parser.addoption("--ingestion-jar", action="store")
parser.addoption("--redis-url", action="store", default="localhost:6379")
parser.addoption("--redis-cluster", action="store_true")
Expand Down
9 changes: 9 additions & 0 deletions tests/e2e/fixtures/feast_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,15 @@ def feast_jobservice(
)
env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project")
env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region")
env["FEAST_DATAPROC_EXECUTOR_INSTANCES"] = pytestconfig.getoption(
"dataproc_executor_instances"
)
env["FEAST_DATAPROC_EXECUTOR_CORES"] = pytestconfig.getoption(
"dataproc_executor_cores"
)
env["FEAST_DATAPROC_EXECUTOR_MEMORY"] = pytestconfig.getoption(
"dataproc_executor_memory"
)
env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join(
global_staging_path, "dataproc"
)
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ def pytest_addoption(parser):
parser.addoption("--dataproc-region", action="store")
parser.addoption("--dataproc-project", action="store")
parser.addoption("--dataproc-staging-location", action="store")
parser.addoption("--dataproc-executor-instances", action="store", default="2")
parser.addoption("--dataproc-executor-cores", action="store", default="2")
parser.addoption("--dataproc-executor-memory", action="store", default="2g")
parser.addoption("--redis-url", action="store")
parser.addoption("--redis-cluster", action="store_true")
6 changes: 6 additions & 0 deletions tests/integration/fixtures/launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,15 @@ def dataproc_launcher(pytestconfig) -> DataprocClusterLauncher:
region = pytestconfig.getoption("--dataproc-region")
project_id = pytestconfig.getoption("--dataproc-project")
staging_location = pytestconfig.getoption("--dataproc-staging-location")
executor_instances = pytestconfig.getoption("dataproc_executor_instances")
executor_cores = pytestconfig.getoption("dataproc_executor_cores")
executor_memory = pytestconfig.getoption("dataproc_executor_memory")
return DataprocClusterLauncher(
cluster_name=cluster_name,
staging_location=staging_location,
region=region,
project_id=project_id,
executor_instances=executor_instances,
executor_cores=executor_cores,
executor_memory=executor_memory,
)

0 comments on commit 89871f1

Please sign in to comment.