-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoortwrapper.py
165 lines (144 loc) · 6.32 KB
/
oortwrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
import io
import boto3
from dotenv import load_dotenv
import botocore
import pandas as pd
from pandasql import sqldf
class OortWrapper:
def __init__(self):
"""
Initialize the OortWrapper with access keys and endpoint URL.
"""
load_dotenv()
access_key = os.getenv('OORT_ACCESS_KEY')
secret_key = os.getenv('OORT_SECRET_KEY')
endpoint_url = os.getenv('OORT_ENDPOINT_URL')
self.s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=endpoint_url
)
def create_bucket(self, bucket_name):
"""
Create a new bucket with the given name.
:param bucket_name: The name of the bucket to create.
:return: The response from the create_bucket API call.
"""
try:
return self.s3_client.create_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'BucketAlreadyExists' or e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
print(f"Bucket '{bucket_name}' already exists. Skipping bucket creation.")
else:
raise e
def delete_bucket(self, bucket_name):
"""
Delete the specified bucket.
:param bucket_name: The name of the bucket to delete.
:return: The response from the delete_bucket API call.
"""
try:
return self.s3_client.delete_bucket(Bucket=bucket_name)
except botocore.exceptions.ClientError as e:
print(f"Error deleting bucket '{bucket_name}': {e}")
raise e
def list_buckets(self):
"""
List all buckets.
:return: A list of bucket names.
"""
response = self.s3_client.list_buckets()
return [bucket['Name'] for bucket in response['Buckets']]
def list_objects(self, bucket_name):
"""
List all objects in the specified bucket.
:param bucket_name: The name of the bucket to list objects from.
:return: A list of object keys.
"""
response = self.s3_client.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
return [item['Key'] for item in response['Contents']]
else:
return []
def put_object(self, bucket_name, key, data):
"""
Put an object into the specified bucket.
:param bucket_name: The name of the bucket to store the object in.
:param key: The key (name) of the object.
:param data: The data (content) of the object.
:return: The response from the put_object API call.
"""
try:
return self.s3_client.put_object(Bucket=bucket_name, Key=key, Body=data)
except botocore.exceptions.ClientError as e:
print(f"Error putting object '{key}' in bucket '{bucket_name}': {e}")
raise e
def get_object(self, bucket_name, key):
"""
Get an object from the specified bucket.
:param bucket_name: The name of the bucket to retrieve the object from.
:param key: The key (name) of the object.
:return: The data (content) of the object.
"""
try:
response = self.s3_client.get_object(Bucket=bucket_name, Key=key)
return response['Body'].read()
except botocore.exceptions.ClientError as e:
print(f"Error getting object '{key}' from bucket '{bucket_name}': {e}")
raise e
def delete_object(self, bucket_name, key):
"""
Delete an object from the specified bucket.
:param bucket_name: The name of the bucket to delete the object from.
:param key: The key (name) of the object.
:return: The response from the delete_object API call.
"""
try:
return self.s3_client.delete_object(Bucket=bucket_name, Key=key)
except botocore.exceptions.ClientError as e:
print(f"Error deleting object '{key}' from bucket '{bucket_name}': {e}")
raise e
def query_csv(self, bucket_name, csv_key):
"""
Query a CSV file in the specified bucket and return the result as a Pandas DataFrame.
:param bucket_name: The name of the bucket containing the CSV file.
:param csv_key: The key (name) of the CSV file.
:return: A Pandas DataFrame containing the result of the query.
"""
try:
response = self.s3_client.get_object(Bucket=bucket_name, Key=csv_key)
csv_content = response['Body'].read().decode('utf-8')
df = pd.read_csv(io.StringIO(csv_content))
return df
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
print(f"Bucket '{bucket_name}' does not exist.")
elif e.response['Error']['Code'] == 'NoSuchKey':
print(f"CSV file with key '{csv_key}' does not exist in bucket '{bucket_name}'.")
else:
print(f"Error querying CSV file '{csv_key}' from bucket '{bucket_name}': {e}")
raise e
def query_csv_pandasql(self, bucket_name, csv_key, query):
"""
Query a CSV file in the specified bucket using SQL and return the result as a Pandas DataFrame.
:param bucket_name: The name of the bucket containing the CSV file.
:param csv_key: The key (name) of the CSV file.
:param query: The SQL query to execute on the CSV file.
:return: A Pandas DataFrame containing the result of the query.
"""
try:
response = self.s3_client.get_object(Bucket=bucket_name, Key=csv_key)
csv_content = response['Body'].read().decode('utf-8')
df = pd.read_csv(io.StringIO(csv_content))
result = sqldf(query, locals())
return result
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == 'NoSuchBucket':
print(f"Bucket '{bucket_name}' does not exist.")
elif e.response['Error']['Code'] == 'NoSuchKey':
print(f"CSV file with key '{csv_key}' does not exist in bucket '{bucket_name}'.")
else:
print(f"Error querying CSV file '{csv_key}' from bucket '{bucket_name}': {e}")
raise e