Skip to content

Commit

Permalink
stage only local files
Browse files Browse the repository at this point in the history
Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
  • Loading branch information
pyalex committed Nov 4, 2020
1 parent 934e359 commit e5903d0
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,20 +218,21 @@ def __init__(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

def _stage_files(self, pyspark_script: str, job_id: str) -> str:
def _stage_file(self, file_path: str, job_id: str) -> str:
if not os.path.isfile(file_path):
return file_path

staging_client = get_staging_client("gs")
blob_path = os.path.join(
self.remote_path, job_id, os.path.basename(pyspark_script),
)
staging_client.upload_file(pyspark_script, self.staging_bucket, blob_path)
blob_path = os.path.join(self.remote_path, job_id, os.path.basename(file_path),)
staging_client.upload_file(file_path, self.staging_bucket, blob_path)

return f"gs://{self.staging_bucket}/{blob_path}"

def dataproc_submit(
self, job_params: SparkJobParameters
) -> Tuple[Job, Callable[[], Job], Callable[[], None]]:
local_job_id = str(uuid.uuid4())
main_file_uri = self._stage_files(job_params.get_main_file_path(), local_job_id)
main_file_uri = self._stage_file(job_params.get_main_file_path(), local_job_id)
job_config: Dict[str, Any] = {
"reference": {"job_id": local_job_id},
"placement": {"cluster_name": self.cluster_name},
Expand Down

0 comments on commit e5903d0

Please sign in to comment.