diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 1ed416a34b..4d677bc38d 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -160,6 +160,15 @@ class ConfigOptions(metaclass=ConfigMeta): #: Region of Dataproc cluster DATAPROC_REGION: Optional[str] = None + #: No. of executor instances for Dataproc cluster + DATAPROC_EXECUTOR_INSTANCES = "2" + + #: No. of executor cores for Dataproc cluster + DATAPROC_EXECUTOR_CORES = "2" + + #: No. of executor memory for Dataproc cluster + DATAPROC_EXECUTOR_MEMORY = "2g" + #: File format of historical retrieval features HISTORICAL_FEATURE_OUTPUT_FORMAT: str = "parquet" diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 4e783e90bc..d928a4f129 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -34,10 +34,13 @@ def _dataproc_launcher(config: Config) -> JobLauncher: from feast.pyspark.launchers import gcloud return gcloud.DataprocClusterLauncher( - config.get(opt.DATAPROC_CLUSTER_NAME), - config.get(opt.SPARK_STAGING_LOCATION), - config.get(opt.DATAPROC_REGION), - config.get(opt.DATAPROC_PROJECT), + cluster_name=config.get(opt.DATAPROC_CLUSTER_NAME), + staging_location=config.get(opt.SPARK_STAGING_LOCATION), + region=config.get(opt.DATAPROC_REGION), + project_id=config.get(opt.DATAPROC_PROJECT), + executor_instances=config.get(opt.DATAPROC_EXECUTOR_INSTANCES), + executor_cores=config.get(opt.DATAPROC_EXECUTOR_CORES), + executor_memory=config.get(opt.DATAPROC_EXECUTOR_MEMORY), ) diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 9123829851..cfbfb82810 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -200,7 +200,14 @@ class DataprocClusterLauncher(JobLauncher): JOB_HASH_LABEL_KEY = "feast_job_hash" def __init__( - self, cluster_name: str, staging_location: str, region: str, project_id: str, + self, + cluster_name: str, + staging_location: str, + region: str, + project_id: str, + executor_instances: str, + executor_cores: str, + executor_memory: str, ): """ Initialize a dataproc job controller client, used internally for job submission and result @@ -213,8 +220,14 @@ def __init__( GCS directory for the storage of files generated by the launcher, such as the pyspark scripts. region (str): Dataproc cluster region. - project_id (str: + project_id (str): GCP project id for the dataproc cluster. + executor_instances (str): + Number of executor instances for dataproc job. + executor_cores (str): + Number of cores for dataproc job. + executor_memory (str): + Amount of memory for dataproc job. """ self.cluster_name = cluster_name @@ -231,6 +244,9 @@ def __init__( self.job_client = JobControllerClient( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} ) + self.executor_instances = executor_instances + self.executor_cores = executor_cores + self.executor_memory = executor_memory def _stage_file(self, file_path: str, job_id: str) -> str: if not os.path.isfile(file_path): @@ -264,7 +280,12 @@ def dataproc_submit( "jar_file_uris": [main_file_uri] + self.EXTERNAL_JARS, "main_class": job_params.get_class_name(), "args": job_params.get_arguments(), - "properties": {"spark.yarn.user.classpath.first": "true"}, + "properties": { + "spark.yarn.user.classpath.first": "true", + "spark.executor.instances": self.executor_instances, + "spark.executor.cores": self.executor_cores, + "spark.executor.memory": self.executor_memory, + }, } } ) diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py index 17524573dd..6e340b49ef 100644 --- a/tests/e2e/conftest.py +++ b/tests/e2e/conftest.py @@ -17,6 +17,9 @@ def pytest_addoption(parser): parser.addoption("--emr-cluster-id", action="store") parser.addoption("--emr-region", action="store") parser.addoption("--dataproc-project", action="store") + parser.addoption("--dataproc-executor-instances", action="store", default="2") + parser.addoption("--dataproc-executor-cores", action="store", default="2") + parser.addoption("--dataproc-executor-memory", action="store", default="2g") parser.addoption("--ingestion-jar", action="store") parser.addoption("--redis-url", action="store", default="localhost:6379") parser.addoption("--redis-cluster", action="store_true") diff --git a/tests/e2e/fixtures/feast_services.py b/tests/e2e/fixtures/feast_services.py index eb085a2c01..f2afd2bf6d 100644 --- a/tests/e2e/fixtures/feast_services.py +++ b/tests/e2e/fixtures/feast_services.py @@ -187,6 +187,15 @@ def feast_jobservice( ) env["FEAST_DATAPROC_PROJECT"] = pytestconfig.getoption("dataproc_project") env["FEAST_DATAPROC_REGION"] = pytestconfig.getoption("dataproc_region") + env["FEAST_DATAPROC_EXECUTOR_INSTANCES"] = pytestconfig.getoption( + "dataproc_executor_instances" + ) + env["FEAST_DATAPROC_EXECUTOR_CORES"] = pytestconfig.getoption( + "dataproc_executor_cores" + ) + env["FEAST_DATAPROC_EXECUTOR_MEMORY"] = pytestconfig.getoption( + "dataproc_executor_memory" + ) env["FEAST_SPARK_STAGING_LOCATION"] = os.path.join( global_staging_path, "dataproc" ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 88ab8743a5..a80e694829 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -3,5 +3,8 @@ def pytest_addoption(parser): parser.addoption("--dataproc-region", action="store") parser.addoption("--dataproc-project", action="store") parser.addoption("--dataproc-staging-location", action="store") + parser.addoption("--dataproc-executor-instances", action="store", default="2") + parser.addoption("--dataproc-executor-cores", action="store", default="2") + parser.addoption("--dataproc-executor-memory", action="store", default="2g") parser.addoption("--redis-url", action="store") parser.addoption("--redis-cluster", action="store_true") diff --git a/tests/integration/fixtures/launchers.py b/tests/integration/fixtures/launchers.py index ebe93172d1..d289d974ac 100644 --- a/tests/integration/fixtures/launchers.py +++ b/tests/integration/fixtures/launchers.py @@ -9,9 +9,15 @@ def dataproc_launcher(pytestconfig) -> DataprocClusterLauncher: region = pytestconfig.getoption("--dataproc-region") project_id = pytestconfig.getoption("--dataproc-project") staging_location = pytestconfig.getoption("--dataproc-staging-location") + executor_instances = pytestconfig.getoption("dataproc_executor_instances") + executor_cores = pytestconfig.getoption("dataproc_executor_cores") + executor_memory = pytestconfig.getoption("dataproc_executor_memory") return DataprocClusterLauncher( cluster_name=cluster_name, staging_location=staging_location, region=region, project_id=project_id, + executor_instances=executor_instances, + executor_cores=executor_cores, + executor_memory=executor_memory, )