Skip to content

Commit

Permalink
Use existing staging client for dataproc staging and improve staging …
Browse files Browse the repository at this point in the history
…client for s3 (#1063)

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng authored Oct 16, 2020
1 parent 7831768 commit 9decb2d
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 9 deletions.
9 changes: 4 additions & 5 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from urllib.parse import urlparse

from google.api_core.operation import Operation
from google.cloud import dataproc_v1, storage
from google.cloud import dataproc_v1
from google.cloud.dataproc_v1 import Job as DataprocJob
from google.cloud.dataproc_v1 import JobStatus

Expand All @@ -18,6 +18,7 @@
SparkJobParameters,
SparkJobStatus,
)
from feast.staging.storage_client import get_staging_client


class DataprocJobMixin:
Expand Down Expand Up @@ -113,13 +114,11 @@ def __init__(
)

def _stage_files(self, pyspark_script: str, job_id: str) -> str:
client = storage.Client()
bucket = client.get_bucket(self.staging_bucket)
staging_client = get_staging_client("gs")
blob_path = os.path.join(
self.remote_path, job_id, os.path.basename(pyspark_script),
)
blob = bucket.blob(blob_path)
blob.upload_from_filename(pyspark_script)
staging_client.upload_file(blob_path, self.staging_bucket, pyspark_script)

return f"gs://{self.staging_bucket}/{blob_path}"

Expand Down
42 changes: 38 additions & 4 deletions sdk/python/feast/staging/storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import hashlib
import os
import re
import shutil
Expand Down Expand Up @@ -192,6 +191,13 @@ def list_files(self, bucket: str, path: str) -> List[str]:
else:
return [f"{S3}://{bucket}/{path.lstrip('/')}"]

def _hash_file(self, local_path: str):
h = hashlib.sha256()
with open(local_path, "rb") as f:
for block in iter(lambda: f.read(2 ** 20), b""):
h.update(block)
return h.hexdigest()

def upload_file(self, local_path: str, bucket: str, remote_path: str):
"""
Uploads file to s3.
Expand All @@ -201,8 +207,36 @@ def upload_file(self, local_path: str, bucket: str, remote_path: str):
bucket (str): s3 Bucket name
remote_path (str): relative path to the folder to which the files need to be uploaded
"""
with open(local_path, "rb") as file:
self.s3_client.upload_fileobj(file, bucket, remote_path)

sha256sum = self._hash_file(local_path)

import botocore

try:
head_response = self.s3_client.head_object(Bucket=bucket, Key=remote_path)
if head_response["Metadata"]["sha256sum"] == sha256sum:
# File already exists
return remote_path
else:
print(f"Uploading {local_path} to {remote_path}")
self.s3_client.upload_file(
local_path,
bucket,
remote_path,
ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] != "404":
raise

self.s3_client.upload_file(
local_path,
bucket,
remote_path,
ExtraArgs={"Metadata": {"sha256sum": sha256sum}},
)
return remote_path


class LocalFSClient(AbstractStagingClient):
Expand Down

0 comments on commit 9decb2d

Please sign in to comment.