-
Notifications
You must be signed in to change notification settings - Fork 169
/
gcloud_utils.py
68 lines (50 loc) · 2.23 KB
/
gcloud_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""
Google Cloud Util Functions:
- Upload to Google Storage Bucket
- Delete from Google Storage Bucket
- Generate signed URL
"""
import os
import datetime
from google.cloud import storage
from google.cloud.storage.blob import Blob
## Google Storage Client
client = storage.Client()
def upload_blob(bucket_name, source_file_name, destination_blob_name, content_type='', algo_unique_key=''):
"""Uploading File to Google Storage Bucket
Args:
bucket_name (str): Google Storage Bucket Name
source_file_name (str): Local Absolute Filpath to upload
destination_blob_name (str): File Name used to store in bucket
content_type (str, optional): The content type of the file being uploaded
algo_unique_key (str, optional): [description]. Defaults to ''. Algorithmia Data Source Bucket Unique Key
Returns:
[str]: Google Storage Object URI, if algorithmia key is given the same is modified.
"""
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name, content_type=content_type)
data_uri = blob.self_link
if algo_unique_key!="":
return os.path.join("gs+{}://{}".format(algo_unique_key, bucket_name), data_uri.split("/")[-1])
return os.path.join("gs://{}".format(bucket_name), data_uri.split("/")[-1])
def delete_blob(bucket_name, blob_name):
"""Deletes a blob from the bucket."""
# bucket_name = "your-bucket-name"
# blob_name = "your-object-name"
bucket = client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.delete()
print("Blob {} deleted.".format(blob_name))
def download_video(bucket_name, filename, output_filename):
bucket = client.get_bucket(bucket_name)
# Create a blob object from the filepath
blob = bucket.blob(filename)
# Download the file to a destination
blob.download_to_filename(output_filename)
return output_filename
def generate_signed_url(output_uri):
expiration_time = datetime.timedelta(minutes=5)
blob = Blob.from_string(output_uri, client=client)
signed_url = blob.generate_signed_url(expiration=expiration_time, version='v4', response_disposition='attachment')
return signed_url