From e5903d0b87fc2583422deeb7b492cf0940bd4f49 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 4 Nov 2020 18:31:15 +0800 Subject: [PATCH] stage only local files Signed-off-by: Oleksii Moskalenko --- .../feast/pyspark/launchers/gcloud/dataproc.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index 3a428a9f26..ea4085c549 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -218,12 +218,13 @@ def __init__( client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"} ) - def _stage_files(self, pyspark_script: str, job_id: str) -> str: + def _stage_file(self, file_path: str, job_id: str) -> str: + if not os.path.isfile(file_path): + return file_path + staging_client = get_staging_client("gs") - blob_path = os.path.join( - self.remote_path, job_id, os.path.basename(pyspark_script), - ) - staging_client.upload_file(pyspark_script, self.staging_bucket, blob_path) + blob_path = os.path.join(self.remote_path, job_id, os.path.basename(file_path),) + staging_client.upload_file(file_path, self.staging_bucket, blob_path) return f"gs://{self.staging_bucket}/{blob_path}" @@ -231,7 +232,7 @@ def dataproc_submit( self, job_params: SparkJobParameters ) -> Tuple[Job, Callable[[], Job], Callable[[], None]]: local_job_id = str(uuid.uuid4()) - main_file_uri = self._stage_files(job_params.get_main_file_path(), local_job_id) + main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id) job_config: Dict[str, Any] = { "reference": {"job_id": local_job_id}, "placement": {"cluster_name": self.cluster_name},